Hi,
I am not relaying or replying to messages directly here - except in the
error case. I am using the t_suspend()/t_continue() along with the
presence and RLS APIs. So what I have is the following:
#!substdef "!PRESENCE_PROCESS_SLEEP!100000!g"
modparam("mqueue", "mqueue", "name=presence")
modparam("rtimer", "timer",
"name=presenceMaster;interval=PRESENCE_PROCESS_SLEEPu;mode=1;")
modparam("rtimer", "exec",
"timer=presenceMaster;route=PRESENCE_MASTER_PROCESS")
modparam("mqueue", "mqueue",
"name=presenceWorker0")
modparam("rtimer", "timer",
"name=presenceWorker0;interval=1u;mode=1;")
modparam("rtimer", "exec",
"timer=presenceWorker0;route=PRESENCE_WORKER_PROCESS")
modparam("mqueue", "mqueue",
"name=presenceWorker1")
modparam("rtimer", "timer",
"name=presenceWorker1;interval=1u;mode=1;")
modparam("rtimer", "exec",
"timer=presenceWorker1;route=PRESENCE_WORKER_PROCESS")
...
modparam("mqueue", "mqueue",
"name=presenceWorkern")
modparam("rtimer", "timer",
"name=presenceWorkern;interval=1u;mode=1;")
modparam("rtimer", "exec",
"timer=presenceWorkern;route=PRESENCE_WORKER_PROCESS")
...
route {
...
# Some logic to determine this is a presence request (within or without dialog)
$var(queue) = "presence";
route(PRESENCE_ENQUEUE);
...
}
...
route[PRESENCE_ENQUEUE] {
xlog("L_WARN", "$rm: route[PRESENCE_ENQUEUE]\n");
if (!t_suspend()) {
t_reply("500", "Server Internal Error");
xlog("L_ERR", "Failed to suspend transaction for
$rm\n");
exit;
}
xlog("L_INFO", "Suspended transaction for $rm
[$T(id_index):$T(id_label)]\n");
if (!mq_add("$var(queue)", "$T(id_index)",
"$T(id_label)")) {
t_reply("500", "Server Internal Error");
xlog("L_ERR", "Failed to queue transaction for $rm
[$T(id_index):$T(id_label)] on $var(queue)\n");
exit;
}
exit;
}
route[PRESENCE_MASTER_PROCESS] {
xlog("L_INFO", "Running PRESENCE_MASTER_PROCESS\n");
while (mq_fetch("presence")) {
$var(id_index) = (int) $mqk(presence);
$var(id_label) = (int) $mqv(presence);
xlog("L_INFO", "presence: Found queued transaction
[$var(id_index):$var(id_label)]\n");
t_continue("$var(id_index)", "$var(id_label)",
"PRESENCE_DISTRIBUTE");
}
}
route[PRESENCE_DISTRIBUTE] {
xlog("L_WARN", "$rm: route[PRESENCE_DISTRIBUTE]\n");
# Some algorithm to distribute traffic across queues...
# Perhaps on request type (so we have a NOTIFIER that
# sends NOTIFY requests in order), perhaps on some form
# of hash... I am experimenting with this...
$var(queue) = "presenceWorker" + $var(queue_number);
xlog("L_INFO", "Adding to queue: $var(queue)\n");
route(PRESENCE_ENQUEUE);
}
route[PRESENCE_WORKER_PROCESS] {
lock("pres");
$var(pres) = $shv(pres);
$shv(pres) = $shv(pres) + 1;
unlock("pres");
$var(queue) = "presenceWorker" + $var(pres);
xlog("L_WARN", "Starting process: $var(queue) (pid:
$pp)\n");
while (1) {
while (mq_fetch($var(queue))) {
$var(id_index) = (int) $mqk($var(queue));
$var(id_label) = (int) $mqv($var(queue));
xlog("L_WARN", "$var(queue): found queued
transaction [$var(id_index):$var(id_label)]\n");
t_continue("$var(id_index)",
"$var(id_label)", "PRESENCE");
}
usleep(PRESENCE_PROCESS_SLEEP);
}
}
route[PRESENCE] {
xlog("L_WARN", "$rm: route[PRESENCE]:
$var(queue)\n");
if (is_method("NOTIFY")) {
xlog("L_INFO", "Sending NOTIFY to RLS\n");
rls_handle_notify();
} else if (is_method("PUBLISH")) {
xlog("L_INFO", "Sending PUBLISH to
Presence\n");
handle_publish();
} else if (is_method("SUBSCRIBE")) {
xlog("L_INFO", "Sending SUBSCRIBE to RLS\n");
$var(ret_code) = rls_handle_subscribe();
if ($var(ret_code) == 10) {
xlog("L_INFO", " SUBSCRIBE not for RLS -
sending to Presence\n");
handle_subscribe();
}
} else {
xlog("L_ERR", "Received
non-(NOTIFY|PUBLISH|SUBSCRIBE) request from presence queue\n");
t_reply("500", "Server Internal Error");
}
exit;
}
...
Previously, I had just a single queue "presence" which all of the
presence worker process took requests from. This meant that
t_suspend()/t_continue() was used just once and this worked (the
presence/RLS APIs respond to the requests statefully). The reason for
doing this in the first place is that I was getting problems with
back-end RLS traffic all using a single TCP connection, which meant all
the back-end presence requests were being handled by the same Kamailio
process, which caused a bottleneck (using UDP causes a different set of
problems under load and isn't really an option). Although the queue is
a FIFO the fact that different processes could take different amounts of
time means that things were happening out of order (Klaus and Anca have
had a discussion about just this kind of issue with presence on the
mailing list recently) and this is causing me problems.
What I have now (above) is presence requests being pulled from the TCP
buffer and suspended as quickly as possible. A presenceMaster process
then dequeues the request (continues it), performs some analysis to
determine which worker should deal with it, and then suspends it again
queuing it for the right worker. All of this works up until the
t_continue() for the worker (in the PRESENCE_WORKER_PROCESS) is called.
At this point the transaction is killed.
What I can't understand is why the first t_suspend()/t_continue() works
here, but the second fails. My previous version of this (with the
single queue and single t_suspend()/t_continue() call) worked fine, but
it seems that the sequence of t_suspend(), t_continue(), t_suspend(),
t_continue() - with no changes to or handling of the request in-between
- fails.
Thanks,
Peter
On Wed, 2012-03-28 at 15:13 +0200, Miklos Tirpak wrote:
Peter,
t_suspend() and t_continue() should work multiple times as long as they
are executed sequentially after each other, i.e. there cannot be two
branches suspended at the same time.
The error you get means to me that t_continue() executed the specified
route block, but in that route, the request was neither replied nor a
new branch was added. Hence, the transaction is hanging in memory and
the module sees no pending branch that could return a reply later.
Make sure that in the route block executed by t_continue() there is
either a t_reply() or you append a new branch and forward it with
t_relay() (or append a new branch and call t_suspend() again). I think
you also need to handle the failure of t_relay() and explicitly call
t_reply() when t_relay() fails in this route.
Regards,
Miklos
On 03/28/2012 02:21 PM, Daniel-Constantin Mierla wrote:
> Hello,
>
> I have been using it only once and didn't looked much deeper into the code.
>
> Maybe Miklos (cc-ed) can give faster more details, afaik he is the
> developer of that piece.
>
> Cheers,
> Daniel
>
> On 3/28/12 1:13 PM, Peter Dunkley wrote:
>> Hi,
>>
>> I am trying to use t_suspend()/t_continue() multiple times on the same
>> transaction. Calling t_suspend() more than once works, but the second
>> time I call t_continue() the transaction is killed and a 500 response
>> is sent. It is the "if (branch == t->nr_of_outgoings)" check from
the
>> code fragment below (from t_suspend.c:t_continue()) that results in
>> the transaction being killed - you can see the debug/error line I
>> added to determine this in the fragment.
>>
>> Is using t_suspend()/t_continue() multiple times something that should
>> work?
>>
>> Thanks,
>>
>> Peter
>>
>> if (t->uas.status < 200) {
>> /* No final reply has been sent yet.
>> * Check whether or not there is any pending branch.
>> */
>> for ( branch = 0;
>> branch < t->nr_of_outgoings;
>> branch++
>> ) {
>> if ((t->uac[branch].request.buffer != NULL)
>> && (t->uac[branch].last_received < 200)
>> )
>> break;
>> }
>>
>> if (branch == t->nr_of_outgoings) {
>> /* There is not any open branch so there is
>> * no chance that a final response will be received. */
>> ret = 0;
>> LM_ERR("branch == t->nr_of_outgoings\n");
>> goto kill_trans;
>> }
>> }
>>
>> --
>> Peter Dunkley
>> Technical Director
>> Crocodile RCS Ltd
>>
>>
>>
>> _______________________________________________
>> sr-dev mailing list
>> sr-dev(a)lists.sip-router.org
>>
http://lists.sip-router.org/cgi-bin/mailman/listinfo/sr-dev
>
> --
> Daniel-Constantin Mierla
> Kamailio Advanced Training, April 23-26, 2012, Berlin, Germany
>
http://www.asipto.com/index.php/kamailio-advanced-training/
>
--
Peter Dunkley
Technical Director
Crocodile RCS Ltd