Module: kamailio Branch: master Commit: f27df5650cf0e5ac17d7ff25db67b27ad33cc6e3 URL: https://github.com/kamailio/kamailio/commit/f27df5650cf0e5ac17d7ff25db67b27a...
Author: Emmanuel Schmidbauer emmanuel@getweave.com Committer: GitHub noreply@github.com Date: 2016-12-08T13:44:33-07:00
Merge pull request #884 from kamailio/nsq-max-in-flight
nsq: use max_in_flight value
---
Modified: src/modules/nsq/nsq_mod.c
---
Diff: https://github.com/kamailio/kamailio/commit/f27df5650cf0e5ac17d7ff25db67b27a... Patch: https://github.com/kamailio/kamailio/commit/f27df5650cf0e5ac17d7ff25db67b27a...
---
diff --git a/src/modules/nsq/nsq_mod.c b/src/modules/nsq/nsq_mod.c index 61d6334..6c23603 100644 --- a/src/modules/nsq/nsq_mod.c +++ b/src/modules/nsq/nsq_mod.c @@ -263,7 +263,7 @@ int set_non_blocking(int fd) /** * */ -int nsq_consumer_worker_proc(int cmd_pipe, char *topic, char *channel) +int nsq_consumer_worker_proc(int cmd_pipe, char *topic, char *channel, int max_in_flight) { struct ev_loop *loop; loop = ev_default_loop(0); @@ -279,6 +279,7 @@ int nsq_consumer_worker_proc(int cmd_pipe, char *topic, char *channel) LM_DBG("NSQ Worker connecting to NSQ Topic [%s] and NSQ Channel [%s]\n", topic, channel); // setup the reader rdr = new_nsq_reader(loop, topic, channel, (void *)ctx, NULL, NULL, NULL, nsq_message_handler); + rdr->max_in_flight = max_in_flight;
if (consumer_use_nsqd == 0) { snprintf(address, 128, "%.*s", nsq_lookupd_address.len, nsq_lookupd_address.s); @@ -300,6 +301,11 @@ static int mod_child_init(int rank) int pid; int i; int workers = dbn_consumer_workers / nsq_topic_channel_counter; + int max_in_flight = 1; + + if (nsq_max_in_flight > 1) { + max_in_flight = nsq_max_in_flight; + }
fire_init_event(rank);
@@ -318,7 +324,7 @@ static int mod_child_init(int rank) return -1; /* error */ if (pid==0){ close(nsq_worker_pipes_fds[i*2+1]); - return(nsq_consumer_worker_proc(nsq_worker_pipes_fds[i*2], DEFAULT_TOPIC, DEFAULT_CHANNEL)); + return(nsq_consumer_worker_proc(nsq_worker_pipes_fds[i*2], DEFAULT_TOPIC, DEFAULT_CHANNEL, max_in_flight)); } } } else { @@ -329,7 +335,7 @@ static int mod_child_init(int rank) return -1; /* error */ if (pid==0){ close(nsq_worker_pipes_fds[i*2+1]); - return(nsq_consumer_worker_proc(nsq_worker_pipes_fds[i*2], tc->topic, tc->channel)); + return(nsq_consumer_worker_proc(nsq_worker_pipes_fds[i*2], tc->topic, tc->channel, max_in_flight)); } } tc = tc->next;