Module: kamailio Branch: master Commit: 67db972a129a8f34ea7406618593df4eaf846a1b URL: https://github.com/kamailio/kamailio/commit/67db972a129a8f34ea7406618593df4e...
Author: Luis Azedo luis@2600hz.com Committer: Luis Azedo luis.azedo@factorlusitano.com Date: 2015-07-01T15:59:57+01:00
kazoo : fix, send timeout callback to consumer process
---
Modified: modules/kazoo/kz_amqp.c
---
Diff: https://github.com/kamailio/kamailio/commit/67db972a129a8f34ea7406618593df4e... Patch: https://github.com/kamailio/kamailio/commit/67db972a129a8f34ea7406618593df4e...
---
diff --git a/modules/kazoo/kz_amqp.c b/modules/kazoo/kz_amqp.c index 96227c0..06e1599 100644 --- a/modules/kazoo/kz_amqp.c +++ b/modules/kazoo/kz_amqp.c @@ -2034,7 +2034,32 @@ void kz_amqp_cb_error(kz_amqp_cmd_ptr cmd) int n = route_get(&main_rt, cmd->err_route); struct action *a = main_rt.rlist[n]; tmb.t_continue(cmd->t_hash, cmd->t_label, a); - kz_amqp_free_pipe_cmd(cmd); +} + +int kz_send_worker_error_event(kz_amqp_cmd_ptr cmd) +{ + cmd->return_code = -1; + kz_amqp_consumer_delivery_ptr ptr = (kz_amqp_consumer_delivery_ptr) shm_malloc(sizeof(kz_amqp_consumer_delivery)); + if(ptr == NULL) { + LM_ERR("NO MORE SHARED MEMORY!"); + return 0; + } + memset(ptr, 0, sizeof(kz_amqp_consumer_delivery)); + ptr->cmd = cmd; + + consumer++; + if(consumer >= dbk_consumer_processes) { + consumer = 0; + } + + if (write(kz_worker_pipes[consumer], &ptr, sizeof(ptr)) != sizeof(ptr)) { + LM_ERR("failed to send payload to consumer %d : %s\nPayload %s\n", consumer, strerror(errno), cmd->payload); + kz_amqp_free_consumer_delivery(ptr); + return 0; + } + + return 1; + }
void kz_amqp_cmd_timeout_cb(int fd, short event, void *arg) @@ -2047,7 +2072,7 @@ void kz_amqp_cmd_timeout_cb(int fd, short event, void *arg) , retrieved_cmd ->message_id->len, retrieved_cmd ->message_id->s ); if(retrieved_cmd->type == KZ_AMQP_CMD_ASYNC_CALL) { - kz_amqp_cb_error(retrieved_cmd); + kz_send_worker_error_event(retrieved_cmd); } else { retrieved_cmd->return_code = -1; lock_release(&retrieved_cmd->lock); @@ -2577,6 +2602,9 @@ void kz_amqp_send_worker_event(int _kz_server_id, amqp_envelope_t* envelope, kz_ } if(idx < dbk_channels) { cmd = kz_cmd_retrieve(message_id); + if(cmd) + cmd->return_code = AMQP_RESPONSE_NORMAL; + /* if(cmd != NULL) { cmd->return_code = 0; @@ -2791,8 +2819,13 @@ void kz_amqp_consumer_worker_cb(int fd, short event, void *arg) LM_DBG("consumer %d received payload %s\n", my_pid(), cmd->payload);
if(cmd->cmd) { - kz_amqp_set_last_result(cmd->payload); - kz_amqp_cb_ok(cmd->cmd); + if(cmd->cmd->return_code == AMQP_RESPONSE_NORMAL) { + kz_amqp_set_last_result(cmd->payload); + kz_amqp_cb_ok(cmd->cmd); + } else { + kz_amqp_reset_last_result(); + kz_amqp_cb_error(cmd->cmd); + } } else { kz_amqp_consumer_event(cmd->payload, cmd->event_key, cmd->event_subkey); }