diff options
author | Emil Renner Berthing <esmil@mailme.dk> | 2012-12-15 22:58:18 +0100 |
---|---|---|
committer | Emil Renner Berthing <esmil@mailme.dk> | 2012-12-17 10:11:06 +0100 |
commit | 9066bb7971a8ed5b0a91c4fdfe57d7312993022e (patch) | |
tree | 9b3dafcb77dd52a28cb060758b7acdb14e95e29b | |
parent | 36babe570b4b1718f1ae8e16940a033d896225ef (diff) | |
download | lem-9066bb7971a8ed5b0a91c4fdfe57d7312993022e.tar.gz lem-9066bb7971a8ed5b0a91c4fdfe57d7312993022e.tar.xz lem-9066bb7971a8ed5b0a91c4fdfe57d7312993022e.zip |
pool: make threadpool dynamically configurable
-rw-r--r-- | bin/lem.c | 2 | ||||
-rw-r--r-- | bin/pool.c | 85 | ||||
-rw-r--r-- | include/lem.h | 1 | ||||
-rw-r--r-- | lem/utils.c | 29 |
4 files changed, 88 insertions, 29 deletions
@@ -334,7 +334,7 @@ main(int argc, char *argv[]) rq.mask = LEM_INITIAL_QUEUESIZE - 1; /* initialize threadpool */ - if (pool_init(10)) { + if (pool_init()) { lem_log_error("Error initializing threadpool"); goto error; } @@ -16,8 +16,9 @@ * along with LEM. If not, see <http://www.gnu.org/licenses/>. */ -#define POOL_THREADS_MIN 1 static unsigned int pool_jobs; +static unsigned int pool_min; +static unsigned int pool_max; static unsigned int pool_threads; static time_t pool_delay; static pthread_mutex_t pool_mutex; @@ -54,13 +55,13 @@ pool_threadfunc(void *arg) pthread_mutex_lock(&pool_mutex); while ((a = pool_head) == NULL) { - if (pool_threads <= POOL_THREADS_MIN) { + if (pool_threads <= pool_min) { pthread_cond_wait(&pool_cond, &pool_mutex); continue; } if (pthread_cond_timedwait(&pool_cond, &pool_mutex, &ts) - && pool_threads > POOL_THREADS_MIN) + && pool_threads > pool_min) goto out; } pool_head = a->next; @@ -107,15 +108,17 @@ pool_cb(EV_A_ struct ev_async *w, int revents) } static int -pool_init(time_t delay) +pool_init(void) { int ret; /* pool_jobs = 0; + pool_min = 0; pool_threads = 0; */ - pool_delay = delay; + pool_max = INT_MAX; + pool_delay = 10; /* pool_head = NULL; pool_tail = NULL; @@ -143,10 +146,38 @@ pool_init(time_t delay) return 0; } +static void +pool_spawnthread(void) +{ + pthread_attr_t attr; + pthread_t thread; + int ret; + + ret = pthread_attr_init(&attr); + if (ret) + goto error; + + ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + if (ret) { + pthread_attr_destroy(&attr); + goto error; + } + + ret = pthread_create(&thread, &attr, pool_threadfunc, NULL); + pthread_attr_destroy(&attr); + if (ret) + goto error; + + return; +error: + lem_log_error("error spawning thread: %s", strerror(ret)); + lem_exit(EXIT_FAILURE); +} + void lem_async_put(struct lem_async *a) { - int ret = 0; + int spawn = 0; if (pool_jobs == 0) ev_async_start(LEM_ &pool_watch); @@ -162,33 +193,31 @@ lem_async_put(struct lem_async *a) pool_tail->next = a; pool_tail = a; } - if (pool_jobs > pool_threads) { + if (pool_jobs > pool_threads && pool_threads < pool_max) { pool_threads++; - ret = 1; + spawn = 1; } pthread_mutex_unlock(&pool_mutex); pthread_cond_signal(&pool_cond); - if (ret) { - pthread_attr_t attr; - pthread_t thread; + if (spawn) + pool_spawnthread(); +} - ret = pthread_attr_init(&attr); - if (ret) - goto error; +void +lem_async_config(int delay, int min, int max) +{ + int spawn; - ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - if (ret) { - pthread_attr_destroy(&attr); - goto error; - } + pool_delay = (time_t)delay; + pool_min = min; + pool_max = max; - ret = pthread_create(&thread, &attr, pool_threadfunc, NULL); - pthread_attr_destroy(&attr); - if (ret) - goto error; - } - return; -error: - lem_log_error("error spawning thread: %s", strerror(ret)); - lem_exit(EXIT_FAILURE); + pthread_mutex_lock(&pool_mutex); + spawn = min - pool_threads; + if (spawn > 0) + pool_threads = min; + pthread_mutex_unlock(&pool_mutex); + + for (; spawn > 0; spawn--) + pool_spawnthread(); } diff --git a/include/lem.h b/include/lem.h index 803c72a..68450ec 100644 --- a/include/lem.h +++ b/include/lem.h @@ -66,6 +66,7 @@ void lem_forgetthread(lua_State *T); void lem_queue(lua_State *T, int nargs); void lem_exit(int status); void lem_async_put(struct lem_async *a); +void lem_async_config(int delay, int min, int max); static inline void lem_async_do(struct lem_async *a, lua_State *T, diff --git a/lem/utils.c b/lem/utils.c index 02bff8f..9c09c28 100644 --- a/lem/utils.c +++ b/lem/utils.c @@ -193,6 +193,31 @@ utils_updatenow(lua_State *T) return 1; } +static int +utils_poolconfig(lua_State *T) +{ + lua_Number n; + int delay; + int min; + int max; + + n = luaL_checknumber(T, 1); + delay = (int)n; + luaL_argcheck(T, (lua_Number)delay == n && delay >= 0, + 1, "not an integer in proper range"); + n = luaL_checknumber(T, 2); + min = (int)n; + luaL_argcheck(T, (lua_Number)min == n && min >= 0, + 2, "not an integer in proper range"); + n = luaL_checknumber(T, 3); + max = (int)n; + luaL_argcheck(T, (lua_Number)max == n && max > 0 && max >= min, + 3, "not an integer in proper range"); + + lem_async_config(delay, min, max); + return 0; +} + int luaopen_lem_utils(lua_State *L) { @@ -244,5 +269,9 @@ luaopen_lem_utils(lua_State *L) lua_pushcfunction(L, utils_updatenow); lua_setfield(L, -2, "updatenow"); + /* set poolconfig function */ + lua_pushcfunction(L, utils_poolconfig); + lua_setfield(L, -2, "poolconfig"); + return 1; } |