Module: sip-router Branch: master Commit: e6a50c5c0957a5ad3e08e57ede5be775a41ac24f URL: http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=e6a50c5c...
Author: pd peter.dunkley@crocodile-rcs.com Committer: pd peter.dunkley@crocodile-rcs.com Date: Mon Jan 30 17:06:42 2012 +0000
modules_k/presence: Improved handling of retransmitted SUBSCRIBE requests
- handle_subscribe() doesn't handle retransmitted SUBSCRIBEs properly. This was noticed with back-end SUBSCRIBEs from RLS under heavy load (also tried TCP here but under-load this caused a different set of problems with buffer sizes and buffers taking too long to process). - Although this was originally observed with RLS back-end SUBSCRIBEs it appears to be a general issue when UDP is used. - There were two main problems: 1) On an un-SUBSCRIBE the record in the hash-table or DB will be removed. If the un-SUBSCRIBE is retransmitted there is no record to be found and handle_subscribe() fails. 2) After fixing 1, and on re-SUBSCRIBE, remote CSeq's with lower than expected values cause failures. This can also happen when there are retransmissions. - The fix was to catch both these cases and treat them as a special class of error. In these two cases and when the protocol is UDP, a correct-looking 2XX response is sent, but no further processing (database updates, sending NOTIFY, and so on) is performed on the SUBSCRIBE request. - Also modified the query in get_database_info() to just use Call-ID, To-tag, and From-tag for dialog matching (so it duplicates the query from get_stored_info()) as the query that was there looked a little aggressive.
---
modules_k/presence/subscribe.c | 247 +++++++++++++++++++++------------------ 1 files changed, 133 insertions(+), 114 deletions(-)
diff --git a/modules_k/presence/subscribe.c b/modules_k/presence/subscribe.c index d11287c..8add119 100644 --- a/modules_k/presence/subscribe.c +++ b/modules_k/presence/subscribe.c @@ -762,7 +762,10 @@ int handle_subscribe(struct sip_msg* msg, char* str1, char* str2) { if(get_stored_info(msg, &subs, &reply_code, &reply_str )< 0) { - LM_ERR("getting stored info\n"); + if (msg->rcv.proto == PROTO_UDP) + LM_INFO("problem getting stored info - possible retransmission\n"); + else + LM_ERR("getting stored info\n"); goto error; } reason= subs.reason; @@ -885,9 +888,26 @@ error: if(sent_reply== 0) { - if(send_error_reply(msg, reply_code, reply_str)< 0) + if (reply_code == 200) { - LM_ERR("failed to send reply on error case\n"); + if(subs.event->type & PUBL_TYPE) + { + if(send_2XX_reply(msg, 202, subs.expires, + &subs.local_contact) <0) + LM_ERR("failed to send reply on error case\n"); + } + else + { + /* For presence.winfo */ + if(send_2XX_reply(msg, 200, subs.expires, + &subs.local_contact) <0) + LM_ERR("failed to send reply on error case\n"); + } + } + else + { + if(send_error_reply(msg, reply_code, reply_str)< 0) + LM_ERR("failed to send reply on error case\n"); } }
@@ -1082,21 +1102,21 @@ int extract_sdialog_info(subs_t* subs,struct sip_msg* msg, int mexp, subs->contact.s, subs->contact.len);
if (EVENT_DIALOG_SLA(subs->event->evp)) - { - /* user_contact@from_domain */ - if(parse_uri(subs->contact.s, subs->contact.len, &uri)< 0) - { - LM_ERR("failed to parse contact uri\n"); - goto error; - } - if(uandd_to_uri(uri.user, subs->from_domain, &subs->pres_uri)< 0) - { - LM_ERR("failed to construct uri\n"); - goto error; - } - LM_DBG("&&&&&&&&&&&&&&& dialog pres_uri= %.*s\n",subs->pres_uri.len, subs->pres_uri.s); - } - + { + /* user_contact@from_domain */ + if(parse_uri(subs->contact.s, subs->contact.len, &uri)< 0) + { + LM_ERR("failed to parse contact uri\n"); + goto error; + } + if(uandd_to_uri(uri.user, subs->from_domain, &subs->pres_uri)< 0) + { + LM_ERR("failed to construct uri\n"); + goto error; + } + LM_DBG("&&&&&&&&&&&&&&& dialog pres_uri= %.*s\n", + subs->pres_uri.len, subs->pres_uri.s); + }
/*process record route and add it to a string*/ if(*to_tag_gen && msg->record_route!=NULL) @@ -1152,18 +1172,17 @@ int get_stored_info(struct sip_msg* msg, subs_t* subs, int* reply_code, unsigned int hash_code;
/* first try to_user== pres_user and to_domain== pres_domain */ - - if(subs->pres_uri.s == NULL) - { - uandd_to_uri(subs->to_user, subs->to_domain, &pres_uri); - if(pres_uri.s== NULL) - { - LM_ERR("creating uri from user and domain\n"); - return -1; - } - } - else - pres_uri = subs->pres_uri; + if(subs->pres_uri.s == NULL) + { + uandd_to_uri(subs->to_user, subs->to_domain, &pres_uri); + if(pres_uri.s== NULL) + { + LM_ERR("creating uri from user and domain\n"); + return -1; + } + } + else + pres_uri = subs->pres_uri;
hash_code= core_hash(&pres_uri, &subs->event->name, shtable_size); lock_get(&subs_htable[hash_code].lock); @@ -1176,14 +1195,13 @@ int get_stored_info(struct sip_msg* msg, subs_t* subs, int* reply_code, } lock_release(&subs_htable[hash_code].lock);
- if(subs->pres_uri.s) - goto not_found; + if(subs->pres_uri.s) + goto not_found; - pkg_free(pres_uri.s); + pkg_free(pres_uri.s); pres_uri.s= NULL; -
- LM_DBG("record not found using R-URI search iteratively\n"); + LM_DBG("record not found using R-URI search iteratively\n"); /* take one row at a time */ for(i= 0; i< shtable_size; i++) { @@ -1207,26 +1225,35 @@ int get_stored_info(struct sip_msg* msg, subs_t* subs, int* reply_code, lock_release(&subs_htable[i].lock); }
+not_found: if(dbmode != DB_MEMORY_ONLY) { - return get_database_info(msg, subs, reply_code, reply_str); + return get_database_info(msg, subs, reply_code, reply_str); }
-not_found: - - LM_ERR("record not found in hash_table\n"); - *reply_code= 481; - *reply_str= pu_481_rpl; + if (msg->rcv.proto == PROTO_UDP && subs->expires == 0) + { + /* Assume it's a retransmission of an un-SUBSCRIBE */ + LM_INFO("No matching subscription dialog found in database - possible retransmission of un-SUBSCRIBE?\n"); + *reply_code= 200; + *reply_str= su_200_rpl; + } + else + { + /* It's definitely an error */ + LM_ERR("record not found in hash_table\n"); + *reply_code= 481; + *reply_str= pu_481_rpl; + }
return -1;
found_rec: - LM_DBG("Record found in hash_table\n"); - + if(!EVENT_DIALOG_SLA(s->event->evp)) subs->pres_uri= pres_uri; - + subs->version = s->version; subs->status= s->status; if(s->reason.s && s->reason.len) @@ -1257,14 +1284,25 @@ found_rec: if(subs->remote_cseq<= s->remote_cseq) { - LM_ERR("wrong sequence number;received: %d - stored: %d\n", - subs->remote_cseq, s->remote_cseq); - - *reply_code= 400; - *reply_str= pu_400_rpl;
lock_release(&subs_htable[i].lock); - goto error; + if (msg->rcv.proto == PROTO_UDP) + { + /* Assume it's a retransmission of a SUBSCRIBE */ + LM_INFO("Possible retransmission of SUBSCRIBE?\n"); + *reply_code= 200; + *reply_str= su_200_rpl; + return -1; + } + else + { + /* It's definitely an error */ + LM_ERR("wrong sequence number received: %d - stored: %d\n", + subs->remote_cseq, s->remote_cseq); + *reply_code= 400; + *reply_str= pu_400_rpl; + goto error; + } } lock_release(&subs_htable[i].lock);
@@ -1282,64 +1320,20 @@ error:
int get_database_info(struct sip_msg* msg, subs_t* subs, int* reply_code, str* reply_str) { - db_key_t query_cols[10]; - db_val_t query_vals[10]; - db_key_t result_cols[9]; + db_key_t query_cols[3]; + db_val_t query_vals[3]; + db_key_t result_cols[7]; db1_res_t *result= NULL; db_row_t *row ; db_val_t *row_vals ; int n_query_cols = 0; int n_result_cols = 0; int remote_cseq_col= 0, local_cseq_col= 0, status_col, reason_col; - int record_route_col, version_col; - int pres_uri_col; + int record_route_col, version_col, pres_uri_col; unsigned int remote_cseq; str pres_uri, record_route; str reason;
- query_cols[n_query_cols] = &str_to_user_col; - query_vals[n_query_cols].type = DB1_STR; - query_vals[n_query_cols].nul = 0; - query_vals[n_query_cols].val.str_val = subs->to_user; - n_query_cols++; - - query_cols[n_query_cols] = &str_to_domain_col; - query_vals[n_query_cols].type = DB1_STR; - query_vals[n_query_cols].nul = 0; - query_vals[n_query_cols].val.str_val = subs->to_domain; - n_query_cols++; - - query_cols[n_query_cols] = &str_watcher_username_col; - query_vals[n_query_cols].type = DB1_STR; - query_vals[n_query_cols].nul = 0; - query_vals[n_query_cols].val.str_val = subs->from_user; - n_query_cols++; - - query_cols[n_query_cols] = &str_watcher_domain_col; - query_vals[n_query_cols].type = DB1_STR; - query_vals[n_query_cols].nul = 0; - query_vals[n_query_cols].val.str_val = subs->from_domain; - n_query_cols++; - - query_cols[n_query_cols] = &str_event_col; - query_vals[n_query_cols].type = DB1_STR; - query_vals[n_query_cols].nul = 0; - query_vals[n_query_cols].val.str_val = subs->event->name; - n_query_cols++; - - query_cols[n_query_cols] = &str_event_id_col; - query_vals[n_query_cols].type = DB1_STR; - query_vals[n_query_cols].nul = 0; - if( subs->event_id.s != NULL) - { - query_vals[n_query_cols].val.str_val.s = subs->event_id.s; - query_vals[n_query_cols].val.str_val.len = subs->event_id.len; - } else { - query_vals[n_query_cols].val.str_val.s = ""; - query_vals[n_query_cols].val.str_val.len = 0; - } - n_query_cols++; - query_cols[n_query_cols] = &str_callid_col; query_vals[n_query_cols].type = DB1_STR; query_vals[n_query_cols].nul = 0; @@ -1385,29 +1379,29 @@ int get_database_info(struct sip_msg* msg, subs_t* subs, int* reply_code, str* r
if(result && result->n <=0) { - LM_ERR("No matching subscription dialog found in database\n"); - pa_dbf.free_result(pa_db, result); - *reply_code= 481; - *reply_str= pu_481_rpl; + + if (msg->rcv.proto == PROTO_UDP && subs->expires == 0) + { + /* Assume it's a retransmission of an un-SUBSCRIBE */ + LM_INFO("No matching subscription dialog found in database - possible retransmission of un-SUBSCRIBE?\n"); + *reply_code= 200; + *reply_str= su_200_rpl; + } + else + { + /* It's definitely an error */ + LM_ERR("No matching subscription dialog found in database\n"); + *reply_code= 481; + *reply_str= pu_481_rpl; + }
return -1; }
row = &result->rows[0]; row_vals = ROW_VALUES(row); - remote_cseq= row_vals[remote_cseq_col].val.int_val; - - if(subs->remote_cseq<= remote_cseq) - { - LM_ERR("wrong sequence number received: %d - stored: %d\n", - subs->remote_cseq, remote_cseq); - *reply_code= 400; - *reply_str= pu_400_rpl; - pa_dbf.free_result(pa_db, result); - return -1; - } - + subs->status= row_vals[status_col].val.int_val; reason.s= (char*)row_vals[reason_col].val.string_val; if(reason.s) @@ -1453,6 +1447,31 @@ int get_database_info(struct sip_msg* msg, subs_t* subs, int* reply_code, str* r subs->record_route.len= record_route.len; }
+ remote_cseq= row_vals[remote_cseq_col].val.int_val; + if(subs->remote_cseq<= remote_cseq) + { + pa_dbf.free_result(pa_db, result); + + if (msg->rcv.proto == PROTO_UDP) + { + /* Assume it's a retransmission of a SUBSCRIBE */ + LM_INFO("Possible retransmission of SUBSCRIBE?\n"); + *reply_code= 200; + *reply_str= su_200_rpl; + } + else + { + /* It's definitely an error */ + LM_ERR("wrong sequence number received: %d - stored: %d\n", + subs->remote_cseq, remote_cseq); + *reply_code= 400; + *reply_str= pu_400_rpl; + } + + return -1; + } + + pa_dbf.free_result(pa_db, result); result= NULL;
Hello,
I believe that this bug also affects the 3.2 branch, but the change is quite big and with the next release of 3.2 due tomorrow I thought it best to hold off "cherry-pick"ing it until after the release. That is, unless anyone else thinks it should go in there?
Regards,
Peter
On Mon, 2012-01-30 at 18:16 +0100, Peter Dunkley wrote:
Hello,
it can be held for next minor release to be tested more, if you feel it is better (we have to include something there as well :-) ). From commit log I understood is happening usually under RLS heavy load with retransmissions, does not help creating the transaction and absorbing the retransmissions with tm?
Cheers, Daniel
On 1/30/12 6:19 PM, Peter Dunkley wrote:
Hi,
Any retransmission will cause the problem, so anyone using UDP over the Internet to a Kamailio presence server where there is occasional packet-loss will see it. It was just first noticed here under heavy load.
By creating a new transaction and absorbing the retransmissions, do you mean calling t_newtran()/t_release() when the SUBSCRIBE is received?
If so I didn't think of that. It'd make sense to do that too. I think the presence module should cope with retransmissions (especially as we need it to cope in a multi-server environment with load-balancers/fail-over and a shared database). But if using t_newtran()/t_release() will handle retransmissions in the normal case it should help reduce the load on the database.
Thanks,
Peter
On Mon, 2012-01-30 at 18:26 +0100, Daniel-Constantin Mierla wrote:
Hello,
On 1/30/12 6:35 PM, Peter Dunkley wrote:
yes, like in default config, using t_newtran() before handling the subscribe. t_check_trans() is used to figure out of there is already a transaction for that request and absorbs the request if it is retransmissions.
Not sure t_release() is explicitly needed anymore, Andrei did some work long time ago in this area, iirc, but if used it is harmless, so it is still in the default config.
Cheers, Daniel