Module: sip-router
Branch: alexh/dialog-sync-wip
Commit: 720e06dddf148baa4c809528d0670c1ec429ae67
URL:
http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=720e06d…
Author: Alex Hermann <alex(a)speakup.nl>
Committer: Alex Hermann <alex(a)speakup.nl>
Date: Thu Aug 28 14:27:34 2014 +0200
dialog: Send initial DMQ-sync only to the node which requested it
Do not broadcast it to all nodes.
---
modules/dialog/dlg_dmq.c | 23 ++++++++++++++---------
modules/dialog/dlg_dmq.h | 2 +-
modules/dialog/dlg_handlers.c | 8 ++++----
modules/dialog/dlg_hash.c | 2 +-
4 files changed, 20 insertions(+), 15 deletions(-)
diff --git a/modules/dialog/dlg_dmq.c b/modules/dialog/dlg_dmq.c
index bce56e5..2f87f08 100644
--- a/modules/dialog/dlg_dmq.c
+++ b/modules/dialog/dlg_dmq.c
@@ -73,13 +73,18 @@ error:
}
-int dlg_dmq_broadcast(str* body) {
+int dlg_dmq_send(str* body, dmq_node_t* node) {
if (!dlg_dmq_peer) {
LM_ERR("dlg_dmq_peer is null!\n");
return -1;
}
- LM_DBG("sending broadcast...\n");
- dlg_dmqb.bcast_message(dlg_dmq_peer, body, 0, &dlg_dmq_resp_callback, 1,
&dlg_dmq_content_type);
+ if (node) {
+ LM_DBG("sending dmq message ...\n");
+ dlg_dmqb.send_message(dlg_dmq_peer, body, node, &dlg_dmq_resp_callback, 1,
&dlg_dmq_content_type);
+ } else {
+ LM_DBG("sending dmq broadcast...\n");
+ dlg_dmqb.bcast_message(dlg_dmq_peer, body, 0, &dlg_dmq_resp_callback, 1,
&dlg_dmq_content_type);
+ }
return 0;
}
@@ -293,7 +298,7 @@ int dlg_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp,
dmq_node_t* dm
break;
case DLG_DMQ_SYNC:
- dmq_send_all_dlgs();
+ dmq_send_all_dlgs(dmq_node);
break;
case DLG_DMQ_NONE:
@@ -343,7 +348,7 @@ int dlg_dmq_request_sync() {
}
jdoc.buf.len = strlen(jdoc.buf.s);
LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s);
- if (dlg_dmq_broadcast(&jdoc.buf)!=0) {
+ if (dlg_dmq_send(&jdoc.buf, 0)!=0) {
goto error;
}
@@ -362,7 +367,7 @@ error:
}
-int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg, int needlock) {
+int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg, int needlock,
dmq_node_t *node ) {
srjson_doc_t jdoc, prof_jdoc;
@@ -452,7 +457,7 @@ int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg,
int needl
}
jdoc.buf.len = strlen(jdoc.buf.s);
LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s);
- if (dlg_dmq_broadcast(&jdoc.buf)!=0) {
+ if (dlg_dmq_send(&jdoc.buf, node)!=0) {
goto error;
}
@@ -471,7 +476,7 @@ error:
}
-int dmq_send_all_dlgs() {
+int dmq_send_all_dlgs(dmq_node_t* dmq_node) {
int index;
dlg_entry_t entry;
dlg_cell_t *dlg;
@@ -485,7 +490,7 @@ int dmq_send_all_dlgs() {
for(dlg = entry.first; dlg != NULL; dlg = dlg->next){
dlg->iflags &= ~DLG_IFLAG_DMQ_SYNC;
- dlg_dmq_replicate_action(DLG_DMQ_UPDATE, dlg, 0);
+ dlg_dmq_replicate_action(DLG_DMQ_UPDATE, dlg, 0, dmq_node);
}
dlg_unlock( d_table, &entry);
diff --git a/modules/dialog/dlg_dmq.h b/modules/dialog/dlg_dmq.h
index efba9af..6547a4b 100644
--- a/modules/dialog/dlg_dmq.h
+++ b/modules/dialog/dlg_dmq.h
@@ -43,6 +43,6 @@ typedef enum {
int dlg_dmq_initialize();
int dlg_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* node);
-int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg, int needlock);
+int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg, int needlock,
dmq_node_t* node);
int dlg_dmq_resp_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void*
param);
#endif
diff --git a/modules/dialog/dlg_handlers.c b/modules/dialog/dlg_handlers.c
index 29e1350..fa08484 100644
--- a/modules/dialog/dlg_handlers.c
+++ b/modules/dialog/dlg_handlers.c
@@ -571,7 +571,7 @@ static void dlg_onreply(struct cell* t, int type, struct tmcb_params
*param)
done:
if(dlg_enable_dmq && (dlg->iflags & DLG_IFLAG_DMQ_SYNC) &&
new_state>old_state) {
- dlg_dmq_replicate_action(DLG_DMQ_STATE, dlg, 0);
+ dlg_dmq_replicate_action(DLG_DMQ_STATE, dlg, 0, 0);
}
done_early:
@@ -730,7 +730,7 @@ static void dlg_on_send(struct cell* t, int type, struct tmcb_params
*param)
/* sync over dmq */
if (dlg_enable_dmq) {
- dlg_dmq_replicate_action(DLG_DMQ_UPDATE, dlg, 1);
+ dlg_dmq_replicate_action(DLG_DMQ_UPDATE, dlg, 1, 0);
}
/* unref by 2: 1 set when adding in tm cb, 1 set by dlg_get_by_iuid() */
@@ -1394,7 +1394,7 @@ void dlg_onroute(struct sip_msg* req, str *route_params, void
*param)
done:
if(dlg_enable_dmq && (dlg->iflags & DLG_IFLAG_DMQ_SYNC) &&
new_state>old_state) {
- dlg_dmq_replicate_action(DLG_DMQ_STATE, dlg, 0);
+ dlg_dmq_replicate_action(DLG_DMQ_STATE, dlg, 0, 0);
}
dlg_release(dlg);
@@ -1469,7 +1469,7 @@ void dlg_ontimeout(struct dlg_tl *tl)
}
if(dlg_enable_dmq && (dlg->iflags & DLG_IFLAG_DMQ_SYNC) &&
new_state>old_state) {
- dlg_dmq_replicate_action(DLG_DMQ_STATE, dlg, 0);
+ dlg_dmq_replicate_action(DLG_DMQ_STATE, dlg, 0, 0);
}
return;
diff --git a/modules/dialog/dlg_hash.c b/modules/dialog/dlg_hash.c
index 58abd24..ac869ad 100644
--- a/modules/dialog/dlg_hash.c
+++ b/modules/dialog/dlg_hash.c
@@ -399,7 +399,7 @@ inline void destroy_dlg(struct dlg_cell *dlg)
run_dlg_callbacks( DLGCB_DESTROY , dlg, NULL, NULL, DLG_DIR_NONE, 0);
if (dlg_enable_dmq && (dlg->iflags & DLG_IFLAG_DMQ_SYNC))
- dlg_dmq_replicate_action(DLG_DMQ_RM, dlg, 0);
+ dlg_dmq_replicate_action(DLG_DMQ_RM, dlg, 0, 0);
/* delete the dialog from DB*/
if (dlg_db_mode)