Module: sip-router Branch: master Commit: d99a9c7f47613078f9d8ab42959e954fc58518f7 URL: http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=d99a9c7f...
Author: Charles Chance charles.chance@sipcentric.com Committer: Charles Chance charles.chance@sipcentric.com Date: Tue Sep 9 23:42:35 2014 +0100
dmq: add the following config functions:
- dmq_t_replicate(): Replicates current SIP message to all active DMQ nodes. Includes built-in loop detection which can be skipped with optional parameter. - dmq_is_from_node(): Checks whether current message has come from another node, based on source IP.
---
modules/dmq/dmq.c | 6 ++++ modules/dmq/dmq_funcs.c | 73 +++++++++++++++++++++++++++++++++++++++++++++++ modules/dmq/dmq_funcs.h | 2 + 3 files changed, 81 insertions(+), 0 deletions(-)
diff --git a/modules/dmq/dmq.c b/modules/dmq/dmq.c index 30405f1..06d9cfb 100644 --- a/modules/dmq/dmq.c +++ b/modules/dmq/dmq.c @@ -97,6 +97,12 @@ static cmd_export_t cmds[] = { ANY_ROUTE}, {"dmq_bcast_message", (cmd_function)cfg_dmq_bcast_message, 3, bcast_dmq_fixup, 0, ANY_ROUTE}, + {"dmq_t_replicate", (cmd_function)cfg_dmq_t_replicate, 0, 0, 0, + REQUEST_ROUTE}, + {"dmq_t_replicate", (cmd_function)cfg_dmq_t_replicate, 1, fixup_spve_null, 0, + REQUEST_ROUTE}, + {"dmq_is_from_node", (cmd_function)cfg_dmq_is_from_node, 0, 0, 0, + REQUEST_ROUTE}, {"bind_dmq", (cmd_function)bind_dmq, 0, 0, 0}, {0, 0, 0, 0, 0, 0} }; diff --git a/modules/dmq/dmq_funcs.c b/modules/dmq/dmq_funcs.c index 6b09b2a..3c9c8c3 100644 --- a/modules/dmq/dmq_funcs.c +++ b/modules/dmq/dmq_funcs.c @@ -114,6 +114,30 @@ int build_uri_str(str* username, struct sip_uri* uri, str* from) return 0; }
+/* Checks if the request (sip_msg_t* msg) comes from another DMQ node based on source IP. */ +int is_from_remote_node(sip_msg_t* msg) +{ + ip_addr_t* ip; + dmq_node_t* node; + int result = -1; + + ip = &msg->rcv.src_ip; + + lock_get(&node_list->lock); + node = node_list->nodes; + + while(node) { + if (!node->local && ip_addr_cmp(ip, &node->ip_address)) { + result = 1; + goto done; + } + node = node->next; + } +done: + lock_release(&node_list->lock); + return result; +} + /** * @brief broadcast a dmq message * @@ -340,6 +364,55 @@ error: return -1; }
+/** + * @brief config file function for replicating SIP message to all nodes (wraps t_replicate) + */ +int cfg_dmq_t_replicate(struct sip_msg* msg, char* s) +{ + dmq_node_t* node; + int i = 0; + + /* avoid loops - do not replicate if message has come from another node + * (override if optional parameter is set) + */ + if ((!s || (get_int_fparam(&i, msg, (fparam_t*)s)==0 && !i)) + && (is_from_remote_node(msg) > 0)) { + LM_DBG("message is from another node - skipping replication\n"); + return -1; + } + + lock_get(&node_list->lock); + node = node_list->nodes; + while(node) { + /* we do not send the message to the following: + * - ourself + * - any inactive nodes + */ + if(node->local || node->status != DMQ_NODE_ACTIVE) { + LM_DBG("skipping node %.*s\n", STR_FMT(&node->orig_uri)); + node = node->next; + continue; + } + if(tmb.t_replicate(msg, &node->orig_uri) < 0) { + LM_ERR("error calling t_replicate\n"); + goto error; + } + node = node->next; + } + lock_release(&node_list->lock); + return 0; +error: + lock_release(&node_list->lock); + return -1; +} + +/* + * @brief config file function to check if received message is from another DMQ node based on source IP + */ +int cfg_dmq_is_from_node(struct sip_msg* msg) +{ + return is_from_remote_node(msg); +}
/** * @brief pings the servers in the nodelist diff --git a/modules/dmq/dmq_funcs.h b/modules/dmq/dmq_funcs.h index a138788..55f67ac 100644 --- a/modules/dmq/dmq_funcs.h +++ b/modules/dmq/dmq_funcs.h @@ -50,6 +50,8 @@ int cfg_dmq_send_message(struct sip_msg* msg, char* peer, char* to, char* body, char* content_type); int cfg_dmq_bcast_message(struct sip_msg* msg, char* peer, char* body, char* content_type); +int cfg_dmq_t_replicate(struct sip_msg* msg, char* s); +int cfg_dmq_is_from_node(struct sip_msg* msg); dmq_peer_t* register_dmq_peer(dmq_peer_t* peer); int dmq_send_message(dmq_peer_t* peer, str* body, dmq_node_t* node, dmq_resp_cback_t* resp_cback, int max_forwards, str* content_type);