Module: sip-router Branch: andrei/tcp_tls_changes Commit: c914809c9f70fe15d1a2459fa0668d722d83726c URL: http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=c914809c...
Author: Andrei Pelinescu-Onciul andrei@iptel.org Committer: Andrei Pelinescu-Onciul andrei@iptel.org Date: Thu May 20 16:22:16 2010 +0200
tls: added a minimum overhead shm buffer queue
Minimum overhead buffer queue in shm memory, based on tcp_wbuffer_queue (tcp_conn.h).
---
modules/tls/sbufq.h | 292 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 292 insertions(+), 0 deletions(-)
diff --git a/modules/tls/sbufq.h b/modules/tls/sbufq.h new file mode 100644 index 0000000..6b59751 --- /dev/null +++ b/modules/tls/sbufq.h @@ -0,0 +1,292 @@ +/* + * $Id$ + * + * Copyright (C) 2010 iptelorg GmbH + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ +/** minimal overhead buffer queue in shm memory. + * @file modules/tls/sbufq.h + * @ingroup: tls + * Module: @ref tls + */ +/* + * History: + * -------- + * 2010-03-31 initial version, based on tcp_conn.h tcp_wbuffer_queue (andrei) +*/ + +#ifndef __sbufq_h +#define __sbufq_h + +#include "../../compiler_opt.h" +#include "../../ut.h" +#include "../../mem/shm_mem.h" +#include "../../timer_ticks.h" +#include "../../timer.h" +#include "../../dprint.h" +#include <string.h> + + +struct sbuf_elem { + struct sbuf_elem* next; + unsigned int b_size; /**< buf size */ + char buf[1]; /**< variable size buffer */ +}; + +struct sbuffer_queue { + struct sbuf_elem* first; + struct sbuf_elem* last; + ticks_t last_chg; /**< last change (creation time or partial flush)*/ + unsigned int queued; /**< total size */ + unsigned int offset; /**< offset in the first buffer where unflushed data + starts */ + unsigned int last_used; /**< how much of the last buffer is used */ +}; + + +/* sbufq_flush() output flags */ +#define F_BUFQ_EMPTY 1 +#define F_BUFQ_ERROR_FLUSH 2 + + +#define sbufq_empty(bq) ((bq)->first==0) +#define sbufq_non_empty(bq) ((bq)->first!=0) + + + +/** adds/appends data to a buffer queue. + * WARNING: it does no attempt to synchronize access/lock. If needed it should + * be called under lock. + * @param q - buffer queue + * @param data + * @param size + * @param min_buf_size - min size to allocate for new buffer elements + * @return 0 on success, -1 on error (mem. allocation) + */ +inline static int sbufq_add(struct sbuffer_queue* q, const void* data, + unsigned int size, unsigned int min_buf_size) +{ + struct sbuf_elem* b; + unsigned int last_free; + unsigned int b_size; + unsigned int crt_size; + ticks_t t; + + t=get_ticks_raw(); + + if (likely(q->last==0)) { + b_size=MAX_unsigned(min_buf_size, size); + b=shm_malloc(sizeof(*b)+b_size-sizeof(b->buf)); + if (unlikely(b==0)) + goto error; + b->b_size=b_size; + b->next=0; + q->last=b; + q->first=b; + q->last_used=0; + q->offset=0; + q->last_chg=get_ticks_raw(); + last_free=b_size; + crt_size=size; + goto data_cpy; + }else{ + b=q->last; + } + + while(size){ + last_free=b->b_size-q->last_used; + if (last_free==0){ + b_size=MAX_unsigned(min_buf_size, size); + b=shm_malloc(sizeof(*b)+b_size-sizeof(b->buf)); + if (unlikely(b==0)) + goto error; + b->b_size=b_size; + b->next=0; + q->last->next=b; + q->last=b; + q->last_used=0; + last_free=b->b_size; + } + crt_size=MIN_unsigned(last_free, size); +data_cpy: + memcpy(b->buf+q->last_used, data, crt_size); + q->last_used+=crt_size; + size-=crt_size; + data+=crt_size; + q->queued+=crt_size; + } + return 0; +error: + return -1; +} + + + +/** inserts data (at the beginning) in a buffer queue. + * Note: should never be called after sbufq_run(). + * WARNING: it does no attempt to synchronize access/lock. If needed it should + * be called under lock. + * @param q - buffer queue + * @param data + * @param size + * @param min_buf_size - min size to allocate for new buffer elements + * @return 0 on success, -1 on error (mem. allocation) + */ +inline static int sbufq_insert(struct sbuffer_queue* q, const void* data, + unsigned int size, unsigned int min_buf_size) +{ + struct sbuf_elem* b; + + if (likely(q->first==0)) /* if empty, use sbufq_add */ + return sbufq_add(q, data, size, min_buf_size); + + if (unlikely(q->offset)){ + LOG(L_CRIT, "BUG: non-null offset %d (bad call, should" + "never be called after sbufq_run())\n", q->offset); + goto error; + } + if ((q->first==q->last) && ((q->last->b_size-q->last_used)>=size)){ + /* one block with enough space in it for size bytes */ + memmove(q->first->buf+size, q->first->buf, size); + memcpy(q->first->buf, data, size); + q->last_used+=size; + }else{ + /* create a size bytes block directly */ + b=shm_malloc(sizeof(*b)+size-sizeof(b->buf)); + if (unlikely(b==0)) + goto error; + b->b_size=size; + /* insert it */ + b->next=q->first; + q->first=b; + memcpy(b->buf, data, size); + } + + q->queued+=size; + return 0; +error: + return -1; +} + + +/** destroy a buffer queue. + * Only the content is destroyed (shm_free()'d), the queue head is + * re-intialized. + * WARNING: it does no attempt to synchronize access/lock. If needed it should + * be called under lock. + * @param q - buffer queue + * @return - number of bytes that used to be queued (>=0). + */ +inline static unsigned int sbufq_destroy(struct sbuffer_queue* q) +{ + struct sbuf_elem* b; + struct sbuf_elem* next_b; + int unqueued; + + unqueued=0; + if (likely(q->first)){ + b=q->first; + do{ + next_b=b->next; + unqueued+=(b==q->last)?q->last_used:b->b_size; + if (b==q->first) + unqueued-=q->offset; + shm_free(b); + b=next_b; + }while(b); + } + memset(q, 0, sizeof(*q)); + return unqueued; +} + + + +/** tries to flush the queue. + * Tries to flush as much as possible from the given queue, using the + * given callback. + * WARNING: it does no attempt to synchronize access/lock. If needed it should + * be called under lock. + * @param q - buffer queue + * @param *flags - set to: + * F_BUFQ_EMPTY if the queued is completely flushed + * F_BUFQ_ERROR_FLUSH if the flush_f callback returned error. + * @param flush_f - flush function (callback). modeled after write(): + * flush_f(param1, param2, const void* buf, unsigned size). + * It should return the number of bytes "flushed" on + * success, or <0 on error. If the number of bytes + * "flushed" is smaller then the requested size, it + * would be assumed that no more bytes can be flushed + * and sbufq_flush will exit. + * @param flush_p1 - parameter for the flush function callback. + * @param flush_p2 - parameter for the flush function callback. + * @return -1 on internal error, or the number of bytes flushed on + * success (>=0). Note that the flags param is + * always set and it should be used to check for errors, since + * a flush_f() failure will not result in a negative return. + */ +inline static int sbufq_flush(struct sbuffer_queue* q, int* flags, + int (*flush_f)(void* p1, void* p2, + const void* buf, + unsigned size), + void* flush_p1, void* flush_p2) +{ + struct sbuf_elem *b; + int n; + int ret; + int block_size; + char* buf; + + *flags=0; + ret=0; + while(q->first){ + block_size=((q->first==q->last)?q->last_used:q->first->b_size)- + q->offset; + buf=q->first->buf+q->offset; + n=flush_f(flush_p1, flush_p2, buf, block_size); + if (likely(n>0)){ + ret+=n; + if (likely(n==block_size)){ + b=q->first; + q->first=q->first->next; + shm_free(b); + q->offset=0; + q->queued-=block_size; + ret+=block_size; + }else{ + q->offset+=n; + q->queued-=n; + ret+=n; + break; + } + }else{ + if (unlikely(n<0)) + *flags|=F_BUFQ_ERROR_FLUSH; + break; + } + } + if (likely(q->first==0)){ + q->last=0; + q->last_used=0; + q->offset=0; + *flags|=F_BUFQ_EMPTY; + } + return ret; +} + + + + +#endif /*__sbufq_h*/ + +/* vi: set ts=4 sw=4 tw=79:ai:cindent: */