Module: sip-router Branch: mariusbucur/dmq Commit: 04fa5b387dd48f3751671b87e321438119f34d96 URL: http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=04fa5b38...
Author: Marius Bucur marius.bucur@1and1.ro Committer: Marius Bucur marius.bucur@1and1.ro Date: Fri Apr 8 19:33:11 2011 +0300
added dmq notification peer changes
---
modules_k/dmq/dmq.c | 14 ++++++++++++-- modules_k/dmq/dmq.h | 1 + modules_k/dmq/peer.c | 11 +++++++++++ modules_k/dmq/peer.h | 1 + modules_k/dmq/worker.c | 3 ++- 5 files changed, 27 insertions(+), 3 deletions(-)
diff --git a/modules_k/dmq/dmq.c b/modules_k/dmq/dmq.c index 90b0ac2..7920361 100644 --- a/modules_k/dmq/dmq.c +++ b/modules_k/dmq/dmq.c @@ -50,6 +50,7 @@ #include "peer.h" #include "bind_dmq.h" #include "worker.h" +#include "notification_peer.h" #include "../../mod_fix.h"
static int mod_init(void); @@ -75,6 +76,8 @@ sl_api_t slb; /** module variables */ dmq_worker_t* workers; dmq_peer_list_t* peer_list; +// the dmq module is a peer itself for receiving notifications regarding nodes +dmq_peer_t dmq_notification_peer;
/** module functions */ static int mod_init(void); @@ -144,7 +147,7 @@ static int mod_init(void) { /* load peer list - the list containing the module callbacks for dmq */ peer_list = init_peer_list(); - /* register worker processes */ + /* register worker processes - add one because of the ping process */ register_procs(num_workers); /* allocate workers array */ @@ -154,12 +157,14 @@ static int mod_init(void) { return -1; } + /* add first dmq peer - the dmq module itself to receive peer notify messages */ + startup_time = (int) time(NULL); return 0; }
/** - * Initialize children + * initialize children */ static int child_init(int rank) { int i, newpid; @@ -179,6 +184,11 @@ static int child_init(int rank) { workers[i].pid = newpid; } } + /** + * add the dmq notification peer. + * the dmq is a peer itself so that it can receive node notifications + */ + add_notification_peer(); return 0; } if(rank == PROC_INIT || rank == PROC_TCP_MAIN) { diff --git a/modules_k/dmq/dmq.h b/modules_k/dmq/dmq.h index 3bd9c84..6d0aac9 100644 --- a/modules_k/dmq/dmq.h +++ b/modules_k/dmq/dmq.h @@ -12,6 +12,7 @@
extern int num_workers; extern dmq_worker_t* workers; +extern dmq_peer_t dmq_notification_peer;
static inline int dmq_load_api(dmq_api_t* api) { bind_dmq_f binddmq; diff --git a/modules_k/dmq/peer.c b/modules_k/dmq/peer.c index 1ec177f..4215289 100644 --- a/modules_k/dmq/peer.c +++ b/modules_k/dmq/peer.c @@ -3,6 +3,7 @@ dmq_peer_list_t* init_peer_list() { dmq_peer_list_t* peer_list = shm_malloc(sizeof(dmq_peer_list_t)); memset(peer_list, 0, sizeof(dmq_peer_list_t)); + lock_init(&peer_list->lock); return peer_list; }
@@ -15,6 +16,7 @@ dmq_peer_t* search_peer_list(dmq_peer_list_t* peer_list, dmq_peer_t* peer) { if(strncasecmp(cur->peer_id.s, peer->peer_id.s, len) == 0) { return cur; } + cur = cur->next; } return 0; } @@ -22,17 +24,26 @@ dmq_peer_t* search_peer_list(dmq_peer_list_t* peer_list, dmq_peer_t* peer) { void add_peer(dmq_peer_list_t* peer_list, dmq_peer_t* peer) { dmq_peer_t* new_peer = shm_malloc(sizeof(dmq_peer_t)); *new_peer = *peer; + + /* copy the str's */ + new_peer->peer_id.s = shm_malloc(peer->peer_id.len); + memcpy(new_peer->peer_id.s, peer->peer_id.s, peer->peer_id.len); + new_peer->description.s = shm_malloc(peer->description.len); + memcpy(new_peer->peer_id.s, peer->peer_id.s, peer->peer_id.len); + new_peer->next = peer_list->peers; peer_list->peers = new_peer; }
int register_dmq_peer(dmq_peer_t* peer) { + lock_get(&peer_list->lock); if(search_peer_list(peer_list, peer)) { LM_ERR("peer already exists: %.*s %.*s\n", peer->peer_id.len, peer->peer_id.s, peer->description.len, peer->description.s); return -1; } add_peer(peer_list, peer); + lock_release(&peer_list->lock); return 0; }
diff --git a/modules_k/dmq/peer.h b/modules_k/dmq/peer.h index 72c851f..b9a09ee 100644 --- a/modules_k/dmq/peer.h +++ b/modules_k/dmq/peer.h @@ -18,6 +18,7 @@ typedef struct dmq_peer { } dmq_peer_t;
typedef struct dmq_peer_list { + gen_lock_t lock; dmq_peer_t* peers; int count; } dmq_peer_list_t; diff --git a/modules_k/dmq/worker.c b/modules_k/dmq/worker.c index 6eaae21..3a3005c 100644 --- a/modules_k/dmq/worker.c +++ b/modules_k/dmq/worker.c @@ -49,7 +49,8 @@ int add_dmq_job(struct sip_msg* msg, dmq_peer_t* peer) { } } if(!found_available) { - LM_DBG("no available worker found, passing job to the least busy one\n"); + LM_DBG("no available worker found, passing job to the least busy one [%d %d]\n", + worker->pid, job_queue_size(worker->queue)); } job_queue_push(worker->queue, &new_job); lock_release(&worker->lock);