Module: sip-router Branch: master Commit: 244d4d4729b295be999acb3a4ca4cf156a9bfbbf URL: http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=244d4d47...
Author: Andrei Pelinescu-Onciul andrei@iptel.org Committer: Andrei Pelinescu-Onciul andrei@iptel.org Date: Thu Sep 24 18:06:22 2009 +0200
mi_rpc: support for async mi commands
async mi commands can now be executed, if the underlying rpc transport module supports it (e.g. xmlrpc).
---
modules/mi_rpc/mi_rpc_mod.c | 135 ++++++++++++++++++++++++++++++++++++++----- 1 files changed, 120 insertions(+), 15 deletions(-)
diff --git a/modules/mi_rpc/mi_rpc_mod.c b/modules/mi_rpc/mi_rpc_mod.c index 9c68839..b678f25 100644 --- a/modules/mi_rpc/mi_rpc_mod.c +++ b/modules/mi_rpc/mi_rpc_mod.c @@ -93,7 +93,7 @@ struct mi_root *mi_rpc_read_params(rpc_t *rpc, void *ctx) { name.s = 0; name.len = 0; - + if(value.len>=2 && value.s[0]=='-' && value.s[1]=='-') { /* name */ @@ -256,12 +256,66 @@ static int mi_rpc_print_tree(rpc_t* rpc, void* ctx, struct mi_root *tree, return 0; }
+ + +/* structure used to pack the rpc dyn. ctx and the print mode */ +struct mi_rpc_handler_param{ + rpc_delayed_ctx_t* dctx; + enum mi_rpc_print_mode mode; +}; + +/* send reply and close async context */ +static void mi_rpc_async_close(struct mi_root* mi_rpl, + struct mi_handler* mi_h, + int done) +{ + rpc_delayed_ctx_t* dctx; + rpc_t* rpc; + void* c; + enum mi_rpc_print_mode mode; + + dctx=0; + if (done){ + if (mi_h->param==0){ + BUG("null param\n"); + shm_free(mi_h); + goto error; + } + dctx=((struct mi_rpc_handler_param*)mi_h->param)->dctx; + if (dctx==0){ + BUG("null dctx\n"); + shm_free(mi_h->param); + shm_free(mi_h); + mi_h->param=0; + goto error; + } + mode=((struct mi_rpc_handler_param*)mi_h->param)->mode; + rpc=&dctx->rpc; + c=dctx->reply_ctx; + + mi_rpc_print_tree(rpc, c, mi_rpl, mode); + + rpc->delayed_ctx_close(dctx); + shm_free(mi_h->param); + mi_h->param=0; + shm_free(mi_h); + } /* else: no provisional support => do nothing */ +error: + if (mi_rpl) + free_mi_tree(mi_rpl); + return; +} + + + static void rpc_mi_exec(rpc_t *rpc, void *ctx, enum mi_rpc_print_mode mode) { str cmd; struct mi_cmd *mic; struct mi_root *mi_req; struct mi_root *mi_rpl; + struct mi_handler* mi_async_h; + struct mi_rpc_handler_param* mi_handler_param;
if (rpc->scan(ctx, "S", &cmd) < 1) { @@ -269,7 +323,11 @@ static void rpc_mi_exec(rpc_t *rpc, void *ctx, enum mi_rpc_print_mode mode) rpc->fault(ctx, 500, "command parameter missing"); return; } - + + mi_async_h=0; + mi_req = 0; + mi_rpl=0; + mic = lookup_mi_cmd(cmd.s, cmd.len); if(mic==0) { @@ -280,12 +338,15 @@ static void rpc_mi_exec(rpc_t *rpc, void *ctx, enum mi_rpc_print_mode mode)
if (mic->flags&MI_ASYNC_RPL_FLAG) { - LM_ERR("async mi cmd support not implemented yet\n"); - rpc->fault(ctx, 500, "async my cmd not implemented yet"); - return; + if (rpc->capabilities==0 || + !(rpc->capabilities(ctx) & RPC_DELAYED_REPLY)) + { + rpc->fault(ctx, 500, + "this rpc transport does not support async mode"); + return; + } }
- mi_req = 0; if(!(mic->flags&MI_NO_INPUT_FLAG)) { mi_req = mi_rpc_read_params(rpc, ctx); @@ -293,29 +354,73 @@ static void rpc_mi_exec(rpc_t *rpc, void *ctx, enum mi_rpc_print_mode mode) { LM_ERR("cannot parse parameters\n"); rpc->fault(ctx, 500, "cannot parse parameters"); - return; + goto error; } + if (mic->flags&MI_ASYNC_RPL_FLAG) + { + /* build mi async handler */ + mi_handler_param=shm_malloc(sizeof(*mi_handler_param)); + if (mi_handler_param==0){ + rpc->fault(ctx, 500, "out of memory"); + return; + } + mi_async_h=shm_malloc(sizeof(*mi_async_h)); + if (mi_async_h==0){ + shm_free(mi_handler_param); + mi_handler_param=0; + rpc->fault(ctx, 500, "out of memory"); + return; + } + memset(mi_async_h, 0, sizeof(*mi_async_h)); + mi_async_h->handler_f=mi_rpc_async_close; + mi_handler_param->mode=mode; + mi_handler_param->dctx=rpc->delayed_ctx_new(ctx); + if (mi_handler_param->dctx==0){ + rpc->fault(ctx, 500, "internal error: async ctx" + " creation failed"); + goto error; + } + /* switch context, since replies are not allowed anymore on the + original one */ + rpc=&mi_handler_param->dctx->rpc; + ctx=mi_handler_param->dctx->reply_ctx; + mi_async_h->param=mi_handler_param; + } + mi_req->async_hdl=mi_async_h; } mi_rpl=run_mi_cmd(mic, mi_req);
if(mi_rpl == 0) { rpc->fault(ctx, 500, "execution failed"); - if (mi_req) free_mi_tree(mi_req); - return; + goto error; }
if (mi_rpl!=MI_ROOT_ASYNC_RPL) { mi_rpc_print_tree(rpc, ctx, mi_rpl, mode); + goto end; + }else if (mi_async_h==0){ + /* async reply, but command not listed as async */ + rpc->fault(ctx, 500, "bad mi command: unexpected async reply"); + goto error; + } + mi_async_h=0; /* don't delete it */ +end: +error: + if (mi_req) + free_mi_tree(mi_req); + if (mi_rpl && mi_rpl!=MI_ROOT_ASYNC_RPL) free_mi_tree(mi_rpl); - if (mi_req) free_mi_tree(mi_req); - return; + if (mi_async_h){ + if (mi_async_h->param){ + if (((struct mi_rpc_handler_param*)mi_async_h->param)->dctx) + rpc->delayed_ctx_close(((struct mi_rpc_handler_param*) + mi_async_h->param)->dctx); + shm_free(mi_async_h->param); + } + shm_free(mi_async_h); } - - /* async cmd -- not yet */ - rpc->fault(ctx, 500, "no async handling yet"); - if (mi_req) free_mi_tree(mi_req); return; }