Module: sip-router
Branch: mariusbucur/dmq
Commit: ba31d863c9475bafc7d6073e3a6ebdd0b40e207a
URL:
http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=ba31d86…
Author: Marius Bucur <marius.bucur(a)1and1.ro>
Committer: Marius Bucur <marius.bucur(a)1and1.ro>
Date: Wed Apr 6 19:30:31 2011 +0300
added support for binding the dmq module within another module.
also, finished the implementation for the dmq worker queues
---
modules_k/dmq/bind_dmq.c | 7 +++
modules_k/dmq/message.c | 29 +++++++++++
modules_k/dmq/message.h | 2 +
modules_k/dmq/peer.c | 43 ++++++++++++++++
modules_k/dmq/peer.h | 35 +++++++++++++
modules_k/dmq/worker.c | 120 ++++++++++++++++++++++++++++++++++++++++++++++
modules_k/dmq/worker.h | 43 ++++++++++++++++
7 files changed, 279 insertions(+), 0 deletions(-)
diff --git a/modules_k/dmq/bind_dmq.c b/modules_k/dmq/bind_dmq.c
new file mode 100644
index 0000000..6744753
--- /dev/null
+++ b/modules_k/dmq/bind_dmq.c
@@ -0,0 +1,7 @@
+#include "bind_dmq.h"
+#include "peer.h"
+
+int bind_dmq(dmq_api_t* api) {
+ api->register_dmq_peer = register_dmq_peer;
+ return 0;
+}
\ No newline at end of file
diff --git a/modules_k/dmq/message.c b/modules_k/dmq/message.c
new file mode 100644
index 0000000..55f385c
--- /dev/null
+++ b/modules_k/dmq/message.c
@@ -0,0 +1,29 @@
+#include "../../parser/parse_to.h"
+#include "../../parser/parse_uri.h"
+#include "../../parser/parse_content.h"
+#include "../../parser/parse_from.h"
+#include "../../ut.h"
+#include "worker.h"
+#include "peer.h"
+#include "message.h"
+
+int handle_dmq_message(struct sip_msg* msg, char* str1, char* str2) {
+ dmq_peer_t* peer;
+ if ((parse_sip_msg_uri(msg) < 0) || (!msg->parsed_uri.user.s)) {
+ LM_ERR("cannot parse msg URI\n");
+ return -1;
+ }
+ LM_DBG("handle_dmq_message [%.*s %.*s] [%s %s]\n",
+ msg->first_line.u.request.method.len, msg->first_line.u.request.method.s,
+ msg->first_line.u.request.uri.len, msg->first_line.u.request.uri.s,
+ ZSW(str1), ZSW(str2));
+ /* the peer id is given as the userinfo part of the request URI */
+ peer = find_peer(msg->parsed_uri.user);
+ if(!peer) {
+ LM_DBG("no peer found for %.*s\n", msg->parsed_uri.user.len,
msg->parsed_uri.user.s);
+ return 0;
+ }
+ LM_DBG("handle_dmq_message peer found: %.*s\n", msg->parsed_uri.user.len,
msg->parsed_uri.user.s);
+ add_dmq_job(msg, peer);
+ return 0;
+}
\ No newline at end of file
diff --git a/modules_k/dmq/message.h b/modules_k/dmq/message.h
new file mode 100644
index 0000000..7e0cb95
--- /dev/null
+++ b/modules_k/dmq/message.h
@@ -0,0 +1,2 @@
+
+int handle_dmq_message(struct sip_msg*, char*, char*);
\ No newline at end of file
diff --git a/modules_k/dmq/peer.c b/modules_k/dmq/peer.c
new file mode 100644
index 0000000..1ec177f
--- /dev/null
+++ b/modules_k/dmq/peer.c
@@ -0,0 +1,43 @@
+#include "peer.h"
+
+dmq_peer_list_t* init_peer_list() {
+ dmq_peer_list_t* peer_list = shm_malloc(sizeof(dmq_peer_list_t));
+ memset(peer_list, 0, sizeof(dmq_peer_list_t));
+ return peer_list;
+}
+
+dmq_peer_t* search_peer_list(dmq_peer_list_t* peer_list, dmq_peer_t* peer) {
+ dmq_peer_t* cur = peer_list->peers;
+ int len;
+ while(cur) {
+ /* len - the minimum length of the two strings */
+ len = cur->peer_id.len < peer->peer_id.len ?
cur->peer_id.len:peer->peer_id.len;
+ if(strncasecmp(cur->peer_id.s, peer->peer_id.s, len) == 0) {
+ return cur;
+ }
+ }
+ return 0;
+}
+
+void add_peer(dmq_peer_list_t* peer_list, dmq_peer_t* peer) {
+ dmq_peer_t* new_peer = shm_malloc(sizeof(dmq_peer_t));
+ *new_peer = *peer;
+ new_peer->next = peer_list->peers;
+ peer_list->peers = new_peer;
+}
+
+int register_dmq_peer(dmq_peer_t* peer) {
+ if(search_peer_list(peer_list, peer)) {
+ LM_ERR("peer already exists: %.*s %.*s\n", peer->peer_id.len,
peer->peer_id.s,
+ peer->description.len, peer->description.s);
+ return -1;
+ }
+ add_peer(peer_list, peer);
+ return 0;
+}
+
+dmq_peer_t* find_peer(str peer_id) {
+ dmq_peer_t foo_peer;
+ foo_peer.peer_id = peer_id;
+ return search_peer_list(peer_list, &foo_peer);
+}
\ No newline at end of file
diff --git a/modules_k/dmq/peer.h b/modules_k/dmq/peer.h
new file mode 100644
index 0000000..72c851f
--- /dev/null
+++ b/modules_k/dmq/peer.h
@@ -0,0 +1,35 @@
+#ifndef PEER_H
+#define PEER_H
+
+#include <string.h>
+#include <stdlib.h>
+#include "../../str.h"
+#include "../../mem/mem.h"
+#include "../../mem/shm_mem.h"
+#include "../../parser/msg_parser.h"
+
+typedef int(*peer_callback_t)(struct sip_msg*);
+
+typedef struct dmq_peer {
+ str peer_id;
+ str description;
+ peer_callback_t callback;
+ struct dmq_peer* next;
+} dmq_peer_t;
+
+typedef struct dmq_peer_list {
+ dmq_peer_t* peers;
+ int count;
+} dmq_peer_list_t;
+
+extern dmq_peer_list_t* peer_list;
+
+dmq_peer_list_t* init_peer_list();
+dmq_peer_t* search_peer_list(dmq_peer_list_t* peer_list, dmq_peer_t* peer);
+typedef int (*register_dmq_peer_t)(dmq_peer_t*);
+
+int register_dmq_peer(dmq_peer_t* peer);
+dmq_peer_t* find_peer(str peer_id);
+
+
+#endif
\ No newline at end of file
diff --git a/modules_k/dmq/worker.c b/modules_k/dmq/worker.c
new file mode 100644
index 0000000..6eaae21
--- /dev/null
+++ b/modules_k/dmq/worker.c
@@ -0,0 +1,120 @@
+#include "dmq.h"
+#include "worker.h"
+
+void worker_loop(int id) {
+ dmq_worker_t* worker = &workers[id];
+ dmq_job_t* current_job;
+ int ret_value;
+ for(;;) {
+ LM_DBG("dmq_worker [%d %d] getting lock\n", id, my_pid());
+ lock_get(&worker->lock);
+ LM_DBG("dmq_worker [%d %d] lock acquired\n", id, my_pid());
+ /* multiple lock_release calls might be performed, so remove from queue until empty */
+ do {
+ current_job = job_queue_pop(worker->queue);
+ /* job_queue_pop might return NULL if queue is empty */
+ if(current_job) {
+ ret_value = current_job->f(current_job->msg);
+ if(ret_value < 0) {
+ LM_ERR("running job failed\n");
+ }
+ shm_free(current_job);
+ worker->jobs_processed++;
+ }
+ } while(job_queue_size(worker->queue) > 0);
+ }
+}
+
+int add_dmq_job(struct sip_msg* msg, dmq_peer_t* peer) {
+ int i, found_available = 0;
+ dmq_job_t new_job;
+ dmq_worker_t* worker;
+ new_job.f = peer->callback;
+ new_job.msg = msg;
+ new_job.orig_peer = peer;
+ if(!num_workers) {
+ LM_ERR("error in add_dmq_job no workers spawned\n");
+ return -1;
+ }
+ /* initialize the worker with the first one */
+ worker = workers;
+ /* search for an available worker, or, if not possible, for the least busy one */
+ for(i = 0; i < num_workers; i++) {
+ if(job_queue_size(workers[i].queue) == 0) {
+ worker = &workers[i];
+ found_available = 1;
+ break;
+ } else if(job_queue_size(workers[i].queue) < job_queue_size(worker->queue)) {
+ worker = &workers[i];
+ }
+ }
+ if(!found_available) {
+ LM_DBG("no available worker found, passing job to the least busy one\n");
+ }
+ job_queue_push(worker->queue, &new_job);
+ lock_release(&worker->lock);
+ return 0;
+}
+
+void init_worker(dmq_worker_t* worker) {
+ memset(worker, 0, sizeof(*worker));
+ lock_init(&worker->lock);
+ // acquire the lock for the first time - so that dmq_worker_loop blocks
+ lock_get(&worker->lock);
+ worker->queue = alloc_job_queue();
+}
+
+job_queue_t* alloc_job_queue() {
+ job_queue_t* queue = shm_malloc(sizeof(job_queue_t));
+ atomic_set(&queue->count, 0);
+ queue->front = NULL;
+ queue->back = NULL;
+ lock_init(&queue->lock);
+ return queue;
+}
+
+void destroy_job_queue(job_queue_t* queue) {
+ shm_free(queue);
+}
+
+int job_queue_size(job_queue_t* queue) {
+ return atomic_get(&queue->count);
+}
+
+void job_queue_push(job_queue_t* queue, dmq_job_t* job) {
+ /* we need to copy the dmq_job into a newly created dmq_job in shm */
+ dmq_job_t* newjob = shm_malloc(sizeof(dmq_job_t));
+ *newjob = *job;
+
+ lock_get(&queue->lock);
+ newjob->prev = NULL;
+ newjob->next = queue->back;
+ if(queue->back) {
+ queue->back->prev = newjob;
+ }
+ queue->back = newjob;
+ if(!queue->front) {
+ queue->front = newjob;
+ }
+ atomic_inc(&queue->count);
+ lock_release(&queue->lock);
+}
+dmq_job_t* job_queue_pop(job_queue_t* queue) {
+ dmq_job_t* front;
+ lock_get(&queue->lock);
+ if(!queue->front) {
+ lock_release(&queue->lock);
+ return NULL;
+ }
+ front = queue->front;
+ if(front->prev) {
+ queue->front = front->prev;
+ front->prev->next = NULL;
+ } else {
+ queue->front = NULL;
+ queue->back = NULL;
+ }
+ atomic_dec(&queue->count);
+ lock_release(&queue->lock);
+ return front;
+}
\ No newline at end of file
diff --git a/modules_k/dmq/worker.h b/modules_k/dmq/worker.h
new file mode 100644
index 0000000..61eda09
--- /dev/null
+++ b/modules_k/dmq/worker.h
@@ -0,0 +1,43 @@
+#ifndef DMQ_WORKER_H
+#define DMQ_WORKER_H
+
+#include "peer.h"
+#include "../../locking.h"
+#include "../../atomic_ops.h"
+#include "../../parser/msg_parser.h"
+
+typedef struct dmq_job {
+ peer_callback_t f;
+ struct sip_msg* msg;
+ dmq_peer_t* orig_peer;
+ struct dmq_job* next;
+ struct dmq_job* prev;
+} dmq_job_t;
+
+typedef struct job_queue {
+ atomic_t count;
+ struct dmq_job* back;
+ struct dmq_job* front;
+ gen_lock_t lock;
+} job_queue_t;
+
+struct dmq_worker {
+ job_queue_t* queue;
+ int jobs_processed;
+ gen_lock_t lock;
+ int pid;
+};
+
+typedef struct dmq_worker dmq_worker_t;
+
+void init_worker(dmq_worker_t* worker);
+int add_dmq_job(struct sip_msg*, dmq_peer_t*);
+void worker_loop(int id);
+
+job_queue_t* alloc_job_queue();
+void destroy_job_queue(job_queue_t* queue);
+void job_queue_push(job_queue_t* queue, dmq_job_t* job);
+dmq_job_t* job_queue_pop(job_queue_t* queue);
+int job_queue_size(job_queue_t* queue);
+
+#endif
\ No newline at end of file