Module: kamailio
Branch: master
Commit: bd179897f8e38913bde17a9e5c27106506f04f26
URL:
https://github.com/kamailio/kamailio/commit/bd179897f8e38913bde17a9e5c27106…
Author: Hugh Waite <que273(a)users.noreply.github.com>
Committer: Hugh Waite <que273(a)users.noreply.github.com>
Date: 2015-09-24T14:43:20+01:00
Merge pull request #344 from kamailio/hpw/websocket_fragmentation
websocket: Add support for SIP message fragmentation
Thanks!
For some features, I like to check with other devs that I'm not in conflict with the
existing style or causing any unintended consequences!
---
Modified: modules/websocket/ws_conn.c
Modified: modules/websocket/ws_conn.h
Modified: modules/websocket/ws_frame.c
---
Diff:
https://github.com/kamailio/kamailio/commit/bd179897f8e38913bde17a9e5c27106…
Patch:
https://github.com/kamailio/kamailio/commit/bd179897f8e38913bde17a9e5c27106…
---
diff --git a/modules/websocket/ws_conn.c b/modules/websocket/ws_conn.c
index 87d593d..b8ede8b 100644
--- a/modules/websocket/ws_conn.c
+++ b/modules/websocket/ws_conn.c
@@ -201,19 +201,20 @@ int wsconn_add(struct receive_info rcv, unsigned int sub_protocol)
LM_DBG("wsconn_add id [%d]\n", id);
/* Allocate and fill in new WebSocket connection */
- wsc = shm_malloc(sizeof(ws_connection_t));
+ wsc = shm_malloc(sizeof(ws_connection_t) + BUF_SIZE);
if (wsc == NULL)
{
LM_ERR("allocating shared memory\n");
return -1;
}
- memset(wsc, 0, sizeof(ws_connection_t));
+ memset(wsc, 0, sizeof(ws_connection_t) + BUF_SIZE);
wsc->id = id;
wsc->id_hash = id_hash;
wsc->state = WS_S_OPEN;
wsc->rcv = rcv;
wsc->sub_protocol = sub_protocol;
wsc->run_event = 0;
+ wsc->frag_buf.s = ((char*)wsc) + sizeof(ws_connection_t);
atomic_set(&wsc->refcnt, 0);
LM_DBG("wsconn_add new wsc => [%p], ref => [%d]\n", wsc,
atomic_get(&wsc->refcnt));
diff --git a/modules/websocket/ws_conn.h b/modules/websocket/ws_conn.h
index 029d570..df6400a 100644
--- a/modules/websocket/ws_conn.h
+++ b/modules/websocket/ws_conn.h
@@ -60,6 +60,8 @@ typedef struct ws_connection
atomic_t refcnt;
int run_event;
+
+ str frag_buf;
} ws_connection_t;
typedef struct
diff --git a/modules/websocket/ws_frame.c b/modules/websocket/ws_frame.c
index e890770..16f7901 100644
--- a/modules/websocket/ws_frame.c
+++ b/modules/websocket/ws_frame.c
@@ -431,15 +431,6 @@ static int decode_and_validate_ws_frame(ws_frame_t *frame,
frame->opcode = (buf[0] & 0xff) & BYTE0_MASK_OPCODE;
frame->mask = (buf[1] & 0xff) & BYTE1_MASK_MASK;
- if (!frame->fin)
- {
- LM_WARN("WebSocket fragmentation not supported in the sip "
- "sub-protocol\n");
- *err_code = 1002;
- *err_text = str_status_protocol_error;
- return -1;
- }
-
if (frame->rsv1 || frame->rsv2 || frame->rsv3)
{
LM_WARN("WebSocket reserved fields with non-zero values\n");
@@ -450,6 +441,10 @@ static int decode_and_validate_ws_frame(ws_frame_t *frame,
switch(frame->opcode)
{
+ case OPCODE_CONTINUATION:
+ LM_DBG("supported continuation frame: 0x%x\n",
+ (unsigned char) frame->opcode);
+ break;
case OPCODE_TEXT_FRAME:
case OPCODE_BINARY_FRAME:
LM_DBG("supported non-control frame: 0x%x\n",
@@ -651,6 +646,36 @@ int ws_frame_receive(void *data)
switch(opcode)
{
+ case OPCODE_CONTINUATION:
+ if (likely(frame.wsc->sub_protocol == SUB_PROTOCOL_SIP))
+ {
+ if (frame.wsc->frag_buf.len + frame.payload_len >= BUF_SIZE)
+ {
+ LM_ERR("Buffer overflow assembling websocket fragments %d + %d = %d\n",
frame.wsc->frag_buf.len, frame.payload_len, frame.wsc->frag_buf.len +
frame.payload_len);
+ wsconn_put(frame.wsc);
+ return -1;
+ }
+ memcpy(frame.wsc->frag_buf.s + frame.wsc->frag_buf.len, frame.payload_data,
frame.payload_len);
+ frame.wsc->frag_buf.len += frame.payload_len;
+ frame.wsc->frag_buf.s[frame.wsc->frag_buf.len] = '\0';
+
+ if (frame.fin)
+ {
+ ret = receive_msg(frame.wsc->frag_buf.s,
+ frame.wsc->frag_buf.len,
+ tcpinfo->rcv);
+ wsconn_put(frame.wsc);
+ return ret;
+ }
+ wsconn_put(frame.wsc);
+ return 0;
+ }
+ else
+ {
+ LM_ERR("Unsupported fragmented sub-protocol");
+ wsconn_put(frame.wsc);
+ return -1;
+ }
case OPCODE_TEXT_FRAME:
case OPCODE_BINARY_FRAME:
if (likely(frame.wsc->sub_protocol == SUB_PROTOCOL_SIP))
@@ -659,11 +684,23 @@ int ws_frame_receive(void *data)
frame.payload_data);
update_stat(ws_sip_received_frames, 1);
- wsconn_put(frame.wsc);
+ if (frame.fin)
+ {
- return receive_msg(frame.payload_data,
+ wsconn_put(frame.wsc);
+
+ return receive_msg(frame.payload_data,
frame.payload_len,
tcpinfo->rcv);
+ }
+ else
+ {
+ memcpy(frame.wsc->frag_buf.s, frame.payload_data, frame.payload_len);
+ frame.wsc->frag_buf.len = frame.payload_len;
+ frame.wsc->frag_buf.s[frame.wsc->frag_buf.len] = '\0';
+ wsconn_put(frame.wsc);
+ return 0;
+ }
}
else if (frame.wsc->sub_protocol == SUB_PROTOCOL_MSRP)
{