Kafka Module

Vicente Hernando

Edited by

Vicente Hernando

Javier Gallart


Table of Contents

1. Admin Guide
1. Overview
2. Dependencies
2.1. Kamailio Modules
2.2. External Libraries or Applications
2.3. Parameters
2.3.1. brokers (string)
2.3.2. configuration (string)
2.3.3. topic (string)
2.3.4. init_without_kafka (string)
2.3.5. metadata_timeout (string)
2.3.6. log_without_overflow (string)
2.4. Functions
2.4.1. kafka_send(topic, msg)
2.4.2. kafka_send_key(topic, msg, key)
2.5. RPC Commands
2.5.1. kafka.stats
2.5.2. kafka.stats_topic

List of Examples

1.1. Set brokers parameter
1.2. Set configuration parameter
1.3. Set topic parameter
1.4. Set init_without_kafka parameter
1.5. Set metadata_timeout parameter
1.6. Set log_without_overflow parameter
1.7. kafka_send usage
1.8. kafka_send_key usage
1.9. kafka.stats usage
1.10. kafka.stats usage

Chapter 1. Admin Guide

1. Overview

This module produces and sends messages to a Kafka server.

2. Dependencies

2.1. Kamailio Modules

The following modules must be loaded before this module:

  • none.

2.2. External Libraries or Applications

The following libraries or applications must be installed before running Kamailio with this module loaded:

2.3. Parameters

2.3.1. brokers (string)

Specifies a list of brokers separated by commas.

From librdkafka documentation:

brokerlist is a ,-separated list of brokers in the format: <broker1>,<broker2>,

Where each broker is in either the host or URL based format:

  • <host>[:<port>]
  • <proto>://<host>[:port]

<proto> is either PLAINTEXT, SSL, SASL, SASL_PLAINTEXT

The two formats can be mixed but ultimately the value of the security.protocol config property decides what brokers are allowed.

This parameter is mandatory. There is no default value.

Example 1.1. Set brokers parameter

...
modparam("kafka", "brokers", "localhost:9092")
modparam("kafka", "brokers", "broker1:10000,broker2")
modparam("kafka", "brokers", "SSL://broker3:9000,ssl://broker2")
...

2.3.2. configuration (string)

Specifies a set of general properties.

Each configuration property follows: name = value pattern. And configuration properties are separated by ;

This parameter is optional, but if it exists it can be configured only once.

Example 1.2. Set configuration parameter

...
modparam("kafka", "configuration", "topic.metadata.refresh.interval.ms=20000;queue.buffering.max.messages=1000000;metadata.request.timeout.ms=90000")

modparam("kafka", "configuration", "topic.metadata.refresh.interval.ms=20000;queue.buffering.max.messages=500000;debug=all;metadata.request.timeout.ms=900000")
...

2.3.3. topic (string)

Specifies a topic name and a set of topic properties.

The topic defined in topic parameter has to already exist in Kafka servers.

Each topic property is a list of attribute = value separated by semicolon.

name attribute indicates the topic name. It is mandatory. Other attributes mean names of properties and are optional.

This parameter is optional. Each topic needs a topic parameter so several topic parameters are allowed.

Example 1.3. Set topic parameter

...
modparam("kafka", "topic", "name=my_topic;request.required.acks=0;request.timeout.ms=10000")
modparam("kafka", "topic", "name=second_topic;request.required.acks=0;request.timeout.ms=10000")
modparam("kafka", "topic", "name=third_topic")
...

2.3.4. init_without_kafka (string)

Set to anything but 0, specifies if kamailio starts even when kafka brokers are not available at startup time.

Default value is 0 (disabled).

Example 1.4. Set init_without_kafka parameter

...
modparam("kafka", "init_without_kafka", 1)
...

2.3.5. metadata_timeout (string)

Specifies, in milliseconds, how much time kamailio waits to get topic metadata info at startup time.

Default value is 2000 milliseconds (2 seconds).

Example 1.5. Set metadata_timeout parameter

...
modparam("kafka", "metadata_timeout", 1000)
...

2.3.6. log_without_overflow (string)

Set to anything but 0, will skip logging most of the error messages that may happen to each kafka message sent to the broker. This is useful when e.g. kafka broker goes down, not to overflow syslog with error messages.

One can alwyas check this module's stats e.g. via RPC commands and see that errors happened or not. Those errors can have 2 causes:

  • Some errors happened with the config functions kafka_send()/kafka_send_key(). This means that the message wasn't even enqueued by librdkafka.
  • Some errors happened on delivery callback, managed by librdkafka. This means that the message was enqueued by librdkafka, but not managed to be delivered to the broker.

Default value is 0 (disabled).

Example 1.6. Set log_without_overflow parameter

...
modparam("kafka", "log_without_overflow", 1)
...

2.4. Functions

2.4.1.  kafka_send(topic, msg)

Send a message to a specific topic via Kafka server.

This function returns -1 for all sort of errors. (So execution of script continues)

Parameters:

  • topic: (string) name of the topic. It is mandatory.
  • msg: (string) message to send. It is mandatory.

Available via KEMI framework as kafka.send.

Example 1.7. kafka_send usage

...
# Send "test message" to topic "my_topic"
kafka_send("my_topic", "test message");
...

2.4.2.  kafka_send_key(topic, msg, key)

Send a message with an associated key to a specific topic via Kafka server.

This function returns -1 for all sort of errors. (So execution of script continues)

Parameters:

  • topic: (string) name of the topic. It is mandatory.
  • msg: (string) message to send. It is mandatory.
  • key: (string) associate this key with the message. It is mandatory.

Available via KEMI framework as kafka.send_key.

Example 1.8. kafka_send_key usage

...
# Send "test message" to topic "my_topic" with key "my_key"
kafka_send_key("my_topic", "test message", "my_key");
...

2.5. RPC Commands

2.5.1. kafka.stats

Show statistics about total sent messages and failed to deliver ones.

Example 1.9. kafka.stats usage

...
kamcmd kafka.stats
Total messages: 26  Errors: 0
...

2.5.2. kafka.stats_topic

Show statistics about sent messages and failed to deliver ones for a specific topic.

Parameter: topic (string) name of the topic. Required.

Example 1.10. kafka.stats usage

...
# Show statistics for my_topic.
kamcmd kafka.stats_topic "my_topic"
Topic: my_topic  Total messages: 17  Errors: 0
...