Module: sip-router Branch: master Commit: 10327c61d35e034f31c49a27f11f81ec82c22055 URL: http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=10327c61...
Author: Peter Dunkley peter.dunkley@crocodile-rcs.com Committer: Peter Dunkley peter.dunkley@crocodile-rcs.com Date: Tue Aug 21 15:41:27 2012 +0100
modules_k/rls: Use database row/table locking where supported in DB only mode
- Under load there are lots of DB deadlocks when using (start|end)_transaction() with multiple presence processes and/or servers. - Without using (start|end)_transaction() multiple processes/servers overwrite each others changes. - Using row locking (where possible) and table locking (where required) fixes these problems. - IMPORTANT NOTE: DB only, multi-process/multi-server, presence will only work properly under high-load when using a database driver that supports transactions and locking (currently just db_postgres).
---
modules_k/rls/notify.c | 10 ++++---- modules_k/rls/resource_notify.c | 40 +++++++++++++++++++++++++++++------ modules_k/rls/rls_db.c | 37 +++++++++++++++++++++++++++++--- modules_k/rls/subscribe.c | 44 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 115 insertions(+), 16 deletions(-)
diff --git a/modules_k/rls/notify.c b/modules_k/rls/notify.c index c8eb230..f0ec0aa 100644 --- a/modules_k/rls/notify.c +++ b/modules_k/rls/notify.c @@ -101,10 +101,10 @@ int send_full_notify(subs_t* subs, xmlNodePtr rl_node, str* rl_uri, int len_est; res_param_t param; int resource_added = 0; /* Flag to indicate that we have added at least one resource */ - multipart_body = NULL; + multipart_body=NULL; + db_query_f query_fn = rlpres_dbf.query_lock ? rlpres_dbf.query_lock : rlpres_dbf.query;
LM_DBG("start\n"); - /* query in alfabetical order */ if(CONSTR_RLSUBS_DID(subs, &rlsubs_did)<0) { @@ -136,15 +136,15 @@ int send_full_notify(subs_t* subs, xmlNodePtr rl_node, str* rl_uri,
if (dbmode == RLS_DB_ONLY && rlpres_dbf.start_transaction) { - if (rlpres_dbf.start_transaction(rlpres_db) < 0) + if (rlpres_dbf.start_transaction(rlpres_db, DB_LOCKING_WRITE) < 0) { LM_ERR("in start_transaction\n"); goto error; } }
- if(rlpres_dbf.query(rlpres_db, query_cols, 0, query_vals, result_cols, - 1, n_result_cols, &str_resource_uri_col, &result )< 0) + if(query_fn(rlpres_db, query_cols, 0, query_vals, result_cols, + 1, n_result_cols, NULL, &result )< 0) { LM_ERR("in sql query\n"); goto error; diff --git a/modules_k/rls/resource_notify.c b/modules_k/rls/resource_notify.c index 8937131..1b9b47c 100644 --- a/modules_k/rls/resource_notify.c +++ b/modules_k/rls/resource_notify.c @@ -203,6 +203,15 @@ static void send_notifies(db1_res_t *result, int did_col, int resource_uri_col, ERR_MEM(PKG_MEM_STR); }
+ if (dbmode == RLS_DB_ONLY && rls_dbf.start_transaction) + { + if (rls_dbf.start_transaction(rls_db, DB_LOCKING_WRITE) < 0) + { + LM_ERR("in start_transaction\n"); + goto error; + } + } + LM_DBG("found %d records with updated state\n", result->n); for(i= 0; i< result->n; i++) { @@ -420,9 +429,17 @@ static void send_notifies(db1_res_t *result, int did_col, int resource_uri_col, dialog= NULL; }
- -error: done: + if (dbmode == RLS_DB_ONLY && rls_dbf.end_transaction) + { + if (rls_dbf.end_transaction(rls_db) < 0) + { + LM_ERR("in end_transaction\n"); + goto error; + } + } + +error: if(bstr.s) pkg_free(bstr.s);
@@ -430,6 +447,13 @@ done: pkg_free(buf); if(dialog) pkg_free(dialog); + + if (dbmode == RLS_DB_ONLY && rls_dbf.abort_transaction) + { + if (rls_dbf.abort_transaction(rls_db) < 0) + LM_ERR("in abort_transaction\n"); + } + return; }
@@ -769,7 +793,7 @@ int rls_handle_notify(struct sip_msg* msg, char* c1, char* c2)
if (dbmode == RLS_DB_ONLY && rlpres_dbf.start_transaction) { - if (rlpres_dbf.start_transaction(rlpres_db) < 0) + if (rlpres_dbf.start_transaction(rlpres_db, DB_LOCKING_WRITE) < 0) { LM_ERR("in start_transaction\n"); goto error; @@ -883,6 +907,7 @@ static void timer_send_full_state_notifies(int round) xmlDocPtr doc = NULL; xmlNodePtr service_node = NULL; int now = (int)time(NULL); + db_query_f query_fn = rls_dbf.query_lock ? rls_dbf.query_lock : rls_dbf.query;
query_cols[0] = &str_updated_col; query_vals[0].type = DB1_INT; @@ -925,7 +950,7 @@ static void timer_send_full_state_notifies(int round)
if (dbmode == RLS_DB_ONLY && rls_dbf.start_transaction) { - if (rls_dbf.start_transaction(rls_db) < 0) + if (rls_dbf.start_transaction(rls_db, DB_LOCKING_WRITE) < 0) { LM_ERR("in start_transaction\n"); goto done; @@ -933,7 +958,7 @@ static void timer_send_full_state_notifies(int round) }
/* Step 1: Find rls_watchers that require full-state notification */ - if (rls_dbf.query(rls_db, query_cols, 0, query_vals, result_cols, + if (query_fn(rls_db, query_cols, 0, query_vals, result_cols, 1, n_result_cols, 0, &result) < 0) { LM_ERR("in sql query\n"); @@ -1051,6 +1076,7 @@ static void timer_send_update_notifies(int round) pres_state_col, content_type_col; int n_result_cols= 0; db1_res_t *result= NULL; + db_query_f query_fn = rlpres_dbf.query_lock ? rlpres_dbf.query_lock : rlpres_dbf.query;
query_cols[0]= &str_updated_col; query_vals[0].type = DB1_INT; @@ -1080,14 +1106,14 @@ static void timer_send_update_notifies(int round)
if (dbmode == RLS_DB_ONLY && rlpres_dbf.start_transaction) { - if (rlpres_dbf.start_transaction(rlpres_db) < 0) + if (rlpres_dbf.start_transaction(rlpres_db, DB_LOCKING_WRITE) < 0) { LM_ERR("in start_transaction\n"); goto done; } }
- if(rlpres_dbf.query(rlpres_db, query_cols, 0, query_vals, result_cols, + if(query_fn(rlpres_db, query_cols, 0, query_vals, result_cols, 1, n_result_cols, &str_rlsubs_did_col, &result)< 0) { LM_ERR("in sql query\n"); diff --git a/modules_k/rls/rls_db.c b/modules_k/rls/rls_db.c index 97ac7d6..ee406de 100644 --- a/modules_k/rls/rls_db.c +++ b/modules_k/rls/rls_db.c @@ -124,6 +124,7 @@ int delete_expired_subs_rlsdb( void ) int i; subs_t subs; str rlsubs_did = {0, 0}; + db_query_f query_fn = rls_dbf.query_lock ? rls_dbf.query_lock : rls_dbf.query;
if(rls_db == NULL) { @@ -148,7 +149,16 @@ int delete_expired_subs_rlsdb( void ) result_cols[r_to_tag_col=n_result_cols++] = &str_to_tag_col; result_cols[r_from_tag_col=n_result_cols++] = &str_from_tag_col;
- if(rls_dbf.query(rls_db, query_cols, query_ops, query_vals, result_cols, + if (rls_dbf.start_transaction) + { + if (rls_dbf.start_transaction(rls_db, DB_LOCKING_WRITE) < 0) + { + LM_ERR("in start_transaction\n"); + goto error; + } + } + + if(query_fn(rls_db, query_cols, query_ops, query_vals, result_cols, n_query_cols, n_result_cols, 0, &result )< 0) { LM_ERR("Can't query db\n"); @@ -213,11 +223,28 @@ int delete_expired_subs_rlsdb( void ) }
rls_dbf.free_result(rls_db, result); + + if (rls_dbf.end_transaction) + { + if (rls_dbf.end_transaction(rls_db) < 0) + { + LM_ERR("in end_transaction\n"); + goto error; + } + } + return 1;
error: if (result) rls_dbf.free_result(rls_db, result); if (rlsubs_did.s) pkg_free(rlsubs_did.s); + + if (rls_dbf.abort_transaction) + { + if (rls_dbf.abort_transaction(rls_db) < 0) + LM_ERR("in abort_transaction\n"); + } + return -1; }
@@ -718,7 +745,8 @@ int get_dialog_subscribe_rlsdb(subs_t *subs) int nr_rows; int r_remote_cseq, r_local_cseq, r_version; char *r_pres_uri, *r_record_route; - + db_query_f query_fn = rls_dbf.query_lock ? rls_dbf.query_lock : rls_dbf.query; + if(rls_db == NULL) { LM_ERR("null database connection\n"); @@ -761,7 +789,7 @@ int get_dialog_subscribe_rlsdb(subs_t *subs) result_cols[version_col = n_result_cols++] = &str_version_col; result_cols[rroute_col = n_result_cols++] = &str_record_route_col;
- if(rls_dbf.query(rls_db, query_cols, 0, query_vals, result_cols, + if(query_fn(rls_db, query_cols, 0, query_vals, result_cols, n_query_cols, n_result_cols, 0, &result )< 0) { LM_ERR("Can't query db\n"); @@ -865,6 +893,7 @@ subs_t *get_dialog_notify_rlsdb(str callid, str to_tag, str from_tag) subs_t *dest; event_t parsed_event; str ev_sname; + db_query_f query_fn = rls_dbf.query_lock ? rls_dbf.query_lock : rls_dbf.query;
if(rls_db == NULL) { @@ -919,7 +948,7 @@ subs_t *get_dialog_notify_rlsdb(str callid, str to_tag, str from_tag) result_cols[r_version_col=n_result_cols++] = &str_version_col; result_cols[r_expires_col=n_result_cols++] = &str_expires_col;
- if(rls_dbf.query(rls_db, query_cols, 0, query_vals, result_cols, + if(query_fn(rls_db, query_cols, 0, query_vals, result_cols, n_query_cols, n_result_cols, 0, &result )< 0) { LM_ERR("Can't query db\n"); diff --git a/modules_k/rls/subscribe.c b/modules_k/rls/subscribe.c index f719754..c770cbf 100644 --- a/modules_k/rls/subscribe.c +++ b/modules_k/rls/subscribe.c @@ -639,6 +639,15 @@ int rls_handle_subscribe(struct sip_msg* msg, str watcher_user, str watcher_doma /* search if a stored dialog */ if ( dbmode == RLS_DB_ONLY ) { + if (rls_dbf.start_transaction) + { + if (rls_dbf.start_transaction(rls_db, DB_LOCKING_WRITE) < 0) + { + LM_ERR("in start_transaction\n"); + goto error; + } + } + rt = get_dialog_subscribe_rlsdb(&subs);
if (rt <= 0) @@ -646,6 +655,16 @@ int rls_handle_subscribe(struct sip_msg* msg, str watcher_user, str watcher_doma LM_DBG("subscription dialog not found for <%.*s@%.*s>\n", subs.watcher_user.len, subs.watcher_user.s, subs.watcher_domain.len, subs.watcher_domain.s); + + if (rls_dbf.end_transaction) + { + if (rls_dbf.end_transaction(rls_db) < 0) + { + LM_ERR("in end_transaction\n"); + goto error; + } + } + goto forpresence; } else if(rt>=400) @@ -657,6 +676,16 @@ int rls_handle_subscribe(struct sip_msg* msg, str watcher_user, str watcher_doma LM_ERR("while sending reply\n"); goto error; } + + if (rls_dbf.end_transaction) + { + if (rls_dbf.end_transaction(rls_db) < 0) + { + LM_ERR("in end_transaction\n"); + goto error; + } + } + ret = 0; goto stop; } @@ -670,6 +699,15 @@ int rls_handle_subscribe(struct sip_msg* msg, str watcher_user, str watcher_doma LM_ERR("while updating resource list subscription\n"); goto error; } + + if (rls_dbf.end_transaction) + { + if (rls_dbf.end_transaction(rls_db) < 0) + { + LM_ERR("in end_transaction\n"); + goto error; + } + } } else { @@ -795,6 +833,12 @@ error: if (rlsubs_did.s != NULL) pkg_free(rlsubs_did.s);
+ if (rls_dbf.abort_transaction) + { + if (rls_dbf.abort_transaction(rls_db) < 0) + LM_ERR("in abort_transaction\n"); + } + return err_ret; }