Module: sip-router
Branch: mariusbucur/dmq
Commit: 04fa5b387dd48f3751671b87e321438119f34d96
URL:
http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=04fa5b3…
Author: Marius Bucur <marius.bucur(a)1and1.ro>
Committer: Marius Bucur <marius.bucur(a)1and1.ro>
Date: Fri Apr 8 19:33:11 2011 +0300
added dmq notification peer changes
---
modules_k/dmq/dmq.c | 14 ++++++++++++--
modules_k/dmq/dmq.h | 1 +
modules_k/dmq/peer.c | 11 +++++++++++
modules_k/dmq/peer.h | 1 +
modules_k/dmq/worker.c | 3 ++-
5 files changed, 27 insertions(+), 3 deletions(-)
diff --git a/modules_k/dmq/dmq.c b/modules_k/dmq/dmq.c
index 90b0ac2..7920361 100644
--- a/modules_k/dmq/dmq.c
+++ b/modules_k/dmq/dmq.c
@@ -50,6 +50,7 @@
#include "peer.h"
#include "bind_dmq.h"
#include "worker.h"
+#include "notification_peer.h"
#include "../../mod_fix.h"
static int mod_init(void);
@@ -75,6 +76,8 @@ sl_api_t slb;
/** module variables */
dmq_worker_t* workers;
dmq_peer_list_t* peer_list;
+// the dmq module is a peer itself for receiving notifications regarding nodes
+dmq_peer_t dmq_notification_peer;
/** module functions */
static int mod_init(void);
@@ -144,7 +147,7 @@ static int mod_init(void) {
/* load peer list - the list containing the module callbacks for dmq */
peer_list = init_peer_list();
- /* register worker processes */
+ /* register worker processes - add one because of the ping process */
register_procs(num_workers);
/* allocate workers array */
@@ -154,12 +157,14 @@ static int mod_init(void) {
return -1;
}
+ /* add first dmq peer - the dmq module itself to receive peer notify messages */
+
startup_time = (int) time(NULL);
return 0;
}
/**
- * Initialize children
+ * initialize children
*/
static int child_init(int rank) {
int i, newpid;
@@ -179,6 +184,11 @@ static int child_init(int rank) {
workers[i].pid = newpid;
}
}
+ /**
+ * add the dmq notification peer.
+ * the dmq is a peer itself so that it can receive node notifications
+ */
+ add_notification_peer();
return 0;
}
if(rank == PROC_INIT || rank == PROC_TCP_MAIN) {
diff --git a/modules_k/dmq/dmq.h b/modules_k/dmq/dmq.h
index 3bd9c84..6d0aac9 100644
--- a/modules_k/dmq/dmq.h
+++ b/modules_k/dmq/dmq.h
@@ -12,6 +12,7 @@
extern int num_workers;
extern dmq_worker_t* workers;
+extern dmq_peer_t dmq_notification_peer;
static inline int dmq_load_api(dmq_api_t* api) {
bind_dmq_f binddmq;
diff --git a/modules_k/dmq/peer.c b/modules_k/dmq/peer.c
index 1ec177f..4215289 100644
--- a/modules_k/dmq/peer.c
+++ b/modules_k/dmq/peer.c
@@ -3,6 +3,7 @@
dmq_peer_list_t* init_peer_list() {
dmq_peer_list_t* peer_list = shm_malloc(sizeof(dmq_peer_list_t));
memset(peer_list, 0, sizeof(dmq_peer_list_t));
+ lock_init(&peer_list->lock);
return peer_list;
}
@@ -15,6 +16,7 @@ dmq_peer_t* search_peer_list(dmq_peer_list_t* peer_list, dmq_peer_t*
peer) {
if(strncasecmp(cur->peer_id.s, peer->peer_id.s, len) == 0) {
return cur;
}
+ cur = cur->next;
}
return 0;
}
@@ -22,17 +24,26 @@ dmq_peer_t* search_peer_list(dmq_peer_list_t* peer_list, dmq_peer_t*
peer) {
void add_peer(dmq_peer_list_t* peer_list, dmq_peer_t* peer) {
dmq_peer_t* new_peer = shm_malloc(sizeof(dmq_peer_t));
*new_peer = *peer;
+
+ /* copy the str's */
+ new_peer->peer_id.s = shm_malloc(peer->peer_id.len);
+ memcpy(new_peer->peer_id.s, peer->peer_id.s, peer->peer_id.len);
+ new_peer->description.s = shm_malloc(peer->description.len);
+ memcpy(new_peer->peer_id.s, peer->peer_id.s, peer->peer_id.len);
+
new_peer->next = peer_list->peers;
peer_list->peers = new_peer;
}
int register_dmq_peer(dmq_peer_t* peer) {
+ lock_get(&peer_list->lock);
if(search_peer_list(peer_list, peer)) {
LM_ERR("peer already exists: %.*s %.*s\n", peer->peer_id.len,
peer->peer_id.s,
peer->description.len, peer->description.s);
return -1;
}
add_peer(peer_list, peer);
+ lock_release(&peer_list->lock);
return 0;
}
diff --git a/modules_k/dmq/peer.h b/modules_k/dmq/peer.h
index 72c851f..b9a09ee 100644
--- a/modules_k/dmq/peer.h
+++ b/modules_k/dmq/peer.h
@@ -18,6 +18,7 @@ typedef struct dmq_peer {
} dmq_peer_t;
typedef struct dmq_peer_list {
+ gen_lock_t lock;
dmq_peer_t* peers;
int count;
} dmq_peer_list_t;
diff --git a/modules_k/dmq/worker.c b/modules_k/dmq/worker.c
index 6eaae21..3a3005c 100644
--- a/modules_k/dmq/worker.c
+++ b/modules_k/dmq/worker.c
@@ -49,7 +49,8 @@ int add_dmq_job(struct sip_msg* msg, dmq_peer_t* peer) {
}
}
if(!found_available) {
- LM_DBG("no available worker found, passing job to the least busy one\n");
+ LM_DBG("no available worker found, passing job to the least busy one [%d
%d]\n",
+ worker->pid, job_queue_size(worker->queue));
}
job_queue_push(worker->queue, &new_job);
lock_release(&worker->lock);