Module: kamailio
Branch: master
Commit: bb5bd6e137c758e06736764d823f7437b78d6207
URL:
https://github.com/kamailio/kamailio/commit/bb5bd6e137c758e06736764d823f743…
Author: Stefan Mititelu <stefan.mititelu(a)net2phone.com>
Committer: Daniel-Constantin Mierla <miconda(a)gmail.com>
Date: 2024-09-18T10:01:10+02:00
kafka: Add interceptor for broker state
So far there is no message logged by librdkafka or kamailio when broker is
back up again. The only way to find out that broker is back up is to
constatntly check kamcmd stats when errors stop growing.
Broker state interceptor is available since librdkafka v2.0.0-RC1
(Dec 21 2022). So only if you are using that librdkafka version,
or newer than that, this code will be enabled.
---
Modified: src/modules/kafka/kfk.c
---
Diff:
https://github.com/kamailio/kamailio/commit/bb5bd6e137c758e06736764d823f743…
Patch:
https://github.com/kamailio/kamailio/commit/bb5bd6e137c758e06736764d823f743…
---
diff --git a/src/modules/kafka/kfk.c b/src/modules/kafka/kfk.c
index b2999a7d721..d9e8a8b9f6d 100644
--- a/src/modules/kafka/kfk.c
+++ b/src/modules/kafka/kfk.c
@@ -220,6 +220,31 @@ static void kfk_msg_delivered(
}
}
+#if RD_KAFKA_VERSION >= 0x020000ff
+static rd_kafka_resp_err_t ic_broker_state_change(rd_kafka_t *rk,
+ int32_t broker_id, const char *secproto, const char *name, int port,
+ const char *state, void *ic_opaque)
+{
+ if(strcmp(state, "UP") == 0) {
+ LM_NOTICE("Connected broker: id: %d, proto: %s, name: %s, port: %d",
+ broker_id, secproto, name, port);
+ }
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+#endif
+
+static rd_kafka_resp_err_t ic_on_new(rd_kafka_t *rk,
+ const rd_kafka_conf_t *conf, void *ic_opaque, char *errstr,
+ size_t errstr_size)
+{
+
+#if RD_KAFKA_VERSION >= 0x020000ff
+ rd_kafka_interceptor_add_on_broker_state_change(
+ rk, "ic_broker_state_change", ic_broker_state_change, NULL);
+#endif
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
/**
* \brief Initialize kafka functionality.
*
@@ -256,6 +281,9 @@ int kfk_init(char *brokers)
/* Set message delivery callback. */
rd_kafka_conf_set_dr_msg_cb(rk_conf, kfk_msg_delivered);
+ /* Set interceptors init function. */
+ rd_kafka_conf_interceptor_add_on_new(rk_conf, "ic_on_new", ic_on_new, NULL);
+
/* Configure properties: */
if(kfk_conf_configure()) {
LM_ERR("Failed to configure general properties\n");