Module: kamailio
Branch: master
Commit: 38a79cdc42b952ac0291d249241ca6499fc269aa
URL:
https://github.com/kamailio/kamailio/commit/38a79cdc42b952ac0291d249241ca64…
Author: Hugh Waite <hugh.waite(a)xura.com>
Committer: Hugh Waite <hugh.waite(a)xura.com>
Date: 2015-09-23T21:59:09+01:00
websocket: Add support for SIP message fragmentation
- websocket connections are created with a BUF_SIZE (64K) buffer used for concatenating
frames
- continuation frames (fragments) are supported for the SIP sub-protocol
---
Modified: modules/websocket/ws_conn.c
Modified: modules/websocket/ws_conn.h
Modified: modules/websocket/ws_frame.c
---
Diff:
https://github.com/kamailio/kamailio/commit/38a79cdc42b952ac0291d249241ca64…
Patch:
https://github.com/kamailio/kamailio/commit/38a79cdc42b952ac0291d249241ca64…
---
diff --git a/modules/websocket/ws_conn.c b/modules/websocket/ws_conn.c
index 87d593d..b8ede8b 100644
--- a/modules/websocket/ws_conn.c
+++ b/modules/websocket/ws_conn.c
@@ -201,19 +201,20 @@ int wsconn_add(struct receive_info rcv, unsigned int sub_protocol)
LM_DBG("wsconn_add id [%d]\n", id);
/* Allocate and fill in new WebSocket connection */
- wsc = shm_malloc(sizeof(ws_connection_t));
+ wsc = shm_malloc(sizeof(ws_connection_t) + BUF_SIZE);
if (wsc == NULL)
{
LM_ERR("allocating shared memory\n");
return -1;
}
- memset(wsc, 0, sizeof(ws_connection_t));
+ memset(wsc, 0, sizeof(ws_connection_t) + BUF_SIZE);
wsc->id = id;
wsc->id_hash = id_hash;
wsc->state = WS_S_OPEN;
wsc->rcv = rcv;
wsc->sub_protocol = sub_protocol;
wsc->run_event = 0;
+ wsc->frag_buf.s = ((char*)wsc) + sizeof(ws_connection_t);
atomic_set(&wsc->refcnt, 0);
LM_DBG("wsconn_add new wsc => [%p], ref => [%d]\n", wsc,
atomic_get(&wsc->refcnt));
diff --git a/modules/websocket/ws_conn.h b/modules/websocket/ws_conn.h
index 029d570..df6400a 100644
--- a/modules/websocket/ws_conn.h
+++ b/modules/websocket/ws_conn.h
@@ -60,6 +60,8 @@ typedef struct ws_connection
atomic_t refcnt;
int run_event;
+
+ str frag_buf;
} ws_connection_t;
typedef struct
diff --git a/modules/websocket/ws_frame.c b/modules/websocket/ws_frame.c
index e890770..16f7901 100644
--- a/modules/websocket/ws_frame.c
+++ b/modules/websocket/ws_frame.c
@@ -431,15 +431,6 @@ static int decode_and_validate_ws_frame(ws_frame_t *frame,
frame->opcode = (buf[0] & 0xff) & BYTE0_MASK_OPCODE;
frame->mask = (buf[1] & 0xff) & BYTE1_MASK_MASK;
- if (!frame->fin)
- {
- LM_WARN("WebSocket fragmentation not supported in the sip "
- "sub-protocol\n");
- *err_code = 1002;
- *err_text = str_status_protocol_error;
- return -1;
- }
-
if (frame->rsv1 || frame->rsv2 || frame->rsv3)
{
LM_WARN("WebSocket reserved fields with non-zero values\n");
@@ -450,6 +441,10 @@ static int decode_and_validate_ws_frame(ws_frame_t *frame,
switch(frame->opcode)
{
+ case OPCODE_CONTINUATION:
+ LM_DBG("supported continuation frame: 0x%x\n",
+ (unsigned char) frame->opcode);
+ break;
case OPCODE_TEXT_FRAME:
case OPCODE_BINARY_FRAME:
LM_DBG("supported non-control frame: 0x%x\n",
@@ -651,6 +646,36 @@ int ws_frame_receive(void *data)
switch(opcode)
{
+ case OPCODE_CONTINUATION:
+ if (likely(frame.wsc->sub_protocol == SUB_PROTOCOL_SIP))
+ {
+ if (frame.wsc->frag_buf.len + frame.payload_len >= BUF_SIZE)
+ {
+ LM_ERR("Buffer overflow assembling websocket fragments %d + %d = %d\n",
frame.wsc->frag_buf.len, frame.payload_len, frame.wsc->frag_buf.len +
frame.payload_len);
+ wsconn_put(frame.wsc);
+ return -1;
+ }
+ memcpy(frame.wsc->frag_buf.s + frame.wsc->frag_buf.len, frame.payload_data,
frame.payload_len);
+ frame.wsc->frag_buf.len += frame.payload_len;
+ frame.wsc->frag_buf.s[frame.wsc->frag_buf.len] = '\0';
+
+ if (frame.fin)
+ {
+ ret = receive_msg(frame.wsc->frag_buf.s,
+ frame.wsc->frag_buf.len,
+ tcpinfo->rcv);
+ wsconn_put(frame.wsc);
+ return ret;
+ }
+ wsconn_put(frame.wsc);
+ return 0;
+ }
+ else
+ {
+ LM_ERR("Unsupported fragmented sub-protocol");
+ wsconn_put(frame.wsc);
+ return -1;
+ }
case OPCODE_TEXT_FRAME:
case OPCODE_BINARY_FRAME:
if (likely(frame.wsc->sub_protocol == SUB_PROTOCOL_SIP))
@@ -659,11 +684,23 @@ int ws_frame_receive(void *data)
frame.payload_data);
update_stat(ws_sip_received_frames, 1);
- wsconn_put(frame.wsc);
+ if (frame.fin)
+ {
- return receive_msg(frame.payload_data,
+ wsconn_put(frame.wsc);
+
+ return receive_msg(frame.payload_data,
frame.payload_len,
tcpinfo->rcv);
+ }
+ else
+ {
+ memcpy(frame.wsc->frag_buf.s, frame.payload_data, frame.payload_len);
+ frame.wsc->frag_buf.len = frame.payload_len;
+ frame.wsc->frag_buf.s[frame.wsc->frag_buf.len] = '\0';
+ wsconn_put(frame.wsc);
+ return 0;
+ }
}
else if (frame.wsc->sub_protocol == SUB_PROTOCOL_MSRP)
{