Module: sip-router
Branch: master
Commit: d99a9c7f47613078f9d8ab42959e954fc58518f7
URL:
http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=d99a9c7…
Author: Charles Chance <charles.chance(a)sipcentric.com>
Committer: Charles Chance <charles.chance(a)sipcentric.com>
Date: Tue Sep 9 23:42:35 2014 +0100
dmq: add the following config functions:
- dmq_t_replicate(): Replicates current SIP message to all active DMQ nodes. Includes
built-in loop detection which can be skipped with optional parameter.
- dmq_is_from_node(): Checks whether current message has come from another node, based on
source IP.
---
modules/dmq/dmq.c | 6 ++++
modules/dmq/dmq_funcs.c | 73 +++++++++++++++++++++++++++++++++++++++++++++++
modules/dmq/dmq_funcs.h | 2 +
3 files changed, 81 insertions(+), 0 deletions(-)
diff --git a/modules/dmq/dmq.c b/modules/dmq/dmq.c
index 30405f1..06d9cfb 100644
--- a/modules/dmq/dmq.c
+++ b/modules/dmq/dmq.c
@@ -97,6 +97,12 @@ static cmd_export_t cmds[] = {
ANY_ROUTE},
{"dmq_bcast_message", (cmd_function)cfg_dmq_bcast_message, 3,
bcast_dmq_fixup, 0,
ANY_ROUTE},
+ {"dmq_t_replicate", (cmd_function)cfg_dmq_t_replicate, 0, 0, 0,
+ REQUEST_ROUTE},
+ {"dmq_t_replicate", (cmd_function)cfg_dmq_t_replicate, 1,
fixup_spve_null, 0,
+ REQUEST_ROUTE},
+ {"dmq_is_from_node", (cmd_function)cfg_dmq_is_from_node, 0, 0, 0,
+ REQUEST_ROUTE},
{"bind_dmq", (cmd_function)bind_dmq, 0, 0,
0},
{0, 0, 0, 0, 0, 0}
};
diff --git a/modules/dmq/dmq_funcs.c b/modules/dmq/dmq_funcs.c
index 6b09b2a..3c9c8c3 100644
--- a/modules/dmq/dmq_funcs.c
+++ b/modules/dmq/dmq_funcs.c
@@ -114,6 +114,30 @@ int build_uri_str(str* username, struct sip_uri* uri, str* from)
return 0;
}
+/* Checks if the request (sip_msg_t* msg) comes from another DMQ node based on source IP.
*/
+int is_from_remote_node(sip_msg_t* msg)
+{
+ ip_addr_t* ip;
+ dmq_node_t* node;
+ int result = -1;
+
+ ip = &msg->rcv.src_ip;
+
+ lock_get(&node_list->lock);
+ node = node_list->nodes;
+
+ while(node) {
+ if (!node->local && ip_addr_cmp(ip, &node->ip_address)) {
+ result = 1;
+ goto done;
+ }
+ node = node->next;
+ }
+done:
+ lock_release(&node_list->lock);
+ return result;
+}
+
/**
* @brief broadcast a dmq message
*
@@ -340,6 +364,55 @@ error:
return -1;
}
+/**
+ * @brief config file function for replicating SIP message to all nodes (wraps
t_replicate)
+ */
+int cfg_dmq_t_replicate(struct sip_msg* msg, char* s)
+{
+ dmq_node_t* node;
+ int i = 0;
+
+ /* avoid loops - do not replicate if message has come from another node
+ * (override if optional parameter is set)
+ */
+ if ((!s || (get_int_fparam(&i, msg, (fparam_t*)s)==0 && !i))
+ && (is_from_remote_node(msg) > 0)) {
+ LM_DBG("message is from another node - skipping replication\n");
+ return -1;
+ }
+
+ lock_get(&node_list->lock);
+ node = node_list->nodes;
+ while(node) {
+ /* we do not send the message to the following:
+ * - ourself
+ * - any inactive nodes
+ */
+ if(node->local || node->status != DMQ_NODE_ACTIVE) {
+ LM_DBG("skipping node %.*s\n",
STR_FMT(&node->orig_uri));
+ node = node->next;
+ continue;
+ }
+ if(tmb.t_replicate(msg, &node->orig_uri) < 0) {
+ LM_ERR("error calling t_replicate\n");
+ goto error;
+ }
+ node = node->next;
+ }
+ lock_release(&node_list->lock);
+ return 0;
+error:
+ lock_release(&node_list->lock);
+ return -1;
+}
+
+/*
+ * @brief config file function to check if received message is from another DMQ node
based on source IP
+ */
+int cfg_dmq_is_from_node(struct sip_msg* msg)
+{
+ return is_from_remote_node(msg);
+}
/**
* @brief pings the servers in the nodelist
diff --git a/modules/dmq/dmq_funcs.h b/modules/dmq/dmq_funcs.h
index a138788..55f67ac 100644
--- a/modules/dmq/dmq_funcs.h
+++ b/modules/dmq/dmq_funcs.h
@@ -50,6 +50,8 @@ int cfg_dmq_send_message(struct sip_msg* msg, char* peer, char* to,
char* body, char* content_type);
int cfg_dmq_bcast_message(struct sip_msg* msg, char* peer, char* body,
char* content_type);
+int cfg_dmq_t_replicate(struct sip_msg* msg, char* s);
+int cfg_dmq_is_from_node(struct sip_msg* msg);
dmq_peer_t* register_dmq_peer(dmq_peer_t* peer);
int dmq_send_message(dmq_peer_t* peer, str* body, dmq_node_t* node,
dmq_resp_cback_t* resp_cback, int max_forwards, str* content_type);