Module: kamailio Branch: master Commit: 78c0275b9081d4ee18a89f702f3931e3c3f83489 URL: https://github.com/kamailio/kamailio/commit/78c0275b9081d4ee18a89f702f3931e3...
Author: joelbax 98022231+joelbax@users.noreply.github.com Committer: GitHub noreply@github.com Date: 2023-07-17T17:21:36+02:00
rabbitmq: Adding amqps support (#3511)
* rabbitmq: Adding amqps support
Adding support for secure AMQP connections over TLS (amqps).
* rabbitmq: Adding amqps support
Adding support for secure AMQP connections over TLS (amqps).
rabbitmq: Adding amqps support
Adding support for secure AMQP connections over TLS (amqps).
rabbitmq: Format fixes
Some style format fixes
* rabbitmq: Format fixes
Fixing some missing spaces
* rabbitmq: Typo fix
Fixing inilialized by initialized
---
Modified: src/modules/rabbitmq/rabbitmq.c Modified: src/modules/rabbitmq/rabbitmq.h
---
Diff: https://github.com/kamailio/kamailio/commit/78c0275b9081d4ee18a89f702f3931e3... Patch: https://github.com/kamailio/kamailio/commit/78c0275b9081d4ee18a89f702f3931e3...
---
diff --git a/src/modules/rabbitmq/rabbitmq.c b/src/modules/rabbitmq/rabbitmq.c index f1c44ff113b..94341070508 100644 --- a/src/modules/rabbitmq/rabbitmq.c +++ b/src/modules/rabbitmq/rabbitmq.c @@ -52,6 +52,7 @@
#include <stdint.h> #include <amqp_tcp_socket.h> +#include <amqp_ssl_socket.h> #include <amqp.h> #include <amqp_framing.h>
@@ -73,10 +74,12 @@ static amqp_connection_state_t amqp_conn = NULL; /* module parameters */ static struct amqp_connection_info amqp_info; static char *amqp_url = RABBITMQ_DEFAULT_AMQP_URL; +static char *rmq_amqps_ca_file = NULL; static int max_reconnect_attempts = 1; static int timeout_sec = 1; static int timeout_usec = 0; static int direct_reply_to = 0; +static int amqp_ssl_init_called = 0;
/* module helper functions */ static int rabbitmq_connect(amqp_connection_state_t *conn); @@ -124,6 +127,7 @@ static cmd_export_t cmds[] = {
/* module parameters */ static param_export_t params[] = {{"url", PARAM_STRING, &amqp_url}, + {"amqps_ca_file", PARAM_STRING, &rmq_amqps_ca_file}, {"timeout_sec", PARAM_INT, &timeout_sec}, {"timeout_usec", PARAM_INT, &timeout_usec}, {"direct_reply_to", PARAM_INT, &direct_reply_to}, {0, 0, 0}}; @@ -557,25 +561,52 @@ static int rabbitmq_connect(amqp_connection_state_t *conn) int ret; int log_ret; // amqp_rpc_reply_t reply; + + // amqp_ssl_init_called should only be called once + if(amqp_info.ssl && !amqp_ssl_init_called) { + amqp_set_initialize_ssl_library(1); + amqp_ssl_init_called = 1; + LM_DBG("AMQP SSL library initialized\n"); + }
// establish a new connection to RabbitMQ server *conn = amqp_new_connection(); + if(!conn) { + LM_ERR("FAIL: create AMQP connection\n"); + return RABBITMQ_ERR_CREATE; + } log_ret = log_on_amqp_error( amqp_get_rpc_reply(*conn), "amqp_new_connection()"); if(log_ret != AMQP_RESPONSE_NORMAL && log_ret != AMQP_RESPONSE_NONE) { return RABBITMQ_ERR_CONNECT; }
- amqp_sock = amqp_tcp_socket_new(*conn); + amqp_sock = (amqp_info.ssl) ? amqp_ssl_socket_new(*conn) + : amqp_tcp_socket_new(*conn); if(!amqp_sock) { LM_ERR("FAIL: create TCP amqp_sock"); amqp_destroy_connection(*conn); return RABBITMQ_ERR_SOCK; }
+ if(rmq_amqps_ca_file) { + if(amqp_ssl_socket_set_cacert(amqp_sock, rmq_amqps_ca_file)) { + LM_ERR("Failed to set CA certificate for amqps connection\n"); + return RABBITMQ_ERR_SSL_CACERT; + } + } + +#if AMQP_VERSION_MAJOR == 0 && AMQP_VERSION_MINOR < 8 + amqp_ssl_socket_set_verify(amqp_sock, 1); +#else + amqp_ssl_socket_set_verify_peer(amqp_sock, 1); + amqp_ssl_socket_set_verify_hostname(amqp_sock, 1); +#endif + ret = amqp_socket_open(amqp_sock, amqp_info.host, amqp_info.port); if(ret != AMQP_STATUS_OK) { - LM_ERR("FAIL: open TCP sock, amqp_status=%d", ret); + LM_ERR("FAIL: open %s sock, amqp_status=%d", + (amqp_info.ssl) ? "SSL" : "TCP", ret); // amqp_destroy_connection(*conn); return RABBITMQ_ERR_SOCK; } diff --git a/src/modules/rabbitmq/rabbitmq.h b/src/modules/rabbitmq/rabbitmq.h index 30f39ca7fd5..b37c84674f9 100644 --- a/src/modules/rabbitmq/rabbitmq.h +++ b/src/modules/rabbitmq/rabbitmq.h @@ -49,8 +49,10 @@ typedef enum RABBITMQ_ERR_CHANNEL, RABBITMQ_ERR_QUEUE, RABBITMQ_ERR_PUBLISH, + RABBITMQ_ERR_CREATE, RABBITMQ_ERR_SOCK, RABBITMQ_ERR_CONSUME, + RABBITMQ_ERR_SSL_CACERT, RABBITMQ_ERR_NULL, } RABBITMQ_ENUM;