Module: kamailio Branch: master Commit: a544b14e0bdc331820a505517a52477879734e7c URL: https://github.com/kamailio/kamailio/commit/a544b14e0bdc331820a505517a524778...
Author: Stefan Mititelu stefan.mititelu@net2phone.com Committer: Daniel-Constantin Mierla miconda@gmail.com Date: 2024-09-18T10:01:10+02:00
kafka: add modparam log_without_overflow
---
Modified: src/modules/kafka/kafka_mod.c Modified: src/modules/kafka/kfk.c
---
Diff: https://github.com/kamailio/kamailio/commit/a544b14e0bdc331820a505517a524778... Patch: https://github.com/kamailio/kamailio/commit/a544b14e0bdc331820a505517a524778...
---
diff --git a/src/modules/kafka/kafka_mod.c b/src/modules/kafka/kafka_mod.c index e88bd37384b..89f3fcd3c42 100644 --- a/src/modules/kafka/kafka_mod.c +++ b/src/modules/kafka/kafka_mod.c @@ -66,6 +66,7 @@ static int w_kafka_send_key( */ int child_init_ok = 0; int init_without_kafka = 0; +int log_without_overflow = 0; char *brokers_param = NULL; /**< List of brokers. */ static int kafka_conf_param(modparam_t type, void *val); static int kafka_topic_param(modparam_t type, void *val); @@ -86,7 +87,8 @@ static param_export_t params[] = {{"brokers", PARAM_STRING, &brokers_param}, {"configuration", PARAM_STRING | USE_FUNC_PARAM, (void *)kafka_conf_param}, {"topic", PARAM_STRING | USE_FUNC_PARAM, (void *)kafka_topic_param}, - {"init_without_kafka", PARAM_INT, &init_without_kafka}, {0, 0, 0}}; + {"init_without_kafka", PARAM_INT, &init_without_kafka}, + {"log_without_overflow", PARAM_INT, &log_without_overflow}, {0, 0, 0}};
/** * \brief Kafka :: Module interface diff --git a/src/modules/kafka/kfk.c b/src/modules/kafka/kfk.c index 76a5fea75d9..68014b8b4fb 100644 --- a/src/modules/kafka/kfk.c +++ b/src/modules/kafka/kfk.c @@ -38,6 +38,7 @@
extern int child_init_ok; extern int init_without_kafka; +extern int log_without_overflow;
/** * \brief data type for a configuration property. @@ -124,6 +125,12 @@ static void kfk_logger( const rd_kafka_t *rk, int level, const char *fac, const char *buf) {
+ if(log_without_overflow && strstr(buf, "Connection refused") != NULL) { + // libkafka will keep retrying to connect if kafka server is down + // FIX: ignore these types of errors not to get overflowed + return; + } + switch(level) { case LOG_EMERG: LM_NPRL("RDKAFKA fac: %s : %s : %s\n", fac, @@ -190,8 +197,14 @@ static void kfk_msg_delivered( kfk_stats_add(topic_name, rkmessage->err);
if(rkmessage->err) { - LM_ERR("RDKAFKA Message delivery failed: %s\n", - rd_kafka_err2str(rkmessage->err)); + if(log_without_overflow) { + // libkafka will log all undelivered msgs as ERR + // FIX: ignore these types of errors not to get overflowed; check stats instead + ; + } else { + LM_ERR("RDKAFKA Message delivery failed: %s\n", + rd_kafka_err2str(rkmessage->err)); + } } else { LM_DBG("RDKAFKA Message delivered (%zd bytes, offset %" PRId64 ", " "partition %" PRId32 "): %.*s\n", @@ -865,7 +878,11 @@ int kfk_message_send(str *topic_name, str *message, str *key) NULL) == -1) { rd_kafka_resp_err_t err = rd_kafka_last_error(); - LM_ERR("Error sending message: %s\n", rd_kafka_err2str(err)); + if(!log_without_overflow) { + LM_ERR("Error sending message: %s\n", rd_kafka_err2str(err)); + } else { + return 0; + }
return -1; }