From 067263bb5049d3a5fde10af7eda685b9a0aa7593 Mon Sep 17 00:00:00 2001 From: Emil Renner Berthing Date: Sat, 9 Feb 2013 11:24:39 +0100 Subject: queue: add blocking multiple producer/consumer queue --- Makefile.in | 3 +- lem/queue.lua | 88 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ test/queue.lua | 85 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 175 insertions(+), 1 deletion(-) create mode 100644 lem/queue.lua create mode 100755 test/queue.lua diff --git a/Makefile.in b/Makefile.in index 5c7e8a5..f4b58e2 100644 --- a/Makefile.in +++ b/Makefile.in @@ -29,7 +29,8 @@ llibs = \ lem/lfs.lua \ lem/http.lua \ lem/http/server.lua \ - lem/hathaway.lua + lem/queue.lua \ + lem/hathaway.lua clibs = \ lem/utils.so \ diff --git a/lem/queue.lua b/lem/queue.lua new file mode 100644 index 0000000..c3e5e42 --- /dev/null +++ b/lem/queue.lua @@ -0,0 +1,88 @@ +-- +-- This file is part of LEM, a Lua Event Machine. +-- Copyright 2013 Emil Renner Berthing +-- +-- LEM is free software: you can redistribute it and/or modify it +-- under the terms of the GNU Lesser General Public License as +-- published by the Free Software Foundation, either version 3 of +-- the License, or (at your option) any later version. +-- +-- LEM is distributed in the hope that it will be useful, but +-- WITHOUT ANY WARRANTY; without even the implied warranty of +-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +-- GNU Lesser General Public License for more details. +-- +-- You should have received a copy of the GNU Lesser General Public +-- License along with LEM. If not, see . +-- + +local utils = require 'lem.utils' + +local M = {} + +local Queue = {} +Queue.__index = Queue +M.Queue = Queue + +local remove = table.remove +local thisthread, suspend, resume = + utils.thisthread, utils.suspend, utils.resume + +function Queue:put(v) + local n = self.n + 1 + self.n = n + + if n >= 1 then + self[n] = v + else + resume(remove(self, 1), v) + end +end + +function Queue:signal(...) + local n = self.n + if n < 0 then + for i = 1, -n do + resume(self[i], ...) + self[i] = nil + end + self.n = 0 + end +end + +function Queue:get() + local n = self.n - 1 + self.n = n + + if n >= 0 then + return remove(self, 1) + end + + self[-n] = thisthread() + return suspend() +end + +function Queue:reset() + self:signal(nil, 'reset') + for i = self.n, 1, -1 do + self[i] = nil + end + self.n = 0 +end + +local get = Queue.get +function Queue:consume() + return get, self +end + +function Queue:empty() + return self.n <= 0 +end + +function M.new() + return setmetatable({ n = 0 }, Queue) +end + +return M + +-- vim: ts=2 sw=2 noet: diff --git a/test/queue.lua b/test/queue.lua new file mode 100755 index 0000000..54f025b --- /dev/null +++ b/test/queue.lua @@ -0,0 +1,85 @@ +#!bin/lem +-- +-- This file is part of LEM, a Lua Event Machine. +-- Copyright 2011-2012 Emil Renner Berthing +-- +-- LEM is free software: you can redistribute it and/or modify it +-- under the terms of the GNU Lesser General Public License as +-- published by the Free Software Foundation, either version 3 of +-- the License, or (at your option) any later version. +-- +-- LEM is distributed in the hope that it will be useful, but +-- WITHOUT ANY WARRANTY; without even the implied warranty of +-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +-- GNU Lesser General Public License for more details. +-- +-- You should have received a copy of the GNU Lesser General Public +-- License along with LEM. If not, see . +-- + +package.path = '?.lua' +package.cpath = '?.so' + +local utils = require 'lem.utils' +local queue = require 'lem.queue' + +local consumers = 0 +local function consumer(q, id) + consumers = consumers + 1 + local sleeper = utils.newsleeper() + for v in q:consume() do + print(string.format('thread %d, n = %2d, received "%s"', + id, q.n, tostring(v))) + sleeper:sleep(0.04) + end + consumers = consumers - 1 +end + +local q, sleeper = queue.new(), utils.newsleeper() + +print "One consumer:\n" +for i = 1, 5 do + q:put(i) +end + +utils.spawn(consumer, q, 1) +utils.yield() +assert(consumers == 1) + +for i = 6, 10 do + q:put(i) + sleeper:sleep(0.1) +end + +assert(q:empty()) + +assert(consumers == 1) +q:reset() +utils.yield() +assert(consumers == 0) + +print "\nFive consumers:\n" + +for i = 1, 10 do + q:put(i) +end + +for i = 1, 5 do + utils.spawn(consumer, q, i) +end +utils.yield() +assert(consumers == 5) + +for i = 11, 20 do + q:put(i) + sleeper:sleep(0.1) +end + +assert(q:empty()) +assert(consumers == 5) + +q:signal(nil) +utils.yield() +assert(consumers == 0) + +-- vim: syntax=lua ts=2 sw=2 noet: -- cgit v1.2.1