On Dec 6, 2022, at 8:44 AM, Jawaid Bazyar
<bazyar(a)gmail.com> wrote:
Create N queues, one assigned to each worker thread
Hash incoming messages to a queue based on call ID
Each worker thread works on its assigned queue.
This is, in essence, what the `route_locks_size` setting speaks to:
https://www.kamailio.org/wikidocs/cookbooks/5.6.x/core/#route_locks_size
... with the difference that the queues aren't mapped to a particular worker thread.
That would be hard to accomplish in Kamailio's preforked process model, since the
processes are just fork()s that all call recvfrom() / accept() / whatever. It's the
kernel that actually decides which process gets any given message or connection. You'd
have to literally add a distributor thread to an architecture where none exists.
Good news! You can devise your own -- with a little work. ;-)
1) Start by creating (let's say) 4 mqueues[1]:
modparam("mqueue", "mqueue", "name=proc1")
modparam("mqueue", "mqueue", "name=proc2")
modparam("mqueue", "mqueue", "name=proc3")
modparam("mqueue", "mqueue", "name=proc4")
2) Create 4 task routes (rtimer[2] tasks) to consume these routes:
modparam("rtimer", "timer",
"name=proc1;interval=10u;mode=1")
modparam("rtimer", "exec", "timer=proc1;route=PROC1")
modparam("rtimer", "timer",
"name=proc2;interval=10u;mode=1")
modparam("rtimer", "exec", "timer=proc2;route=PROC2")
modparam("rtimer", "timer",
"name=proc3;interval=10u;mode=1")
modparam("rtimer", "exec", "timer=proc3;route=PROC3")
modparam("rtimer", "timer",
"name=proc4;interval=10u;mode=1")
modparam("rtimer", "exec", "timer=proc4;route=PROC4")
3) Have each PROC[x] route consume its respective mqueue. Don't have to needlessly
duplicate, just create some wrappers that use an intermediate variable:
route[ONWARD] {
# In so many words.
record_route();
enum_query();
xlog("INVITE ENUM query - To URI $tU");
forward();
}
route[THE_REAL_PROC] {
while(mq_fetch("proc$var(qnum)")) {
$var(id) = $(mqk(term_proc){s.select,0,:}{s.int});
$var(label) = $(mqk(term_proc){s.select,1,:}{s.int});
# Resume transaction in this worker process (rtimer process).
t_continue("$var(id)", "$var(label)", "ONWARD");
}
}
route[PROC1] {
$var(qnum) = '1';
route(THE_REAL_PROC);
}
route[PROC2] {
$var(qnum) = '2';
route(THE_REAL_PROC);
}
route[PROC3] {
$var(qnum) = '3';
route(THE_REAL_PROC);
}
route[PROC4] {
$var(qnum) = '4';
route(THE_REAL_PROC);
}
4) Set children=1 / socket_workers=1 for the applicable listener.
5) Devise a request_route which will hash over the Call-ID, suspend the transaction[3],
and send the transaction index and label identifiers to one of these 4 queues.
core_hash() from cfgutils[4] is a great fit for that, but requires that the hash domain
size be a power of 2 -- which makes a number of queues like 4 or 8 a great choice. :-)
request_route {
...
$var(q) = core_hash("$ci", "", 2); # 2^2, aka 4 queues
if(!t_suspend()) {
sl_send_reply("500", "Internal server error - cannot
suspend");
exit;
}
mq_add("proc$var(q)", "$T(id_index):$T(id_label)");
}
6) This setup will guarantee that messages associated with one Call-ID will always go to
the exact same worker process.
Caveat emptor, of course: haven't tested this exactly as rendered, and it creates a
single bottleneck at the worker process that handles the listening.
-- Alex
[1]
https://kamailio.org/docs/modules/5.6.x/modules/mqueue.html
[2]
https://kamailio.org/docs/modules/5.6.x/modules/rtimer.html
[3]
https://kamailio.org/docs/modules/5.6.x/modules/tmx.html#tmx.f.t_suspend
https://kamailio.org/docs/modules/5.6.x/modules/tmx.html#tmx.f.t_continue
[4]
https://kamailio.org/docs/modules/5.6.x/modules/cfgutils.html
--
Alex Balashov | Principal | Evariste Systems LLC
Tel: +1-706-510-6800 / +1-800-250-5920 (toll-free)
Web:
http://www.evaristesys.com/,
http://www.csrpswitch.com/