Module: sip-router Branch: master Commit: c2dcf4dbd884b13423159a7d94a3cb9f4e1a3e87 URL: http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=c2dcf4db...
Author: Alex Hermann alex@speakup.nl Committer: Charles Chance charles.chance@sipcentric.com Date: Thu Nov 13 19:54:26 2014 +0000
dmq: Let the handler know about the sending node
Try to find a node based on the from uri of the incoming request and hand it to the request handler.
---
modules/dmq/notification_peer.c | 2 +- modules/dmq/notification_peer.h | 2 +- modules/dmq/peer.c | 2 +- modules/dmq/peer.h | 5 +++-- modules/dmq/worker.c | 12 +++++++++++- modules/htable/ht_dmq.c | 2 +- modules/htable/ht_dmq.h | 2 +- 7 files changed, 19 insertions(+), 8 deletions(-)
diff --git a/modules/dmq/notification_peer.c b/modules/dmq/notification_peer.c index e7d8930..1d804bd 100644 --- a/modules/dmq/notification_peer.c +++ b/modules/dmq/notification_peer.c @@ -187,7 +187,7 @@ int run_init_callbacks() { /** * @brief dmq notification callback */ -int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp) +int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node) { int nodes_recv; str* response_body = NULL; diff --git a/modules/dmq/notification_peer.h b/modules/dmq/notification_peer.h index ff9871d..99be339 100644 --- a/modules/dmq/notification_peer.h +++ b/modules/dmq/notification_peer.h @@ -37,7 +37,7 @@ extern str notification_content_type; extern int *dmq_init_callback_done;
int add_notification_peer(); -int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp); +int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node); int extract_node_list(dmq_node_list_t* update_list, struct sip_msg* msg); str* build_notification_body(); int build_node_str(dmq_node_t* node, char* buf, int buflen); diff --git a/modules/dmq/peer.c b/modules/dmq/peer.c index b93cf3a..f54a2c0 100644 --- a/modules/dmq/peer.c +++ b/modules/dmq/peer.c @@ -97,7 +97,7 @@ dmq_peer_t* find_peer(str peer_id) /** * @empty callback */ -int empty_peer_callback(struct sip_msg* msg, peer_reponse_t* resp) +int empty_peer_callback(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node) { return 0; } diff --git a/modules/dmq/peer.h b/modules/dmq/peer.h index bd1a15d..8b6fc32 100644 --- a/modules/dmq/peer.h +++ b/modules/dmq/peer.h @@ -28,6 +28,7 @@
#include <string.h> #include <stdlib.h> +#include "dmqnode.h" #include "../../lock_ops.h" #include "../../str.h" #include "../../mem/mem.h" @@ -41,7 +42,7 @@ typedef struct peer_response { str body; } peer_reponse_t;
-typedef int(*peer_callback_t)(struct sip_msg*, peer_reponse_t* resp); +typedef int(*peer_callback_t)(struct sip_msg*, peer_reponse_t* resp, dmq_node_t* node); typedef int(*init_callback_t)();
typedef struct dmq_peer { @@ -66,7 +67,7 @@ typedef dmq_peer_t* (*register_dmq_peer_t)(dmq_peer_t*);
dmq_peer_t* add_peer(dmq_peer_list_t* peer_list, dmq_peer_t* peer); dmq_peer_t* find_peer(str peer_id); -int empty_peer_callback(struct sip_msg* msg, peer_reponse_t* resp); +int empty_peer_callback(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node);
#endif
diff --git a/modules/dmq/worker.c b/modules/dmq/worker.c index 1ac77f0..8426730 100644 --- a/modules/dmq/worker.c +++ b/modules/dmq/worker.c @@ -29,6 +29,8 @@ #include "../../data_lump_rpl.h" #include "../../mod_fix.h" #include "../../sip_msg_clone.h" +#include "../../parser/parse_from.h" +#include "../../parser/parse_to.h"
/** * @brief set the body of a response @@ -74,6 +76,7 @@ void worker_loop(int id) dmq_job_t* current_job; peer_reponse_t peer_response; int ret_value; + dmq_node_t *dmq_node = NULL;
worker = &workers[id]; for(;;) { @@ -88,7 +91,14 @@ void worker_loop(int id) current_job = job_queue_pop(worker->queue); /* job_queue_pop might return NULL if queue is empty */ if(current_job) { - ret_value = current_job->f(current_job->msg, &peer_response); + /* extract the from uri */ + if (parse_from_header(current_job->msg) < 0) { + LM_ERR("bad sip message or missing From hdr\n"); + } else { + dmq_node = find_dmq_node_uri(node_list, &((struct to_body*)current_job->msg->from->parsed)->uri); + } + + ret_value = current_job->f(current_job->msg, &peer_response, dmq_node); if(ret_value < 0) { LM_ERR("running job failed\n"); continue; diff --git a/modules/htable/ht_dmq.c b/modules/htable/ht_dmq.c index 21755a7..7fa2959 100644 --- a/modules/htable/ht_dmq.c +++ b/modules/htable/ht_dmq.c @@ -89,7 +89,7 @@ int ht_dmq_broadcast(str* body) { /** * @brief ht dmq callback */ -int ht_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp) +int ht_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node) { int content_length; str body; diff --git a/modules/htable/ht_dmq.h b/modules/htable/ht_dmq.h index 7a0ca88..e9189ec 100644 --- a/modules/htable/ht_dmq.h +++ b/modules/htable/ht_dmq.h @@ -40,7 +40,7 @@ typedef enum { } ht_dmq_action_t;
int ht_dmq_initialize(); -int ht_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp); +int ht_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node); int ht_dmq_replicate_action(ht_dmq_action_t action, str* htname, str* cname, int type, int_str* val, int mode); int ht_dmq_replay_action(ht_dmq_action_t action, str* htname, str* cname, int type, int_str* val, int mode); int ht_dmq_resp_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void* param);