Module: kamailio Branch: master Commit: 5b852254e75365c2886baf1e4be8829ac10b9917 URL: https://github.com/kamailio/kamailio/commit/5b852254e75365c2886baf1e4be8829a...
Author: Daniel-Constantin Mierla miconda@gmail.com Committer: Daniel-Constantin Mierla miconda@gmail.com Date: 2024-11-22T13:28:20+01:00
core: framework to execute async event route with type-key-value
- execute event_route[core:tkv] in async fashion, with data passed in the form of [type, key, value] - the events have to be emitted from parts of the code and the execution of the event route should not block the emitting process
---
Modified: src/core/async_task.c Modified: src/core/async_task.h
---
Diff: https://github.com/kamailio/kamailio/commit/5b852254e75365c2886baf1e4be8829a... Patch: https://github.com/kamailio/kamailio/commit/5b852254e75365c2886baf1e4be8829a...
---
diff --git a/src/core/async_task.c b/src/core/async_task.c index bce7ed51faa..17c385a51fd 100644 --- a/src/core/async_task.c +++ b/src/core/async_task.c @@ -34,11 +34,15 @@ #include <arpa/inet.h> #include <fcntl.h> #include <errno.h> +#include <stdarg.h>
#include "dprint.h" #include "sr_module.h" #include "ut.h" #include "pt.h" +#include "kemi.h" +#include "fmsg.h" +#include "receive.h" #include "cfg/cfg_struct.h" #include "parser/parse_param.h"
@@ -525,3 +529,126 @@ int async_task_run(async_wgroup_t *awg, int idx)
return 0; } + + +/** + * + */ +static async_wgroup_t *_async_tkv_awg = NULL; +static async_tkv_param_t *_ksr_async_tkv_param = NULL; +static int _ksr_async_tkv_ridx = -1; + +/** + * + */ +void async_tkv_init(void) +{ + str gname = str_init("tkv"); + str evname = str_init("core:tkv"); + + _async_tkv_awg = async_task_group_find(&gname); + + _ksr_async_tkv_ridx = route_lookup(&event_rt, evname.s); + if(_ksr_async_tkv_ridx <= 0 + || event_rt.rlist[_ksr_async_tkv_ridx] == NULL) { + LM_DBG("event_route[%s] not defined - skipping\n", evname.s); + _ksr_async_tkv_ridx = -2; + return; + } +} + +/** + * + */ +void async_exec_tkv(void *param) +{ + async_tkv_param_t *adp; + sr_kemi_eng_t *keng = NULL; + sip_msg_t *fmsg = NULL; + str evname = str_init("core:tkv"); + str cbname = str_init("ksr_core_tkv"); + int rtype = 0; + + adp = (async_tkv_param_t *)param; + fmsg = faked_msg_next(); + rtype = get_route_type(); + _ksr_async_tkv_param = adp; + set_route_type(REQUEST_ROUTE); + keng = sr_kemi_eng_get(); + if(keng != NULL) { + if(sr_kemi_route(keng, fmsg, EVENT_ROUTE, &cbname, &evname) < 0) { + LM_ERR("error running event route kemi callback [%.*s]\n", + cbname.len, cbname.s); + } + } else { + if(_ksr_async_tkv_ridx >= 0) { + run_top_route(event_rt.rlist[_ksr_async_tkv_ridx], fmsg, 0); + } + } + ksr_msg_env_reset(); + set_route_type(rtype); + _ksr_async_tkv_param = NULL; + /* param is freed along with the async task strucutre in core */ +} + +/** + * + */ +int async_tkv_emit(int dtype, char *pkey, char *fmt, ...) +{ + async_task_t *at = NULL; + int dsize = 0; + async_tkv_param_t *adp; + va_list va; + char buf[KSR_ASYNC_TKV_SIZE]; + int n = 0; + int klen = 0; + sr_kemi_eng_t *keng = NULL; + + if(_async_tkv_awg == NULL) { + LM_DBG("the async group has not been set\n"); + return -1; + } + keng = sr_kemi_eng_get(); + if(keng == NULL && _ksr_async_tkv_ridx == -2) { + LM_DBG("the event_route[core:tkv] has not been defined\n"); + return -1; + } + + va_start(va, fmt); + n = vsnprintf(buf, KSR_ASYNC_TKV_SIZE, fmt, va); + va_end(va); + + if(n < 0 || n > KSR_ASYNC_TKV_SIZE) { + LM_ERR("failed to print the arguments for key: %s\n", pkey); + return -1; + } + + klen = strlen(pkey); + dsize = sizeof(async_task_t) + sizeof(async_tkv_param_t) + klen + 1 + n + 1; + at = (async_task_t *)shm_malloc(dsize); + if(at == NULL) { + SHM_MEM_ERROR; + return -1; + } + memset(at, 0, dsize); + at->exec = async_exec_tkv; + at->param = (char *)at + sizeof(async_task_t); + adp = (async_tkv_param_t *)at->param; + adp->dtype = dtype; + adp->skey.s = (char *)adp + sizeof(async_tkv_param_t); + adp->skey.len = klen; + memcpy(adp->skey.s, pkey, klen); + adp->skey.len = klen; + adp->sval.s = (char *)adp->skey.s + klen + 1; + memcpy(adp->sval.s, buf, n); + adp->sval.len = n; + + if(async_task_group_send(_async_tkv_awg, at) < 0) { + LM_ERR("failed to send task with key: %s\n", pkey); + shm_free(at); + return -1; + } + + return 0; +} diff --git a/src/core/async_task.h b/src/core/async_task.h index 58386bba8bd..b4276c2d27c 100644 --- a/src/core/async_task.h +++ b/src/core/async_task.h @@ -58,4 +58,15 @@ async_wgroup_t *async_task_group_find(str *gname); int async_task_group_push(str *gname, async_task_t *task); int async_task_group_send(async_wgroup_t *awg, async_task_t *task);
+typedef struct async_tkv_param +{ + int dtype; + str skey; + str sval; +} async_tkv_param_t; + +#define KSR_ASYNC_TKV_SIZE 512 +void async_tkv_init(void); +int async_tkv_emit(int dtype, char *pkey, char *fmt, ...); + #endif