Module: kamailio
Branch: master
Commit: 67db972a129a8f34ea7406618593df4eaf846a1b
URL:
https://github.com/kamailio/kamailio/commit/67db972a129a8f34ea7406618593df4…
Author: Luis Azedo <luis(a)2600hz.com>
Committer: Luis Azedo <luis.azedo(a)factorlusitano.com>
Date: 2015-07-01T15:59:57+01:00
kazoo : fix, send timeout callback to consumer process
---
Modified: modules/kazoo/kz_amqp.c
---
Diff:
https://github.com/kamailio/kamailio/commit/67db972a129a8f34ea7406618593df4…
Patch:
https://github.com/kamailio/kamailio/commit/67db972a129a8f34ea7406618593df4…
---
diff --git a/modules/kazoo/kz_amqp.c b/modules/kazoo/kz_amqp.c
index 96227c0..06e1599 100644
--- a/modules/kazoo/kz_amqp.c
+++ b/modules/kazoo/kz_amqp.c
@@ -2034,7 +2034,32 @@ void kz_amqp_cb_error(kz_amqp_cmd_ptr cmd)
int n = route_get(&main_rt, cmd->err_route);
struct action *a = main_rt.rlist[n];
tmb.t_continue(cmd->t_hash, cmd->t_label, a);
- kz_amqp_free_pipe_cmd(cmd);
+}
+
+int kz_send_worker_error_event(kz_amqp_cmd_ptr cmd)
+{
+ cmd->return_code = -1;
+ kz_amqp_consumer_delivery_ptr ptr = (kz_amqp_consumer_delivery_ptr)
shm_malloc(sizeof(kz_amqp_consumer_delivery));
+ if(ptr == NULL) {
+ LM_ERR("NO MORE SHARED MEMORY!");
+ return 0;
+ }
+ memset(ptr, 0, sizeof(kz_amqp_consumer_delivery));
+ ptr->cmd = cmd;
+
+ consumer++;
+ if(consumer >= dbk_consumer_processes) {
+ consumer = 0;
+ }
+
+ if (write(kz_worker_pipes[consumer], &ptr, sizeof(ptr)) != sizeof(ptr)) {
+ LM_ERR("failed to send payload to consumer %d : %s\nPayload %s\n", consumer,
strerror(errno), cmd->payload);
+ kz_amqp_free_consumer_delivery(ptr);
+ return 0;
+ }
+
+ return 1;
+
}
void kz_amqp_cmd_timeout_cb(int fd, short event, void *arg)
@@ -2047,7 +2072,7 @@ void kz_amqp_cmd_timeout_cb(int fd, short event, void *arg)
, retrieved_cmd ->message_id->len, retrieved_cmd ->message_id->s
);
if(retrieved_cmd->type == KZ_AMQP_CMD_ASYNC_CALL) {
- kz_amqp_cb_error(retrieved_cmd);
+ kz_send_worker_error_event(retrieved_cmd);
} else {
retrieved_cmd->return_code = -1;
lock_release(&retrieved_cmd->lock);
@@ -2577,6 +2602,9 @@ void kz_amqp_send_worker_event(int _kz_server_id, amqp_envelope_t*
envelope, kz_
}
if(idx < dbk_channels) {
cmd = kz_cmd_retrieve(message_id);
+ if(cmd)
+ cmd->return_code = AMQP_RESPONSE_NORMAL;
+
/*
if(cmd != NULL) {
cmd->return_code = 0;
@@ -2791,8 +2819,13 @@ void kz_amqp_consumer_worker_cb(int fd, short event, void *arg)
LM_DBG("consumer %d received payload %s\n", my_pid(), cmd->payload);
if(cmd->cmd) {
- kz_amqp_set_last_result(cmd->payload);
- kz_amqp_cb_ok(cmd->cmd);
+ if(cmd->cmd->return_code == AMQP_RESPONSE_NORMAL) {
+ kz_amqp_set_last_result(cmd->payload);
+ kz_amqp_cb_ok(cmd->cmd);
+ } else {
+ kz_amqp_reset_last_result();
+ kz_amqp_cb_error(cmd->cmd);
+ }
} else {
kz_amqp_consumer_event(cmd->payload, cmd->event_key, cmd->event_subkey);
}