Module: kamailio Branch: master Commit: 21bdbff049571ca22f1205a041c8b1162005d319 URL: https://github.com/kamailio/kamailio/commit/21bdbff049571ca22f1205a041c8b116...
Author: Daniel-Constantin Mierla miconda@gmail.com Committer: Daniel-Constantin Mierla miconda@gmail.com Date: 2020-03-20T17:10:05+01:00
dmq: init worker structure in chid_init for PROC_INIT rank
- have it ready when other processes are started
---
Modified: src/modules/dmq/dmq.c Modified: src/modules/dmq/worker.c Modified: src/modules/dmq/worker.h
---
Diff: https://github.com/kamailio/kamailio/commit/21bdbff049571ca22f1205a041c8b116... Patch: https://github.com/kamailio/kamailio/commit/21bdbff049571ca22f1205a041c8b116...
---
diff --git a/src/modules/dmq/dmq.c b/src/modules/dmq/dmq.c index 2b64d0e907..4fa61d947b 100644 --- a/src/modules/dmq/dmq.c +++ b/src/modules/dmq/dmq.c @@ -230,11 +230,12 @@ static int mod_init(void) }
/* allocate workers array */ - workers = shm_malloc(num_workers * sizeof(*workers)); + workers = shm_malloc(num_workers * sizeof(dmq_worker_t)); if(workers == NULL) { LM_ERR("error in shm_malloc\n"); return -1; } + memset(workers, 0, num_workers * sizeof(dmq_worker_t));
dmq_init_callback_done = shm_malloc(sizeof(int)); if(!dmq_init_callback_done) { @@ -275,14 +276,24 @@ static int mod_init(void) static int child_init(int rank) { int i, newpid; + + if(rank == PROC_INIT) { + for(i = 0; i < num_workers; i++) { + if (init_worker(&workers[i]) < 0) { + LM_ERR("failed to init struct for worker[%d]\n", i); + return -1; + } + } + return 0; + } + if(rank == PROC_MAIN) { /* fork worker processes */ for(i = 0; i < num_workers; i++) { - init_worker(&workers[i]); LM_DBG("starting worker process %d\n", i); newpid = fork_process(PROC_RPC, "DMQ WORKER", 0); if(newpid < 0) { - LM_ERR("failed to fork process\n"); + LM_ERR("failed to fork worker process %d\n", i); return -1; } else if(newpid == 0) { /* child - this will loop forever */ @@ -307,7 +318,7 @@ static int child_init(int rank) } return 0; } - if(rank == PROC_INIT || rank == PROC_TCP_MAIN) { + if(rank == PROC_TCP_MAIN) { /* do nothing for the main process */ return 0; } diff --git a/src/modules/dmq/worker.c b/src/modules/dmq/worker.c index 14aadfc998..38b49e4512 100644 --- a/src/modules/dmq/worker.c +++ b/src/modules/dmq/worker.c @@ -224,7 +224,7 @@ int add_dmq_job(struct sip_msg *msg, dmq_peer_t *peer) /** * @brief init dmq worker */ -void init_worker(dmq_worker_t *worker) +int init_worker(dmq_worker_t *worker) { memset(worker, 0, sizeof(*worker)); if(worker_usleep <= 0) { @@ -233,6 +233,11 @@ void init_worker(dmq_worker_t *worker) lock_get(&worker->lock); } worker->queue = alloc_job_queue(); + if(worker->queue==NULL) { + LM_ERR("queue could not be initialized\n"); + return -1; + } + return 0; }
/** diff --git a/src/modules/dmq/worker.h b/src/modules/dmq/worker.h index 9f5fed71cf..cb4fce8ccf 100644 --- a/src/modules/dmq/worker.h +++ b/src/modules/dmq/worker.h @@ -56,7 +56,7 @@ struct dmq_worker
typedef struct dmq_worker dmq_worker_t;
-void init_worker(dmq_worker_t *worker); +int init_worker(dmq_worker_t *worker); int add_dmq_job(struct sip_msg *, dmq_peer_t *); void worker_loop(int id);