Table of Contents
List of Examples
node_hostname parameteramqp_consumer_processes parameteramqp_consumer_event_key parameteramqp_consumer_event_subkey parameteramqp_max_channels parameteramqp_connection parameterevent_callback parameteramqp_consumer_loop_count parameteramqp_internal_loop_count parameteramqp_consumer_ack_loop_count parameterconsume_messages_on_reconnect parametersingle_consumer_on_reconnect parameteramqp_consumer_ack_timeout parameteramqp_interprocess_timeout parameteramqp_waitframe_timeout parameteramqp_query_timeout parameteramqp_query_timeout_avp parameterdb_url parameterpresentity_table parameterpua_mode parameterkazoo_publish usagekazoo_query usagekazoo_subscribe usagekazoo_subscribe usagekazoo_pua_publish usagekazoo_encode usagekazoo_json usagekz.json usagekz.encode usageTable of Contents
The Kazoo is a general purpose AMQP connector (tested with rabbitmq-server). It exposes publish/consume capabilities into Kamailio.
From a high-level, the purpose of the module might be for things like:
Integrate to an AMQP application to make real-time routing decisions (instead of using, say, a SQL database)
Provide a real-time integration into your program, instead of your database, so you can overlay additional logic in your preferred language while also utilizing a message bus
Utilize messaging to have a distributed messaging layer, such that machines processing requests/responses/events can go up/down or share the workload and your Kamailio node will still be happy
supported operations are:
publish json payloads to rabbitmq
publish json payloads to rabbitmq and wait for correlated response message
subscribe to an exchange with a routing key
The Kazoo module also has support to publish updates to presence module thru the kazoo_pua_publish function
The module works with a main forked process that does the communication with rabbitmq for issuing publishes, waiting for replies and consuming messages. When it consumes a message it defers the process to a worker process so that it doesn't block this main process.
The worker process issues an event-route where we can act on the received payload. The name of the event-route is composed by values extracted from the payload.
Kazoo module will try to execute the event route from most significant to less significant. define the event route like event_route[kazoo:consumer-event[-payload_key_value[-payload_subkey_value]]]
we can set the key/subkey pair on a subscription base. check the payload on subscribe.
Example 1.1. define the event route
...
modparam("kazoo", "amqp_consumer_event_key", "Event-Category")
modparam("kazoo", "amqp_consumer_event_subkey", "Event-Name")
...
event_route[kazoo:consumer-event-presence-update]
{
# presence is the value extracted from Event-Category field in json payload
# update is the value extracted from Event-Name field in json payload
xlog("L_INFO", "received $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json,From})");
...
}
event_route[kazoo:consumer-event-presence]
{
# presence is the value extracted from Event-Category field in json payload
xlog("L_INFO", "received $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json,From})");
...
}
event_route[kazoo:consumer-event-event-category-event-name]
{
# event-category is the name of the amqp_consumer_event_key parameter
# event-name is the name of the amqp_consumer_event_subkey parameter
# this event route is executed if we can't find the previous
...
}
event_route[kazoo:consumer-event-event-category]
{
# event-category is the name of the amqp_consumer_event_key parameter
# this event route is executed if we can't find the previous
...
}
event_route[kazoo:consumer-event]
{
# this event route is executed if we can't find the previous
}
The name of this host to register in rabbitmq.
Default value is NULL. you must set this parameter value for the module to work
Example 1.2. Set node_hostname parameter
...
modparam("kazoo", "node_hostname", "sipproxy.mydomain.com")
...
The number of worker processes to handle messages consumption.
Default value is 4.
Example 1.3. Set amqp_consumer_processes parameter
...
modparam("kazoo", "amqp_consumer_processes", 10)
...
The default name of the field in json payload to compose the event name 1st part
Default value is “Event-Category”.
Example 1.4. Set amqp_consumer_event_key parameter
...
modparam("kazoo", "amqp_consumer_event_key", "My-JSON-Field-Name")
...
The default name of the field in json payload to compose the event name 2nd part
Default value is “Event-Name”.
Example 1.5. Set amqp_consumer_event_subkey parameter
...
modparam("kazoo", "amqp_consumer_event_subkey", "My-JSON-SubField-Name")
...
The number of pre allocated channels for the connection.
Default value is 50.
The connection url to rabbitmq. can be set multiple times for failover.
Example 1.7. Set amqp_connection parameter
...
modparam("kazoo", "amqp_connection", "amqp://guest:guest@localhost:5672")
modparam("kazoo", "amqp_connection", "kazoo://guest:guest@otherhost:5672")
...
The name of the function in the kemi configuration file (embedded scripting language such as Lua, Python, ...) to be executed instead of event_route[...] blocks.
The function receives a string parameter with the name of the event, the values can be: 'kazoo:mod-init', 'kazoo:consumer-event'.
Example 1.8. Set event_callback parameter
...
modparam("kazoo", "event_callback", "ksr_kazoo_event")
...
execution control of main loop can be controlled by changing the parameter values in this section.
The main loop has 3 sub-loops were it listen for actions to execute with a timeout. These group of parameters allow to set the maximum number of times the sub-loop is executed if it doesn't timeout.
On busy systems, we may have a condition where a sub-loop never times out because it always has data to process. The purpose of these parameters is to set a maximum number of times it executes before it handles control to the next sub-loop.
... while(true) // main loop while(ACK or timeout) // acknowledge from worker process while(SEND or timeout) // anything to send ? while(CONSUME or timeout) // any data on consumed exchanges ? ...
The consumer loop count.
Default value is 10.
Example 1.9. Set amqp_consumer_loop_count parameter
...
modparam("kazoo", "amqp_consumer_loop_count", 3)
...
The internal listen for commands loop count.
Default value is 5.
Example 1.10. Set amqp_internal_loop_count parameter
...
modparam("kazoo", "amqp_internal_loop_count", 1)
...
The work ack loop count.
Default value is 20.
Example 1.11. Set amqp_consumer_ack_loop_count parameter
...
modparam("kazoo", "amqp_consumer_ack_loop_count", 5)
...
This parameter indicates if the module ignores the loop counters on reconnect and consumes all the pending messages ready to be consumed.
Default value is 1.
Example 1.12. Set consume_messages_on_reconnect parameter
...
modparam("kazoo", "consume_messages_on_reconnect", 0)
...
When the main loop receives a message from rabbitmq it will defer the execution to a worker in a round-robin manner. This parameter allows to use the same worker when kazoo reconnects to rabbitmq.
Default value is 1.
Example 1.13. Set single_consumer_on_reconnect parameter
...
modparam("kazoo", "single_consumer_on_reconnect", 0)
...
each functional parameter related to timers come with 2 reflected parameters. name_sec and name_micro
Timeout when checking for acknowledge from workers.
Default value is 100000 micro.
Example 1.14. Set amqp_consumer_ack_timeout parameter
...
modparam("kazoo", "amqp_consumer_ack_timeout_sec", 1)
modparam("kazoo", "amqp_consumer_ack_timeout_micro", 200000)
...
Timeout when checking for commands (publish/query) for sending to rabbitmq.
Default value is 100000 micro.
Example 1.15. Set amqp_interprocess_timeout parameter
...
modparam("kazoo", "amqp_interprocess_timeout_sec", 1)
modparam("kazoo", "amqp_interprocess_timeout_micro", 200000)
...
Timeout when checking for messages from rabbitmq.
Default value is 100000 micro.
Example 1.16. Set amqp_waitframe_timeout parameter
...
modparam("kazoo", "amqp_waitframe_timeout_sec", 1)
modparam("kazoo", "amqp_waitframe_timeout_micro", 200000)
...
Timeout when checking for reply messages from rabbitmq for kazoo_query commands.
Default value is 2 sec.
Example 1.17. Set amqp_query_timeout parameter
...
modparam("kazoo", "amqp_query_timeout_sec", 1)
modparam("kazoo", "amqp_query_timeout_micro", 200000)
...
avp holding the value in seconds for Timeout when checking for reply messages from rabbitmq for kazoo_query commands.
Default value is NULL (no value).
Example 1.18. >Set amqp_query_timeout_avp parameter
...
modparam("kazoo", "amqp_query_timeout_avp", "$var(kz_timeout)")
route[SOME_ROUTE]
{
$var(kz_timeout) = 12;
kazoo_query(exchange, routingkey, payload);
}
...
The database for the presentity table.
If set, the kazoo_ppua_publish function will update the presentity status in the database.
Default value is “NULL”.
Example 1.19. Set db_url parameter
...
modparam("kazoo", "db_url", "mysql://kamailio:kamailiorw@localhost/kamailio")
...
The function publishes a json payload to rabbitmq. The routing_key parameter should be encoded. Optional AMQP-Headers are specified in the format key1=value1;key2=value2
This function can be used from ANY ROUTE.
Example 1.22. kazoo_publish usage
...
$var(amqp_payload_request) = $_s({"Event-Category" : "directory", "Event-Name" : "reg_success", "Contact" : "$(ct{s.escape.common}{s.replace,\','}{s.replace,$$,})", "Call-ID" : "$ci", "Realm" : "$fd", "Username" : "$fU", "From-User" : "$fU", "From-Host" : "$fd", "To-User" : "$tU", "To-Host" : "$td", "User-Agent" : "$(ua{s.escape.common}{s.replace,\','}{s.replace,$$,})" });
$var(amqp_routing_key) = "registration.success." + $(fd{kz.encode}) + "." + $fU;
kazoo_publish("callmgr", $var(amqp_routing_key), $var(amqp_payload_request));
...
The function publishes a json payload to rabbitmq, waits for a correlated messageand puts the result in target_var. The routing_key parameter should be encoded. target_var is optional as the function also puts the result in pseudo-variable $kzR. Optional AMQP-Headers are specified in the format key1=value1;key2=value2
This function can be used from ANY ROUTE.
Example 1.23. kazoo_query usage
...
$var(amqp_payload_request) = "{'Event-Category' : 'call_event' , 'Event-Name' : 'query_user_channels_req', 'Realm' : '" + $fd + "', 'Username' : '" + $fU + "', 'Active-Only' : false }";
kazoo_encode("$ci", "$var(callid_encoded)");
$var(amqp_routing_key) = "call.status_req.$var(callid_encoded)";
if(kazoo_query("callevt", $var(amqp_routing_key), $var(amqp_payload_request), "$var(amqp_result)")) {
kazoo_json("$var(amqp_result)", "Channels[0].switch_url", "$du");
if($du != $null) {
xlog("L_INFO", "$ci|log|user channels found redirecting call to $du");
return;
}
}
...
The function subscribes to exchange/type on queue with routing_key. The routing_key parameter should be encoded.
This function must be called from event_route[kazoo:mod-init].
Example 1.24. kazoo_subscribe usage
...
event_route[kazoo:mod-init]
{
kazoo_subscribe("dialoginfo", "direct", "BLF-QUEUE-MY_HOSTNAME", "BLF-MY_HOSTNAME");
}
event_route[kazoo:consumer-event]
{
xlog("L_INFO","Received json payload : $kzE");
}
...
The function takes additional parameters to the subscribe function.
json payload fields description
exchange : str, required
type : str, required
queue : str, required
routing : str, required
auto_delete : int, default 1
durable : int, default 0
no_ack : int, default 1
wait_for_consumer_ack : int, default 0
event_key : str, no default
event_subkey : str, no default
This function must be called from event_route[kazoo:mod-init].
Example 1.25. kazoo_subscribe usage
...
event_route[kazoo:mod-init]
{
$var(payload) = "{ 'exchange' : 'dialoginfo' , 'type' : 'direct', 'queue' : 'BLF-QUEUE-MY_HOSTNAME', 'routing' : 'BLF-MY_HOSTNAME', 'auto_delete' : 0, 'durable' : 1, 'no_ack' : 0, 'wait_for_consumer_ack' : 1 }";
kazoo_subscribe("$var(payload)");
}
event_route[kazoo:consumer-event]
{
xlog("L_INFO","Received json payload : $kzE");
}
...
The function build presentity state from json_payload and updates presentity table.
This function can be used from ANY ROUTE.
Example 1.26. kazoo_pua_publish usage
...
event_route[kazoo:consumer-event-presence-update]
{
xlog("L_INFO", "received $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json,From})");
kazoo_pua_publish($kzE);
pres_refresh_watchers("$(kzE{kz.json,From})", "$(kzE{kz.json,Event-Package})", 1);
}
...
The function encodes the 1st parameter for amqp and puts the result in the 2nd parameter.
This function can be used from ANY ROUTE.
Example 1.27. kazoo_encode usage
...
kazoo_encode("$ci", "$var(callid_encoded)");
$var(amqp_routing_key) = "call.status_req.$var(callid_encoded)";
...
The function extracts the value from a json payload and puts the result in the 3rd parameter. It can use nested values for the query part.
This function can be used from ANY ROUTE.
Example 1.28. kazoo_json usage
...
kazoo_json("$var(amqp_result)", "Channels[0].switch_url", "$du");
if($du != $null) {
xlog("L_INFO", "$ci|log|user channels found redirecting call to $du");
return;
}
...
$kzR Contains the payload result of kazoo_query execution.
$kzE Contains the payload of a consumed message
The prefix for kazoo transformations is kz.
json
Example 1.29. kz.json usage
...
#kazoo_json("$var(amqp_result)", "Channels[0].switch_url", "$du");
$du = $kzR{kz.json,Channels[0].switch_url};
if($du != $null) {
xlog("L_INFO", "$ci|log|user channels found redirecting call to $du");
return;
}
...
encode
Example 1.30. kz.encode usage
...
#kazoo_encode("$ci", "$var(callid_encoded)");
#$var(amqp_routing_key) = "call.status_req.$var(callid_encoded)";
$var(amqp_routing_key) = "call.status_req." + $(ci{kz.encode})
...