summaryrefslogblamecommitdiffstats
path: root/bin/pool.c
blob: 1d8dabd4191d834d6142acfc284209678a8d0e87 (plain) (tree)
1
2
3
4
5
6
7
8
9

                                                 
                                      
  

                                                                 


                                                                 

                                                             
                                                                
                                                      
  

                                                                       

   
                              

                             


                                  










                                                                                  





















                                                    
                                                       




                                                                                
                                                                   








                                                  
                                 

                                    
                                   









                                                
                                              





                               
                         

                         
                           



                               



                                   


                           
                                       


          
               




                      
                     

                         

                           





                         

                                                  
                                            
                          

                                                    

                                       
                  
                                                            













                                                            



























                                                                          
    
                                  
 
                      














                                                 
                                                                  
                               
                          


                                          


                                   
 



                                             
 


                                   
 







                                          
 
/*
 * This file is part of LEM, a Lua Event Machine.
 * Copyright 2012 Emil Renner Berthing
 *
 * LEM is free software: you can redistribute it and/or modify it
 * under the terms of the GNU Lesser General Public License as
 * published by the Free Software Foundation, either version 3 of
 * the License, or (at your option) any later version.
 *
 * LEM is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with LEM.  If not, see <http://www.gnu.org/licenses/>.
 */

static unsigned int pool_jobs;
static unsigned int pool_min;
static unsigned int pool_max;
static unsigned int pool_threads;
static time_t pool_delay;
static pthread_mutex_t pool_mutex;
#if _POSIX_SPIN_LOCKS >= 200112L
static pthread_spinlock_t pool_dlock;
#define pool_done_init()   pthread_spin_init(&pool_dlock, PTHREAD_PROCESS_PRIVATE)
#define pool_done_lock()   pthread_spin_lock(&pool_dlock)
#define pool_done_unlock() pthread_spin_unlock(&pool_dlock)
#else
static pthread_mutex_t pool_dlock;
#define pool_done_init()   pthread_mutex_init(&pool_dlock, NULL)
#define pool_done_lock()   pthread_mutex_lock(&pool_dlock)
#define pool_done_unlock() pthread_mutex_unlock(&pool_dlock)
#endif
static pthread_cond_t pool_cond;
static struct lem_async *pool_head;
static struct lem_async *pool_tail;
static struct lem_async *pool_done;
static struct ev_async pool_watch;

static void *
pool_threadfunc(void *arg)
{
	struct lem_async *a;
	struct timespec ts;
	struct timeval tv;

	(void)arg;

	while (1) {
		gettimeofday(&tv, NULL);
		ts.tv_sec  = tv.tv_sec + pool_delay;
		ts.tv_nsec = 1000*tv.tv_usec;

		pthread_mutex_lock(&pool_mutex);
		while ((a = pool_head) == NULL) {
			if (pool_threads <= pool_min) {
				pthread_cond_wait(&pool_cond, &pool_mutex);
				continue;
			}

			if (pthread_cond_timedwait(&pool_cond, &pool_mutex, &ts)
					&& pool_threads > pool_min)
				goto out;
		}
		pool_head = a->next;
		pthread_mutex_unlock(&pool_mutex);

		lem_debug("Running job %p", a);
		a->work(a);
		lem_debug("Bye %p", a);

		pool_done_lock();
		a->next = pool_done;
		pool_done = a;
		pool_done_unlock();

		ev_async_send(LEM_ &pool_watch);
	}
out:
	pool_threads--;
	pthread_mutex_unlock(&pool_mutex);
	return NULL;
}

static void
pool_cb(EV_P_ struct ev_async *w, int revents)
{
	struct lem_async *a;
	struct lem_async *next;

	(void)revents;

	pool_done_lock();
	a = pool_done;
	pool_done = NULL;
	pool_done_unlock();

	for (; a; a = next) {
		pool_jobs--;
		next = a->next;
		if (a->reap)
			a->reap(a);
		else
			free(a);
	}

	if (pool_jobs == 0)
		ev_async_stop(EV_A_ w);
}

static int
pool_init(void)
{
	int ret;

	/*
	pool_jobs = 0;
	pool_min = 0;
	pool_threads = 0;
	*/
	pool_max = INT_MAX;
	pool_delay = 10;
	/*
	pool_head = NULL;
	pool_tail = NULL;
	pool_done = NULL;
	*/

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wstrict-aliasing"
	ev_async_init(&pool_watch, pool_cb);
#pragma GCC diagnostic pop

	ret = pthread_mutex_init(&pool_mutex, NULL);
	if (ret == 0)
		ret = pool_done_init();
	if (ret) {
		lem_log_error("error initializing lock: %s",
				strerror(ret));
		return -1;
	}

	ret = pthread_cond_init(&pool_cond, NULL);
	if (ret) {
		lem_log_error("error initializing cond: %s",
				strerror(ret));
		return -1;
	}

	return 0;
}

static void
pool_spawnthread(void)
{
	pthread_attr_t attr;
	pthread_t thread;
	int ret;

	ret = pthread_attr_init(&attr);
	if (ret)
		goto error;

	ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
	if (ret) {
		pthread_attr_destroy(&attr);
		goto error;
	}

	ret = pthread_create(&thread, &attr, pool_threadfunc, NULL);
	pthread_attr_destroy(&attr);
	if (ret)
		goto error;

	return;
error:
	lem_log_error("error spawning thread: %s", strerror(ret));
	lem_exit(EXIT_FAILURE);
}

void
lem_async_run(struct lem_async *a)
{
	int spawn = 0;

	if (pool_jobs == 0)
		ev_async_start(LEM_ &pool_watch);
	pool_jobs++;

	a->next = NULL;

	pthread_mutex_lock(&pool_mutex);
	if (pool_head == NULL) {
		pool_head = a;
		pool_tail = a;
	} else {
		pool_tail->next = a;
		pool_tail = a;
	}
	if (pool_jobs > pool_threads && pool_threads < pool_max) {
		pool_threads++;
		spawn = 1;
	}
	pthread_mutex_unlock(&pool_mutex);
	pthread_cond_signal(&pool_cond);
	if (spawn)
		pool_spawnthread();
}

void
lem_async_config(int delay, int min, int max)
{
	int spawn;

	pool_delay = (time_t)delay;
	pool_min = min;
	pool_max = max;

	pthread_mutex_lock(&pool_mutex);
	spawn = min - pool_threads;
	if (spawn > 0)
		pool_threads = min;
	pthread_mutex_unlock(&pool_mutex);

	for (; spawn > 0; spawn--)
		pool_spawnthread();
}