Module: sip-router Branch: alexh/dialog-sync-wip Commit: 720e06dddf148baa4c809528d0670c1ec429ae67 URL: http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=720e06dd...
Author: Alex Hermann alex@speakup.nl Committer: Alex Hermann alex@speakup.nl Date: Thu Aug 28 14:27:34 2014 +0200
dialog: Send initial DMQ-sync only to the node which requested it
Do not broadcast it to all nodes.
---
modules/dialog/dlg_dmq.c | 23 ++++++++++++++--------- modules/dialog/dlg_dmq.h | 2 +- modules/dialog/dlg_handlers.c | 8 ++++---- modules/dialog/dlg_hash.c | 2 +- 4 files changed, 20 insertions(+), 15 deletions(-)
diff --git a/modules/dialog/dlg_dmq.c b/modules/dialog/dlg_dmq.c index bce56e5..2f87f08 100644 --- a/modules/dialog/dlg_dmq.c +++ b/modules/dialog/dlg_dmq.c @@ -73,13 +73,18 @@ error: }
-int dlg_dmq_broadcast(str* body) { +int dlg_dmq_send(str* body, dmq_node_t* node) { if (!dlg_dmq_peer) { LM_ERR("dlg_dmq_peer is null!\n"); return -1; } - LM_DBG("sending broadcast...\n"); - dlg_dmqb.bcast_message(dlg_dmq_peer, body, 0, &dlg_dmq_resp_callback, 1, &dlg_dmq_content_type); + if (node) { + LM_DBG("sending dmq message ...\n"); + dlg_dmqb.send_message(dlg_dmq_peer, body, node, &dlg_dmq_resp_callback, 1, &dlg_dmq_content_type); + } else { + LM_DBG("sending dmq broadcast...\n"); + dlg_dmqb.bcast_message(dlg_dmq_peer, body, 0, &dlg_dmq_resp_callback, 1, &dlg_dmq_content_type); + } return 0; }
@@ -293,7 +298,7 @@ int dlg_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dm break;
case DLG_DMQ_SYNC: - dmq_send_all_dlgs(); + dmq_send_all_dlgs(dmq_node); break;
case DLG_DMQ_NONE: @@ -343,7 +348,7 @@ int dlg_dmq_request_sync() { } jdoc.buf.len = strlen(jdoc.buf.s); LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s); - if (dlg_dmq_broadcast(&jdoc.buf)!=0) { + if (dlg_dmq_send(&jdoc.buf, 0)!=0) { goto error; }
@@ -362,7 +367,7 @@ error: }
-int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg, int needlock) { +int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg, int needlock, dmq_node_t *node ) {
srjson_doc_t jdoc, prof_jdoc;
@@ -452,7 +457,7 @@ int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg, int needl } jdoc.buf.len = strlen(jdoc.buf.s); LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s); - if (dlg_dmq_broadcast(&jdoc.buf)!=0) { + if (dlg_dmq_send(&jdoc.buf, node)!=0) { goto error; }
@@ -471,7 +476,7 @@ error: }
-int dmq_send_all_dlgs() { +int dmq_send_all_dlgs(dmq_node_t* dmq_node) { int index; dlg_entry_t entry; dlg_cell_t *dlg; @@ -485,7 +490,7 @@ int dmq_send_all_dlgs() {
for(dlg = entry.first; dlg != NULL; dlg = dlg->next){ dlg->iflags &= ~DLG_IFLAG_DMQ_SYNC; - dlg_dmq_replicate_action(DLG_DMQ_UPDATE, dlg, 0); + dlg_dmq_replicate_action(DLG_DMQ_UPDATE, dlg, 0, dmq_node); }
dlg_unlock( d_table, &entry); diff --git a/modules/dialog/dlg_dmq.h b/modules/dialog/dlg_dmq.h index efba9af..6547a4b 100644 --- a/modules/dialog/dlg_dmq.h +++ b/modules/dialog/dlg_dmq.h @@ -43,6 +43,6 @@ typedef enum {
int dlg_dmq_initialize(); int dlg_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* node); -int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg, int needlock); +int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg, int needlock, dmq_node_t* node); int dlg_dmq_resp_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void* param); #endif diff --git a/modules/dialog/dlg_handlers.c b/modules/dialog/dlg_handlers.c index 29e1350..fa08484 100644 --- a/modules/dialog/dlg_handlers.c +++ b/modules/dialog/dlg_handlers.c @@ -571,7 +571,7 @@ static void dlg_onreply(struct cell* t, int type, struct tmcb_params *param)
done: if(dlg_enable_dmq && (dlg->iflags & DLG_IFLAG_DMQ_SYNC) && new_state>old_state) { - dlg_dmq_replicate_action(DLG_DMQ_STATE, dlg, 0); + dlg_dmq_replicate_action(DLG_DMQ_STATE, dlg, 0, 0); }
done_early: @@ -730,7 +730,7 @@ static void dlg_on_send(struct cell* t, int type, struct tmcb_params *param)
/* sync over dmq */ if (dlg_enable_dmq) { - dlg_dmq_replicate_action(DLG_DMQ_UPDATE, dlg, 1); + dlg_dmq_replicate_action(DLG_DMQ_UPDATE, dlg, 1, 0); }
/* unref by 2: 1 set when adding in tm cb, 1 set by dlg_get_by_iuid() */ @@ -1394,7 +1394,7 @@ void dlg_onroute(struct sip_msg* req, str *route_params, void *param)
done: if(dlg_enable_dmq && (dlg->iflags & DLG_IFLAG_DMQ_SYNC) && new_state>old_state) { - dlg_dmq_replicate_action(DLG_DMQ_STATE, dlg, 0); + dlg_dmq_replicate_action(DLG_DMQ_STATE, dlg, 0, 0); }
dlg_release(dlg); @@ -1469,7 +1469,7 @@ void dlg_ontimeout(struct dlg_tl *tl) }
if(dlg_enable_dmq && (dlg->iflags & DLG_IFLAG_DMQ_SYNC) && new_state>old_state) { - dlg_dmq_replicate_action(DLG_DMQ_STATE, dlg, 0); + dlg_dmq_replicate_action(DLG_DMQ_STATE, dlg, 0, 0); }
return; diff --git a/modules/dialog/dlg_hash.c b/modules/dialog/dlg_hash.c index 58abd24..ac869ad 100644 --- a/modules/dialog/dlg_hash.c +++ b/modules/dialog/dlg_hash.c @@ -399,7 +399,7 @@ inline void destroy_dlg(struct dlg_cell *dlg) run_dlg_callbacks( DLGCB_DESTROY , dlg, NULL, NULL, DLG_DIR_NONE, 0);
if (dlg_enable_dmq && (dlg->iflags & DLG_IFLAG_DMQ_SYNC)) - dlg_dmq_replicate_action(DLG_DMQ_RM, dlg, 0); + dlg_dmq_replicate_action(DLG_DMQ_RM, dlg, 0, 0);
/* delete the dialog from DB*/ if (dlg_db_mode)