Module: sip-router Branch: master Commit: cab6c3abedb9036ebd75483c860b0caa74388e4c URL: http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=cab6c3ab...
Author: Daniel-Constantin Mierla miconda@gmail.com Committer: Daniel-Constantin Mierla miconda@gmail.com Date: Mon Apr 14 14:16:48 2014 +0200
async: new function async_task_route(rname)
- execute actions in a routing block via core async framework
---
modules/async/async_mod.c | 74 +++++++++++++++++++++++++++++++++++++++++++ modules/async/async_sleep.c | 74 +++++++++++++++++++++++++++++++++++++++++++ modules/async/async_sleep.h | 4 ++- 3 files changed, 151 insertions(+), 1 deletions(-)
diff --git a/modules/async/async_mod.c b/modules/async/async_mod.c index 26e94b7..cc838f8 100644 --- a/modules/async/async_mod.c +++ b/modules/async/async_mod.c @@ -33,6 +33,7 @@ #include "../../pvar.h" #include "../../timer_proc.h" #include "../../route_struct.h" +#include "../../async_task.h" #include "../../modules/tm/tm_load.h"
#include "async_sleep.h" @@ -49,6 +50,8 @@ static int w_async_sleep(struct sip_msg* msg, char* sec, char* str2); static int fixup_async_sleep(void** param, int param_no); static int w_async_route(struct sip_msg* msg, char* rt, char* sec); static int fixup_async_route(void** param, int param_no); +static int w_async_task_route(struct sip_msg* msg, char* rt, char* p2); +static int fixup_async_task_route(void** param, int param_no);
/* tm */ struct tm_binds tmb; @@ -59,6 +62,8 @@ static cmd_export_t cmds[]={ 0, REQUEST_ROUTE|FAILURE_ROUTE}, {"async_sleep", (cmd_function)w_async_sleep, 1, fixup_async_sleep, 0, REQUEST_ROUTE|FAILURE_ROUTE}, + {"async_task_route", (cmd_function)w_async_task_route, 1, fixup_async_task_route, + 0, REQUEST_ROUTE|FAILURE_ROUTE}, {0, 0, 0, 0, 0, 0} };
@@ -129,6 +134,9 @@ static void mod_destroy(void) async_destroy_timer_list(); }
+/** + * + */ static int w_async_sleep(struct sip_msg* msg, char* sec, char* str2) { int s; @@ -159,6 +167,9 @@ static int w_async_sleep(struct sip_msg* msg, char* sec, char* str2) return -1; }
+/** + * + */ static int fixup_async_sleep(void** param, int param_no) { async_param_t *ap; @@ -179,6 +190,9 @@ static int fixup_async_sleep(void** param, int param_no) return 0; }
+/** + * + */ static int w_async_route(struct sip_msg* msg, char* rt, char* sec) { cfg_action_t *act; @@ -220,6 +234,9 @@ static int w_async_route(struct sip_msg* msg, char* rt, char* sec) return 0; }
+/** + * + */ static int fixup_async_route(void** param, int param_no) { if(param_no==1) @@ -233,3 +250,60 @@ static int fixup_async_route(void** param, int param_no) } return 0; } + +/** + * + */ +static int w_async_task_route(struct sip_msg* msg, char* rt, char* sec) +{ + cfg_action_t *act; + str rn; + int ri; + + if(msg==NULL) + return -1; + + if(fixup_get_svalue(msg, (gparam_t*)rt, &rn)!=0) + { + LM_ERR("no async route block name\n"); + return -1; + } + + ri = route_get(&main_rt, rn.s); + if(ri<0) + { + LM_ERR("unable to find route block [%.*s]\n", rn.len, rn.s); + return -1; + } + act = main_rt.rlist[ri]; + if(act==NULL) + { + LM_ERR("empty action lists in route block [%.*s]\n", rn.len, rn.s); + return -1; + } + + if(async_send_task(msg, act)<0) + return -1; + /* force exit in config */ + return 0; +} + +/** + * + */ +static int fixup_async_task_route(void** param, int param_no) +{ + if(!async_task_initialized()) { + LM_ERR("async task framework was not initialized" + " - set async_workers parameter in core\n"); + return -1; + } + + if(param_no==1) + { + if(fixup_spve_null(param, 1)<0) + return -1; + return 0; + } + return 0; +} diff --git a/modules/async/async_sleep.c b/modules/async/async_sleep.c index e486a3b..2dcd911 100644 --- a/modules/async/async_sleep.c +++ b/modules/async/async_sleep.c @@ -31,6 +31,7 @@ #include "../../ut.h" #include "../../locking.h" #include "../../timer.h" +#include "../../async_task.h" #include "../../modules/tm/tm_load.h"
#include "async_sleep.h" @@ -186,3 +187,76 @@ void async_timer_exec(unsigned int ticks, void *param) shm_free(ai); } } + + +/** + * + */ +void async_exec_task(void *param) +{ + cfg_action_t *act; + unsigned int *p; + unsigned int tindex; + unsigned int tlabel; + + act = *((cfg_action_t**)param); + p = (unsigned int*)((char*)param + sizeof(cfg_action_t*)); + tindex = p[0]; + tlabel = p[1]; + + if(act!=NULL) + tmb.t_continue(tindex, tlabel, act); + /* param is freed along with the async task strucutre in core */ +} + +/** + * + */ +int async_send_task(sip_msg_t* msg, cfg_action_t *act) +{ + async_task_t *at; + tm_cell_t *t = 0; + unsigned int tindex; + unsigned int tlabel; + int dsize; + unsigned int *p; + + t = tmb.t_gett(); + if (t==NULL || t==T_UNDEFINED) + { + if(tmb.t_newtran(msg)<0) + { + LM_ERR("cannot create the transaction\n"); + return -1; + } + t = tmb.t_gett(); + if (t==NULL || t==T_UNDEFINED) + { + LM_ERR("cannot lookup the transaction\n"); + return -1; + } + } + dsize = sizeof(async_task_t) + sizeof(cfg_action_t*) + + 2*sizeof(unsigned int); + at = (async_task_t*)shm_malloc(dsize); + if(at==NULL) + { + LM_ERR("no more shm\n"); + return -1; + } + memset(at, 0, dsize); + if(tmb.t_suspend(msg, &tindex, &tlabel)<0) + { + LM_ERR("failed to suppend the processing\n"); + shm_free(at); + return -1; + } + at->exec = async_exec_task; + at->param = (char*)at + sizeof(async_task_t); + *((cfg_action_t**)at->param) = act; + p = (unsigned int*)((char*)at->param + sizeof(cfg_action_t*)); + p[0] = tindex; + p[1] = tlabel; + async_task_push(at); + return 0; +} diff --git a/modules/async/async_sleep.h b/modules/async/async_sleep.h index 6275bd4..2ae6bcf 100644 --- a/modules/async/async_sleep.h +++ b/modules/async/async_sleep.h @@ -42,8 +42,10 @@ int async_init_timer_list(void);
int async_destroy_timer_list(void);
-int async_sleep(struct sip_msg* msg, int seconds, cfg_action_t *act); +int async_sleep(sip_msg_t* msg, int seconds, cfg_action_t *act);
void async_timer_exec(unsigned int ticks, void *param);
+int async_send_task(sip_msg_t* msg, cfg_action_t *act); + #endif