Module: kamailio Branch: master Commit: d104324ec86190d8ce0886f11fea9b131f46a877 URL: https://github.com/kamailio/kamailio/commit/d104324ec86190d8ce0886f11fea9b13...
Author: Stefan Mititelu stefan.mititelu@net2phone.com Committer: Daniel-Constantin Mierla miconda@gmail.com Date: 2024-09-18T10:01:10+02:00
kafka: add modparam metadata_timeout
---
Modified: src/modules/kafka/kafka_mod.c Modified: src/modules/kafka/kfk.c
---
Diff: https://github.com/kamailio/kamailio/commit/d104324ec86190d8ce0886f11fea9b13... Patch: https://github.com/kamailio/kamailio/commit/d104324ec86190d8ce0886f11fea9b13...
---
diff --git a/src/modules/kafka/kafka_mod.c b/src/modules/kafka/kafka_mod.c index 89f3fcd3c42..43504c7b7cc 100644 --- a/src/modules/kafka/kafka_mod.c +++ b/src/modules/kafka/kafka_mod.c @@ -67,6 +67,7 @@ static int w_kafka_send_key( int child_init_ok = 0; int init_without_kafka = 0; int log_without_overflow = 0; +int metadata_timeout = 2000; char *brokers_param = NULL; /**< List of brokers. */ static int kafka_conf_param(modparam_t type, void *val); static int kafka_topic_param(modparam_t type, void *val); @@ -88,7 +89,8 @@ static param_export_t params[] = {{"brokers", PARAM_STRING, &brokers_param}, (void *)kafka_conf_param}, {"topic", PARAM_STRING | USE_FUNC_PARAM, (void *)kafka_topic_param}, {"init_without_kafka", PARAM_INT, &init_without_kafka}, - {"log_without_overflow", PARAM_INT, &log_without_overflow}, {0, 0, 0}}; + {"log_without_overflow", PARAM_INT, &log_without_overflow}, + {"metadata_timeout", PARAM_INT, &metadata_timeout}, {0, 0, 0}};
/** * \brief Kafka :: Module interface diff --git a/src/modules/kafka/kfk.c b/src/modules/kafka/kfk.c index 68014b8b4fb..ed406385e5e 100644 --- a/src/modules/kafka/kfk.c +++ b/src/modules/kafka/kfk.c @@ -39,6 +39,7 @@ extern int child_init_ok; extern int init_without_kafka; extern int log_without_overflow; +extern int metadata_timeout;
/** * \brief data type for a configuration property. @@ -730,11 +731,6 @@ static int kfk_topic_list_configure() return 0; }
-/* -1 means RD_POLL_INFINITE */ -/* 100000 means 100 seconds */ -#define METADATA_TIMEOUT \ - 100000 /**< Timeout when asking for metadata in milliseconds. */ - /** * \brief check that a topic exists in cluster. * @@ -756,7 +752,7 @@ static int kfk_topic_exist(str *topic_name)
/* Get metadata for all topics. */ rd_kafka_resp_err_t res; - res = rd_kafka_metadata(rk, 1, NULL, &metadatap, METADATA_TIMEOUT); + res = rd_kafka_metadata(rk, 1, NULL, &metadatap, metadata_timeout); if(res != RD_KAFKA_RESP_ERR_NO_ERROR) { LM_ERR("Failed to get metadata: %s\n", rd_kafka_err2str(res)); goto error;