Module: kamailio Branch: master Commit: bd179897f8e38913bde17a9e5c27106506f04f26 URL: https://github.com/kamailio/kamailio/commit/bd179897f8e38913bde17a9e5c271065...
Author: Hugh Waite que273@users.noreply.github.com Committer: Hugh Waite que273@users.noreply.github.com Date: 2015-09-24T14:43:20+01:00
Merge pull request #344 from kamailio/hpw/websocket_fragmentation
websocket: Add support for SIP message fragmentation
Thanks! For some features, I like to check with other devs that I'm not in conflict with the existing style or causing any unintended consequences!
---
Modified: modules/websocket/ws_conn.c Modified: modules/websocket/ws_conn.h Modified: modules/websocket/ws_frame.c
---
Diff: https://github.com/kamailio/kamailio/commit/bd179897f8e38913bde17a9e5c271065... Patch: https://github.com/kamailio/kamailio/commit/bd179897f8e38913bde17a9e5c271065...
---
diff --git a/modules/websocket/ws_conn.c b/modules/websocket/ws_conn.c index 87d593d..b8ede8b 100644 --- a/modules/websocket/ws_conn.c +++ b/modules/websocket/ws_conn.c @@ -201,19 +201,20 @@ int wsconn_add(struct receive_info rcv, unsigned int sub_protocol) LM_DBG("wsconn_add id [%d]\n", id);
/* Allocate and fill in new WebSocket connection */ - wsc = shm_malloc(sizeof(ws_connection_t)); + wsc = shm_malloc(sizeof(ws_connection_t) + BUF_SIZE); if (wsc == NULL) { LM_ERR("allocating shared memory\n"); return -1; } - memset(wsc, 0, sizeof(ws_connection_t)); + memset(wsc, 0, sizeof(ws_connection_t) + BUF_SIZE); wsc->id = id; wsc->id_hash = id_hash; wsc->state = WS_S_OPEN; wsc->rcv = rcv; wsc->sub_protocol = sub_protocol; wsc->run_event = 0; + wsc->frag_buf.s = ((char*)wsc) + sizeof(ws_connection_t); atomic_set(&wsc->refcnt, 0);
LM_DBG("wsconn_add new wsc => [%p], ref => [%d]\n", wsc, atomic_get(&wsc->refcnt)); diff --git a/modules/websocket/ws_conn.h b/modules/websocket/ws_conn.h index 029d570..df6400a 100644 --- a/modules/websocket/ws_conn.h +++ b/modules/websocket/ws_conn.h @@ -60,6 +60,8 @@ typedef struct ws_connection
atomic_t refcnt; int run_event; + + str frag_buf; } ws_connection_t;
typedef struct diff --git a/modules/websocket/ws_frame.c b/modules/websocket/ws_frame.c index e890770..16f7901 100644 --- a/modules/websocket/ws_frame.c +++ b/modules/websocket/ws_frame.c @@ -431,15 +431,6 @@ static int decode_and_validate_ws_frame(ws_frame_t *frame, frame->opcode = (buf[0] & 0xff) & BYTE0_MASK_OPCODE; frame->mask = (buf[1] & 0xff) & BYTE1_MASK_MASK; - if (!frame->fin) - { - LM_WARN("WebSocket fragmentation not supported in the sip " - "sub-protocol\n"); - *err_code = 1002; - *err_text = str_status_protocol_error; - return -1; - } - if (frame->rsv1 || frame->rsv2 || frame->rsv3) { LM_WARN("WebSocket reserved fields with non-zero values\n"); @@ -450,6 +441,10 @@ static int decode_and_validate_ws_frame(ws_frame_t *frame,
switch(frame->opcode) { + case OPCODE_CONTINUATION: + LM_DBG("supported continuation frame: 0x%x\n", + (unsigned char) frame->opcode); + break; case OPCODE_TEXT_FRAME: case OPCODE_BINARY_FRAME: LM_DBG("supported non-control frame: 0x%x\n", @@ -651,6 +646,36 @@ int ws_frame_receive(void *data)
switch(opcode) { + case OPCODE_CONTINUATION: + if (likely(frame.wsc->sub_protocol == SUB_PROTOCOL_SIP)) + { + if (frame.wsc->frag_buf.len + frame.payload_len >= BUF_SIZE) + { + LM_ERR("Buffer overflow assembling websocket fragments %d + %d = %d\n", frame.wsc->frag_buf.len, frame.payload_len, frame.wsc->frag_buf.len + frame.payload_len); + wsconn_put(frame.wsc); + return -1; + } + memcpy(frame.wsc->frag_buf.s + frame.wsc->frag_buf.len, frame.payload_data, frame.payload_len); + frame.wsc->frag_buf.len += frame.payload_len; + frame.wsc->frag_buf.s[frame.wsc->frag_buf.len] = '\0'; + + if (frame.fin) + { + ret = receive_msg(frame.wsc->frag_buf.s, + frame.wsc->frag_buf.len, + tcpinfo->rcv); + wsconn_put(frame.wsc); + return ret; + } + wsconn_put(frame.wsc); + return 0; + } + else + { + LM_ERR("Unsupported fragmented sub-protocol"); + wsconn_put(frame.wsc); + return -1; + } case OPCODE_TEXT_FRAME: case OPCODE_BINARY_FRAME: if (likely(frame.wsc->sub_protocol == SUB_PROTOCOL_SIP)) @@ -659,11 +684,23 @@ int ws_frame_receive(void *data) frame.payload_data); update_stat(ws_sip_received_frames, 1);
- wsconn_put(frame.wsc); + if (frame.fin) + {
- return receive_msg(frame.payload_data, + wsconn_put(frame.wsc); + + return receive_msg(frame.payload_data, frame.payload_len, tcpinfo->rcv); + } + else + { + memcpy(frame.wsc->frag_buf.s, frame.payload_data, frame.payload_len); + frame.wsc->frag_buf.len = frame.payload_len; + frame.wsc->frag_buf.s[frame.wsc->frag_buf.len] = '\0'; + wsconn_put(frame.wsc); + return 0; + } } else if (frame.wsc->sub_protocol == SUB_PROTOCOL_MSRP) {