Module: sip-router
Branch: alexh/dialog-sync-wip
Commit: d9eb21488d3d763ae176aa1d10ee16a94cf5f908
URL:
http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=d9eb214…
Author: Alex Hermann <alex(a)speakup.nl>
Committer: Alex Hermann <alex(a)speakup.nl>
Date: Thu Aug 28 14:21:15 2014 +0200
dmq: Let the handler know about the sending node
Try to find a node based on the from uri of the incoming request and hand
it to the request handler.
---
modules/dialog/dlg_dmq.c | 2 +-
modules/dialog/dlg_dmq.h | 2 +-
modules/dmq/notification_peer.c | 2 +-
modules/dmq/notification_peer.h | 2 +-
modules/dmq/peer.c | 2 +-
modules/dmq/peer.h | 5 +++--
modules/dmq/worker.c | 12 +++++++++++-
modules/htable/ht_dmq.c | 2 +-
modules/htable/ht_dmq.h | 2 +-
9 files changed, 21 insertions(+), 10 deletions(-)
diff --git a/modules/dialog/dlg_dmq.c b/modules/dialog/dlg_dmq.c
index bd43da7..bce56e5 100644
--- a/modules/dialog/dlg_dmq.c
+++ b/modules/dialog/dlg_dmq.c
@@ -87,7 +87,7 @@ int dlg_dmq_broadcast(str* body) {
/**
* @brief ht dmq callback
*/
-int dlg_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp)
+int dlg_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node)
{
int content_length;
str body;
diff --git a/modules/dialog/dlg_dmq.h b/modules/dialog/dlg_dmq.h
index bd13757..efba9af 100644
--- a/modules/dialog/dlg_dmq.h
+++ b/modules/dialog/dlg_dmq.h
@@ -42,7 +42,7 @@ typedef enum {
} dlg_dmq_action_t;
int dlg_dmq_initialize();
-int dlg_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp);
+int dlg_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* node);
int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg, int needlock);
int dlg_dmq_resp_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void*
param);
#endif
diff --git a/modules/dmq/notification_peer.c b/modules/dmq/notification_peer.c
index 75c1386..e6b68ac 100644
--- a/modules/dmq/notification_peer.c
+++ b/modules/dmq/notification_peer.c
@@ -187,7 +187,7 @@ int run_init_callbacks() {
/**
* @brief dmq notification callback
*/
-int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp)
+int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t*
dmq_node)
{
int nodes_recv;
str* response_body = NULL;
diff --git a/modules/dmq/notification_peer.h b/modules/dmq/notification_peer.h
index ff9871d..99be339 100644
--- a/modules/dmq/notification_peer.h
+++ b/modules/dmq/notification_peer.h
@@ -37,7 +37,7 @@ extern str notification_content_type;
extern int *dmq_init_callback_done;
int add_notification_peer();
-int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp);
+int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t*
dmq_node);
int extract_node_list(dmq_node_list_t* update_list, struct sip_msg* msg);
str* build_notification_body();
int build_node_str(dmq_node_t* node, char* buf, int buflen);
diff --git a/modules/dmq/peer.c b/modules/dmq/peer.c
index b93cf3a..f54a2c0 100644
--- a/modules/dmq/peer.c
+++ b/modules/dmq/peer.c
@@ -97,7 +97,7 @@ dmq_peer_t* find_peer(str peer_id)
/**
* @empty callback
*/
-int empty_peer_callback(struct sip_msg* msg, peer_reponse_t* resp)
+int empty_peer_callback(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node)
{
return 0;
}
diff --git a/modules/dmq/peer.h b/modules/dmq/peer.h
index bd1a15d..8b6fc32 100644
--- a/modules/dmq/peer.h
+++ b/modules/dmq/peer.h
@@ -28,6 +28,7 @@
#include <string.h>
#include <stdlib.h>
+#include "dmqnode.h"
#include "../../lock_ops.h"
#include "../../str.h"
#include "../../mem/mem.h"
@@ -41,7 +42,7 @@ typedef struct peer_response {
str body;
} peer_reponse_t;
-typedef int(*peer_callback_t)(struct sip_msg*, peer_reponse_t* resp);
+typedef int(*peer_callback_t)(struct sip_msg*, peer_reponse_t* resp, dmq_node_t* node);
typedef int(*init_callback_t)();
typedef struct dmq_peer {
@@ -66,7 +67,7 @@ typedef dmq_peer_t* (*register_dmq_peer_t)(dmq_peer_t*);
dmq_peer_t* add_peer(dmq_peer_list_t* peer_list, dmq_peer_t* peer);
dmq_peer_t* find_peer(str peer_id);
-int empty_peer_callback(struct sip_msg* msg, peer_reponse_t* resp);
+int empty_peer_callback(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t*
dmq_node);
#endif
diff --git a/modules/dmq/worker.c b/modules/dmq/worker.c
index a9ec65d..35d853d 100644
--- a/modules/dmq/worker.c
+++ b/modules/dmq/worker.c
@@ -29,6 +29,8 @@
#include "../../data_lump_rpl.h"
#include "../../mod_fix.h"
#include "../../sip_msg_clone.h"
+#include "../../parser/parse_from.h"
+#include "../../parser/parse_to.h"
/**
* @brief set the body of a response
@@ -78,6 +80,7 @@ void worker_loop(int id)
dmq_job_t* current_job;
peer_reponse_t peer_response;
int ret_value;
+ dmq_node_t *dmq_node = NULL;
worker = &workers[id];
for(;;) {
@@ -92,7 +95,14 @@ void worker_loop(int id)
current_job = job_queue_pop(worker->queue);
/* job_queue_pop might return NULL if queue is empty */
if(current_job) {
- ret_value = current_job->f(current_job->msg, &peer_response);
+ /* extract the from uri */
+ if (parse_from_header(current_job->msg) < 0) {
+ LM_ERR("bad sip message or missing From hdr\n");
+ } else {
+ dmq_node = find_dmq_node_uri(node_list, &((struct
to_body*)current_job->msg->from->parsed)->uri);
+ }
+
+ ret_value = current_job->f(current_job->msg, &peer_response, dmq_node);
if(ret_value < 0) {
LM_ERR("running job failed\n");
continue;
diff --git a/modules/htable/ht_dmq.c b/modules/htable/ht_dmq.c
index 21755a7..7fa2959 100644
--- a/modules/htable/ht_dmq.c
+++ b/modules/htable/ht_dmq.c
@@ -89,7 +89,7 @@ int ht_dmq_broadcast(str* body) {
/**
* @brief ht dmq callback
*/
-int ht_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp)
+int ht_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node)
{
int content_length;
str body;
diff --git a/modules/htable/ht_dmq.h b/modules/htable/ht_dmq.h
index 7a0ca88..e9189ec 100644
--- a/modules/htable/ht_dmq.h
+++ b/modules/htable/ht_dmq.h
@@ -40,7 +40,7 @@ typedef enum {
} ht_dmq_action_t;
int ht_dmq_initialize();
-int ht_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp);
+int ht_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node);
int ht_dmq_replicate_action(ht_dmq_action_t action, str* htname, str* cname, int type,
int_str* val, int mode);
int ht_dmq_replay_action(ht_dmq_action_t action, str* htname, str* cname, int type,
int_str* val, int mode);
int ht_dmq_resp_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void*
param);