Table of Contents
List of Examples
node_hostname
parameteramqp_consumer_processes
parameteramqp_consumer_event_key
parameteramqp_consumer_event_subkey
parameteramqp_max_channels
parameteramqp_connection
parameterevent_callback
parameteramqp_consumer_loop_count
parameteramqp_internal_loop_count
parameteramqp_consumer_ack_loop_count
parameterconsume_messages_on_reconnect
parametersingle_consumer_on_reconnect
parameteramqp_consumer_ack_timeout
parameteramqp_interprocess_timeout
parameteramqp_waitframe_timeout
parameteramqp_query_timeout
parameteramqp_query_timeout_avp
parameterdb_url
parameterpresentity_table
parameterpua_mode
parameterkazoo_publish
usagekazoo_query
usagekazoo_subscribe
usagekazoo_subscribe
usagekazoo_pua_publish
usagekazoo_encode
usagekazoo_json
usagekz.json
usagekz.encode
usageTable of Contents
The Kazoo is a general purpose AMQP connector (tested with rabbitmq-server). It exposes publish/consume capabilities into Kamailio.
From a high-level, the purpose of the module might be for things like:
Integrate to an AMQP application to make real-time routing decisions (instead of using, say, a SQL database)
Provide a real-time integration into your program, instead of your database, so you can overlay additional logic in your preferred language while also utilizing a message bus
Utilize messaging to have a distributed messaging layer, such that machines processing requests/responses/events can go up/down or share the workload and your Kamailio node will still be happy
supported operations are:
publish json payloads to rabbitmq
publish json payloads to rabbitmq and wait for correlated response message
subscribe to an exchange with a routing key
The Kazoo module also has support to publish updates to presence module thru the kazoo_pua_publish function
The module works with a main forked process that does the communication with rabbitmq for issuing publishes, waiting for replies and consuming messages. When it consumes a message it defers the process to a worker process so that it doesn't block this main process.
The worker process issues an event-route where we can act on the received payload. The name of the event-route is composed by values extracted from the payload.
Kazoo module will try to execute the event route from most significant to less significant. define the event route like event_route[kazoo:consumer-event[-payload_key_value[-payload_subkey_value]]]
we can set the key/subkey pair on a subscription base. check the payload on subscribe.
Example 1.1. define the event route
... modparam("kazoo", "amqp_consumer_event_key", "Event-Category") modparam("kazoo", "amqp_consumer_event_subkey", "Event-Name") ... event_route[kazoo:consumer-event-presence-update] { # presence is the value extracted from Event-Category field in json payload # update is the value extracted from Event-Name field in json payload xlog("L_INFO", "received $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json,From})"); ... } event_route[kazoo:consumer-event-presence] { # presence is the value extracted from Event-Category field in json payload xlog("L_INFO", "received $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json,From})"); ... } event_route[kazoo:consumer-event-event-category-event-name] { # event-category is the name of the amqp_consumer_event_key parameter # event-name is the name of the amqp_consumer_event_subkey parameter # this event route is executed if we can't find the previous ... } event_route[kazoo:consumer-event-event-category] { # event-category is the name of the amqp_consumer_event_key parameter # this event route is executed if we can't find the previous ... } event_route[kazoo:consumer-event] { # this event route is executed if we can't find the previous }
The name of this host to register in rabbitmq.
Default value is NULL. you must set this parameter value for the module to work
Example 1.2. Set node_hostname
parameter
... modparam("kazoo", "node_hostname", "sipproxy.mydomain.com") ...
The number of worker processes to handle messages consumption.
Default value is 4.
Example 1.3. Set amqp_consumer_processes
parameter
... modparam("kazoo", "amqp_consumer_processes", 10) ...
The default name of the field in json payload to compose the event name 1st part
Default value is “Event-Category”.
Example 1.4. Set amqp_consumer_event_key
parameter
... modparam("kazoo", "amqp_consumer_event_key", "My-JSON-Field-Name") ...
The default name of the field in json payload to compose the event name 2nd part
Default value is “Event-Name”.
Example 1.5. Set amqp_consumer_event_subkey
parameter
... modparam("kazoo", "amqp_consumer_event_subkey", "My-JSON-SubField-Name") ...
The number of pre allocated channels for the connection.
Default value is 50.
The connection url to rabbitmq. can be set multiple times for failover.
Example 1.7. Set amqp_connection
parameter
... modparam("kazoo", "amqp_connection", "amqp://guest:guest@localhost:5672") modparam("kazoo", "amqp_connection", "kazoo://guest:guest@otherhost:5672") ...
The name of the function in the kemi configuration file (embedded scripting language such as Lua, Python, ...) to be executed instead of event_route[...] blocks.
The function receives a string parameter with the name of the event, the values can be: 'kazoo:mod-init', 'kazoo:consumer-event'.
Example 1.8. Set event_callback
parameter
... modparam("kazoo", "event_callback", "ksr_kazoo_event") ...
execution control of main loop can be controlled by changing the parameter values in this section.
The main loop has 3 sub-loops were it listen for actions to execute with a timeout. These group of parameters allow to set the maximum number of times the sub-loop is executed if it doesn't timeout.
On busy systems, we may have a condition where a sub-loop never times out because it always has data to process. The purpose of these parameters is to set a maximum number of times it executes before it handles control to the next sub-loop.
... while(true) // main loop while(ACK or timeout) // acknowledge from worker process while(SEND or timeout) // anything to send ? while(CONSUME or timeout) // any data on consumed exchanges ? ...
The consumer loop count.
Default value is 10.
Example 1.9. Set amqp_consumer_loop_count
parameter
... modparam("kazoo", "amqp_consumer_loop_count", 3) ...
The internal listen for commands loop count.
Default value is 5.
Example 1.10. Set amqp_internal_loop_count
parameter
... modparam("kazoo", "amqp_internal_loop_count", 1) ...
The work ack loop count.
Default value is 20.
Example 1.11. Set amqp_consumer_ack_loop_count
parameter
... modparam("kazoo", "amqp_consumer_ack_loop_count", 5) ...
This parameter indicates if the module ignores the loop counters on reconnect and consumes all the pending messages ready to be consumed.
Default value is 1.
Example 1.12. Set consume_messages_on_reconnect
parameter
... modparam("kazoo", "consume_messages_on_reconnect", 0) ...
When the main loop receives a message from rabbitmq it will defer the execution to a worker in a round-robin manner. This parameter allows to use the same worker when kazoo reconnects to rabbitmq.
Default value is 1.
Example 1.13. Set single_consumer_on_reconnect
parameter
... modparam("kazoo", "single_consumer_on_reconnect", 0) ...
each functional parameter related to timers come with 2 reflected parameters. name_sec and name_micro
Timeout when checking for acknowledge from workers.
Default value is 100000 micro.
Example 1.14. Set amqp_consumer_ack_timeout
parameter
... modparam("kazoo", "amqp_consumer_ack_timeout_sec", 1) modparam("kazoo", "amqp_consumer_ack_timeout_micro", 200000) ...
Timeout when checking for commands (publish/query) for sending to rabbitmq.
Default value is 100000 micro.
Example 1.15. Set amqp_interprocess_timeout
parameter
... modparam("kazoo", "amqp_interprocess_timeout_sec", 1) modparam("kazoo", "amqp_interprocess_timeout_micro", 200000) ...
Timeout when checking for messages from rabbitmq.
Default value is 100000 micro.
Example 1.16. Set amqp_waitframe_timeout
parameter
... modparam("kazoo", "amqp_waitframe_timeout_sec", 1) modparam("kazoo", "amqp_waitframe_timeout_micro", 200000) ...
Timeout when checking for reply messages from rabbitmq for kazoo_query commands.
Default value is 2 sec.
Example 1.17. Set amqp_query_timeout
parameter
... modparam("kazoo", "amqp_query_timeout_sec", 1) modparam("kazoo", "amqp_query_timeout_micro", 200000) ...
avp holding the value in seconds for Timeout when checking for reply messages from rabbitmq for kazoo_query commands.
Default value is NULL (no value).
Example 1.18. >Set amqp_query_timeout_avp
parameter
... modparam("kazoo", "amqp_query_timeout_avp", "$var(kz_timeout)") route[SOME_ROUTE] { $var(kz_timeout) = 12; kazoo_query(exchange, routingkey, payload); } ...
The database for the presentity table.
If set, the kazoo_ppua_publish function will update the presentity status in the database.
Default value is “NULL”.
Example 1.19. Set db_url
parameter
... modparam("kazoo", "db_url", "mysql://kamailio:kamailiorw@localhost/kamailio") ...
The function publishes a json payload to rabbitmq. The routing_key parameter should be encoded. Optional AMQP-Headers are specified in the format key1=value1;key2=value2
This function can be used from ANY ROUTE.
Example 1.22. kazoo_publish
usage
... $var(amqp_payload_request) = $_s({"Event-Category" : "directory", "Event-Name" : "reg_success", "Contact" : "$(ct{s.escape.common}{s.replace,\','}{s.replace,$$,})", "Call-ID" : "$ci", "Realm" : "$fd", "Username" : "$fU", "From-User" : "$fU", "From-Host" : "$fd", "To-User" : "$tU", "To-Host" : "$td", "User-Agent" : "$(ua{s.escape.common}{s.replace,\','}{s.replace,$$,})" }); $var(amqp_routing_key) = "registration.success." + $(fd{kz.encode}) + "." + $fU; kazoo_publish("callmgr", $var(amqp_routing_key), $var(amqp_payload_request)); ...
The function publishes a json payload to rabbitmq, waits for a correlated messageand puts the result in target_var. The routing_key parameter should be encoded. target_var is optional as the function also puts the result in pseudo-variable $kzR. Optional AMQP-Headers are specified in the format key1=value1;key2=value2
This function can be used from ANY ROUTE.
Example 1.23. kazoo_query
usage
... $var(amqp_payload_request) = "{'Event-Category' : 'call_event' , 'Event-Name' : 'query_user_channels_req', 'Realm' : '" + $fd + "', 'Username' : '" + $fU + "', 'Active-Only' : false }"; kazoo_encode("$ci", "$var(callid_encoded)"); $var(amqp_routing_key) = "call.status_req.$var(callid_encoded)"; if(kazoo_query("callevt", $var(amqp_routing_key), $var(amqp_payload_request), "$var(amqp_result)")) { kazoo_json("$var(amqp_result)", "Channels[0].switch_url", "$du"); if($du != $null) { xlog("L_INFO", "$ci|log|user channels found redirecting call to $du"); return; } } ...
The function subscribes to exchange/type on queue with routing_key. The routing_key parameter should be encoded.
This function must be called from event_route[kazoo:mod-init].
Example 1.24. kazoo_subscribe
usage
... event_route[kazoo:mod-init] { kazoo_subscribe("dialoginfo", "direct", "BLF-QUEUE-MY_HOSTNAME", "BLF-MY_HOSTNAME"); } event_route[kazoo:consumer-event] { xlog("L_INFO","Received json payload : $kzE"); } ...
The function takes additional parameters to the subscribe function.
json payload fields description
exchange : str, required
type : str, required
queue : str, required
routing : str, required
auto_delete : int, default 1
durable : int, default 0
no_ack : int, default 1
wait_for_consumer_ack : int, default 0
event_key : str, no default
event_subkey : str, no default
This function must be called from event_route[kazoo:mod-init].
Example 1.25. kazoo_subscribe
usage
... event_route[kazoo:mod-init] { $var(payload) = "{ 'exchange' : 'dialoginfo' , 'type' : 'direct', 'queue' : 'BLF-QUEUE-MY_HOSTNAME', 'routing' : 'BLF-MY_HOSTNAME', 'auto_delete' : 0, 'durable' : 1, 'no_ack' : 0, 'wait_for_consumer_ack' : 1 }"; kazoo_subscribe("$var(payload)"); } event_route[kazoo:consumer-event] { xlog("L_INFO","Received json payload : $kzE"); } ...
The function build presentity state from json_payload and updates presentity table.
This function can be used from ANY ROUTE.
Example 1.26. kazoo_pua_publish
usage
... event_route[kazoo:consumer-event-presence-update] { xlog("L_INFO", "received $(kzE{kz.json,Event-Package}) update for $(kzE{kz.json,From})"); kazoo_pua_publish($kzE); pres_refresh_watchers("$(kzE{kz.json,From})", "$(kzE{kz.json,Event-Package})", 1); } ...
The function encodes the 1st parameter for amqp and puts the result in the 2nd parameter.
This function can be used from ANY ROUTE.
Example 1.27. kazoo_encode
usage
... kazoo_encode("$ci", "$var(callid_encoded)"); $var(amqp_routing_key) = "call.status_req.$var(callid_encoded)"; ...
The function extracts the value from a json payload and puts the result in the 3rd parameter. It can use nested values for the query part.
This function can be used from ANY ROUTE.
Example 1.28. kazoo_json
usage
... kazoo_json("$var(amqp_result)", "Channels[0].switch_url", "$du"); if($du != $null) { xlog("L_INFO", "$ci|log|user channels found redirecting call to $du"); return; } ...
$kzR Contains the payload result of kazoo_query execution.
$kzE Contains the payload of a consumed message
The prefix for kazoo transformations is kz.
json
Example 1.29. kz.json
usage
... #kazoo_json("$var(amqp_result)", "Channels[0].switch_url", "$du"); $du = $kzR{kz.json,Channels[0].switch_url}; if($du != $null) { xlog("L_INFO", "$ci|log|user channels found redirecting call to $du"); return; } ...
encode
Example 1.30. kz.encode
usage
... #kazoo_encode("$ci", "$var(callid_encoded)"); #$var(amqp_routing_key) = "call.status_req.$var(callid_encoded)"; $var(amqp_routing_key) = "call.status_req." + $(ci{kz.encode}) ...