kamailio.org
Sign In
Sign Up
Sign In
Sign Up
Manage this list
×
Keyboard Shortcuts
Thread View
j
: Next unread message
k
: Previous unread message
j a
: Jump to all threads
j l
: Jump to MailingList overview
2024
November
October
September
August
July
June
May
April
March
February
January
2023
December
November
October
September
August
July
June
May
April
March
February
January
2022
December
November
October
September
August
July
June
May
April
March
February
January
2021
December
November
October
September
August
July
June
May
April
March
February
January
2020
December
November
October
September
August
July
June
May
April
March
February
January
2019
December
November
October
September
August
July
June
May
April
March
February
January
2018
December
November
October
September
August
July
June
May
April
March
February
January
2017
December
November
October
September
August
July
June
May
April
March
February
January
2016
December
November
October
September
August
July
June
May
April
March
February
January
2015
December
November
October
September
August
July
June
May
April
March
February
January
2014
December
November
October
September
August
July
June
May
April
March
February
January
2013
December
November
October
September
August
July
June
May
April
March
February
January
2012
December
November
October
September
August
July
June
May
April
March
February
January
2011
December
November
October
September
August
July
June
May
April
March
February
January
2010
December
November
October
September
August
July
June
May
April
March
February
January
2009
December
November
October
September
August
July
June
May
April
March
February
January
2008
December
November
List overview
Download
sr-dev
September 2014
----- 2024 -----
November 2024
October 2024
September 2024
August 2024
July 2024
June 2024
May 2024
April 2024
March 2024
February 2024
January 2024
----- 2023 -----
December 2023
November 2023
October 2023
September 2023
August 2023
July 2023
June 2023
May 2023
April 2023
March 2023
February 2023
January 2023
----- 2022 -----
December 2022
November 2022
October 2022
September 2022
August 2022
July 2022
June 2022
May 2022
April 2022
March 2022
February 2022
January 2022
----- 2021 -----
December 2021
November 2021
October 2021
September 2021
August 2021
July 2021
June 2021
May 2021
April 2021
March 2021
February 2021
January 2021
----- 2020 -----
December 2020
November 2020
October 2020
September 2020
August 2020
July 2020
June 2020
May 2020
April 2020
March 2020
February 2020
January 2020
----- 2019 -----
December 2019
November 2019
October 2019
September 2019
August 2019
July 2019
June 2019
May 2019
April 2019
March 2019
February 2019
January 2019
----- 2018 -----
December 2018
November 2018
October 2018
September 2018
August 2018
July 2018
June 2018
May 2018
April 2018
March 2018
February 2018
January 2018
----- 2017 -----
December 2017
November 2017
October 2017
September 2017
August 2017
July 2017
June 2017
May 2017
April 2017
March 2017
February 2017
January 2017
----- 2016 -----
December 2016
November 2016
October 2016
September 2016
August 2016
July 2016
June 2016
May 2016
April 2016
March 2016
February 2016
January 2016
----- 2015 -----
December 2015
November 2015
October 2015
September 2015
August 2015
July 2015
June 2015
May 2015
April 2015
March 2015
February 2015
January 2015
----- 2014 -----
December 2014
November 2014
October 2014
September 2014
August 2014
July 2014
June 2014
May 2014
April 2014
March 2014
February 2014
January 2014
----- 2013 -----
December 2013
November 2013
October 2013
September 2013
August 2013
July 2013
June 2013
May 2013
April 2013
March 2013
February 2013
January 2013
----- 2012 -----
December 2012
November 2012
October 2012
September 2012
August 2012
July 2012
June 2012
May 2012
April 2012
March 2012
February 2012
January 2012
----- 2011 -----
December 2011
November 2011
October 2011
September 2011
August 2011
July 2011
June 2011
May 2011
April 2011
March 2011
February 2011
January 2011
----- 2010 -----
December 2010
November 2010
October 2010
September 2010
August 2010
July 2010
June 2010
May 2010
April 2010
March 2010
February 2010
January 2010
----- 2009 -----
December 2009
November 2009
October 2009
September 2009
August 2009
July 2009
June 2009
May 2009
April 2009
March 2009
February 2009
January 2009
----- 2008 -----
December 2008
November 2008
sr-dev@lists.kamailio.org
33 participants
375 discussions
Start a n
N
ew thread
git:alexh/dialog-sync-wip: dialog: Only DMQ-sync out our "own" dialogs
by Alex Hermann
Module: sip-router Branch: alexh/dialog-sync-wip Commit: c809cae1b1357852bd72cf8e37dc21420bd79112 URL:
http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=c809cae…
Author: Alex Hermann <alex(a)speakup.nl> Committer: Alex Hermann <alex(a)speakup.nl> Date: Mon Sep 1 17:53:25 2014 +0200 dialog: Only DMQ-sync out our "own" dialogs Do not sync-out dialogs we got from another proxy. --- modules/dialog/dlg_dmq.c | 8 +++++--- 1 files changed, 5 insertions(+), 3 deletions(-) diff --git a/modules/dialog/dlg_dmq.c b/modules/dialog/dlg_dmq.c index 60b90d5..56da577 100644 --- a/modules/dialog/dlg_dmq.c +++ b/modules/dialog/dlg_dmq.c @@ -375,7 +375,7 @@ int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg, int needl LM_DBG("replicating action [%d] on [%u:%u] to dmq peers\n", action, dlg->h_entry, dlg->h_id); if (action == DLG_DMQ_UPDATE) { - if ((dlg->iflags & DLG_IFLAG_DMQ_SYNC) && ((dlg->dflags & DLG_FLAG_CHANGED_PROF) == 0)) { + if (!node && (dlg->iflags & DLG_IFLAG_DMQ_SYNC) && ((dlg->dflags & DLG_FLAG_CHANGED_PROF) == 0)) { LM_DBG("dlg not changed, no sync\n"); return 1; } @@ -490,8 +490,10 @@ int dmq_send_all_dlgs(dmq_node_t* dmq_node) { dlg_lock( d_table, &entry); for(dlg = entry.first; dlg != NULL; dlg = dlg->next){ - dlg->iflags &= ~DLG_IFLAG_DMQ_SYNC; - dlg_dmq_replicate_action(DLG_DMQ_UPDATE, dlg, 0, dmq_node); + if (dlg->iflags & DLG_IFLAG_DMQ_SYNC) { + dlg->dflags |= DLG_FLAG_CHANGED_PROF; + dlg_dmq_replicate_action(DLG_DMQ_UPDATE, dlg, 0, dmq_node); + } } dlg_unlock( d_table, &entry);
10 years, 2 months
1
0
0
0
git:alexh/dialog-sync-wip: dialog: Send initial DMQ-sync only to the node which requested it
by Alex Hermann
Module: sip-router Branch: alexh/dialog-sync-wip Commit: 720e06dddf148baa4c809528d0670c1ec429ae67 URL:
http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=720e06d…
Author: Alex Hermann <alex(a)speakup.nl> Committer: Alex Hermann <alex(a)speakup.nl> Date: Thu Aug 28 14:27:34 2014 +0200 dialog: Send initial DMQ-sync only to the node which requested it Do not broadcast it to all nodes. --- modules/dialog/dlg_dmq.c | 23 ++++++++++++++--------- modules/dialog/dlg_dmq.h | 2 +- modules/dialog/dlg_handlers.c | 8 ++++---- modules/dialog/dlg_hash.c | 2 +- 4 files changed, 20 insertions(+), 15 deletions(-) diff --git a/modules/dialog/dlg_dmq.c b/modules/dialog/dlg_dmq.c index bce56e5..2f87f08 100644 --- a/modules/dialog/dlg_dmq.c +++ b/modules/dialog/dlg_dmq.c @@ -73,13 +73,18 @@ error: } -int dlg_dmq_broadcast(str* body) { +int dlg_dmq_send(str* body, dmq_node_t* node) { if (!dlg_dmq_peer) { LM_ERR("dlg_dmq_peer is null!\n"); return -1; } - LM_DBG("sending broadcast...\n"); - dlg_dmqb.bcast_message(dlg_dmq_peer, body, 0, &dlg_dmq_resp_callback, 1, &dlg_dmq_content_type); + if (node) { + LM_DBG("sending dmq message ...\n"); + dlg_dmqb.send_message(dlg_dmq_peer, body, node, &dlg_dmq_resp_callback, 1, &dlg_dmq_content_type); + } else { + LM_DBG("sending dmq broadcast...\n"); + dlg_dmqb.bcast_message(dlg_dmq_peer, body, 0, &dlg_dmq_resp_callback, 1, &dlg_dmq_content_type); + } return 0; } @@ -293,7 +298,7 @@ int dlg_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dm break; case DLG_DMQ_SYNC: - dmq_send_all_dlgs(); + dmq_send_all_dlgs(dmq_node); break; case DLG_DMQ_NONE: @@ -343,7 +348,7 @@ int dlg_dmq_request_sync() { } jdoc.buf.len = strlen(jdoc.buf.s); LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s); - if (dlg_dmq_broadcast(&jdoc.buf)!=0) { + if (dlg_dmq_send(&jdoc.buf, 0)!=0) { goto error; } @@ -362,7 +367,7 @@ error: } -int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg, int needlock) { +int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg, int needlock, dmq_node_t *node ) { srjson_doc_t jdoc, prof_jdoc; @@ -452,7 +457,7 @@ int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg, int needl } jdoc.buf.len = strlen(jdoc.buf.s); LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s); - if (dlg_dmq_broadcast(&jdoc.buf)!=0) { + if (dlg_dmq_send(&jdoc.buf, node)!=0) { goto error; } @@ -471,7 +476,7 @@ error: } -int dmq_send_all_dlgs() { +int dmq_send_all_dlgs(dmq_node_t* dmq_node) { int index; dlg_entry_t entry; dlg_cell_t *dlg; @@ -485,7 +490,7 @@ int dmq_send_all_dlgs() { for(dlg = entry.first; dlg != NULL; dlg = dlg->next){ dlg->iflags &= ~DLG_IFLAG_DMQ_SYNC; - dlg_dmq_replicate_action(DLG_DMQ_UPDATE, dlg, 0); + dlg_dmq_replicate_action(DLG_DMQ_UPDATE, dlg, 0, dmq_node); } dlg_unlock( d_table, &entry); diff --git a/modules/dialog/dlg_dmq.h b/modules/dialog/dlg_dmq.h index efba9af..6547a4b 100644 --- a/modules/dialog/dlg_dmq.h +++ b/modules/dialog/dlg_dmq.h @@ -43,6 +43,6 @@ typedef enum { int dlg_dmq_initialize(); int dlg_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* node); -int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg, int needlock); +int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg, int needlock, dmq_node_t* node); int dlg_dmq_resp_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void* param); #endif diff --git a/modules/dialog/dlg_handlers.c b/modules/dialog/dlg_handlers.c index 29e1350..fa08484 100644 --- a/modules/dialog/dlg_handlers.c +++ b/modules/dialog/dlg_handlers.c @@ -571,7 +571,7 @@ static void dlg_onreply(struct cell* t, int type, struct tmcb_params *param) done: if(dlg_enable_dmq && (dlg->iflags & DLG_IFLAG_DMQ_SYNC) && new_state>old_state) { - dlg_dmq_replicate_action(DLG_DMQ_STATE, dlg, 0); + dlg_dmq_replicate_action(DLG_DMQ_STATE, dlg, 0, 0); } done_early: @@ -730,7 +730,7 @@ static void dlg_on_send(struct cell* t, int type, struct tmcb_params *param) /* sync over dmq */ if (dlg_enable_dmq) { - dlg_dmq_replicate_action(DLG_DMQ_UPDATE, dlg, 1); + dlg_dmq_replicate_action(DLG_DMQ_UPDATE, dlg, 1, 0); } /* unref by 2: 1 set when adding in tm cb, 1 set by dlg_get_by_iuid() */ @@ -1394,7 +1394,7 @@ void dlg_onroute(struct sip_msg* req, str *route_params, void *param) done: if(dlg_enable_dmq && (dlg->iflags & DLG_IFLAG_DMQ_SYNC) && new_state>old_state) { - dlg_dmq_replicate_action(DLG_DMQ_STATE, dlg, 0); + dlg_dmq_replicate_action(DLG_DMQ_STATE, dlg, 0, 0); } dlg_release(dlg); @@ -1469,7 +1469,7 @@ void dlg_ontimeout(struct dlg_tl *tl) } if(dlg_enable_dmq && (dlg->iflags & DLG_IFLAG_DMQ_SYNC) && new_state>old_state) { - dlg_dmq_replicate_action(DLG_DMQ_STATE, dlg, 0); + dlg_dmq_replicate_action(DLG_DMQ_STATE, dlg, 0, 0); } return; diff --git a/modules/dialog/dlg_hash.c b/modules/dialog/dlg_hash.c index 58abd24..ac869ad 100644 --- a/modules/dialog/dlg_hash.c +++ b/modules/dialog/dlg_hash.c @@ -399,7 +399,7 @@ inline void destroy_dlg(struct dlg_cell *dlg) run_dlg_callbacks( DLGCB_DESTROY , dlg, NULL, NULL, DLG_DIR_NONE, 0); if (dlg_enable_dmq && (dlg->iflags & DLG_IFLAG_DMQ_SYNC)) - dlg_dmq_replicate_action(DLG_DMQ_RM, dlg, 0); + dlg_dmq_replicate_action(DLG_DMQ_RM, dlg, 0, 0); /* delete the dialog from DB*/ if (dlg_db_mode)
10 years, 2 months
1
0
0
0
git:alexh/dialog-sync-wip: dmq: Let the handler know about the sending node
by Alex Hermann
Module: sip-router Branch: alexh/dialog-sync-wip Commit: d9eb21488d3d763ae176aa1d10ee16a94cf5f908 URL:
http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=d9eb214…
Author: Alex Hermann <alex(a)speakup.nl> Committer: Alex Hermann <alex(a)speakup.nl> Date: Thu Aug 28 14:21:15 2014 +0200 dmq: Let the handler know about the sending node Try to find a node based on the from uri of the incoming request and hand it to the request handler. --- modules/dialog/dlg_dmq.c | 2 +- modules/dialog/dlg_dmq.h | 2 +- modules/dmq/notification_peer.c | 2 +- modules/dmq/notification_peer.h | 2 +- modules/dmq/peer.c | 2 +- modules/dmq/peer.h | 5 +++-- modules/dmq/worker.c | 12 +++++++++++- modules/htable/ht_dmq.c | 2 +- modules/htable/ht_dmq.h | 2 +- 9 files changed, 21 insertions(+), 10 deletions(-) diff --git a/modules/dialog/dlg_dmq.c b/modules/dialog/dlg_dmq.c index bd43da7..bce56e5 100644 --- a/modules/dialog/dlg_dmq.c +++ b/modules/dialog/dlg_dmq.c @@ -87,7 +87,7 @@ int dlg_dmq_broadcast(str* body) { /** * @brief ht dmq callback */ -int dlg_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp) +int dlg_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node) { int content_length; str body; diff --git a/modules/dialog/dlg_dmq.h b/modules/dialog/dlg_dmq.h index bd13757..efba9af 100644 --- a/modules/dialog/dlg_dmq.h +++ b/modules/dialog/dlg_dmq.h @@ -42,7 +42,7 @@ typedef enum { } dlg_dmq_action_t; int dlg_dmq_initialize(); -int dlg_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp); +int dlg_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* node); int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg, int needlock); int dlg_dmq_resp_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void* param); #endif diff --git a/modules/dmq/notification_peer.c b/modules/dmq/notification_peer.c index 75c1386..e6b68ac 100644 --- a/modules/dmq/notification_peer.c +++ b/modules/dmq/notification_peer.c @@ -187,7 +187,7 @@ int run_init_callbacks() { /** * @brief dmq notification callback */ -int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp) +int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node) { int nodes_recv; str* response_body = NULL; diff --git a/modules/dmq/notification_peer.h b/modules/dmq/notification_peer.h index ff9871d..99be339 100644 --- a/modules/dmq/notification_peer.h +++ b/modules/dmq/notification_peer.h @@ -37,7 +37,7 @@ extern str notification_content_type; extern int *dmq_init_callback_done; int add_notification_peer(); -int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp); +int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node); int extract_node_list(dmq_node_list_t* update_list, struct sip_msg* msg); str* build_notification_body(); int build_node_str(dmq_node_t* node, char* buf, int buflen); diff --git a/modules/dmq/peer.c b/modules/dmq/peer.c index b93cf3a..f54a2c0 100644 --- a/modules/dmq/peer.c +++ b/modules/dmq/peer.c @@ -97,7 +97,7 @@ dmq_peer_t* find_peer(str peer_id) /** * @empty callback */ -int empty_peer_callback(struct sip_msg* msg, peer_reponse_t* resp) +int empty_peer_callback(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node) { return 0; } diff --git a/modules/dmq/peer.h b/modules/dmq/peer.h index bd1a15d..8b6fc32 100644 --- a/modules/dmq/peer.h +++ b/modules/dmq/peer.h @@ -28,6 +28,7 @@ #include <string.h> #include <stdlib.h> +#include "dmqnode.h" #include "../../lock_ops.h" #include "../../str.h" #include "../../mem/mem.h" @@ -41,7 +42,7 @@ typedef struct peer_response { str body; } peer_reponse_t; -typedef int(*peer_callback_t)(struct sip_msg*, peer_reponse_t* resp); +typedef int(*peer_callback_t)(struct sip_msg*, peer_reponse_t* resp, dmq_node_t* node); typedef int(*init_callback_t)(); typedef struct dmq_peer { @@ -66,7 +67,7 @@ typedef dmq_peer_t* (*register_dmq_peer_t)(dmq_peer_t*); dmq_peer_t* add_peer(dmq_peer_list_t* peer_list, dmq_peer_t* peer); dmq_peer_t* find_peer(str peer_id); -int empty_peer_callback(struct sip_msg* msg, peer_reponse_t* resp); +int empty_peer_callback(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node); #endif diff --git a/modules/dmq/worker.c b/modules/dmq/worker.c index a9ec65d..35d853d 100644 --- a/modules/dmq/worker.c +++ b/modules/dmq/worker.c @@ -29,6 +29,8 @@ #include "../../data_lump_rpl.h" #include "../../mod_fix.h" #include "../../sip_msg_clone.h" +#include "../../parser/parse_from.h" +#include "../../parser/parse_to.h" /** * @brief set the body of a response @@ -78,6 +80,7 @@ void worker_loop(int id) dmq_job_t* current_job; peer_reponse_t peer_response; int ret_value; + dmq_node_t *dmq_node = NULL; worker = &workers[id]; for(;;) { @@ -92,7 +95,14 @@ void worker_loop(int id) current_job = job_queue_pop(worker->queue); /* job_queue_pop might return NULL if queue is empty */ if(current_job) { - ret_value = current_job->f(current_job->msg, &peer_response); + /* extract the from uri */ + if (parse_from_header(current_job->msg) < 0) { + LM_ERR("bad sip message or missing From hdr\n"); + } else { + dmq_node = find_dmq_node_uri(node_list, &((struct to_body*)current_job->msg->from->parsed)->uri); + } + + ret_value = current_job->f(current_job->msg, &peer_response, dmq_node); if(ret_value < 0) { LM_ERR("running job failed\n"); continue; diff --git a/modules/htable/ht_dmq.c b/modules/htable/ht_dmq.c index 21755a7..7fa2959 100644 --- a/modules/htable/ht_dmq.c +++ b/modules/htable/ht_dmq.c @@ -89,7 +89,7 @@ int ht_dmq_broadcast(str* body) { /** * @brief ht dmq callback */ -int ht_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp) +int ht_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node) { int content_length; str body; diff --git a/modules/htable/ht_dmq.h b/modules/htable/ht_dmq.h index 7a0ca88..e9189ec 100644 --- a/modules/htable/ht_dmq.h +++ b/modules/htable/ht_dmq.h @@ -40,7 +40,7 @@ typedef enum { } ht_dmq_action_t; int ht_dmq_initialize(); -int ht_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp); +int ht_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* dmq_node); int ht_dmq_replicate_action(ht_dmq_action_t action, str* htname, str* cname, int type, int_str* val, int mode); int ht_dmq_replay_action(ht_dmq_action_t action, str* htname, str* cname, int type, int_str* val, int mode); int ht_dmq_resp_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void* param);
10 years, 2 months
1
0
0
0
git:alexh/dialog-sync-wip: dialog: DMQ-sync dialogs with peers on startup
by Alex Hermann
Module: sip-router Branch: alexh/dialog-sync-wip Commit: 144737c482d39ae276e8a09f7dea01c5fa685bc6 URL:
http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=144737c…
Author: Alex Hermann <alex(a)speakup.nl> Committer: Alex Hermann <alex(a)speakup.nl> Date: Tue Aug 26 16:26:03 2014 +0200 dialog: DMQ-sync dialogs with peers on startup Use DMQ's init_callback() to request the peers to send all dialogs. --- modules/dialog/dlg_dmq.c | 75 ++++++++++++++++++++++++++++++++++++++- modules/dialog/dlg_dmq.h | 1 + modules/dmq/dmq.c | 9 ++++- modules/dmq/notification_peer.c | 19 ++++++++-- modules/dmq/notification_peer.h | 1 + 5 files changed, 99 insertions(+), 6 deletions(-) diff --git a/modules/dialog/dlg_dmq.c b/modules/dialog/dlg_dmq.c index f659a70..bd43da7 100644 --- a/modules/dialog/dlg_dmq.c +++ b/modules/dialog/dlg_dmq.c @@ -35,6 +35,10 @@ dmq_api_t dlg_dmqb; dmq_peer_t* dlg_dmq_peer = NULL; dmq_resp_cback_t dlg_dmq_resp_callback = {&dlg_dmq_resp_callback_f, 0}; +int dmq_send_all_dlgs(); +int dlg_dmq_request_sync(); + + /** * @brief add notification peer */ @@ -51,7 +55,7 @@ int dlg_dmq_initialize() } not_peer.callback = dlg_dmq_handle_msg; - not_peer.init_callback = NULL; + not_peer.init_callback = dlg_dmq_request_sync; not_peer.description.s = "dialog"; not_peer.description.len = 6; not_peer.peer_id.s = "dialog"; @@ -288,6 +292,10 @@ int dlg_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp) unref++; break; + case DLG_DMQ_SYNC: + dmq_send_all_dlgs(); + break; + case DLG_DMQ_NONE: break; } @@ -314,6 +322,46 @@ error: } +int dlg_dmq_request_sync() { + srjson_doc_t jdoc; + + LM_DBG("requesting sync from dmq peers\n"); + + srjson_InitDoc(&jdoc, NULL); + + jdoc.root = srjson_CreateObject(&jdoc); + if(jdoc.root==NULL) { + LM_ERR("cannot create json root\n"); + goto error; + } + + srjson_AddNumberToObject(&jdoc, jdoc.root, "action", DLG_DMQ_SYNC); + jdoc.buf.s = srjson_PrintUnformatted(&jdoc, jdoc.root); + if(jdoc.buf.s==NULL) { + LM_ERR("unable to serialize data\n"); + goto error; + } + jdoc.buf.len = strlen(jdoc.buf.s); + LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s); + if (dlg_dmq_broadcast(&jdoc.buf)!=0) { + goto error; + } + + jdoc.free_fn(jdoc.buf.s); + jdoc.buf.s = NULL; + srjson_DestroyDoc(&jdoc); + return 0; + +error: + if(jdoc.buf.s!=NULL) { + jdoc.free_fn(jdoc.buf.s); + jdoc.buf.s = NULL; + } + srjson_DestroyDoc(&jdoc); + return -1; +} + + int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg, int needlock) { srjson_doc_t jdoc, prof_jdoc; @@ -391,6 +439,7 @@ int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg, int needl break; case DLG_DMQ_NONE: + case DLG_DMQ_SYNC: break; } if (needlock) @@ -422,6 +471,30 @@ error: } +int dmq_send_all_dlgs() { + int index; + dlg_entry_t entry; + dlg_cell_t *dlg; + + LM_DBG("sending all dialogs \n"); + + for(index = 0; index< d_table->size; index++){ + /* lock the whole entry */ + entry = (d_table->entries)[index]; + dlg_lock( d_table, &entry); + + for(dlg = entry.first; dlg != NULL; dlg = dlg->next){ + dlg->iflags &= ~DLG_IFLAG_DMQ_SYNC; + dlg_dmq_replicate_action(DLG_DMQ_UPDATE, dlg, 0); + } + + dlg_unlock( d_table, &entry); + } + + return 0; +} + + /** * @brief dmq response callback */ diff --git a/modules/dialog/dlg_dmq.h b/modules/dialog/dlg_dmq.h index ac38010..bd13757 100644 --- a/modules/dialog/dlg_dmq.h +++ b/modules/dialog/dlg_dmq.h @@ -38,6 +38,7 @@ typedef enum { DLG_DMQ_UPDATE, DLG_DMQ_STATE, DLG_DMQ_RM, + DLG_DMQ_SYNC, } dlg_dmq_action_t; int dlg_dmq_initialize(); diff --git a/modules/dmq/dmq.c b/modules/dmq/dmq.c index 30405f1..515ba49 100644 --- a/modules/dmq/dmq.c +++ b/modules/dmq/dmq.c @@ -229,7 +229,14 @@ static int mod_init(void) LM_ERR("error in shm_malloc\n"); return -1; } - + + dmq_init_callback_done = shm_malloc(sizeof(int)); + if (!dmq_init_callback_done) { + LM_ERR("no more shm\n"); + return -1; + } + *dmq_init_callback_done = 0; + /** * add the dmq notification peer. * the dmq is a peer itself so that it can receive node notifications diff --git a/modules/dmq/notification_peer.c b/modules/dmq/notification_peer.c index e7704f0..75c1386 100644 --- a/modules/dmq/notification_peer.c +++ b/modules/dmq/notification_peer.c @@ -29,6 +29,9 @@ str notification_content_type = str_init("text/plain"); dmq_resp_cback_t notification_callback = {¬ification_resp_callback_f, 0}; +int *dmq_init_callback_done; + + /** * @brief add notification peer */ @@ -186,7 +189,6 @@ int run_init_callbacks() { */ int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp) { - static int firstrun = 1; int nodes_recv; str* response_body = NULL; int maxforwards = 0; @@ -223,9 +225,9 @@ int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp) ¬ification_callback, maxforwards, ¬ification_content_type); } pkg_free(response_body); - if (firstrun) { + if (!*dmq_init_callback_done) { + *dmq_init_callback_done = 1; run_init_callbacks(); - firstrun = 0; } return 0; error: @@ -312,8 +314,17 @@ int notification_resp_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void* param) { int ret; + int nodes_recv; + LM_DBG("notification_callback_f triggered [%p %d %p]\n", msg, code, param); - if(code == 408) { + if(code == 200) { + nodes_recv = extract_node_list(node_list, msg); + LM_DBG("received %d new or changed nodes\n", nodes_recv); + if (!*dmq_init_callback_done) { + *dmq_init_callback_done = 1; + run_init_callbacks(); + } + } else if(code == 408) { /* deleting node - the server did not respond */ LM_ERR("deleting server %.*s because of failed request\n", STR_FMT(&node->orig_uri)); if (STR_EQ(node->orig_uri, dmq_notification_address)) { diff --git a/modules/dmq/notification_peer.h b/modules/dmq/notification_peer.h index 72df4ec..ff9871d 100644 --- a/modules/dmq/notification_peer.h +++ b/modules/dmq/notification_peer.h @@ -34,6 +34,7 @@ #include "dmq_funcs.h" extern str notification_content_type; +extern int *dmq_init_callback_done; int add_notification_peer(); int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp);
10 years, 2 months
1
0
0
0
git:alexh/dialog-sync-wip: dmq: Add init_callback() to API
by Alex Hermann
Module: sip-router Branch: alexh/dialog-sync-wip Commit: 6ceddd9bf832220b05684ff439eb085b8daff240 URL:
http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=6ceddd9…
Author: Alex Hermann <alex(a)speakup.nl> Committer: Alex Hermann <alex(a)speakup.nl> Date: Tue Aug 26 16:23:21 2014 +0200 dmq: Add init_callback() to API The init_callback is called after DMQ has synced with the notification_peer. This callback can thus be used to send/broadcast messages as early as possible. --- modules/dialog/dlg_dmq.c | 1 + modules/dmq/notification_peer.c | 21 +++++++++++++++++++++ modules/dmq/peer.h | 2 ++ modules/htable/ht_dmq.c | 1 + 4 files changed, 25 insertions(+), 0 deletions(-) diff --git a/modules/dialog/dlg_dmq.c b/modules/dialog/dlg_dmq.c index 2b11453..f659a70 100644 --- a/modules/dialog/dlg_dmq.c +++ b/modules/dialog/dlg_dmq.c @@ -51,6 +51,7 @@ int dlg_dmq_initialize() } not_peer.callback = dlg_dmq_handle_msg; + not_peer.init_callback = NULL; not_peer.description.s = "dialog"; not_peer.description.len = 6; not_peer.peer_id.s = "dialog"; diff --git a/modules/dmq/notification_peer.c b/modules/dmq/notification_peer.c index 2f459e1..e7704f0 100644 --- a/modules/dmq/notification_peer.c +++ b/modules/dmq/notification_peer.c @@ -36,6 +36,7 @@ int add_notification_peer() { dmq_peer_t not_peer; not_peer.callback = dmq_notification_callback; + not_peer.init_callback = NULL; not_peer.description.s = "notification_peer"; not_peer.description.len = 17; not_peer.peer_id.s = "notification_peer"; @@ -165,11 +166,27 @@ error: return -1; } + +int run_init_callbacks() { + dmq_peer_t* crt; + + crt = peer_list->peers; + while(crt) { + if (crt->init_callback) { + crt->init_callback(); + } + crt = crt->next; + } + return 0; +} + + /** * @brief dmq notification callback */ int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp) { + static int firstrun = 1; int nodes_recv; str* response_body = NULL; int maxforwards = 0; @@ -206,6 +223,10 @@ int dmq_notification_callback(struct sip_msg* msg, peer_reponse_t* resp) ¬ification_callback, maxforwards, ¬ification_content_type); } pkg_free(response_body); + if (firstrun) { + run_init_callbacks(); + firstrun = 0; + } return 0; error: return -1; diff --git a/modules/dmq/peer.h b/modules/dmq/peer.h index 7ce4434..bd1a15d 100644 --- a/modules/dmq/peer.h +++ b/modules/dmq/peer.h @@ -42,11 +42,13 @@ typedef struct peer_response { } peer_reponse_t; typedef int(*peer_callback_t)(struct sip_msg*, peer_reponse_t* resp); +typedef int(*init_callback_t)(); typedef struct dmq_peer { str peer_id; str description; peer_callback_t callback; + init_callback_t init_callback; struct dmq_peer* next; } dmq_peer_t; diff --git a/modules/htable/ht_dmq.c b/modules/htable/ht_dmq.c index 284c3dc..21755a7 100644 --- a/modules/htable/ht_dmq.c +++ b/modules/htable/ht_dmq.c @@ -59,6 +59,7 @@ int ht_dmq_initialize() } not_peer.callback = ht_dmq_handle_msg; + not_peer.init_callback = NULL; not_peer.description.s = "htable"; not_peer.description.len = 6; not_peer.peer_id.s = "htable";
10 years, 2 months
1
0
0
0
git:alexh/dialog-sync-wip: dmq: Don' t delete the original notification_peer on timeout
by Alex Hermann
Module: sip-router Branch: alexh/dialog-sync-wip Commit: e73aa4032873b3fca88bb809d6d4ce4f28c0f237 URL:
http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=e73aa40…
Author: Alex Hermann <alex(a)speakup.nl> Committer: Alex Hermann <alex(a)speakup.nl> Date: Tue Aug 26 16:37:08 2014 +0200 dmq: Don't delete the original notification_peer on timeout --- modules/dmq/notification_peer.c | 5 ++++- 1 files changed, 4 insertions(+), 1 deletions(-) diff --git a/modules/dmq/notification_peer.c b/modules/dmq/notification_peer.c index 4a76934..2f459e1 100644 --- a/modules/dmq/notification_peer.c +++ b/modules/dmq/notification_peer.c @@ -295,9 +295,12 @@ int notification_resp_callback_f(struct sip_msg* msg, int code, if(code == 408) { /* deleting node - the server did not respond */ LM_ERR("deleting server %.*s because of failed request\n", STR_FMT(&node->orig_uri)); + if (STR_EQ(node->orig_uri, dmq_notification_address)) { + LM_ERR("not deleting notification_peer\n"); + return 0; + } ret = del_dmq_node(node_list, node); LM_DBG("del_dmq_node returned %d\n", ret); } return 0; } -
10 years, 2 months
1
0
0
0
git:alexh/dialog-sync-wip: dialog: Only allow strictly increasing states in DMQ sync
by Alex Hermann
Module: sip-router Branch: alexh/dialog-sync-wip Commit: 4a5e99771f0b76b6ddd7ecd129f46a73a3e0d1e8 URL:
http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=4a5e997…
Author: Alex Hermann <alex(a)speakup.nl> Committer: Alex Hermann <alex(a)speakup.nl> Date: Tue Aug 26 18:51:02 2014 +0200 dialog: Only allow strictly increasing states in DMQ sync Catches most out-of-order sync messages. --- modules/dialog/dlg_dmq.c | 8 +++++++- modules/dialog/dlg_handlers.c | 6 +++--- modules/dialog/dlg_hash.h | 3 ++- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/modules/dialog/dlg_dmq.c b/modules/dialog/dlg_dmq.c index 3ea532e..2b11453 100644 --- a/modules/dialog/dlg_dmq.c +++ b/modules/dialog/dlg_dmq.c @@ -212,7 +212,7 @@ int dlg_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp) dlg_json_to_profiles(dlg, &prof_jdoc); srjson_DestroyDoc(&prof_jdoc); } - if (dlg->state == state) { + if (state == dlg->state) { break; } /* intentional fallthrough */ @@ -222,6 +222,12 @@ int dlg_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp) LM_ERR("dialog [%u:%u] not found\n", iuid.h_entry, iuid.h_id); goto error; } + if (state < dlg->state) { + LM_NOTICE("Ignoring backwards state change on dlg [%u:%u] with callid [%.*s] from state [%u] to state [%u]\n", + iuid.h_entry, iuid.h_id, + dlg->callid.len, dlg->callid.s, dlg->state, state); + break; + } LM_DBG("State update dlg [%u:%u] with callid [%.*s] from state [%u] to state [%u]\n", iuid.h_entry, iuid.h_id, dlg->callid.len, dlg->callid.s, dlg->state, state); switch (state) { diff --git a/modules/dialog/dlg_handlers.c b/modules/dialog/dlg_handlers.c index af5385b..29e1350 100644 --- a/modules/dialog/dlg_handlers.c +++ b/modules/dialog/dlg_handlers.c @@ -570,7 +570,7 @@ static void dlg_onreply(struct cell* t, int type, struct tmcb_params *param) if (unref) dlg_unref(dlg, unref); done: - if(dlg_enable_dmq && (dlg->iflags & DLG_IFLAG_DMQ_SYNC) && new_state!=old_state) { + if(dlg_enable_dmq && (dlg->iflags & DLG_IFLAG_DMQ_SYNC) && new_state>old_state) { dlg_dmq_replicate_action(DLG_DMQ_STATE, dlg, 0); } @@ -1393,7 +1393,7 @@ void dlg_onroute(struct sip_msg* req, str *route_params, void *param) } done: - if(dlg_enable_dmq && (dlg->iflags & DLG_IFLAG_DMQ_SYNC) && new_state!=old_state) { + if(dlg_enable_dmq && (dlg->iflags & DLG_IFLAG_DMQ_SYNC) && new_state>old_state) { dlg_dmq_replicate_action(DLG_DMQ_STATE, dlg, 0); } @@ -1468,7 +1468,7 @@ void dlg_ontimeout(struct dlg_tl *tl) dlg_unref(dlg, 1); } - if(dlg_enable_dmq && (dlg->iflags & DLG_IFLAG_DMQ_SYNC) && new_state!=old_state) { + if(dlg_enable_dmq && (dlg->iflags & DLG_IFLAG_DMQ_SYNC) && new_state>old_state) { dlg_dmq_replicate_action(DLG_DMQ_STATE, dlg, 0); } diff --git a/modules/dialog/dlg_hash.h b/modules/dialog/dlg_hash.h index 66b86e0..426bad6 100644 --- a/modules/dialog/dlg_hash.h +++ b/modules/dialog/dlg_hash.h @@ -50,7 +50,8 @@ #include "dlg_cb.h" -/* states of a dialog */ +/* states of a dialog + * order is important, numbering must represent normal state stange flow */ #define DLG_STATE_UNCONFIRMED 1 /*!< unconfirmed dialog */ #define DLG_STATE_EARLY 2 /*!< early dialog */ #define DLG_STATE_CONFIRMED_NA 3 /*!< confirmed dialog without a ACK yet */
10 years, 2 months
1
0
0
0
git:alexh/dialog-sync-wip: dialog: Don' t try to DMQ-sync non-synced dlgs on destroy
by Alex Hermann
Module: sip-router Branch: alexh/dialog-sync-wip Commit: ed3d8ffe3fa256be871d32b0e8cb9a7c541df36e URL:
http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=ed3d8ff…
Author: Alex Hermann <alex(a)speakup.nl> Committer: Alex Hermann <alex(a)speakup.nl> Date: Tue Aug 26 16:36:15 2014 +0200 dialog: Don't try to DMQ-sync non-synced dlgs on destroy --- modules/dialog/dlg_hash.c | 4 ++-- 1 files changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/dialog/dlg_hash.c b/modules/dialog/dlg_hash.c index 169bef6..58abd24 100644 --- a/modules/dialog/dlg_hash.c +++ b/modules/dialog/dlg_hash.c @@ -398,7 +398,7 @@ inline void destroy_dlg(struct dlg_cell *dlg) run_dlg_callbacks( DLGCB_DESTROY , dlg, NULL, NULL, DLG_DIR_NONE, 0); - if (dlg_enable_dmq) + if (dlg_enable_dmq && (dlg->iflags & DLG_IFLAG_DMQ_SYNC)) dlg_dmq_replicate_action(DLG_DMQ_RM, dlg, 0); /* delete the dialog from DB*/ @@ -462,9 +462,9 @@ void destroy_dlg_table(void) while (dlg) { l_dlg = dlg; dlg = dlg->next; + l_dlg->iflags &= ~DLG_IFLAG_DMQ_SYNC; destroy_dlg(l_dlg); } - } shm_free(d_table);
10 years, 2 months
1
0
0
0
git:alexh/dialog-sync-wip: dialog: Unset DMQ_SYNC flag upon restore from DB on another host
by Alex Hermann
Module: sip-router Branch: alexh/dialog-sync-wip Commit: b5ca1979ccc2d39f4c905fc036b9b4f321e20325 URL:
http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=b5ca197…
Author: Alex Hermann <alex(a)speakup.nl> Committer: Alex Hermann <alex(a)speakup.nl> Date: Fri Aug 22 15:30:07 2014 +0200 dialog: Unset DMQ_SYNC flag upon restore from DB on another host Don't try to sync non-local dialogs via DMQ after DB restore. --- modules/dialog/dlg_db_handler.c | 4 ++++ 1 files changed, 4 insertions(+), 0 deletions(-) diff --git a/modules/dialog/dlg_db_handler.c b/modules/dialog/dlg_db_handler.c index fcc04e8..9a4d1c8 100644 --- a/modules/dialog/dlg_db_handler.c +++ b/modules/dialog/dlg_db_handler.c @@ -422,6 +422,10 @@ static int load_dialog_info_from_db(int dlg_hash_size, int fetch_num_rows) srjson_DestroyDoc(&jdoc); } dlg->iflags = (unsigned int)VAL_INT(values+22); + if (!dlg->bind_addr[DLG_CALLER_LEG] || !dlg->bind_addr[DLG_CALLEE_LEG]) { + /* non-local socket, probably not our dialog */ + dlg->iflags &= ~DLG_IFLAG_DMQ_SYNC; + } /*restore the timer values */ if (0 != insert_dlg_timer( &(dlg->tl), (int)dlg->tl.timeout )) { LM_CRIT("Unable to insert dlg %p [%u:%u] "
10 years, 2 months
1
0
0
0
git:alexh/dialog-sync-wip: dialog: Correctly detect and handle state in DLG_DMQ_UPDATE
by Alex Hermann
Module: sip-router Branch: alexh/dialog-sync-wip Commit: f42a9fbf0a1206c5e3ee0eadb591495526354ca5 URL:
http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=f42a9fb…
Author: Alex Hermann <alex(a)speakup.nl> Committer: Alex Hermann <alex(a)speakup.nl> Date: Tue Aug 26 16:28:29 2014 +0200 dialog: Correctly detect and handle state in DLG_DMQ_UPDATE --- modules/dialog/dlg_dmq.c | 7 ++++--- 1 files changed, 4 insertions(+), 3 deletions(-) diff --git a/modules/dialog/dlg_dmq.c b/modules/dialog/dlg_dmq.c index c8ea38c..3ea532e 100644 --- a/modules/dialog/dlg_dmq.c +++ b/modules/dialog/dlg_dmq.c @@ -97,7 +97,7 @@ int dlg_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp) str profiles = {0, 0}, callid = {0, 0}; str dummy = {0, 0}; unsigned int init_ts = 0, start_ts = 0, lifetime = 0; - unsigned int state = 0; + unsigned int state = 1; /* received dmq message */ LM_DBG("dmq message received\n"); @@ -194,8 +194,6 @@ int dlg_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp) dlg->h_id = iuid.h_id; /* prevent DB sync */ dlg->dflags &= ~(DLG_FLAG_NEW|DLG_FLAG_CHANGED); - dlg->init_ts = init_ts; - dlg->start_ts = start_ts; } else { /* remove existing profiles */ if (dlg->profile_links!=NULL) { @@ -204,6 +202,9 @@ int dlg_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp) } } + dlg->init_ts = init_ts; + dlg->start_ts = start_ts; + /* add profiles */ if(profiles.s!=NULL) { srjson_InitDoc(&prof_jdoc, NULL);
10 years, 2 months
1
0
0
0
← Newer
1
...
28
29
30
31
32
33
34
...
38
Older →
Jump to page:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
Results per page:
10
25
50
100
200