Table of Contents
List of Examples
node_hostname
parameteramqp_consumer_processes
parameteramqp_consumer_event_key
parameteramqp_consumer_event_subkey
parameteramqp_max_channels
parameteramqp_connection
parameteramqp_consumer_loop_count
parameteramqp_internal_loop_count
parameteramqp_consumer_ack_loop_count
parameterconsume_messages_on_reconnect
parametersingle_consumer_on_reconnect
parameteramqp_consumer_ack_timeout
parameteramqp_interprocess_timeout
parameteramqp_waitframe_timeout
parameteramqp_query_timeout
parameteramqp_query_timeout_avp
parameterdb_url
parameterpresentity_table
parameterkazoo_publish
usagekazoo_query
usagekazoo_subscribe
usagekazoo_subscribe
usagekazoo_pua_publish
usagekazoo_encode
usagekazoo_json
usagekz.json
usagekz.encode
usageTable of Contents
The Kazoo is a general purpose AMQP connector (tested with rabbitmq-server). It exposes publish/consume capabilities into Kamailio.
From a high-level, the purpose of the module might be for things like:
Integrate to an AMQP application to make real-time routing decisions (instead of using, say, a SQL database)
Provide a real-time integration into your program, instead of your database, so you can overlay additional logic in your preferred language while also utilizing a message bus
Utilize messaging to have a distributed messaging layer, such that machines processing requests/responses/events can go up/down or share the workload and your Kamailio node will still be happy
supported operations are:
publish json payloads to rabbitmq
publish json payloads to rabbitmq and wait for correlated response message
subscribe to an exchange with a routing key
The Kazoo module also has support to publish updates to presence module thru the kazoo_pua_publish function
The module works with a main forked process that does the communication with rabbitmq for issuing publishes, waiting for replies and consuming messages. When it consumes a message it defers the process to a worker process so that it doesn't block this main process.
The worker process issues an event-route where we can act on the received payload. The name of the event-route is composed by values extracted from the payload.
Kazoo module will try to execute the event route from most significant to less significant. define the event route like event_route[kazoo:consumer-event[-payload_key_value[-payload_subkey_value]]]
we can set the key/subkey pair on a subscription base. check the payload on subscribe.
Example 1.1. define the event route
... modparam("kazoo", "amqp_consumer_event_key", "Event-Category") modparam("kazoo", "amqp_consumer_event_subkey", "Event-Name") ... event_route[kazoo:consumer-event-presence-update] { # presence is the value extracted from Event-Category field in json payload # update is the value extracted from Event-Name field in json payload xlog("L_INFO", "received $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json,From})"); ... } event_route[kazoo:consumer-event-presence] { # presence is the value extracted from Event-Category field in json payload xlog("L_INFO", "received $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json,From})"); ... } event_route[kazoo:consumer-event-event-category-event-name] { # event-category is the name of the amqp_consumer_event_key parameter # event-name is the name of the amqp_consumer_event_subkey parameter # this event route is executed if we can't find the previous ... } event_route[kazoo:consumer-event-event-category] { # event-category is the name of the amqp_consumer_event_key parameter # this event route is executed if we can't find the previous ... } event_route[kazoo:consumer-event] { # this event route is executed if we can't find the previous }
The name of this host to register in rabbitmq.
Default value is NULL. you must set this parameter value for the module to work
Example 1.2. Set node_hostname
parameter
... modparam("kazoo", "node_hostname", "sipproxy.mydomain.com") ...
The number of worker processes to handle messages consumption.
Default value is 4.
Example 1.3. Set amqp_consumer_processes
parameter
... modparam("kazoo", "amqp_consumer_processes", 10) ...
The default name of the field in json payload to compose the event name 1st part
Default value is “Event-Category”.
Example 1.4. Set amqp_consumer_event_key
parameter
... modparam("kazoo", "amqp_consumer_event_key", "My-JSON-Field-Name") ...
The default name of the field in json payload to compose the event name 2nd part
Default value is “Event-Name”.
Example 1.5. Set amqp_consumer_event_subkey
parameter
... modparam("kazoo", "amqp_consumer_event_subkey", "My-JSON-SubField-Name") ...
The number of pre allocated channels for the connection.
Default value is 50.
execution control of main loop can be controlled by changing the parameter values in this section.
The main loop has 3 sub-loops were it listen for actions to execute with a timeout. These group of parameters allow to set the maximum number of times the sub-loop is executed if it doesn't timeout.
On busy systems, we may have a condition where a sub-loop never times out because it always has data to process. The purpose of these parameters is to set a maximum number of times it executes before it handles control to the next sub-loop.
... while(true) // main loop while(ACK or timeout) // acknowledge from worker process while(SEND or timeout) // anything to send ? while(CONSUME or timeout) // any data on consumed exchanges ? ...
The consumer loop count.
Default value is 10.
Example 1.8. Set amqp_consumer_loop_count
parameter
... modparam("kazoo", "amqp_consumer_loop_count", 3) ...
The internal listen for commands loop count.
Default value is 5.
Example 1.9. Set amqp_internal_loop_count
parameter
... modparam("kazoo", "amqp_internal_loop_count", 1) ...
The work ack loop count.
Default value is 20.
Example 1.10. Set amqp_consumer_ack_loop_count
parameter
... modparam("kazoo", "amqp_consumer_ack_loop_count", 5) ...
This parameter indicates if the module ignores the loop counters on reconnect and consumes all the pending messages ready to be consumed.
Default value is 1.
Example 1.11. Set consume_messages_on_reconnect
parameter
... modparam("kazoo", "consume_messages_on_reconnect", 0) ...
When the main loop receives a message from rabbitmq it will defer the execution to a worker in a round-robin manner. this parameter allows to use the same worker when kazoo reconnects to rabbitmq.
Default value is 1.
Example 1.12. Set single_consumer_on_reconnect
parameter
... modparam("kazoo", "single_consumer_on_reconnect", 0) ...
each functional parameter related to timers come with 2 reflected parameters. name_sec and name_micro
Timeout when checking for aknowledge from workers.
Default value is 100000 micro.
Example 1.13. Set amqp_consumer_ack_timeout
parameter
... modparam("kazoo", "amqp_consumer_ack_timeout_sec", 1) modparam("kazoo", "amqp_consumer_ack_timeout_micro", 200000) ...
Timeout when checking for commands (publish/query) for sending to rabbitmq.
Default value is 100000 micro.
Example 1.14. Set amqp_interprocess_timeout
parameter
... modparam("kazoo", "amqp_interprocess_timeout_sec", 1) modparam("kazoo", "amqp_interprocess_timeout_micro", 200000) ...
Timeout when checking for messages from rabbitmq.
Default value is 100000 micro.
Example 1.15. Set amqp_waitframe_timeout
parameter
... modparam("kazoo", "amqp_waitframe_timeout_sec", 1) modparam("kazoo", "amqp_waitframe_timeout_micro", 200000) ...
Timeout when checking for reply messages from rabbitmq for kazoo_query commands.
Default value is 2 sec.
Example 1.16. Set amqp_query_timeout
parameter
... modparam("kazoo", "amqp_query_timeout_sec", 1) modparam("kazoo", "amqp_query_timeout_micro", 200000) ...
avp holding the value in seconds for Timeout when checking for reply messages from rabbitmq for kazoo_query commands.
Default value is NULL (no value).
Example 1.17. >Set amqp_query_timeout_avp
parameter
... modparam("kazoo", "amqp_query_timeout_avp", "$var(kz_timeout)") route[SOME_ROUTE] { $var(kz_timeout) = 12; kazoo_query(exchange, routingkey, payload); } ...
The function publishes a json payload to rabbitmq. The routing_key parameter should be encoded.
This function can be used from ANY ROUTE.
Example 1.20. kazoo_publish
usage
... $var(amqp_payload_request) = "{'Event-Category' : 'directory', 'Event-Name' : 'reg_success', 'Contact' : '" + $var(fs_contact) + "', 'Call-ID' : '" + $ci + "', 'Realm' : '" + $fd +"', 'Username' : '" + $fU + "', 'From-User' : '" + $fU + "', 'From-Host' : '" + $fd + "', 'To-User' : '" + $tU +"', 'To-Host' : '" + $td + "', 'User-Agent' : '" + $ua +"' ," + $var(register_contants)+ " }"; $var(amqp_routing_key) = "registration.success." + $(fd{kz.encode}) + "." + $fU; kazoo_publish("callmgr", $var(amqp_routing_key), $var(amqp_payload_request)); ...
The function publishes a json payload to rabbitmq, waits for a correlated messageand puts the result in target_var. The routing_key parameter should be encoded. target_var is optional as the function also puts the result in pseudo-variable $kzR.
This function can be used from ANY ROUTE.
Example 1.21. kazoo_query
usage
... $var(amqp_payload_request) = "{'Event-Category' : 'call_event' , 'Event-Name' : 'query_user_channels_req', 'Realm' : '" + $fd + "', 'Username' : '" + $fU + "', 'Active-Only' : false }"; kazoo_encode("$ci", "$var(callid_encoded)"); $var(amqp_routing_key) = "call.status_req.$var(callid_encoded)"; if(kazoo_query("callevt", $var(amqp_routing_key), $var(amqp_payload_request), "$var(amqp_result)")) { kazoo_json("$var(amqp_result)", "Channels[0].switch_url", "$du"); if($du != $null) { xlog("L_INFO", "$ci|log|user channels found redirecting call to $du"); return; } } ...
The function subscribes to exchange/type on queue with routing_key. The routing_key parameter should be encoded.
This function must be called from event_route[kazoo:mod-init].
Example 1.22. kazoo_subscribe
usage
... event_route[kazoo:mod-init] { kazoo_subscribe("dialoginfo", "direct", "BLF-QUEUE-MY_HOSTNAME", "BLF-MY_HOSTNAME"); } event_route[kazoo:consumer-event] { xlog("L_INFO","Received json payload : $kzE"); } ...
The function takes additional parameters to the subscribe function.
json payload fields description
exchange : str, required
type : str, required
queue : str, required
routing : str, required
auto_delete : int, default 1
durable : int, default 0
no_ack : int, default 1
wait_for_consumer_ack : int, default 0
event_key : str, no default
event_subkey : str, no default
This function must be called from event_route[kazoo:mod-init].
Example 1.23. kazoo_subscribe
usage
... event_route[kazoo:mod-init] { $var(payload) = "{ 'exchange' : 'dialoginfo' , 'type' : 'direct', 'queue' : 'BLF-QUEUE-MY_HOSTNAME', 'routing' : 'BLF-MY_HOSTNAME', 'auto_delete' : 0, 'durable' : 1, 'no_ack' : 0, 'wait_for_consumer_ack' : 1 }"; kazoo_subscribe("$var(payload)"); } event_route[kazoo:consumer-event] { xlog("L_INFO","Received json payload : $kzE"); } ...
The function build presentity state from json_payload and updates presentity table.
This function can be used from ANY ROUTE.
Example 1.24. kazoo_pua_publish
usage
... event_route[kazoo:consumer-event-presence-update] { xlog("L_INFO", "received $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json,From})"); kazoo_pua_publish($kzE); pres_refresh_watchers("$(kzE{kz.json,From})", "$(kzE{kz.json,Event-Package})", 1); } ...
The function encodes the 1st parameter for amqp and puts the result in the 2nd parameter.
This function can be used from ANY ROUTE.
Example 1.25. kazoo_encode
usage
... kazoo_encode("$ci", "$var(callid_encoded)"); $var(amqp_routing_key) = "call.status_req.$var(callid_encoded)"; ...
The function extracts the value from a json payload and puts the result in the 3rd parameter. It can use nested values for the query part.
This function can be used from ANY ROUTE.
Example 1.26. kazoo_json
usage
... kazoo_json("$var(amqp_result)", "Channels[0].switch_url", "$du"); if($du != $null) { xlog("L_INFO", "$ci|log|user channels found redirecting call to $du"); return; } ...
$kzR Contains the payload result of kazoo_query execution.
$kzE Contains the payload of a consumed message
The prefix for kazoo transformations is kz.
json
Example 1.27. kz.json
usage
... #kazoo_json("$var(amqp_result)", "Channels[0].switch_url", "$du"); $du = $kzR{kz.json,Channels[0].switch_url}; if($du != $null) { xlog("L_INFO", "$ci|log|user channels found redirecting call to $du"); return; } ...
encode
Example 1.28. kz.encode
usage
... #kazoo_encode("$ci", "$var(callid_encoded)"); #$var(amqp_routing_key) = "call.status_req.$var(callid_encoded)"; $var(amqp_routing_key) = "call.status_req." + $(ci{kz.encode}) ...