Hi,
As part of some work to get the maximum performance out of Kamailio RLS
and Presence I would like to be able to hand-off processing of presence
requests (NOTIFY, PUBLISH, and SUBSCRIBE) to some worker processes.
This way, even if large numbers of presence requests are received on a
single TCP connection, all that the Kamailio process managing that
connection has to do is to de-packetise the requests and hand them off.
I have been looking at the ASYNC module and it doesn't seem to do quite
the right thing for this. The issue is the sleep parameter. I want my
worker processes to process presence requests continuously from a queue,
not suspend them for a second or more and then process them. Is there a
simple change to ASYNC to get it to do this?
I currently do what I need in a quite different way (see config fragment
below), but I think using ASYNC would be cleaner if it did what I
needed...
...
# ----- mqueue params -----
modparam("mqueue", "mqueue", "name=presence")
# ----- rtimer params -----
modparam("rtimer", "timer",
"name=presence0;interval=1;mode=1;")
modparam("rtimer", "exec",
"timer=presence0;route=PRESENCE_PROCESS")
modparam("rtimer", "timer",
"name=presence1;interval=1;mode=1;")
modparam("rtimer", "exec",
"timer=presence1;route=PRESENCE_PROCESS")
...
modparam("rtimer", "timer",
"name=presence7;interval=1;mode=1;")
modparam("rtimer", "exec",
"timer=presence7;route=PRESENCE_PROCESS")
...
if (@event!="presence" &&
@event!="presence.winfo") {
$var(ev_type) = @event;
xlog("L_INFO", " $rm $var(ev_type) not a presence
message for myself\n");
return;
}
if (!t_suspend()) {
t_reply("500", "Server Internal Error");
xlog("L_ERR", "Failed to suspend transaction for
$rm\n");
exit;
}
xlog("L_INFO", "Suspended transaction for $rm
[$T(id_index):$T(id_label)]\n");
if (!mq_add("presence", "$T(id_index)",
"$T(id_label)")) {
t_reply("500", "Server Internal Error");
xlog("L_ERR", "Failed to queue transaction for $rm
[$T(id_index):$T(id_label)]\n");
exit;
}
exit;
...
route[PRESENCE_PROCESS] {
lock("pres");
$var(pres) = $shv(pres);
$shv(pres) = $shv(pres) + 1;
unlock("pres");
xlog("L_WARN", "Starting presence de-queue process
$var(pres) (pid: $pp)\n");
while (1) {
while (mq_fetch("presence")) {
$var(id_index) = (int) $mqk(presence);
$var(id_label) = (int) $mqv(presence);
xlog("L_INFO", "Found queued presence
transaction [$var(id_index):$var(id_label)]\n");
t_continue("$var(id_index)",
"$var(id_label)", "PRESENCE");
}
usleep(100000);
}
}
route[PRESENCE] {
xlog("L_INFO", "$rm: route[PRESENCE] process
$var(pres)\n");
if (is_method("NOTIFY")) {
xlog("L_INFO", "Sending NOTIFY to RLS\n");
rls_handle_notify();
} else if (is_method("PUBLISH")) {
xlog("L_INFO", "Sending PUBLISH to
Presence\n");
handle_publish();
} else if (is_method("SUBSCRIBE")) {
xlog("L_INFO", "Sending SUBSCRIBE to RLS\n");
$var(ret_code) = rls_handle_subscribe();
if ($var(ret_code) == 10) {
xlog("L_INFO", " SUBSCRIBE not for RLS -
sending to Presence\n");
handle_subscribe();
}
} else {
xlog("L_ERR", "Received non-(NOTIFY|SUBSCRIBE)
request from presence queue\n");
t_reply("500", "Server Internal Error");
}
exit;
}
Thanks,
Peter
--
Peter Dunkley
Technical Director
Crocodile RCS Ltd