NSQ Module

Weave Communications

Edited by

Emmanuel Schmidbauer


Table of Contents

1. Admin Guide
1. Overview
2. How it works
2.1. Aknowledge Messages
3. Dependencies
3.1. Kamailio Modules
3.2. External Libraries or Applications
4. Parameters
4.1. lookupd_address(str)
4.2. lookupd_port(int)
4.3. nsqd_address(str)
4.4. nsqd_port(int)
4.5. consumer_use_nsqd(int)
4.6. consumer_event_key(str)
4.7. consumer_event_sub_key(str)
4.8. max_in_flight(int)
4.9. consumer_workers(int)
4.10. topic_channel(str)
4.11. db_url(str)
4.12. presentity_table(str)
4.13. db_table_lock_type(int)
5. Functions
5.1. nsq_pua_publish(json_payload)
6. Pseudo Variables
7. Transformations
8. Event Routes

List of Examples

1.1. Set lookupd_address parameter
1.2. Set lookupd_port parameter
1.3. Set nsqd_address parameter
1.4. Set nsqd_port parameter
1.5. Set consumer_use_nsqd parameter
1.6. Set consumer_event_key parameter
1.7. Set consumer_event_sub_key parameter
1.8. Set max_in_flight parameter
1.9. Set consumer_workers parameter
1.10. Set topic_channel parameter
1.11. Set db_url parameter
1.12. Set presentity_table parameter
1.13. Set db_table_lock_type parameter
1.14. nsq_pua_publish usage
1.15. Example usage of $nsqE pseudo variable
1.16. nsq.json usage
1.17. Define the event routes

Chapter 1. Admin Guide

1. Overview

The module provides an NSQ consumer for Kamailio configuration file. NSQ is a real time distributed messaging platofrm, more details about it can be found at nsq.io.

From a high-level perspective, the module may be used for:

  • Provide a real-time integration with you Kamailio configuration file, which can be used as alternative to interact with a database, allowing to overlay additional logic in your preferred language while utilizing a message bus.

  • Rely on a distributed messaging layer, such that machines processing requests/responses/events can go up/down or share the workload, whithout impacting Kamailio's activity.

Supported NSQ operations are:

  • Subscribe to a Topic and Channel

The NSQ module also has support to publish updates to presence module through the nsq_pua_publish() function.

2. How it works

The module creates an additional NSQ manager process that does the communication with NSQ for consuming messages. This one defers the message for processing to other NSQ worker processs so that it doesn't block itself, nor the SIP worker processes.

2.1. Aknowledge Messages

Consumed messages have the option of being acknowledge in two ways:

  • immediately when received

  • after processing by the worker

3. Dependencies

3.1. Kamailio Modules

The following modules must be loaded before this module:

  • none.

3.2. External Libraries or Applications

The following libraries or applications must be installed

  • libev.

  • libjson.

  • libevbuffsock.

  • libnsq.

4. Parameters

4.1. lookupd_address(str)

The nsqlookupd address.

Usage: nsq related.

Default value is 127.0.0.1

Example 1.1. Set lookupd_address parameter

...
modparam("nsq", "lookupd_address", "nsqlookupd.mydomain.com")
...

4.2. lookupd_port(int)

The nsqlookupd TCP port.

Usage: nsq related.

Default value is 4161.

Example 1.2. Set lookupd_port parameter

...
modparam("nsq", "lookupd_port", 4161)
...

4.3. nsqd_address(str)

The nsqd address. You can specify connecting directly to nsqd instead of using nsqlookupd.

Usage: nsq related.

Default value is 127.0.0.1

Example 1.3. Set nsqd_address parameter

...
modparam("nsq", "nsqd_address", "nsqd.mydomain.com")
...

4.4. nsqd_port(int)

The nsqd TCP port.

Usage: nsq related.

Default value is 4150.

Example 1.4. Set nsqd_port parameter

...
modparam("nsq", "nsqd_port", 4150)
...

4.5. consumer_use_nsqd(int)

Set to 1 if you'd like to connect to nsqd instead of nsqlookupd.

Usage: nsq related.

Default value is 0.

Example 1.5. Set consumer_use_nsqd parameter

...
modparam("nsq", "consumer_use_nsqd", 1)
...

4.6. consumer_event_key(str)

The default name of the field in json payload to compose the event name 1st part

Usage: nsq related.

Default value is Event-Category.

Example 1.6. Set consumer_event_key parameter

...
modparam("nsq", "consumer_event_key", "My-JSON-Field-Name")
...

4.7. consumer_event_sub_key(str)

The default name of the field in json payload to compose the event name 2nd part

Usage: nsq related.

Default value is Event-Name.

Example 1.7. Set consumer_event_sub_key parameter

...
modparam("nsq", "consumer_event_sub_key", "My-JSON-SubField-Name")
...

4.8. max_in_flight(int)

The number of messages the consumer can receive before nsqd expects a response.

Usage: nsq related.

Default value is 1.

Example 1.8. Set max_in_flight parameter

...
modparam("nsq", "max_in_flight", 2)
...

4.9. consumer_workers(int)

Number of consumer connections to NSQ per topic_channel.

Usage: nsq related.

Default value is 4.

Example 1.9. Set consumer_workers parameter

...
modparam("nsq", "consumer_workers", 2)
...

4.10. topic_channel(str)

The NSQ Topic and Channel. Delimiter-separated by :. It be set multiple times to subscribe to multiple topics and channels. The value of consumer_workers is allocated per topic_channel.

Usage: nsq related.

Default value is Kamailio-Topic:Kamailio-Channel.

Example 1.10. Set topic_channel parameter

...
modparam("nsq", "topic_channel", "My-NSQ-Topic:My-NSQ-Channel")
modparam("nsq", "topic_channel", "My-NSQ-Topic-2:My-NSQ-Channel-2")
...

4.11. db_url(str)

The database for the presentity table.

If set, the nsq_pua_publish function will update the presentity status in the database.

Usage: presence related.

Default value is NULL.

Example 1.11. Set db_url parameter

...
modparam("nsq", "db_url", "mysql://kamailio:kamailiorw@localhost/kamailio")
...

4.12. presentity_table(str)

The name of the presentity table in the database.

Default value is presentity.

Example 1.12. Set presentity_table parameter

...
modparam("nsq", "presentity_table", "my_presentity_table")
...

4.13. db_table_lock_type(int)

Enable (=1) or disable (=0) the locks for table during a transaction.

Default value is 1.

Example 1.13. Set db_table_lock_type parameter

...
modparam("nsq", "db_table_lock_type", 0)
...

5. Functions

5.1.  nsq_pua_publish(json_payload)

The function build presentity state from json_payload and updates presentity table.

Usage: presence related.

This function can be used from ANY ROUTE.

Example 1.14. nsq_pua_publish usage

...
event_route[nsq:consumer-event-presence-update]
{
	xlog("L_INFO", "received $(nsqE{nsq.json,Event-Package}) update for $(nsqE{nsq.json,From})");
	nsq_pua_publish($nsqE);
	pres_refresh_watchers("$(nsqE{nsq.json,From})", "$(nsqE{nsq.json,Event-Package})", 1);
}
...

6. Pseudo Variables

Example 1.15. Example usage of $nsqE pseudo variable

	xlog("L_INFO", "received payload $nsqE from NSQ");
	...
}

  • $nsqE Contains the payload of a consumed message

7. Transformations

The prefix for nsq transformations is nsq.

You can use the transformation to extract values from the json structured $nsqE pseudo variable

  • json

    Example 1.16. nsq.json usage

    ...
    # extract value of "Custom-Data" from $nsqE pseudo variable and set it to $var(Custom-Data)
    $var(Custom-Data) = $(nsqE{nsq.json,Custom-Data});
    if($var(Custom-Data) != $null) {
    	xlog("L_INFO", "$ci|log|custom data: $var(Custom-Data)");
    }
    ...

8. Event Routes

The worker process issues an event-route where we can act on the received payload. The name of the event-route is composed by values extracted from the payload.

NSQ module will try to execute the event route from most significant to less significant. define the event route like event_route[nsq:consumer-event[-payload_key_value[-payload_subkey_value]]]

We can set the key/subkey pair on a subscription base. check the payload on subscribe.

Example 1.17. Define the event routes

...
modparam("nsq", "consumer_event_key", "Event-Category")
modparam("nsq", "consumer_event_sub_key", "Event-Name")
...

event_route[nsq:consumer-event-presence-update]
{
	# presence is the value extracted from Event-Category field in json payload
	# update is the value extracted from Event-Name field in json payload
	xlog("L_INFO", "received $(nsqE{nsq.json,Event-Package}) update for $(nsqE{nsq.json,From})");
	...
}

event_route[nsq:consumer-event-presence]
{
	# presence is the value extracted from Event-Category field in json payload
	xlog("L_INFO", "received $(nsqE{nsq.json,Event-Package}) update for $(nsqE{nsq.json,From})");
	...
}

event_route[nsq:consumer-event-event-category-event-name]
{
	# event-category is the name of the consumer_event_key parameter
	# event-name is the name of the consumer_event_sub_key parameter
	# this event route is executed if we can't find the previous
	...
}

event_route[nsq:consumer-event-event-category]
{
	# event-category is the name of the consumer_event_key parameter
	# this event route is executed if we can't find the previous
	...
}

event_route[nsq:consumer-event]
{
	# this event route is executed if we can't find the previous
}