Module: kamailio
Branch: master
Commit: 78c0275b9081d4ee18a89f702f3931e3c3f83489
URL:
https://github.com/kamailio/kamailio/commit/78c0275b9081d4ee18a89f702f3931e…
Author: joelbax <98022231+joelbax(a)users.noreply.github.com>
Committer: GitHub <noreply(a)github.com>
Date: 2023-07-17T17:21:36+02:00
rabbitmq: Adding amqps support (#3511)
* rabbitmq: Adding amqps support
Adding support for secure AMQP connections over TLS (amqps).
* rabbitmq: Adding amqps support
Adding support for secure AMQP connections over TLS (amqps).
rabbitmq: Adding amqps support
Adding support for secure AMQP connections over TLS (amqps).
rabbitmq: Format fixes
Some style format fixes
* rabbitmq: Format fixes
Fixing some missing spaces
* rabbitmq: Typo fix
Fixing inilialized by initialized
---
Modified: src/modules/rabbitmq/rabbitmq.c
Modified: src/modules/rabbitmq/rabbitmq.h
---
Diff:
https://github.com/kamailio/kamailio/commit/78c0275b9081d4ee18a89f702f3931e…
Patch:
https://github.com/kamailio/kamailio/commit/78c0275b9081d4ee18a89f702f3931e…
---
diff --git a/src/modules/rabbitmq/rabbitmq.c b/src/modules/rabbitmq/rabbitmq.c
index f1c44ff113b..94341070508 100644
--- a/src/modules/rabbitmq/rabbitmq.c
+++ b/src/modules/rabbitmq/rabbitmq.c
@@ -52,6 +52,7 @@
#include <stdint.h>
#include <amqp_tcp_socket.h>
+#include <amqp_ssl_socket.h>
#include <amqp.h>
#include <amqp_framing.h>
@@ -73,10 +74,12 @@ static amqp_connection_state_t amqp_conn = NULL;
/* module parameters */
static struct amqp_connection_info amqp_info;
static char *amqp_url = RABBITMQ_DEFAULT_AMQP_URL;
+static char *rmq_amqps_ca_file = NULL;
static int max_reconnect_attempts = 1;
static int timeout_sec = 1;
static int timeout_usec = 0;
static int direct_reply_to = 0;
+static int amqp_ssl_init_called = 0;
/* module helper functions */
static int rabbitmq_connect(amqp_connection_state_t *conn);
@@ -124,6 +127,7 @@ static cmd_export_t cmds[] = {
/* module parameters */
static param_export_t params[] = {{"url", PARAM_STRING, &amqp_url},
+ {"amqps_ca_file", PARAM_STRING, &rmq_amqps_ca_file},
{"timeout_sec", PARAM_INT, &timeout_sec},
{"timeout_usec", PARAM_INT, &timeout_usec},
{"direct_reply_to", PARAM_INT, &direct_reply_to}, {0, 0, 0}};
@@ -557,25 +561,52 @@ static int rabbitmq_connect(amqp_connection_state_t *conn)
int ret;
int log_ret;
// amqp_rpc_reply_t reply;
+
+ // amqp_ssl_init_called should only be called once
+ if(amqp_info.ssl && !amqp_ssl_init_called) {
+ amqp_set_initialize_ssl_library(1);
+ amqp_ssl_init_called = 1;
+ LM_DBG("AMQP SSL library initialized\n");
+ }
// establish a new connection to RabbitMQ server
*conn = amqp_new_connection();
+ if(!conn) {
+ LM_ERR("FAIL: create AMQP connection\n");
+ return RABBITMQ_ERR_CREATE;
+ }
log_ret = log_on_amqp_error(
amqp_get_rpc_reply(*conn), "amqp_new_connection()");
if(log_ret != AMQP_RESPONSE_NORMAL && log_ret != AMQP_RESPONSE_NONE) {
return RABBITMQ_ERR_CONNECT;
}
- amqp_sock = amqp_tcp_socket_new(*conn);
+ amqp_sock = (amqp_info.ssl) ? amqp_ssl_socket_new(*conn)
+ : amqp_tcp_socket_new(*conn);
if(!amqp_sock) {
LM_ERR("FAIL: create TCP amqp_sock");
amqp_destroy_connection(*conn);
return RABBITMQ_ERR_SOCK;
}
+ if(rmq_amqps_ca_file) {
+ if(amqp_ssl_socket_set_cacert(amqp_sock, rmq_amqps_ca_file)) {
+ LM_ERR("Failed to set CA certificate for amqps connection\n");
+ return RABBITMQ_ERR_SSL_CACERT;
+ }
+ }
+
+#if AMQP_VERSION_MAJOR == 0 && AMQP_VERSION_MINOR < 8
+ amqp_ssl_socket_set_verify(amqp_sock, 1);
+#else
+ amqp_ssl_socket_set_verify_peer(amqp_sock, 1);
+ amqp_ssl_socket_set_verify_hostname(amqp_sock, 1);
+#endif
+
ret = amqp_socket_open(amqp_sock, amqp_info.host, amqp_info.port);
if(ret != AMQP_STATUS_OK) {
- LM_ERR("FAIL: open TCP sock, amqp_status=%d", ret);
+ LM_ERR("FAIL: open %s sock, amqp_status=%d",
+ (amqp_info.ssl) ? "SSL" : "TCP", ret);
// amqp_destroy_connection(*conn);
return RABBITMQ_ERR_SOCK;
}
diff --git a/src/modules/rabbitmq/rabbitmq.h b/src/modules/rabbitmq/rabbitmq.h
index 30f39ca7fd5..b37c84674f9 100644
--- a/src/modules/rabbitmq/rabbitmq.h
+++ b/src/modules/rabbitmq/rabbitmq.h
@@ -49,8 +49,10 @@ typedef enum
RABBITMQ_ERR_CHANNEL,
RABBITMQ_ERR_QUEUE,
RABBITMQ_ERR_PUBLISH,
+ RABBITMQ_ERR_CREATE,
RABBITMQ_ERR_SOCK,
RABBITMQ_ERR_CONSUME,
+ RABBITMQ_ERR_SSL_CACERT,
RABBITMQ_ERR_NULL,
} RABBITMQ_ENUM;