Module: kamailio
Branch: master
Commit: 5b852254e75365c2886baf1e4be8829ac10b9917
URL:
https://github.com/kamailio/kamailio/commit/5b852254e75365c2886baf1e4be8829…
Author: Daniel-Constantin Mierla <miconda(a)gmail.com>
Committer: Daniel-Constantin Mierla <miconda(a)gmail.com>
Date: 2024-11-22T13:28:20+01:00
core: framework to execute async event route with type-key-value
- execute event_route[core:tkv] in async fashion, with data passed in
the form of [type, key, value]
- the events have to be emitted from parts of the code and the execution
of the event route should not block the emitting process
---
Modified: src/core/async_task.c
Modified: src/core/async_task.h
---
Diff:
https://github.com/kamailio/kamailio/commit/5b852254e75365c2886baf1e4be8829…
Patch:
https://github.com/kamailio/kamailio/commit/5b852254e75365c2886baf1e4be8829…
---
diff --git a/src/core/async_task.c b/src/core/async_task.c
index bce7ed51faa..17c385a51fd 100644
--- a/src/core/async_task.c
+++ b/src/core/async_task.c
@@ -34,11 +34,15 @@
#include <arpa/inet.h>
#include <fcntl.h>
#include <errno.h>
+#include <stdarg.h>
#include "dprint.h"
#include "sr_module.h"
#include "ut.h"
#include "pt.h"
+#include "kemi.h"
+#include "fmsg.h"
+#include "receive.h"
#include "cfg/cfg_struct.h"
#include "parser/parse_param.h"
@@ -525,3 +529,126 @@ int async_task_run(async_wgroup_t *awg, int idx)
return 0;
}
+
+
+/**
+ *
+ */
+static async_wgroup_t *_async_tkv_awg = NULL;
+static async_tkv_param_t *_ksr_async_tkv_param = NULL;
+static int _ksr_async_tkv_ridx = -1;
+
+/**
+ *
+ */
+void async_tkv_init(void)
+{
+ str gname = str_init("tkv");
+ str evname = str_init("core:tkv");
+
+ _async_tkv_awg = async_task_group_find(&gname);
+
+ _ksr_async_tkv_ridx = route_lookup(&event_rt, evname.s);
+ if(_ksr_async_tkv_ridx <= 0
+ || event_rt.rlist[_ksr_async_tkv_ridx] == NULL) {
+ LM_DBG("event_route[%s] not defined - skipping\n", evname.s);
+ _ksr_async_tkv_ridx = -2;
+ return;
+ }
+}
+
+/**
+ *
+ */
+void async_exec_tkv(void *param)
+{
+ async_tkv_param_t *adp;
+ sr_kemi_eng_t *keng = NULL;
+ sip_msg_t *fmsg = NULL;
+ str evname = str_init("core:tkv");
+ str cbname = str_init("ksr_core_tkv");
+ int rtype = 0;
+
+ adp = (async_tkv_param_t *)param;
+ fmsg = faked_msg_next();
+ rtype = get_route_type();
+ _ksr_async_tkv_param = adp;
+ set_route_type(REQUEST_ROUTE);
+ keng = sr_kemi_eng_get();
+ if(keng != NULL) {
+ if(sr_kemi_route(keng, fmsg, EVENT_ROUTE, &cbname, &evname) < 0) {
+ LM_ERR("error running event route kemi callback [%.*s]\n",
+ cbname.len, cbname.s);
+ }
+ } else {
+ if(_ksr_async_tkv_ridx >= 0) {
+ run_top_route(event_rt.rlist[_ksr_async_tkv_ridx], fmsg, 0);
+ }
+ }
+ ksr_msg_env_reset();
+ set_route_type(rtype);
+ _ksr_async_tkv_param = NULL;
+ /* param is freed along with the async task strucutre in core */
+}
+
+/**
+ *
+ */
+int async_tkv_emit(int dtype, char *pkey, char *fmt, ...)
+{
+ async_task_t *at = NULL;
+ int dsize = 0;
+ async_tkv_param_t *adp;
+ va_list va;
+ char buf[KSR_ASYNC_TKV_SIZE];
+ int n = 0;
+ int klen = 0;
+ sr_kemi_eng_t *keng = NULL;
+
+ if(_async_tkv_awg == NULL) {
+ LM_DBG("the async group has not been set\n");
+ return -1;
+ }
+ keng = sr_kemi_eng_get();
+ if(keng == NULL && _ksr_async_tkv_ridx == -2) {
+ LM_DBG("the event_route[core:tkv] has not been defined\n");
+ return -1;
+ }
+
+ va_start(va, fmt);
+ n = vsnprintf(buf, KSR_ASYNC_TKV_SIZE, fmt, va);
+ va_end(va);
+
+ if(n < 0 || n > KSR_ASYNC_TKV_SIZE) {
+ LM_ERR("failed to print the arguments for key: %s\n", pkey);
+ return -1;
+ }
+
+ klen = strlen(pkey);
+ dsize = sizeof(async_task_t) + sizeof(async_tkv_param_t) + klen + 1 + n + 1;
+ at = (async_task_t *)shm_malloc(dsize);
+ if(at == NULL) {
+ SHM_MEM_ERROR;
+ return -1;
+ }
+ memset(at, 0, dsize);
+ at->exec = async_exec_tkv;
+ at->param = (char *)at + sizeof(async_task_t);
+ adp = (async_tkv_param_t *)at->param;
+ adp->dtype = dtype;
+ adp->skey.s = (char *)adp + sizeof(async_tkv_param_t);
+ adp->skey.len = klen;
+ memcpy(adp->skey.s, pkey, klen);
+ adp->skey.len = klen;
+ adp->sval.s = (char *)adp->skey.s + klen + 1;
+ memcpy(adp->sval.s, buf, n);
+ adp->sval.len = n;
+
+ if(async_task_group_send(_async_tkv_awg, at) < 0) {
+ LM_ERR("failed to send task with key: %s\n", pkey);
+ shm_free(at);
+ return -1;
+ }
+
+ return 0;
+}
diff --git a/src/core/async_task.h b/src/core/async_task.h
index 58386bba8bd..b4276c2d27c 100644
--- a/src/core/async_task.h
+++ b/src/core/async_task.h
@@ -58,4 +58,15 @@ async_wgroup_t *async_task_group_find(str *gname);
int async_task_group_push(str *gname, async_task_t *task);
int async_task_group_send(async_wgroup_t *awg, async_task_t *task);
+typedef struct async_tkv_param
+{
+ int dtype;
+ str skey;
+ str sval;
+} async_tkv_param_t;
+
+#define KSR_ASYNC_TKV_SIZE 512
+void async_tkv_init(void);
+int async_tkv_emit(int dtype, char *pkey, char *fmt, ...);
+
#endif