Module: kamailio
Branch: master
Commit: 6a81d90c8c0ec0ca7a3c1823c53efb51b0556ed0
URL:
https://github.com/kamailio/kamailio/commit/6a81d90c8c0ec0ca7a3c1823c53efb5…
Author: Stefan Mititelu <stefan.mititelu(a)net2phone.com>
Committer: Daniel-Constantin Mierla <miconda(a)gmail.com>
Date: 2024-09-18T10:01:10+02:00
kafka: export statistics for total/err messages
---
Modified: src/modules/kafka/kafka_mod.c
Modified: src/modules/kafka/kfk.c
---
Diff:
https://github.com/kamailio/kamailio/commit/6a81d90c8c0ec0ca7a3c1823c53efb5…
Patch:
https://github.com/kamailio/kamailio/commit/6a81d90c8c0ec0ca7a3c1823c53efb5…
---
diff --git a/src/modules/kafka/kafka_mod.c b/src/modules/kafka/kafka_mod.c
index 43504c7b7cc..108ff3c126b 100644
--- a/src/modules/kafka/kafka_mod.c
+++ b/src/modules/kafka/kafka_mod.c
@@ -45,6 +45,7 @@
#include "../../core/kemi.h"
#include "../../core/rpc.h"
#include "../../core/rpc_lookup.h"
+#include "../../core/counters.h"
#include "kfk.h"
@@ -64,6 +65,8 @@ static int w_kafka_send_key(
/*
* Variables and functions to deal with module parameters.
*/
+stat_var *total_messages;
+stat_var *total_messages_err;
int child_init_ok = 0;
int init_without_kafka = 0;
int log_without_overflow = 0;
@@ -105,6 +108,10 @@ struct module_exports exports = {
mod_destroy /* destroy function */
};
+/*! \brief We expose internal variables via the statistic framework below.*/
+stat_export_t mod_stats[] = {{"total_messages", 0, &total_messages},
+ {"total_messages_err", 0, &total_messages_err}, {0, 0, 0}};
+
static int mod_init(void)
{
/* Register RPC commands. */
@@ -119,6 +126,14 @@ static int mod_init(void)
return -1;
}
+#ifdef STATISTICS
+ /* register statistics */
+ if(register_module_stats("kafka", mod_stats) != 0) {
+ LM_ERR("Failed to register core statistics\n");
+ return -1;
+ }
+#endif
+
return 0;
}
diff --git a/src/modules/kafka/kfk.c b/src/modules/kafka/kfk.c
index ed406385e5e..eb9b166d28c 100644
--- a/src/modules/kafka/kfk.c
+++ b/src/modules/kafka/kfk.c
@@ -35,7 +35,10 @@
#include "../../core/mem/pkg.h"
#include "../../core/mem/shm_mem.h"
#include "../../core/locking.h"
+#include "../../core/counters.h"
+extern stat_var *total_messages;
+extern stat_var *total_messages_err;
extern int child_init_ok;
extern int init_without_kafka;
extern int log_without_overflow;
@@ -1045,9 +1048,11 @@ static int kfk_stats_add(const char *topic, rd_kafka_resp_err_t
err)
lock_get(stats_lock);
stats_general->total++;
+ update_stat(total_messages, 1);
if(err) {
stats_general->error++;
+ update_stat(total_messages_err, 1);
}
LM_DBG("General stats: total = %" PRIu64 " error = %" PRIu64
"\n",