Module: kamailio
Branch: 5.0
Commit: 9abd1e002e0b3f7a884093e7e76efc8afdd54a17
URL:
https://github.com/kamailio/kamailio/commit/9abd1e002e0b3f7a884093e7e76efc8…
Author: Sebastian Damm <damm(a)sipgate.de>
Committer: Sebastian Damm <damm(a)sipgate.de>
Date: 2017-05-12T15:34:39+02:00
rabbitmq: don't create reply-to queue on publish
When using the rabbitmq_publish function, there is no need to create
a reply to queue, because it will never be read. And since there is
never a real consumer, so the queue will never be deleted. This
will eventually cloak up the RabbitMQ server with millions of
generic reply queues.
This bug has been fixed in master already, so this is basically a
backport.
---
Modified: src/modules/rabbitmq/rabbitmq.c
---
Diff:
https://github.com/kamailio/kamailio/commit/9abd1e002e0b3f7a884093e7e76efc8…
Patch:
https://github.com/kamailio/kamailio/commit/9abd1e002e0b3f7a884093e7e76efc8…
---
diff --git a/src/modules/rabbitmq/rabbitmq.c b/src/modules/rabbitmq/rabbitmq.c
index 4cd69c9..05fa750 100644
--- a/src/modules/rabbitmq/rabbitmq.c
+++ b/src/modules/rabbitmq/rabbitmq.c
@@ -175,7 +175,6 @@ static int rabbitmq_publish(struct sip_msg* msg, char* in_exchange,
char* in_rou
int reconnect_attempts = 0;
int log_ret;
str exchange, routingkey, messagebody, contenttype;
- amqp_bytes_t reply_to_queue;
// sanity checks
if (get_str_fparam(&exchange, msg, (fparam_t*)in_exchange) < 0) {
@@ -231,44 +230,13 @@ static int rabbitmq_publish(struct sip_msg* msg, char* in_exchange,
char* in_rou
return RABBITMQ_ERR_CHANNEL;
}
- // alloc queue
- amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1,
amqp_empty_table);
- if (log_on_amqp_error(amqp_get_rpc_reply(conn), "amqp_queue_declare()") !=
AMQP_RESPONSE_NORMAL) {
- LM_ERR("FAIL: amqp_queue_declare()\n");
- amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
- return RABBITMQ_ERR_QUEUE;
- }
-
- // alloc bytes
- reply_to_queue = amqp_bytes_malloc_dup(r->queue);
- LM_DBG("%.*s\n", (int)reply_to_queue.len, (char*)reply_to_queue.bytes);
- if (reply_to_queue.bytes == NULL) {
- amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
- amqp_bytes_free(reply_to_queue);
- LM_ERR("Out of memory while copying queue name");
- return -1;
- }
-
// alloc properties
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG |
AMQP_BASIC_DELIVERY_MODE_FLAG |
- AMQP_BASIC_REPLY_TO_FLAG |
AMQP_BASIC_CORRELATION_ID_FLAG;
props.content_type = amqp_cstring_bytes(contenttype.s);
props.delivery_mode = 2; /* persistent delivery mode */
- props.reply_to = amqp_bytes_malloc_dup(reply_to_queue);
- if (props.reply_to.bytes == NULL) {
- // debug
- LM_ERR("Out of memory while copying queue name");
-
- // cleanup
- amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
- amqp_bytes_free(reply_to_queue);
-
- // error
- return -1;
- }
props.correlation_id = amqp_cstring_bytes("1");
// publish
@@ -285,7 +253,6 @@ static int rabbitmq_publish(struct sip_msg* msg, char* in_exchange,
char* in_rou
// cleanup
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
- amqp_bytes_free(reply_to_queue);
// error
return RABBITMQ_ERR_PUBLISH;
@@ -295,8 +262,6 @@ static int rabbitmq_publish(struct sip_msg* msg, char* in_exchange,
char* in_rou
LM_DBG("SUCCESS: amqp_basic_publish()\n");
// cleanup
- amqp_bytes_free(props.reply_to);
- amqp_bytes_free(reply_to_queue);
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
// success