Module: kamailio Branch: master Commit: 29aec6800099a3b1d7a2b2c7e6d4b3865f0bc6e6 URL: https://github.com/kamailio/kamailio/commit/29aec6800099a3b1d7a2b2c7e6d4b386...
Author: lazedo luis.azedo@factorlusitano.com Committer: lazedo luis.azedo@factorlusitano.com Date: 2017-04-14T05:47:52+01:00
kazoo: add consistent worker key
---
Modified: src/modules/kazoo/kz_amqp.c Modified: src/modules/kazoo/kz_amqp.h
---
Diff: https://github.com/kamailio/kamailio/commit/29aec6800099a3b1d7a2b2c7e6d4b386... Patch: https://github.com/kamailio/kamailio/commit/29aec6800099a3b1d7a2b2c7e6d4b386...
---
diff --git a/src/modules/kazoo/kz_amqp.c b/src/modules/kazoo/kz_amqp.c index f3a7c2c..d759490 100644 --- a/src/modules/kazoo/kz_amqp.c +++ b/src/modules/kazoo/kz_amqp.c @@ -384,6 +384,8 @@ void kz_amqp_free_bind(kz_amqp_bind_ptr bind) kz_amqp_bytes_free(bind->event_key); if(bind->event_subkey.bytes) kz_amqp_bytes_free(bind->event_subkey); + if(bind->consistent_worker_key) + shm_free(bind->consistent_worker_key); shm_free(bind); }
@@ -1771,6 +1773,7 @@ int kz_amqp_subscribe(struct sip_msg* msg, char* payload) int no_ack = 1; int federate = 0; int consistent_worker = 0; + str* consistent_worker_key = NULL; int wait_for_consumer_ack = 1; kz_amqp_queue_ptr queue = NULL; kz_amqp_exchange_ptr exchange = NULL; @@ -1825,6 +1828,11 @@ int kz_amqp_subscribe(struct sip_msg* msg, char* payload) consistent_worker = json_object_get_int(tmpObj); }
+ tmpObj = kz_json_get_object(json_obj, "consistent-worker-key"); + if(tmpObj != NULL) { + consistent_worker_key = kz_str_dup_from_char((char*)json_object_get_string(tmpObj)); + } + tmpObj = kz_json_get_object(json_obj, "exchange-bindings"); if(tmpObj != NULL) { exchange_binding = kz_amqp_exchange_binding_from_json(tmpObj); @@ -1853,6 +1861,7 @@ int kz_amqp_subscribe(struct sip_msg* msg, char* payload) bind->wait_for_consumer_ack = wait_for_consumer_ack; bind->federate = federate; bind->consistent_worker = consistent_worker; + bind->consistent_worker_key = consistent_worker_key;
kz_amqp_binding_ptr binding = shm_malloc(sizeof(kz_amqp_binding)); @@ -2866,6 +2875,8 @@ void kz_amqp_send_worker_event(kz_amqp_server_ptr server_ptr, amqp_envelope_t* e char buffer[100]; kz_amqp_cmd_ptr cmd = NULL; kz_amqp_consumer_delivery_ptr ptr = NULL; + json_obj_ptr json_obj = NULL; + json_object* JObj = NULL; str* message_id = NULL; int idx = envelope->channel-1; int worker = 0; @@ -2878,7 +2889,7 @@ void kz_amqp_send_worker_event(kz_amqp_server_ptr server_ptr, amqp_envelope_t* e } memset(json_data, 0, msg_size + 1); memcpy(json_data, (char*)envelope->message.body.bytes, msg_size); - json_obj_ptr json_obj = kz_json_parse(json_data); + json_obj = kz_json_parse(json_data); pkg_free(json_data); if (json_obj == NULL) { LM_ERR("error parsing json body\n"); @@ -2888,7 +2899,7 @@ void kz_amqp_send_worker_event(kz_amqp_server_ptr server_ptr, amqp_envelope_t* e json_object_object_add(json_obj, BLF_JSON_BROKER_ZONE, json_object_new_string(server_ptr->zone->zone));
- json_object* JObj = kz_json_get_object(json_obj, BLF_JSON_SERVERID); + JObj = kz_json_get_object(json_obj, BLF_JSON_SERVERID); if(JObj != NULL) { const char* _kz_server_id_str = json_object_get_string(JObj); sprintf(buffer, "consumer://%d/%s", _kz_server_id, _kz_server_id_str); @@ -2932,8 +2943,14 @@ void kz_amqp_send_worker_event(kz_amqp_server_ptr server_ptr, amqp_envelope_t* e
if(bind && bind->consistent_worker) { str rk; - rk.s = (char*)envelope->routing_key.bytes; - rk.len = (int)envelope->routing_key.len; + if(bind->consistent_worker_key != NULL && + (JObj = kz_json_get_object(json_obj, bind->consistent_worker_key->s)) != NULL) { + rk.s = (char*)json_object_get_string(JObj); + rk.len = strlen(rk.s); + } else { + rk.s = (char*)envelope->routing_key.bytes; + rk.len = (int)envelope->routing_key.len; + } worker = core_hash(&rk, NULL, dbk_consumer_workers); LM_DBG("computed worker for %.*s is %d\n", rk.len, rk.s, worker); } else { diff --git a/src/modules/kazoo/kz_amqp.h b/src/modules/kazoo/kz_amqp.h index c412156..b4a88ff 100644 --- a/src/modules/kazoo/kz_amqp.h +++ b/src/modules/kazoo/kz_amqp.h @@ -217,6 +217,7 @@ typedef struct { amqp_boolean_t wait_for_consumer_ack; amqp_boolean_t federate; amqp_boolean_t consistent_worker; + str* consistent_worker_key; } kz_amqp_bind, *kz_amqp_bind_ptr;
typedef struct {