Module: kamailio Branch: master Commit: 6a81d90c8c0ec0ca7a3c1823c53efb51b0556ed0 URL: https://github.com/kamailio/kamailio/commit/6a81d90c8c0ec0ca7a3c1823c53efb51...
Author: Stefan Mititelu stefan.mititelu@net2phone.com Committer: Daniel-Constantin Mierla miconda@gmail.com Date: 2024-09-18T10:01:10+02:00
kafka: export statistics for total/err messages
---
Modified: src/modules/kafka/kafka_mod.c Modified: src/modules/kafka/kfk.c
---
Diff: https://github.com/kamailio/kamailio/commit/6a81d90c8c0ec0ca7a3c1823c53efb51... Patch: https://github.com/kamailio/kamailio/commit/6a81d90c8c0ec0ca7a3c1823c53efb51...
---
diff --git a/src/modules/kafka/kafka_mod.c b/src/modules/kafka/kafka_mod.c index 43504c7b7cc..108ff3c126b 100644 --- a/src/modules/kafka/kafka_mod.c +++ b/src/modules/kafka/kafka_mod.c @@ -45,6 +45,7 @@ #include "../../core/kemi.h" #include "../../core/rpc.h" #include "../../core/rpc_lookup.h" +#include "../../core/counters.h"
#include "kfk.h"
@@ -64,6 +65,8 @@ static int w_kafka_send_key( /* * Variables and functions to deal with module parameters. */ +stat_var *total_messages; +stat_var *total_messages_err; int child_init_ok = 0; int init_without_kafka = 0; int log_without_overflow = 0; @@ -105,6 +108,10 @@ struct module_exports exports = { mod_destroy /* destroy function */ };
+/*! \brief We expose internal variables via the statistic framework below.*/ +stat_export_t mod_stats[] = {{"total_messages", 0, &total_messages}, + {"total_messages_err", 0, &total_messages_err}, {0, 0, 0}}; + static int mod_init(void) { /* Register RPC commands. */ @@ -119,6 +126,14 @@ static int mod_init(void) return -1; }
+#ifdef STATISTICS + /* register statistics */ + if(register_module_stats("kafka", mod_stats) != 0) { + LM_ERR("Failed to register core statistics\n"); + return -1; + } +#endif + return 0; }
diff --git a/src/modules/kafka/kfk.c b/src/modules/kafka/kfk.c index ed406385e5e..eb9b166d28c 100644 --- a/src/modules/kafka/kfk.c +++ b/src/modules/kafka/kfk.c @@ -35,7 +35,10 @@ #include "../../core/mem/pkg.h" #include "../../core/mem/shm_mem.h" #include "../../core/locking.h" +#include "../../core/counters.h"
+extern stat_var *total_messages; +extern stat_var *total_messages_err; extern int child_init_ok; extern int init_without_kafka; extern int log_without_overflow; @@ -1045,9 +1048,11 @@ static int kfk_stats_add(const char *topic, rd_kafka_resp_err_t err) lock_get(stats_lock);
stats_general->total++; + update_stat(total_messages, 1);
if(err) { stats_general->error++; + update_stat(total_messages_err, 1); }
LM_DBG("General stats: total = %" PRIu64 " error = %" PRIu64 "\n",