Module: sip-router Branch: lazedo/kazoo Commit: 0d9cf1bd0b34f0fe718d6f52edf72b5a890b62fe URL: http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=0d9cf1bd...
Author: lazedo luis.azedo@factorlusitano.com Committer: lazedo luis.azedo@factorlusitano.com Date: Thu Sep 11 14:47:33 2014 +0100
support vhost in connection
---
modules/kazoo/kz_amqp.c | 59 ++++++++++++++++++++++++++++++++++++++++------ modules/kazoo/kz_amqp.h | 3 +- 2 files changed, 52 insertions(+), 10 deletions(-)
diff --git a/modules/kazoo/kz_amqp.c b/modules/kazoo/kz_amqp.c index 1e3c0d2..3931d27 100644 --- a/modules/kazoo/kz_amqp.c +++ b/modules/kazoo/kz_amqp.c @@ -127,6 +127,17 @@ void kz_amqp_free_bind(kz_amqp_bind_ptr bind) shm_free(bind); }
+void kz_amqp_free_connection(kz_amqp_conn_ptr conn) +{ + if(!conn) + return; + + if(conn->url) + shm_free(conn->url); + shm_free(conn); +} + + void kz_amqp_free_pipe_cmd(kz_amqp_cmd_ptr cmd) { if(cmd == NULL) @@ -269,9 +280,9 @@ void kz_amqp_destroy() { if(kz_pool != NULL) { kz_amqp_conn_ptr conn = kz_pool->head; while(conn != NULL) { - kz_amqp_conn_ptr free = conn; + kz_amqp_conn_ptr tofree = conn; conn = conn->next; - shm_free(free); + kz_amqp_free_connection(tofree); } shm_free(kz_pool); } @@ -279,13 +290,45 @@ void kz_amqp_destroy() {
}
+#define KZ_URL_MAX_SIZE 50 +static char* KZ_URL_ROOT = "/"; + int kz_amqp_add_connection(modparam_t type, void* val) { kz_amqp_init_connection_pool(); // find a better way
+ char* url = (char*) val; + int len = strlen(url); + if(len > KZ_URL_MAX_SIZE) { + LM_ERR("connection url exceeds max size %d\n", KZ_URL_MAX_SIZE); + return -1; + } + kz_amqp_conn_ptr newConn = shm_malloc(sizeof(kz_amqp_conn)); memset(newConn, 0, sizeof(kz_amqp_conn));
+ newConn->url = shm_malloc( (KZ_URL_MAX_SIZE + 1) * sizeof(char) ); + memset(newConn->url, 0, (KZ_URL_MAX_SIZE + 1) * sizeof(char)); + // maintain compatibility + if (!strncmp((char*)val, "kazoo://", 8)) { + sprintf(newConn->url, "amqp://%s", (char*)(url+(8*sizeof(char))) ); + } else { + strcpy(newConn->url, url); + newConn->url[len] = '\0'; + } + + if(amqp_parse_url(newConn->url, &newConn->info) == AMQP_STATUS_BAD_URL) { + LM_ERR("ERROR PARSING URL "%s"\n", newConn->url); + goto error; + } + + + if(newConn->info.vhost == NULL) { + newConn->info.vhost = KZ_URL_ROOT; + } else if(newConn->info.vhost[0] == '/' && strlen(newConn->info.vhost) == 1) { // bug in amqp_parse_url ? + newConn->info.vhost++; + } + if(kz_pool->head == NULL) kz_pool->head = newConn;
@@ -294,9 +337,12 @@ int kz_amqp_add_connection(modparam_t type, void* val)
kz_pool->tail = newConn;
- amqp_parse_url((char*)val, &newConn->info); - return 0; + +error: + kz_amqp_free_connection(newConn); + return -1; + }
void kz_amqp_connection_close(kz_amqp_conn_ptr rmq) { @@ -313,9 +359,6 @@ void kz_amqp_connection_close(kz_amqp_conn_ptr rmq) { rmq->conn = NULL; rmq->socket = NULL; rmq->channel_count = 0; - -// lock_release(&kz_pool->lock); - }
} @@ -348,7 +391,7 @@ int kz_amqp_connection_open(kz_amqp_conn_ptr rmq) { }
if (kz_amqp_error("Logging in", amqp_login(rmq->conn, - "/", //rmq->info.vhost, + rmq->info.vhost, 0, 131072, 0, diff --git a/modules/kazoo/kz_amqp.h b/modules/kazoo/kz_amqp.h index ce4aa5d..10b9095 100644 --- a/modules/kazoo/kz_amqp.h +++ b/modules/kazoo/kz_amqp.h @@ -30,11 +30,11 @@ extern int dbk_consumer_processes;
typedef struct kz_amqp_conn_t { kz_amqp_connection_info info; + char* url; amqp_connection_state_t conn; amqp_socket_t *socket; amqp_channel_t channel_count; amqp_channel_t channel_counter; -// gen_lock_t lock; struct kz_amqp_conn_t* next; } kz_amqp_conn, *kz_amqp_conn_ptr;
@@ -42,7 +42,6 @@ typedef struct { kz_amqp_conn_ptr current; kz_amqp_conn_ptr head; kz_amqp_conn_ptr tail; -// gen_lock_t lock; } kz_amqp_conn_pool, *kz_amqp_conn_pool_ptr;