Module: kamailio Branch: master Commit: 741bb148ddf4311679cfa6e379fa8bdbc8fac5e5 URL: https://github.com/kamailio/kamailio/commit/741bb148ddf4311679cfa6e379fa8bdb...
Author: Stefan Mititelu stefan.mititelu@net2phone.com Committer: Daniel-Constantin Mierla miconda@gmail.com Date: 2024-09-18T10:01:10+02:00
kafka: add modparam init_without_kafka
---
Modified: src/modules/kafka/kafka_mod.c Modified: src/modules/kafka/kfk.c
---
Diff: https://github.com/kamailio/kamailio/commit/741bb148ddf4311679cfa6e379fa8bdb... Patch: https://github.com/kamailio/kamailio/commit/741bb148ddf4311679cfa6e379fa8bdb...
---
diff --git a/src/modules/kafka/kafka_mod.c b/src/modules/kafka/kafka_mod.c index 26369d6ca31..e88bd37384b 100644 --- a/src/modules/kafka/kafka_mod.c +++ b/src/modules/kafka/kafka_mod.c @@ -64,6 +64,8 @@ static int w_kafka_send_key( /* * Variables and functions to deal with module parameters. */ +int child_init_ok = 0; +int init_without_kafka = 0; char *brokers_param = NULL; /**< List of brokers. */ static int kafka_conf_param(modparam_t type, void *val); static int kafka_topic_param(modparam_t type, void *val); @@ -84,7 +86,7 @@ static param_export_t params[] = {{"brokers", PARAM_STRING, &brokers_param}, {"configuration", PARAM_STRING | USE_FUNC_PARAM, (void *)kafka_conf_param}, {"topic", PARAM_STRING | USE_FUNC_PARAM, (void *)kafka_topic_param}, - {0, 0, 0}}; + {"init_without_kafka", PARAM_INT, &init_without_kafka}, {0, 0, 0}};
/** * \brief Kafka :: Module interface @@ -125,9 +127,15 @@ static int child_init(int rank) if(rank == PROC_INIT || rank == PROC_TCP_MAIN) return 0;
+ child_init_ok = 1; if(kfk_init(brokers_param)) { - LM_ERR("Failed to initialize Kafka\n"); - return -1; + child_init_ok = 0; + if(init_without_kafka) { + LM_ERR("Failed to initialize Kafka - continue\n"); + } else { + LM_ERR("Failed to initialize Kafka\n"); + return -1; + } } return 0; } diff --git a/src/modules/kafka/kfk.c b/src/modules/kafka/kfk.c index c6dc8398da2..76a5fea75d9 100644 --- a/src/modules/kafka/kfk.c +++ b/src/modules/kafka/kfk.c @@ -36,6 +36,9 @@ #include "../../core/mem/shm_mem.h" #include "../../core/locking.h"
+extern int child_init_ok; +extern int init_without_kafka; + /** * \brief data type for a configuration property. */ @@ -587,7 +590,9 @@ static int kfk_topic_configure(kfk_topic_t *ktopic) }
int topic_found = kfk_topic_exist(ktopic->topic_name); - if(topic_found == -1) { + if(init_without_kafka) { + ; + } else if(topic_found == -1) { LM_ERR("Failed to search for topic %.*s in cluster\n", ktopic->topic_name->len, ktopic->topic_name->s); goto error; @@ -828,6 +833,12 @@ int kfk_message_send(str *topic_name, str *message, str *key) /* Get topic from name. */ rd_kafka_topic_t *rkt = kfk_topic_get(topic_name);
+ if(!child_init_ok) { + LM_ERR("kafka module is unusable: child init NOT ok! Skip sending " + "message, message lost!"); + return -1; + } + if(!rkt) { LM_ERR("Topic not found: %.*s\n", topic_name->len, topic_name->s); return -1;