Module: sip-router Branch: master Commit: 9467ebbfdeea640923dbb77b4bea97ccda795903 URL: http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=9467ebbf...
Author: Daniel-Constantin Mierla miconda@gmail.com Committer: Daniel-Constantin Mierla miconda@gmail.com Date: Mon Apr 14 14:13:36 2014 +0200
core: framework for creating asynchronous pool of workers
- dedicated group of processes that can get tasks from other processes via memory pipe - react immediately, no time based polling - should reduce the need for other components to create extra processes for special handling
---
async_task.c | 220 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ async_task.h | 36 ++++++++++ 2 files changed, 256 insertions(+), 0 deletions(-)
diff --git a/async_task.c b/async_task.c new file mode 100644 index 0000000..f5aa818 --- /dev/null +++ b/async_task.c @@ -0,0 +1,220 @@ +/** + * Copyright (C) 2014 Daniel-Constantin Mierla (asipto.com) + * + * This file is part of Extensible SIP Router, a free SIP server. + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include <stdio.h> +#include <unistd.h> +#include <stdlib.h> +#include <string.h> + +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/un.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <fcntl.h> +#include <errno.h> + +#include "dprint.h" +#include "sr_module.h" +#include "ut.h" +#include "pt.h" +#include "cfg/cfg_struct.h" + + +#include "async_task.h" + +static int _async_task_workers = 0; +static int _async_task_sockets[2]; + +int async_task_run(int idx); + +/** + * + */ +int async_task_init_sockets(void) +{ + if (socketpair(PF_UNIX, SOCK_DGRAM, 0, _async_task_sockets) < 0) { + LM_ERR("opening tasks dgram socket pair\n"); + return -1; + } + LM_DBG("inter-process event notification sockets initialized\n"); + return 0; +} + +/** + * + */ +void async_task_close_sockets_child(void) +{ + LM_DBG("closing the notification socket used by children\n"); + close(_async_task_sockets[1]); +} + +/** + * + */ +void async_task_close_sockets_parent(void) +{ + LM_DBG("closing the notification socket used by parent\n"); + close(_async_task_sockets[0]); +} + +/** + * + */ +int async_task_init(void) +{ + LM_DBG("start initializing asynk task framework\n"); + if(_async_task_workers<=0) + return 0; + + /* advertise new processes to core */ + register_procs(_async_task_workers); + + /* advertise new processes to cfg framework */ + cfg_register_child(_async_task_workers); + + return 0; +} + +/** + * + */ +int async_task_initialized(void) +{ + if(_async_task_workers<=0) + return 0; + return 1; +} + +/** + * + */ +int async_task_child_init(int rank) +{ + int pid; + int i; + + LM_DBG("child initializing asynk task framework\n"); + + if(_async_task_workers<=0) + return 0; + + if (rank==PROC_INIT) { + if(async_task_init_sockets()<0) { + LM_ERR("failed to initialize tasks sockets\n"); + return -1; + } + return 0; + } + + if(rank>0) { + async_task_close_sockets_parent(); + return 0; + } + if (rank!=PROC_MAIN) + return 0; + + for(i=0; i<_async_task_workers; i++) { + pid=fork_process(PROC_RPC, "Async Task Worker", 1); + if (pid<0) + return -1; /* error */ + if(pid==0) { + /* child */ + + /* initialize the config framework */ + if (cfg_child_init()) + return -1; + /* main function for workers */ + if(async_task_run(i+1)<0) { + LM_ERR("failed to initialize task worker process: %d\n", i); + return -1; + } + } + } + + return 0; +} + +/** + * + */ +int async_task_set_workers(int n) +{ + if(_async_task_workers>0) { + LM_WARN("task workers already set\n"); + return 0; + } + if(n<=0) + return 0; + + _async_task_workers = n; + + return 0; +} + +/** + * + */ +int async_task_push(async_task_t *task) +{ + int len; + + if(_async_task_workers<=0) + return 0; + + len = write(_async_task_sockets[1], &task, sizeof(async_task_t*)); + if(len<=0) { + LM_ERR("failed to pass the task to asynk workers\n"); + return -1; + } + LM_DBG("task sent [%p]\n", task); + return 0; +} + +/** + * + */ +int async_task_run(int idx) +{ + async_task_t *ptask; + int received; + + LM_DBG("async task worker %d ready\n", idx); + + for( ; ; ) { + if ((received = recvfrom(_async_task_sockets[0], + &ptask, sizeof(async_task_t*), + 0, NULL, 0)) < 0) { + LM_ERR("failed to received task (%d: %s)\n", errno, strerror(errno)); + continue; + } + if(received != sizeof(async_task_t*)) { + LM_ERR("invalid task size %d\n", received); + continue; + } + if(ptask->exec!=NULL) { + LM_DBG("task executed [%p] (%p/%p)\n", ptask, + ptask->exec, ptask->param); + ptask->exec(ptask->param); + } + shm_free(ptask); + } + + return 0; +} diff --git a/async_task.h b/async_task.h new file mode 100644 index 0000000..4c317a1 --- /dev/null +++ b/async_task.h @@ -0,0 +1,36 @@ +/** + * Copyright (C) 2014 Daniel-Constantin Mierla (asipto.com) + * + * This file is part of Extensible SIP Router, a free SIP server. + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + + +#ifndef _ASYNC_TASK_H_ +#define _ASYNC_TASK_H_ + +typedef void (*async_cbe_t)(void *p); + +typedef struct _async_task { + async_cbe_t exec; + void *param; +} async_task_t; + +int async_task_init(void); +int async_task_child_init(int rank); +int async_task_initialized(void); +int async_task_set_workers(int n); +int async_task_push(async_task_t *task); + +#endif