KAZOO Module

2600hz Inc.

Edited by

Luis Azedo


Table of Contents

1. Admin Guide
1. Overview
2. How it works
2.1. event routes
2.2. acknowledge messages
3. Dependencies
3.1. Kamailio Modules
3.2. External Libraries or Applications
4. Parameters
4.1. amqp related
4.1.1. node_hostname(str)
4.1.2. amqp_consumer_processes(int)
4.1.3. amqp_consumer_event_key(str)
4.1.4. amqp_consumer_event_subkey(str)
4.1.5. amqp_max_channels(str)
4.1.6. amqp_connection(str)
4.1.7. event_callback(str)
4.2. execution control
4.2.1. amqp_consumer_loop_count(int)
4.2.2. amqp_internal_loop_count(int)
4.2.3. amqp_consumer_ack_loop_count(int)
4.2.4. consume_messages_on_reconnect(int)
4.2.5. single_consumer_on_reconnect(int)
4.3. timers
4.3.1. amqp_consumer_ack_timeout(str)
4.3.2. amqp_interprocess_timeout(str)
4.3.3. amqp_waitframe_timeout(str)
4.3.4. amqp_query_timeout(str)
4.3.5. amqp_query_timeout_avp(str)
4.4. presence related
4.4.1. db_url(str)
4.4.2. presentity_table(str)
4.4.3. pua_mode(int)
5. Functions
5.1. amqp related
5.1.1. kazoo_publish(exchange, routing_key, json_payload [, amqp_headers])
5.1.2. kazoo_query(exchange, routing_key, json_payload [, target_var] [, amqp_headers])
5.1.3. kazoo_subscribe(exchange, exchange_type, queue, routing_key)
5.1.4. kazoo_subscribe(json_description)
5.2. presence related
5.2.1. kazoo_pua_publish(json_payload)
5.3. other
5.3.1. kazoo_encode(to_encode, target_var)
5.3.2. kazoo_json(json_payload, field, target_var)
6. Exported pseudo-variables
7. Transformations

List of Examples

1.1. define the event route
1.2. Set node_hostname parameter
1.3. Set amqp_consumer_processes parameter
1.4. Set amqp_consumer_event_key parameter
1.5. Set amqp_consumer_event_subkey parameter
1.6. Set amqp_max_channels parameter
1.7. Set amqp_connection parameter
1.8. Set event_callback parameter
1.9. Set amqp_consumer_loop_count parameter
1.10. Set amqp_internal_loop_count parameter
1.11. Set amqp_consumer_ack_loop_count parameter
1.12. Set consume_messages_on_reconnect parameter
1.13. Set single_consumer_on_reconnect parameter
1.14. Set amqp_consumer_ack_timeout parameter
1.15. Set amqp_interprocess_timeout parameter
1.16. Set amqp_waitframe_timeout parameter
1.17. Set amqp_query_timeout parameter
1.18. >Set amqp_query_timeout_avp parameter
1.19. Set db_url parameter
1.20. Set presentity_table parameter
1.21. Set pua_mode parameter
1.22. kazoo_publish usage
1.23. kazoo_query usage
1.24. kazoo_subscribe usage
1.25. kazoo_subscribe usage
1.26. kazoo_pua_publish usage
1.27. kazoo_encode usage
1.28. kazoo_json usage
1.29. kz.json usage
1.30. kz.encode usage

Chapter 1. Admin Guide

1. Overview

The Kazoo is a general purpose AMQP connector (tested with rabbitmq-server). It exposes publish/consume capabilities into Kamailio.

From a high-level, the purpose of the module might be for things like:

  • Integrate to an AMQP application to make real-time routing decisions (instead of using, say, a SQL database)

  • Provide a real-time integration into your program, instead of your database, so you can overlay additional logic in your preferred language while also utilizing a message bus

  • Utilize messaging to have a distributed messaging layer, such that machines processing requests/responses/events can go up/down or share the workload and your Kamailio node will still be happy

supported operations are:

  • publish json payloads to rabbitmq

  • publish json payloads to rabbitmq and wait for correlated response message

  • subscribe to an exchange with a routing key

The Kazoo module also has support to publish updates to presence module thru the kazoo_pua_publish function

2. How it works

The module works with a main forked process that does the communication with rabbitmq for issuing publishes, waiting for replies and consuming messages. When it consumes a message it defers the process to a worker process so that it doesn't block this main process.

2.1. event routes

The worker process issues an event-route where we can act on the received payload. The name of the event-route is composed by values extracted from the payload.

Kazoo module will try to execute the event route from most significant to less significant. define the event route like event_route[kazoo:consumer-event[-payload_key_value[-payload_subkey_value]]]

we can set the key/subkey pair on a subscription base. check the payload on subscribe.

Example 1.1. define the event route

...
modparam("kazoo", "amqp_consumer_event_key", "Event-Category")
modparam("kazoo", "amqp_consumer_event_subkey", "Event-Name")
...

event_route[kazoo:consumer-event-presence-update]
{
# presence is the value extracted from Event-Category field in json payload
# update is the value extracted from Event-Name field in json payload
xlog("L_INFO", "received $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json,From})");
...
}

event_route[kazoo:consumer-event-presence]
{
# presence is the value extracted from Event-Category field in json payload
xlog("L_INFO", "received $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json,From})");
...
}

event_route[kazoo:consumer-event-event-category-event-name]
{
# event-category is the name of the amqp_consumer_event_key parameter
# event-name is the name of the amqp_consumer_event_subkey parameter
# this event route is executed if we can't find the previous
...
}

event_route[kazoo:consumer-event-event-category]
{
# event-category is the name of the amqp_consumer_event_key parameter
# this event route is executed if we can't find the previous
...
}

event_route[kazoo:consumer-event]
{
# this event route is executed if we can't find the previous
}

2.2. acknowledge messages

Consumed messages have the option of being acknowledge in two ways:

  • immediately when received

  • after processing by the worker

3. Dependencies

3.1. Kamailio Modules

The following modules must be loaded before this module:

  • none.

3.2. External Libraries or Applications

  • librabbitmq.

  • libjson.

  • libuuid.

4. Parameters

4.1. amqp related

4.1.1. node_hostname(str)

The name of this host to register in rabbitmq.

Default value is NULL. you must set this parameter value for the module to work

Example 1.2. Set node_hostname parameter

...
modparam("kazoo", "node_hostname", "sipproxy.mydomain.com")
...

4.1.2. amqp_consumer_processes(int)

The number of worker processes to handle messages consumption.

Default value is 4.

Example 1.3. Set amqp_consumer_processes parameter

...
modparam("kazoo", "amqp_consumer_processes", 10)
...

4.1.3. amqp_consumer_event_key(str)

The default name of the field in json payload to compose the event name 1st part

Default value is Event-Category.

Example 1.4. Set amqp_consumer_event_key parameter

...
modparam("kazoo", "amqp_consumer_event_key", "My-JSON-Field-Name")
...

4.1.4. amqp_consumer_event_subkey(str)

The default name of the field in json payload to compose the event name 2nd part

Default value is Event-Name.

Example 1.5. Set amqp_consumer_event_subkey parameter

...
modparam("kazoo", "amqp_consumer_event_subkey", "My-JSON-SubField-Name")
...

4.1.5. amqp_max_channels(str)

The number of pre allocated channels for the connection.

Default value is 50.

Example 1.6. Set amqp_max_channels parameter

...
modparam("kazoo", "amqp_max_channels", 100)
...

4.1.6. amqp_connection(str)

The connection url to rabbitmq. can be set multiple times for failover.

Example 1.7. Set amqp_connection parameter

...
modparam("kazoo", "amqp_connection", "amqp://guest:guest@localhost:5672")
modparam("kazoo", "amqp_connection", "kazoo://guest:guest@otherhost:5672")
...

4.1.7. event_callback(str)

The name of the function in the kemi configuration file (embedded scripting language such as Lua, Python, ...) to be executed instead of event_route[...] blocks.

The function receives a string parameter with the name of the event, the values can be: 'kazoo:mod-init', 'kazoo:consumer-event'.

Example 1.8. Set event_callback parameter

    ...
    modparam("kazoo", "event_callback", "ksr_kazoo_event")
    ...

4.2. execution control

execution control of main loop can be controlled by changing the parameter values in this section.

The main loop has 3 sub-loops were it listen for actions to execute with a timeout. These group of parameters allow to set the maximum number of times the sub-loop is executed if it doesn't timeout.

On busy systems, we may have a condition where a sub-loop never times out because it always has data to process. The purpose of these parameters is to set a maximum number of times it executes before it handles control to the next sub-loop.

...
while(true) // main  loop
while(ACK or timeout)  // acknowledge from worker process
while(SEND or timeout) // anything to send ?
while(CONSUME or timeout) // any data on consumed exchanges ?
...

4.2.1. amqp_consumer_loop_count(int)

The consumer loop count.

Default value is 10.

Example 1.9. Set amqp_consumer_loop_count parameter

...
modparam("kazoo", "amqp_consumer_loop_count", 3)
...

4.2.2. amqp_internal_loop_count(int)

The internal listen for commands loop count.

Default value is 5.

Example 1.10. Set amqp_internal_loop_count parameter

...
modparam("kazoo", "amqp_internal_loop_count", 1)
...

4.2.3. amqp_consumer_ack_loop_count(int)

The work ack loop count.

Default value is 20.

Example 1.11. Set amqp_consumer_ack_loop_count parameter

...
modparam("kazoo", "amqp_consumer_ack_loop_count", 5)
...

4.2.4. consume_messages_on_reconnect(int)

This parameter indicates if the module ignores the loop counters on reconnect and consumes all the pending messages ready to be consumed.

Default value is 1.

Example 1.12. Set consume_messages_on_reconnect parameter

...
modparam("kazoo", "consume_messages_on_reconnect", 0)
...

4.2.5. single_consumer_on_reconnect(int)

When the main loop receives a message from rabbitmq it will defer the execution to a worker in a round-robin manner. This parameter allows to use the same worker when kazoo reconnects to rabbitmq.

Default value is 1.

Example 1.13. Set single_consumer_on_reconnect parameter

...
modparam("kazoo", "single_consumer_on_reconnect", 0)
...

4.3. timers

each functional parameter related to timers come with 2 reflected parameters. name_sec and name_micro

4.3.1. amqp_consumer_ack_timeout(str)

Timeout when checking for acknowledge from workers.

Default value is 100000 micro.

Example 1.14. Set amqp_consumer_ack_timeout parameter

...
modparam("kazoo", "amqp_consumer_ack_timeout_sec", 1)
modparam("kazoo", "amqp_consumer_ack_timeout_micro", 200000)
...

4.3.2. amqp_interprocess_timeout(str)

Timeout when checking for commands (publish/query) for sending to rabbitmq.

Default value is 100000 micro.

Example 1.15. Set amqp_interprocess_timeout parameter

...
modparam("kazoo", "amqp_interprocess_timeout_sec", 1)
modparam("kazoo", "amqp_interprocess_timeout_micro", 200000)
...

4.3.3. amqp_waitframe_timeout(str)

Timeout when checking for messages from rabbitmq.

Default value is 100000 micro.

Example 1.16. Set amqp_waitframe_timeout parameter

...
modparam("kazoo", "amqp_waitframe_timeout_sec", 1)
modparam("kazoo", "amqp_waitframe_timeout_micro", 200000)
...

4.3.4. amqp_query_timeout(str)

Timeout when checking for reply messages from rabbitmq for kazoo_query commands.

Default value is 2 sec.

Example 1.17. Set amqp_query_timeout parameter

...
modparam("kazoo", "amqp_query_timeout_sec", 1)
modparam("kazoo", "amqp_query_timeout_micro", 200000)
...

4.3.5. amqp_query_timeout_avp(str)

avp holding the value in seconds for Timeout when checking for reply messages from rabbitmq for kazoo_query commands.

Default value is NULL (no value).

Example 1.18. >Set amqp_query_timeout_avp parameter

...
modparam("kazoo", "amqp_query_timeout_avp", "$var(kz_timeout)")

route[SOME_ROUTE]
{
    $var(kz_timeout) = 12;
    kazoo_query(exchange, routingkey, payload);
}

...

4.4. presence related

4.4.1. db_url(str)

The database for the presentity table.

If set, the kazoo_ppua_publish function will update the presentity status in the database.

Default value is NULL.

Example 1.19. Set db_url parameter

...
modparam("kazoo", "db_url", "mysql://kamailio:kamailiorw@localhost/kamailio")
...

4.4.2. presentity_table(str)

The name of the presentity table in the database.

Default value is presentity.

Example 1.20. Set presentity_table parameter

...
modparam("kazoo", "presentity_table", "my_presentity_table")
...

4.4.3. pua_mode(int)

Control if the module has to connect to presence database tables. Set it to 0 to not connect to database.

Default value is 1.

Example 1.21. Set pua_mode parameter

...
modparam("kazoo", "pua_mode", 0)
...

5. Functions

5.1. amqp related

5.1.1.  kazoo_publish(exchange, routing_key, json_payload [, amqp_headers])

The function publishes a json payload to rabbitmq. The routing_key parameter should be encoded. Optional AMQP-Headers are specified in the format key1=value1;key2=value2

This function can be used from ANY ROUTE.

Example 1.22. kazoo_publish usage

...
$var(amqp_payload_request) = $_s({"Event-Category" : "directory", "Event-Name" : "reg_success", "Contact" : "$(ct{s.escape.common}{s.replace,\','}{s.replace,$$,})", "Call-ID" : "$ci", "Realm" : "$fd", "Username" : "$fU", "From-User" : "$fU", "From-Host" : "$fd", "To-User" : "$tU", "To-Host" : "$td", "User-Agent" : "$(ua{s.escape.common}{s.replace,\','}{s.replace,$$,})" });
$var(amqp_routing_key) = "registration.success." + $(fd{kz.encode}) + "." + $fU;
kazoo_publish("callmgr", $var(amqp_routing_key), $var(amqp_payload_request));
...

5.1.2.  kazoo_query(exchange, routing_key, json_payload [, target_var] [, amqp_headers])

The function publishes a json payload to rabbitmq, waits for a correlated messageand puts the result in target_var. The routing_key parameter should be encoded. target_var is optional as the function also puts the result in pseudo-variable $kzR. Optional AMQP-Headers are specified in the format key1=value1;key2=value2

This function can be used from ANY ROUTE.

Example 1.23. kazoo_query usage

...
$var(amqp_payload_request) = "{'Event-Category' : 'call_event' , 'Event-Name' : 'query_user_channels_req', 'Realm' : '" + $fd + "', 'Username' : '" + $fU + "', 'Active-Only' : false }";
kazoo_encode("$ci", "$var(callid_encoded)");
$var(amqp_routing_key) = "call.status_req.$var(callid_encoded)";
if(kazoo_query("callevt", $var(amqp_routing_key), $var(amqp_payload_request), "$var(amqp_result)")) {
   kazoo_json("$var(amqp_result)", "Channels[0].switch_url", "$du");
   if($du != $null) {
       xlog("L_INFO", "$ci|log|user channels found redirecting call to $du");
       return;
   }
}
...

5.1.3.  kazoo_subscribe(exchange, exchange_type, queue, routing_key)

The function subscribes to exchange/type on queue with routing_key. The routing_key parameter should be encoded.

This function must be called from event_route[kazoo:mod-init].

Example 1.24. kazoo_subscribe usage

...
event_route[kazoo:mod-init]
{
   kazoo_subscribe("dialoginfo", "direct", "BLF-QUEUE-MY_HOSTNAME", "BLF-MY_HOSTNAME");
}

event_route[kazoo:consumer-event]
{
    xlog("L_INFO","Received json payload : $kzE");
}
...

5.1.4.  kazoo_subscribe(json_description)

The function takes additional parameters to the subscribe function.

json payload fields description

  • exchange : str, required

  • type : str, required

  • queue : str, required

  • routing : str, required

  • auto_delete : int, default 1

  • durable : int, default 0

  • no_ack : int, default 1

  • wait_for_consumer_ack : int, default 0

  • event_key : str, no default

  • event_subkey : str, no default

This function must be called from event_route[kazoo:mod-init].

Example 1.25. kazoo_subscribe usage

...
event_route[kazoo:mod-init]
{
    $var(payload) = "{ 'exchange' : 'dialoginfo' , 'type' : 'direct', 'queue' : 'BLF-QUEUE-MY_HOSTNAME', 'routing' : 'BLF-MY_HOSTNAME', 'auto_delete' : 0, 'durable' : 1, 'no_ack' : 0, 'wait_for_consumer_ack' : 1 }";
    kazoo_subscribe("$var(payload)");
}

event_route[kazoo:consumer-event]
{
    xlog("L_INFO","Received json payload : $kzE");
}
...

5.2. presence related

5.2.1.  kazoo_pua_publish(json_payload)

The function build presentity state from json_payload and updates presentity table.

This function can be used from ANY ROUTE.

Example 1.26. kazoo_pua_publish usage

...
event_route[kazoo:consumer-event-presence-update]
{
    xlog("L_INFO", "received $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json,From})");
    kazoo_pua_publish($kzE);
    pres_refresh_watchers("$(kzE{kz.json,From})", "$(kzE{kz.json,Event-Package})", 1);
}
...

5.3. other

5.3.1.  kazoo_encode(to_encode, target_var)

The function encodes the 1st parameter for amqp and puts the result in the 2nd parameter.

This function can be used from ANY ROUTE.

Example 1.27. kazoo_encode usage

...
kazoo_encode("$ci", "$var(callid_encoded)");
$var(amqp_routing_key) = "call.status_req.$var(callid_encoded)";
...

5.3.2.  kazoo_json(json_payload, field, target_var)

The function extracts the value from a json payload and puts the result in the 3rd parameter. It can use nested values for the query part.

This function can be used from ANY ROUTE.

Example 1.28. kazoo_json usage

...
kazoo_json("$var(amqp_result)", "Channels[0].switch_url", "$du");
if($du != $null) {
  xlog("L_INFO", "$ci|log|user channels found redirecting call to $du");
  return;
}
...

6. Exported pseudo-variables

  • $kzR Contains the payload result of kazoo_query execution.

  • $kzE Contains the payload of a consumed message

7. Transformations

The prefix for kazoo transformations is kz.

  • json

    Example 1.29. kz.json usage

    ...
    #kazoo_json("$var(amqp_result)", "Channels[0].switch_url", "$du");
    $du = $kzR{kz.json,Channels[0].switch_url};
    if($du != $null) {
      xlog("L_INFO", "$ci|log|user channels found redirecting call to $du");
      return;
    }
    ...

  • encode

    Example 1.30. kz.encode usage

    ...
    #kazoo_encode("$ci", "$var(callid_encoded)");
    #$var(amqp_routing_key) = "call.status_req.$var(callid_encoded)";
    $var(amqp_routing_key) = "call.status_req." + $(ci{kz.encode})
    ...