Module: kamailio Branch: master Commit: bb5bd6e137c758e06736764d823f7437b78d6207 URL: https://github.com/kamailio/kamailio/commit/bb5bd6e137c758e06736764d823f7437...
Author: Stefan Mititelu stefan.mititelu@net2phone.com Committer: Daniel-Constantin Mierla miconda@gmail.com Date: 2024-09-18T10:01:10+02:00
kafka: Add interceptor for broker state
So far there is no message logged by librdkafka or kamailio when broker is back up again. The only way to find out that broker is back up is to constatntly check kamcmd stats when errors stop growing.
Broker state interceptor is available since librdkafka v2.0.0-RC1 (Dec 21 2022). So only if you are using that librdkafka version, or newer than that, this code will be enabled.
---
Modified: src/modules/kafka/kfk.c
---
Diff: https://github.com/kamailio/kamailio/commit/bb5bd6e137c758e06736764d823f7437... Patch: https://github.com/kamailio/kamailio/commit/bb5bd6e137c758e06736764d823f7437...
---
diff --git a/src/modules/kafka/kfk.c b/src/modules/kafka/kfk.c index b2999a7d721..d9e8a8b9f6d 100644 --- a/src/modules/kafka/kfk.c +++ b/src/modules/kafka/kfk.c @@ -220,6 +220,31 @@ static void kfk_msg_delivered( } }
+#if RD_KAFKA_VERSION >= 0x020000ff +static rd_kafka_resp_err_t ic_broker_state_change(rd_kafka_t *rk, + int32_t broker_id, const char *secproto, const char *name, int port, + const char *state, void *ic_opaque) +{ + if(strcmp(state, "UP") == 0) { + LM_NOTICE("Connected broker: id: %d, proto: %s, name: %s, port: %d", + broker_id, secproto, name, port); + } + return RD_KAFKA_RESP_ERR_NO_ERROR; +} +#endif + +static rd_kafka_resp_err_t ic_on_new(rd_kafka_t *rk, + const rd_kafka_conf_t *conf, void *ic_opaque, char *errstr, + size_t errstr_size) +{ + +#if RD_KAFKA_VERSION >= 0x020000ff + rd_kafka_interceptor_add_on_broker_state_change( + rk, "ic_broker_state_change", ic_broker_state_change, NULL); +#endif + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + /** * \brief Initialize kafka functionality. * @@ -256,6 +281,9 @@ int kfk_init(char *brokers) /* Set message delivery callback. */ rd_kafka_conf_set_dr_msg_cb(rk_conf, kfk_msg_delivered);
+ /* Set interceptors init function. */ + rd_kafka_conf_interceptor_add_on_new(rk_conf, "ic_on_new", ic_on_new, NULL); + /* Configure properties: */ if(kfk_conf_configure()) { LM_ERR("Failed to configure general properties\n");