Module: sip-router
Branch: mariusbucur/dmq
Commit: 78eea80ffdaf09dd6d247455d77c47a64eacae48
URL:
http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=78eea80…
Author: Marius Bucur <marius.bucur(a)1and1.ro>
Committer: Marius Bucur <marius.bucur(a)1and1.ro>
Date: Wed Apr 6 20:12:00 2011 +0300
temporarily changed the presence module to test dmq functionality
---
modules_k/dmq/bind_dmq.h | 14 ++++++++
modules_k/dmq/dmq.c | 71 ++++++++++++++++++++++++-----------------
modules_k/dmq/dmq.h | 30 +++++++++++++++++
modules_k/dmq/dmq_worker.h | 8 -----
modules_k/presence/presence.c | 29 +++++++++++++++++
5 files changed, 115 insertions(+), 37 deletions(-)
diff --git a/modules_k/dmq/bind_dmq.h b/modules_k/dmq/bind_dmq.h
index e69de29..9c515a4 100644
--- a/modules_k/dmq/bind_dmq.h
+++ b/modules_k/dmq/bind_dmq.h
@@ -0,0 +1,14 @@
+#ifndef BIND_DMQ_H
+#define BIND_DMQ_H
+
+#include "peer.h"
+
+typedef struct dmq_api {
+ register_dmq_peer_t register_dmq_peer;
+} dmq_api_t;
+
+typedef int (*bind_dmq_f)(dmq_api_t* api);
+
+int bind_dmq(dmq_api_t* api);
+
+#endif
\ No newline at end of file
diff --git a/modules_k/dmq/dmq.c b/modules_k/dmq/dmq.c
index f5aae9a..90b0ac2 100644
--- a/modules_k/dmq/dmq.c
+++ b/modules_k/dmq/dmq.c
@@ -47,11 +47,11 @@
#include "../../lib/kmi/mi.h"
#include "../../lib/kcore/hash_func.h"
#include "dmq.h"
+#include "peer.h"
#include "bind_dmq.h"
-#include "dmq_worker.h"
+#include "worker.h"
#include "../../mod_fix.h"
-static int mi_child_init(void);
static int mod_init(void);
static int child_init(int);
static void destroy(void);
@@ -74,15 +74,18 @@ sl_api_t slb;
/** module variables */
dmq_worker_t* workers;
+dmq_peer_list_t* peer_list;
/** module functions */
static int mod_init(void);
static int child_init(int);
static void destroy(void);
-static int fixup_dmq(void** param, int param_no);
+static int handle_dmq_fixup(void** param, int param_no);
static cmd_export_t cmds[] = {
- {"handle_dmq_message", (cmd_function)handle_dmq_message, 0, fixup_dmq, 0,
+ {"handle_dmq_message", (cmd_function)handle_dmq_message, 0, handle_dmq_fixup,
0,
+ REQUEST_ROUTE},
+ {"bind_dmq", (cmd_function)bind_dmq, 0, 0, 0,
REQUEST_ROUTE},
{0, 0, 0, 0, 0, 0}
};
@@ -93,7 +96,6 @@ static param_export_t params[] = {
};
static mi_export_t mi_cmds[] = {
- {"cleanup", 0, 0, 0, mi_child_init},
{0, 0, 0, 0, 0}
};
@@ -117,7 +119,6 @@ struct module_exports exports = {
* init module function
*/
static int mod_init(void) {
- int i = 0;
if(register_mi_mod(exports.name, mi_cmds)!=0) {
LM_ERR("failed to register MI commands\n");
@@ -136,23 +137,21 @@ static int mod_init(void) {
/* load all TM stuff */
if(load_tm_api(&tmb)==-1) {
- LM_ERR("Can't load tm functions. Module TM not loaded?\n");
+ LM_ERR("can't load tm functions. TM module probably not loaded\n");
return -1;
}
- /* fork worker processes */
+ /* load peer list - the list containing the module callbacks for dmq */
+ peer_list = init_peer_list();
+
+ /* register worker processes */
+ register_procs(num_workers);
+
+ /* allocate workers array */
workers = shm_malloc(num_workers * sizeof(*workers));
- for(i = 0; i < num_workers; i++) {
- int newpid = fork_process(PROC_NOCHLDINIT, "DMQ WORKER", 0);
- if(newpid < 0) {
- LM_ERR("failed to form process\n");
- return -1;
- } else if(newpid == 0) {
- /* child */
- // worker loop
- } else {
- workers[i].pid = newpid;
- }
+ if(workers == NULL) {
+ LM_ERR("error in shm_malloc\n");
+ return -1;
}
startup_time = (int) time(NULL);
@@ -163,26 +162,40 @@ static int mod_init(void) {
* Initialize children
*/
static int child_init(int rank) {
- if (rank==PROC_INIT || rank==PROC_MAIN || rank==PROC_TCP_MAIN) {
+ int i, newpid;
+ if (rank == PROC_MAIN) {
+ /* fork worker processes */
+ for(i = 0; i < num_workers; i++) {
+ init_worker(&workers[i]);
+ LM_DBG("starting worker process %d\n", i);
+ newpid = fork_process(PROC_NOCHLDINIT, "DMQ WORKER", 0);
+ if(newpid < 0) {
+ LM_ERR("failed to form process\n");
+ return -1;
+ } else if(newpid == 0) {
+ // child - this will loop forever
+ worker_loop(i);
+ } else {
+ workers[i].pid = newpid;
+ }
+ }
+ return 0;
+ }
+ if(rank == PROC_INIT || rank == PROC_TCP_MAIN) {
/* do nothing for the main process */
return 0;
}
pid = my_pid();
-
- if(library_mode)
- return 0;
-
- return 0;
-}
-
-static int mi_child_init(void) {
return 0;
}
-
/*
* destroy function
*/
static void destroy(void) {
}
+
+static int handle_dmq_fixup(void** param, int param_no) {
+ return 0;
+}
\ No newline at end of file
diff --git a/modules_k/dmq/dmq.h b/modules_k/dmq/dmq.h
index d4b13de..3bd9c84 100644
--- a/modules_k/dmq/dmq.h
+++ b/modules_k/dmq/dmq.h
@@ -1,3 +1,33 @@
+#ifndef DMQ_H
+#define DMQ_H
+
+#include "../../dprint.h"
+#include "../../error.h"
+#include "../../sr_module.h"
+#include "bind_dmq.h"
+#include "peer.h"
+#include "worker.h"
#define DEFAULT_NUM_WORKERS 2
+
+extern int num_workers;
+extern dmq_worker_t* workers;
+
+static inline int dmq_load_api(dmq_api_t* api) {
+ bind_dmq_f binddmq;
+ binddmq = (bind_dmq_f)find_export("bind_dmq", 0, 0);
+ if ( binddmq == 0) {
+ LM_ERR("cannot find bind_dmq\n");
+ return -1;
+ }
+ if (binddmq(api) < 0)
+ {
+ LM_ERR("cannot bind dmq api\n");
+ return -1;
+ }
+ return 0;
+}
+
int handle_dmq_message(struct sip_msg* msg, char* str1 ,char* str2);
+
+#endif
\ No newline at end of file
diff --git a/modules_k/dmq/dmq_worker.h b/modules_k/dmq/dmq_worker.h
deleted file mode 100644
index e20b075..0000000
--- a/modules_k/dmq/dmq_worker.h
+++ /dev/null
@@ -1,8 +0,0 @@
-
-
-struct dmq_worker {
- void* queue;
- int pid;
-};
-
-typedef struct dmq_worker dmq_worker_t;
\ No newline at end of file
diff --git a/modules_k/presence/presence.c b/modules_k/presence/presence.c
index a1cb13f..e32e527 100644
--- a/modules_k/presence/presence.c
+++ b/modules_k/presence/presence.c
@@ -69,6 +69,7 @@
#include "../../lib/kmi/mi.h"
#include "../../lib/kcore/hash_func.h"
#include "../pua/hash.h"
+#include "../dmq/dmq.h"
#include "presence.h"
#include "publish.h"
#include "subscribe.h"
@@ -105,6 +106,9 @@ char* to_tag_pref = "10";
struct tm_binds tmb;
/* SL API structure */
sl_api_t slb;
+/* dmq API structure */
+dmq_api_t dmq;
+register_dmq_peer_t register_dmq;
/** module functions */
@@ -206,6 +210,22 @@ struct module_exports exports= {
child_init /* per-child init function */
};
+int dmq_callback(struct sip_msg* msg) {
+ LM_ERR("it worked - dmq module triggered the presence callback [%ld %d]\n",
time(0), my_pid());
+ sleep(4);
+ return 0;
+}
+
+static void add_dmq_peer() {
+ dmq_peer_t presence_peer;
+ presence_peer.peer_id.s = "presence";
+ presence_peer.peer_id.len = 8;
+ presence_peer.description.s = "presence";
+ presence_peer.description.len = 8;
+ presence_peer.callback = dmq_callback;
+ register_dmq(&presence_peer);
+}
+
/**
* init module function
*/
@@ -268,6 +288,15 @@ static int mod_init(void)
return -1;
}
+ if(dmq_load_api(&dmq) < 0) {
+ LM_ERR("cannot load dmq api\n");
+ return -1;
+ } else {
+ register_dmq = dmq.register_dmq_peer;
+ add_dmq_peer();
+ LM_DBG("presence-dmq loaded\n");
+ }
+
if(db_url.s== NULL)
{
LM_ERR("database url not set!\n");