1. Overview

The Kazoo is a general purpose AMQP connector (tested with rabbitmq-server). It exposes publish/consume capabilities into Kamailio.

From a high-level, the purpose of the module might be for things like:

  • Integrate to an AMQP application to make real-time routing decisions (instead of using, say, a SQL database)

  • Provide a real-time integration into your program, instead of your database, so you can overlay additional logic in your preferred language while also utilizing a message bus

  • Utilize messaging to have a distributed messaging layer, such that machines processing requests/responses/events can go up/down or share the workload and your Kamailio node will still be happy

supported operations are:

  • publish json payloads to rabbitmq

  • publish json payloads to rabbitmq and wait for correlated response message

  • subscribe to an exchange with a routing key

The Kazoo module also has support to publish updates to presence module thru the kazoo_pua_publish function

2. How it works

The module works with a main forked process that does the communication with rabbitmq for issuing publishes, waiting for replies and consuming messages. When it consumes a message it defers the process to a worker process so that it doesn't block this main process.

2.1. event routes

The worker process issues an event-route where we can act on the received payload. The name of the event-route is composed by values extracted from the payload.

Kazoo module will try to execute the event route from most significant to less significant. define the event route like event_route[kazoo:consumer-event[-payload_key_value[-payload_subkey_value]]]

we can set the key/subkey pair on a subscription base. check the payload on subscribe.

Example 1.1. define the event route

modparam("kazoo", "amqp_consumer_event_key", "Event-Category")
modparam("kazoo", "amqp_consumer_event_subkey", "Event-Name")

# presence is the value extracted from Event-Category field in json payload
# update is the value extracted from Event-Name field in json payload
xlog("L_INFO", "received $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json,From})");

# presence is the value extracted from Event-Category field in json payload
xlog("L_INFO", "received $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json,From})");

# event-category is the name of the amqp_consumer_event_key parameter
# event-name is the name of the amqp_consumer_event_subkey parameter
# this event route is executed if we can't find the previous

# event-category is the name of the amqp_consumer_event_key parameter
# this event route is executed if we can't find the previous

# this event route is executed if we can't find the previous

2.2. acknowledge messages

Consumed messages have the option of being acknowledge in two ways:

  • immediately when received

  • after processing by the worker

3. Dependencies

3.1. Kamailio Modules

The following modules must be loaded before this module:

  • none.

3.2. External Libraries or Applications

  • librabbitmq.

  • libjson.

  • libuuid.

4. Parameters

4.1. amqp related

4.1.1. node_hostname(str)

The name of this host to register in rabbitmq.

Default value is NULL. you must set this parameter value for the module to work

Example 1.2. Set node_hostname parameter

modparam("kazoo", "node_hostname", "sipproxy.mydomain.com")

4.1.2. amqp_consumer_processes(int)

The number of worker processes to handle messages consumption.

Default value is 4.

Example 1.3. Set amqp_consumer_processes parameter

modparam("kazoo", "amqp_consumer_processes", 10)

4.1.3. amqp_consumer_event_key(str)

The default name of the field in json payload to compose the event name 1st part

Default value is Event-Category.

Example 1.4. Set amqp_consumer_event_key parameter

modparam("kazoo", "amqp_consumer_event_key", "My-JSON-Field-Name")

4.1.4. amqp_consumer_event_subkey(str)

The default name of the field in json payload to compose the event name 2nd part

Default value is Event-Name.

Example 1.5. Set amqp_consumer_event_subkey parameter

modparam("kazoo", "amqp_consumer_event_subkey", "My-JSON-SubField-Name")

4.1.5. amqp_max_channels(str)

The number of pre allocated channels for the connection.

Default value is 50.

Example 1.6. Set amqp_max_channels parameter

modparam("kazoo", "amqp_max_channels", 100)

4.1.6. amqp_connection(str)

The connection url to rabbitmq. can be set multiple times for failover.

Example 1.7. Set amqp_connection parameter

modparam("kazoo", "amqp_connection", "amqp://guest:guest@localhost:5672")
modparam("kazoo", "amqp_connection", "kazoo://guest:guest@otherhost:5672")

4.1.7. event_callback(str)

The name of the function in the kemi configuration file (embedded scripting language such as Lua, Python, ...) to be executed instead of event_route[...] blocks.

The function receives a string parameter with the name of the event, the values can be: 'kazoo:mod-init', 'kazoo:consumer-event'.

Example 1.8. Set event_callback parameter

    modparam("kazoo", "event_callback", "ksr_kazoo_event")

4.2. execution control

execution control of main loop can be controlled by changing the parameter values in this section.

The main loop has 3 sub-loops were it listen for actions to execute with a timeout. These group of parameters allow to set the maximum number of times the sub-loop is executed if it doesn't timeout.

On busy systems, we may have a condition where a sub-loop never times out because it always has data to process. The purpose of these parameters is to set a maximum number of times it executes before it handles control to the next sub-loop.

while(true) // main  loop
while(ACK or timeout)  // acknowledge from worker process
while(SEND or timeout) // anything to send ?
while(CONSUME or timeout) // any data on consumed exchanges ?

4.2.1. amqp_consumer_loop_count(int)

The consumer loop count.

Default value is 10.

Example 1.9. Set amqp_consumer_loop_count parameter

modparam("kazoo", "amqp_consumer_loop_count", 3)

4.2.2. amqp_internal_loop_count(int)

The internal listen for commands loop count.

Default value is 5.

Example 1.10. Set amqp_internal_loop_count parameter

modparam("kazoo", "amqp_internal_loop_count", 1)

4.2.3. amqp_consumer_ack_loop_count(int)

The work ack loop count.

Default value is 20.

Example 1.11. Set amqp_consumer_ack_loop_count parameter

modparam("kazoo", "amqp_consumer_ack_loop_count", 5)

4.2.4. consume_messages_on_reconnect(int)

This parameter indicates if the module ignores the loop counters on reconnect and consumes all the pending messages ready to be consumed.

Default value is 1.

Example 1.12. Set consume_messages_on_reconnect parameter

modparam("kazoo", "consume_messages_on_reconnect", 0)

4.2.5. single_consumer_on_reconnect(int)

When the main loop receives a message from rabbitmq it will defer the execution to a worker in a round-robin manner. This parameter allows to use the same worker when kazoo reconnects to rabbitmq.

Default value is 1.

Example 1.13. Set single_consumer_on_reconnect parameter

modparam("kazoo", "single_consumer_on_reconnect", 0)

4.3. timers

each functional parameter related to timers come with 2 reflected parameters. name_sec and name_micro

4.3.1. amqp_consumer_ack_timeout(str)

Timeout when checking for acknowledge from workers.

Default value is 100000 micro.

Example 1.14. Set amqp_consumer_ack_timeout parameter

modparam("kazoo", "amqp_consumer_ack_timeout_sec", 1)
modparam("kazoo", "amqp_consumer_ack_timeout_micro", 200000)

4.3.2. amqp_interprocess_timeout(str)

Timeout when checking for commands (publish/query) for sending to rabbitmq.

Default value is 100000 micro.

Example 1.15. Set amqp_interprocess_timeout parameter

modparam("kazoo", "amqp_interprocess_timeout_sec", 1)
modparam("kazoo", "amqp_interprocess_timeout_micro", 200000)

4.3.3. amqp_waitframe_timeout(str)

Timeout when checking for messages from rabbitmq.

Default value is 100000 micro.

Example 1.16. Set amqp_waitframe_timeout parameter

modparam("kazoo", "amqp_waitframe_timeout_sec", 1)
modparam("kazoo", "amqp_waitframe_timeout_micro", 200000)

4.3.4. amqp_query_timeout(str)

Timeout when checking for reply messages from rabbitmq for kazoo_query commands.

Default value is 2 sec.

Example 1.17. Set amqp_query_timeout parameter

modparam("kazoo", "amqp_query_timeout_sec", 1)
modparam("kazoo", "amqp_query_timeout_micro", 200000)

4.3.5. amqp_query_timeout_avp(str)

avp holding the value in seconds for Timeout when checking for reply messages from rabbitmq for kazoo_query commands.

Default value is NULL (no value).

Example 1.18. >Set amqp_query_timeout_avp parameter

modparam("kazoo", "amqp_query_timeout_avp", "$var(kz_timeout)")

    $var(kz_timeout) = 12;
    kazoo_query(exchange, routingkey, payload);


4.4. presence related

4.4.1. db_url(str)

The database for the presentity table.

If set, the kazoo_ppua_publish function will update the presentity status in the database.

Default value is NULL.

Example 1.19. Set db_url parameter

modparam("kazoo", "db_url", "mysql://kamailio:kamailiorw@localhost/kamailio")

4.4.2. presentity_table(str)

The name of the presentity table in the database.

Default value is presentity.

Example 1.20. Set presentity_table parameter

modparam("kazoo", "presentity_table", "my_presentity_table")

4.4.3. pua_mode(int)

Control if the module has to connect to presence database tables. Set it to 0 to not connect to database.

Default value is 1.

Example 1.21. Set pua_mode parameter

modparam("kazoo", "pua_mode", 0)

5. Functions

5.1. amqp related

5.1.1.  kazoo_publish(exchange, routing_key, json_payload [, amqp_headers])

The function publishes a json payload to rabbitmq. The routing_key parameter should be encoded. Optional AMQP-Headers are specified in the format key1=value1;key2=value2

This function can be used from ANY ROUTE.

Example 1.22. kazoo_publish usage

$var(amqp_payload_request) = $_s({"Event-Category" : "directory", "Event-Name" : "reg_success", "Contact" : "$(ct{s.escape.common}{s.replace,\','}{s.replace,$$,})", "Call-ID" : "$ci", "Realm" : "$fd", "Username" : "$fU", "From-User" : "$fU", "From-Host" : "$fd", "To-User" : "$tU", "To-Host" : "$td", "User-Agent" : "$(ua{s.escape.common}{s.replace,\','}{s.replace,$$,})" });
$var(amqp_routing_key) = "registration.success." + $(fd{kz.encode}) + "." + $fU;
kazoo_publish("callmgr", $var(amqp_routing_key), $var(amqp_payload_request));

5.1.2.  kazoo_query(exchange, routing_key, json_payload [, target_var] [, amqp_headers])

The function publishes a json payload to rabbitmq, waits for a correlated messageand puts the result in target_var. The routing_key parameter should be encoded. target_var is optional as the function also puts the result in pseudo-variable $kzR. Optional AMQP-Headers are specified in the format key1=value1;key2=value2

This function can be used from ANY ROUTE.

Example 1.23. kazoo_query usage

$var(amqp_payload_request) = "{'Event-Category' : 'call_event' , 'Event-Name' : 'query_user_channels_req', 'Realm' : '" + $fd + "', 'Username' : '" + $fU + "', 'Active-Only' : false }";
kazoo_encode("$ci", "$var(callid_encoded)");
$var(amqp_routing_key) = "call.status_req.$var(callid_encoded)";
if(kazoo_query("callevt", $var(amqp_routing_key), $var(amqp_payload_request), "$var(amqp_result)")) {
   kazoo_json("$var(amqp_result)", "Channels[0].switch_url", "$du");
   if($du != $null) {
       xlog("L_INFO", "$ci|log|user channels found redirecting call to $du");

5.1.3.  kazoo_subscribe(exchange, exchange_type, queue, routing_key)

The function subscribes to exchange/type on queue with routing_key. The routing_key parameter should be encoded.

This function must be called from event_route[kazoo:mod-init].

Example 1.24. kazoo_subscribe usage

   kazoo_subscribe("dialoginfo", "direct", "BLF-QUEUE-MY_HOSTNAME", "BLF-MY_HOSTNAME");

    xlog("L_INFO","Received json payload : $kzE");

5.1.4.  kazoo_subscribe(json_description)

The function takes additional parameters to the subscribe function.

json payload fields description

  • exchange : str, required

  • type : str, required

  • queue : str, required

  • routing : str, required

  • auto_delete : int, default 1

  • durable : int, default 0

  • no_ack : int, default 1

  • wait_for_consumer_ack : int, default 0

  • event_key : str, no default

  • event_subkey : str, no default

This function must be called from event_route[kazoo:mod-init].

Example 1.25. kazoo_subscribe usage

    $var(payload) = "{ 'exchange' : 'dialoginfo' , 'type' : 'direct', 'queue' : 'BLF-QUEUE-MY_HOSTNAME', 'routing' : 'BLF-MY_HOSTNAME', 'auto_delete' : 0, 'durable' : 1, 'no_ack' : 0, 'wait_for_consumer_ack' : 1 }";

    xlog("L_INFO","Received json payload : $kzE");

5.2. presence related

5.2.1.  kazoo_pua_publish(json_payload)

The function build presentity state from json_payload and updates presentity table.

This function can be used from ANY ROUTE.

Example 1.26. kazoo_pua_publish usage

    xlog("L_INFO", "received $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json,From})");
    pres_refresh_watchers("$(kzE{kz.json,From})", "$(kzE{kz.json,Event-Package})", 1);

5.3. other

5.3.1.  kazoo_encode(to_encode, target_var)

The function encodes the 1st parameter for amqp and puts the result in the 2nd parameter.

This function can be used from ANY ROUTE.

Example 1.27. kazoo_encode usage

kazoo_encode("$ci", "$var(callid_encoded)");
$var(amqp_routing_key) = "call.status_req.$var(callid_encoded)";

5.3.2.  kazoo_json(json_payload, field, target_var)

The function extracts the value from a json payload and puts the result in the 3rd parameter. It can use nested values for the query part.

This function can be used from ANY ROUTE.

Example 1.28. kazoo_json usage

kazoo_json("$var(amqp_result)", "Channels[0].switch_url", "$du");
if($du != $null) {
  xlog("L_INFO", "$ci|log|user channels found redirecting call to $du");

6. Exported pseudo-variables

  • $kzR Contains the payload result of kazoo_query execution.

  • $kzE Contains the payload of a consumed message

7. Transformations

The prefix for kazoo transformations is kz.

  • json

    Example 1.29. kz.json usage

    #kazoo_json("$var(amqp_result)", "Channels[0].switch_url", "$du");
    $du = $kzR{kz.json,Channels[0].switch_url};
    if($du != $null) {
      xlog("L_INFO", "$ci|log|user channels found redirecting call to $du");

  • encode

    Example 1.30. kz.encode usage

    #kazoo_encode("$ci", "$var(callid_encoded)");
    #$var(amqp_routing_key) = "call.status_req.$var(callid_encoded)";
    $var(amqp_routing_key) = "call.status_req." + $(ci{kz.encode})