Module: sip-router
Branch: lazedo/kazoo
Commit: 0d9cf1bd0b34f0fe718d6f52edf72b5a890b62fe
URL:
http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=0d9cf1b…
Author: lazedo <luis.azedo(a)factorlusitano.com>
Committer: lazedo <luis.azedo(a)factorlusitano.com>
Date: Thu Sep 11 14:47:33 2014 +0100
support vhost in connection
---
modules/kazoo/kz_amqp.c | 59 ++++++++++++++++++++++++++++++++++++++++------
modules/kazoo/kz_amqp.h | 3 +-
2 files changed, 52 insertions(+), 10 deletions(-)
diff --git a/modules/kazoo/kz_amqp.c b/modules/kazoo/kz_amqp.c
index 1e3c0d2..3931d27 100644
--- a/modules/kazoo/kz_amqp.c
+++ b/modules/kazoo/kz_amqp.c
@@ -127,6 +127,17 @@ void kz_amqp_free_bind(kz_amqp_bind_ptr bind)
shm_free(bind);
}
+void kz_amqp_free_connection(kz_amqp_conn_ptr conn)
+{
+ if(!conn)
+ return;
+
+ if(conn->url)
+ shm_free(conn->url);
+ shm_free(conn);
+}
+
+
void kz_amqp_free_pipe_cmd(kz_amqp_cmd_ptr cmd)
{
if(cmd == NULL)
@@ -269,9 +280,9 @@ void kz_amqp_destroy() {
if(kz_pool != NULL) {
kz_amqp_conn_ptr conn = kz_pool->head;
while(conn != NULL) {
- kz_amqp_conn_ptr free = conn;
+ kz_amqp_conn_ptr tofree = conn;
conn = conn->next;
- shm_free(free);
+ kz_amqp_free_connection(tofree);
}
shm_free(kz_pool);
}
@@ -279,13 +290,45 @@ void kz_amqp_destroy() {
}
+#define KZ_URL_MAX_SIZE 50
+static char* KZ_URL_ROOT = "/";
+
int kz_amqp_add_connection(modparam_t type, void* val)
{
kz_amqp_init_connection_pool(); // find a better way
+ char* url = (char*) val;
+ int len = strlen(url);
+ if(len > KZ_URL_MAX_SIZE) {
+ LM_ERR("connection url exceeds max size %d\n", KZ_URL_MAX_SIZE);
+ return -1;
+ }
+
kz_amqp_conn_ptr newConn = shm_malloc(sizeof(kz_amqp_conn));
memset(newConn, 0, sizeof(kz_amqp_conn));
+ newConn->url = shm_malloc( (KZ_URL_MAX_SIZE + 1) * sizeof(char) );
+ memset(newConn->url, 0, (KZ_URL_MAX_SIZE + 1) * sizeof(char));
+ // maintain compatibility
+ if (!strncmp((char*)val, "kazoo://", 8)) {
+ sprintf(newConn->url, "amqp://%s", (char*)(url+(8*sizeof(char))) );
+ } else {
+ strcpy(newConn->url, url);
+ newConn->url[len] = '\0';
+ }
+
+ if(amqp_parse_url(newConn->url, &newConn->info) == AMQP_STATUS_BAD_URL) {
+ LM_ERR("ERROR PARSING URL \"%s\"\n", newConn->url);
+ goto error;
+ }
+
+
+ if(newConn->info.vhost == NULL) {
+ newConn->info.vhost = KZ_URL_ROOT;
+ } else if(newConn->info.vhost[0] == '/' &&
strlen(newConn->info.vhost) == 1) { // bug in amqp_parse_url ?
+ newConn->info.vhost++;
+ }
+
if(kz_pool->head == NULL)
kz_pool->head = newConn;
@@ -294,9 +337,12 @@ int kz_amqp_add_connection(modparam_t type, void* val)
kz_pool->tail = newConn;
- amqp_parse_url((char*)val, &newConn->info);
-
return 0;
+
+error:
+ kz_amqp_free_connection(newConn);
+ return -1;
+
}
void kz_amqp_connection_close(kz_amqp_conn_ptr rmq) {
@@ -313,9 +359,6 @@ void kz_amqp_connection_close(kz_amqp_conn_ptr rmq) {
rmq->conn = NULL;
rmq->socket = NULL;
rmq->channel_count = 0;
-
-// lock_release(&kz_pool->lock);
-
}
}
@@ -348,7 +391,7 @@ int kz_amqp_connection_open(kz_amqp_conn_ptr rmq) {
}
if (kz_amqp_error("Logging in", amqp_login(rmq->conn,
- "/", //rmq->info.vhost,
+ rmq->info.vhost,
0,
131072,
0,
diff --git a/modules/kazoo/kz_amqp.h b/modules/kazoo/kz_amqp.h
index ce4aa5d..10b9095 100644
--- a/modules/kazoo/kz_amqp.h
+++ b/modules/kazoo/kz_amqp.h
@@ -30,11 +30,11 @@ extern int dbk_consumer_processes;
typedef struct kz_amqp_conn_t {
kz_amqp_connection_info info;
+ char* url;
amqp_connection_state_t conn;
amqp_socket_t *socket;
amqp_channel_t channel_count;
amqp_channel_t channel_counter;
-// gen_lock_t lock;
struct kz_amqp_conn_t* next;
} kz_amqp_conn, *kz_amqp_conn_ptr;
@@ -42,7 +42,6 @@ typedef struct {
kz_amqp_conn_ptr current;
kz_amqp_conn_ptr head;
kz_amqp_conn_ptr tail;
-// gen_lock_t lock;
} kz_amqp_conn_pool, *kz_amqp_conn_pool_ptr;