summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEmil Renner Berthing <esmil@mailme.dk>2012-12-05 18:52:24 +0100
committerEmil Renner Berthing <esmil@mailme.dk>2012-12-15 02:59:29 +0100
commitbfc5c4fc2c51aae7f7b08f925ba00251503e969f (patch)
tree1ad46ba448c1eb0b960a055cae66b8f9e2884291
parentf351a72d2d2872adc942d90bcdf36b046b2ed88f (diff)
downloadlem-bfc5c4fc2c51aae7f7b08f925ba00251503e969f.tar.gz
lem-bfc5c4fc2c51aae7f7b08f925ba00251503e969f.tar.xz
lem-bfc5c4fc2c51aae7f7b08f925ba00251503e969f.zip
pool: add thread pool
-rw-r--r--bin/lem.c11
-rw-r--r--bin/pool.c181
-rw-r--r--include/lem.h19
3 files changed, 211 insertions, 0 deletions
diff --git a/bin/lem.c b/bin/lem.c
index 21fbee5..d6cbabd 100644
--- a/bin/lem.c
+++ b/bin/lem.c
@@ -22,6 +22,9 @@
#include <signal.h>
#include <stdio.h>
#include <assert.h>
+#include <sys/time.h>
+#include <time.h>
+#include <pthread.h>
#include <lem.h>
#include <lualib.h>
@@ -258,6 +261,8 @@ runqueue_pop(EV_P_ struct ev_idle *w, int revents)
lem_exit(EXIT_FAILURE);
}
+#include "pool.c"
+
static int
queue_file(int argc, char *argv[], int fidx)
{
@@ -327,6 +332,12 @@ main(int argc, char *argv[])
rq.first = rq.last = 0;
rq.mask = LEM_INITIAL_QUEUESIZE - 1;
+ /* initialize threadpool */
+ if (pool_init(10)) {
+ lem_log_error("Error initializing threadpool");
+ goto error;
+ }
+
/* load file */
if (queue_file(argc, argv, 1))
goto error;
diff --git a/bin/pool.c b/bin/pool.c
new file mode 100644
index 0000000..e604c31
--- /dev/null
+++ b/bin/pool.c
@@ -0,0 +1,181 @@
+/*
+ * This file is part of LEM, a Lua Event Machine.
+ * Copyright 2011-2012 Emil Renner Berthing
+ *
+ * LEM is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * LEM is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with LEM. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#define POOL_THREADS_MIN 1
+static unsigned int pool_jobs;
+static unsigned int pool_threads;
+static time_t pool_delay;
+static pthread_mutex_t pool_mutex;
+static pthread_cond_t pool_cond;
+static struct lem_async *pool_head;
+static struct lem_async *pool_tail;
+static struct lem_async *pool_done;
+static struct ev_async pool_watch;
+
+static void *
+pool_threadfunc(void *arg)
+{
+ struct lem_async *a;
+ struct timespec ts;
+ struct timeval tv;
+
+ (void)arg;
+
+ while (1) {
+ gettimeofday(&tv, NULL);
+ ts.tv_sec = tv.tv_sec + pool_delay;
+ ts.tv_nsec = 1000*tv.tv_usec;
+
+ pthread_mutex_lock(&pool_mutex);
+ while ((a = pool_head) == NULL) {
+ if (pool_threads <= POOL_THREADS_MIN) {
+ pthread_cond_wait(&pool_cond, &pool_mutex);
+ continue;
+ }
+
+ if (pthread_cond_timedwait(&pool_cond, &pool_mutex, &ts)
+ && pool_threads > POOL_THREADS_MIN)
+ goto out;
+ }
+ pool_head = a->next;
+ pthread_mutex_unlock(&pool_mutex);
+
+ lem_debug("Running job %p", a);
+ a->work(a);
+ lem_debug("Bye %p", a);
+
+ pthread_mutex_lock(&pool_mutex);
+ a->next = pool_done;
+ pool_done = a;
+ pthread_mutex_unlock(&pool_mutex);
+
+ ev_async_send(LEM_ &pool_watch);
+ }
+out:
+ pool_threads--;
+ pthread_mutex_unlock(&pool_mutex);
+ return NULL;
+}
+
+static void
+pool_cb(EV_A_ struct ev_async *w, int revents)
+{
+ struct lem_async *a;
+ struct lem_async *next;
+
+ (void)revents;
+
+ pthread_mutex_lock(&pool_mutex);
+ a = pool_done;
+ pool_done = NULL;
+ pthread_mutex_unlock(&pool_mutex);
+
+ for (; a; a = next) {
+ pool_jobs--;
+ next = a->next;
+ a->reap(a);
+ }
+
+ if (pool_jobs == 0)
+ ev_async_stop(LEM_ w);
+}
+
+static int
+pool_init(time_t delay)
+{
+ int ret;
+
+ /*
+ pool_jobs = 0;
+ pool_threads = 0;
+ */
+ pool_delay = delay;
+ /*
+ pool_head = NULL;
+ pool_tail = NULL;
+ pool_done = NULL;
+ */
+
+ ev_async_init(&pool_watch, pool_cb);
+
+ ret = pthread_mutex_init(&pool_mutex, NULL);
+ if (ret) {
+ lem_log_error("error initializing mutex: %s",
+ strerror(ret));
+ return -1;
+ }
+
+ ret = pthread_cond_init(&pool_cond, NULL);
+ if (ret) {
+ lem_log_error("error initializing cond: %s",
+ strerror(ret));
+ return -1;
+ }
+
+ return 0;
+}
+
+void
+lem_async_put(struct lem_async *a)
+{
+ int ret = 0;
+
+ if (pool_jobs == 0)
+ ev_async_start(LEM_ &pool_watch);
+ pool_jobs++;
+
+ a->next = NULL;
+
+ pthread_mutex_lock(&pool_mutex);
+ if (pool_head == NULL) {
+ pool_head = a;
+ pool_tail = a;
+ } else {
+ pool_tail->next = a;
+ pool_tail = a;
+ }
+ if (pool_jobs > pool_threads) {
+ pool_threads++;
+ ret = 1;
+ }
+ pthread_mutex_unlock(&pool_mutex);
+ pthread_cond_signal(&pool_cond);
+ if (ret) {
+ pthread_attr_t attr;
+ pthread_t thread;
+
+ ret = pthread_attr_init(&attr);
+ if (ret)
+ goto error;
+
+ ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+ if (ret) {
+ pthread_attr_destroy(&attr);
+ goto error;
+ }
+
+ ret = pthread_create(&thread, &attr, pool_threadfunc, NULL);
+ pthread_attr_destroy(&attr);
+ if (ret)
+ goto error;
+ }
+ return;
+error:
+ lem_log_error("error spawning thread: %s", strerror(ret));
+ lem_exit(EXIT_FAILURE);
+}
diff --git a/include/lem.h b/include/lem.h
index 1cf35ee..803c72a 100644
--- a/include/lem.h
+++ b/include/lem.h
@@ -53,10 +53,29 @@ extern struct ev_loop *lem_loop;
# define LEM_
#endif
+struct lem_async {
+ lua_State *T;
+ void (*work)(struct lem_async *a);
+ void (*reap)(struct lem_async *a);
+ struct lem_async *next;
+};
+
void *lem_xmalloc(size_t size);
lua_State *lem_newthread(void);
void lem_forgetthread(lua_State *T);
void lem_queue(lua_State *T, int nargs);
void lem_exit(int status);
+void lem_async_put(struct lem_async *a);
+
+static inline void
+lem_async_do(struct lem_async *a, lua_State *T,
+ void (*work)(struct lem_async *),
+ void (*reap)(struct lem_async *))
+{
+ a->T = T;
+ a->work = work;
+ a->reap = reap;
+ lem_async_put(a);
+}
#endif