Module: sip-router Branch: master Commit: b1aadf4cf9ea8fa91465fadc24ad268398a74880 URL: http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=b1aadf4c...
Author: Alex Hermann alex@speakup.nl Committer: Charles Chance charles.chance@sipcentric.com Date: Thu Nov 13 19:45:32 2014 +0000
dmq: Add init_callback() to API
The init_callback is called after DMQ has synced with the notification_peer. This callback can thus be used to send/broadcast messages as early as possible.
---
modules/dmq/dmq.c | 12 +++++++++++- modules/dmq/notification_peer.c | 34 +++++++++++++++++++++++++++++++++- modules/dmq/notification_peer.h | 1 + modules/dmq/peer.h | 2 ++ modules/htable/ht_dmq.c | 1 + 5 files changed, 48 insertions(+), 2 deletions(-)
diff --git a/modules/dmq/dmq.c b/modules/dmq/dmq.c index 7e4defb..6d53baf 100644 --- a/modules/dmq/dmq.c +++ b/modules/dmq/dmq.c @@ -239,7 +239,14 @@ static int mod_init(void) LM_ERR("error in shm_malloc\n"); return -1; } - + + dmq_init_callback_done = shm_malloc(sizeof(int)); + if (!dmq_init_callback_done) { + LM_ERR("no more shm\n"); + return -1; + } + *dmq_init_callback_done = 0; + /** * add the dmq notification peer. * the dmq is a peer itself so that it can receive node notifications @@ -326,6 +333,9 @@ static void destroy(void) { if (dmq_server_socket.s) { pkg_free(dmq_server_socket.s); } + if (dmq_init_callback_done) { + shm_free(dmq_init_callback_done); + } }
static int handle_dmq_fixup(void** param, int param_no) diff --git a/modules/dmq/notification_peer.c b/modules/dmq/notification_peer.c index 2acb9a8..e7d8930 100644 --- a/modules/dmq/notification_peer.c +++ b/modules/dmq/notification_peer.c @@ -29,6 +29,9 @@ str notification_content_type = str_init("text/plain"); dmq_resp_cback_t notification_callback = {¬ification_resp_callback_f, 0};
+int *dmq_init_callback_done; + + /** * @brief add notification peer */ @@ -36,6 +39,7 @@ int add_notification_peer() { dmq_peer_t not_peer; not_peer.callback = dmq_notification_callback; + not_peer.init_callback = NULL; not_peer.description.s = "notification_peer"; not_peer.description.len = 17; not_peer.peer_id.s = "notification_peer"; @@ -165,6 +169,21 @@ error: return -1; }
+ +int run_init_callbacks() { + dmq_peer_t* crt; + + crt = peer_list->peers; + while(crt) { + if (crt->init_callback) { + crt->init_callback(); + } + crt = crt->next; + } + return 0; +} + + /** * @brief dmq notification callback */ @@ -206,6 +225,10 @@ int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp) ¬ification_callback, maxforwards, ¬ification_content_type); } pkg_free(response_body); + if (!*dmq_init_callback_done) { + *dmq_init_callback_done = 1; + run_init_callbacks(); + } return 0; error: return -1; @@ -292,8 +315,17 @@ int notification_resp_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void* param) { int ret; + int nodes_recv; + LM_DBG("notification_callback_f triggered [%p %d %p]\n", msg, code, param); - if(code == 408) { + if(code == 200) { + nodes_recv = extract_node_list(node_list, msg); + LM_DBG("received %d new or changed nodes\n", nodes_recv); + if (!*dmq_init_callback_done) { + *dmq_init_callback_done = 1; + run_init_callbacks(); + } + } else if(code == 408) { /* deleting node - the server did not respond */ LM_ERR("deleting server %.*s because of failed request\n", STR_FMT(&node->orig_uri)); if (STR_EQ(node->orig_uri, dmq_notification_address)) { diff --git a/modules/dmq/notification_peer.h b/modules/dmq/notification_peer.h index 72df4ec..ff9871d 100644 --- a/modules/dmq/notification_peer.h +++ b/modules/dmq/notification_peer.h @@ -34,6 +34,7 @@ #include "dmq_funcs.h"
extern str notification_content_type; +extern int *dmq_init_callback_done;
int add_notification_peer(); int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp); diff --git a/modules/dmq/peer.h b/modules/dmq/peer.h index 7ce4434..bd1a15d 100644 --- a/modules/dmq/peer.h +++ b/modules/dmq/peer.h @@ -42,11 +42,13 @@ typedef struct peer_response { } peer_reponse_t;
typedef int(*peer_callback_t)(struct sip_msg*, peer_reponse_t* resp); +typedef int(*init_callback_t)();
typedef struct dmq_peer { str peer_id; str description; peer_callback_t callback; + init_callback_t init_callback; struct dmq_peer* next; } dmq_peer_t;
diff --git a/modules/htable/ht_dmq.c b/modules/htable/ht_dmq.c index 284c3dc..21755a7 100644 --- a/modules/htable/ht_dmq.c +++ b/modules/htable/ht_dmq.c @@ -59,6 +59,7 @@ int ht_dmq_initialize() }
not_peer.callback = ht_dmq_handle_msg; + not_peer.init_callback = NULL; not_peer.description.s = "htable"; not_peer.description.len = 6; not_peer.peer_id.s = "htable";