summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEmil Renner Berthing <esmil@mailme.dk>2013-02-09 11:24:39 +0100
committerEmil Renner Berthing <esmil@mailme.dk>2013-06-08 19:11:53 +0200
commit067263bb5049d3a5fde10af7eda685b9a0aa7593 (patch)
tree53d1b43383312572f4b1e7f3268891a1b0754e55
parent76f4dbf52e1bc85fd2d42fdb4c8caf0edc9df175 (diff)
downloadlem-067263bb5049d3a5fde10af7eda685b9a0aa7593.tar.gz
lem-067263bb5049d3a5fde10af7eda685b9a0aa7593.tar.xz
lem-067263bb5049d3a5fde10af7eda685b9a0aa7593.zip
queue: add blocking multiple producer/consumer queue
-rw-r--r--Makefile.in3
-rw-r--r--lem/queue.lua88
-rwxr-xr-xtest/queue.lua85
3 files changed, 175 insertions, 1 deletions
diff --git a/Makefile.in b/Makefile.in
index 5c7e8a5..f4b58e2 100644
--- a/Makefile.in
+++ b/Makefile.in
@@ -29,7 +29,8 @@ llibs = \
lem/lfs.lua \
lem/http.lua \
lem/http/server.lua \
- lem/hathaway.lua
+ lem/queue.lua \
+ lem/hathaway.lua
clibs = \
lem/utils.so \
diff --git a/lem/queue.lua b/lem/queue.lua
new file mode 100644
index 0000000..c3e5e42
--- /dev/null
+++ b/lem/queue.lua
@@ -0,0 +1,88 @@
+--
+-- This file is part of LEM, a Lua Event Machine.
+-- Copyright 2013 Emil Renner Berthing
+--
+-- LEM is free software: you can redistribute it and/or modify it
+-- under the terms of the GNU Lesser General Public License as
+-- published by the Free Software Foundation, either version 3 of
+-- the License, or (at your option) any later version.
+--
+-- LEM is distributed in the hope that it will be useful, but
+-- WITHOUT ANY WARRANTY; without even the implied warranty of
+-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+-- GNU Lesser General Public License for more details.
+--
+-- You should have received a copy of the GNU Lesser General Public
+-- License along with LEM. If not, see <http://www.gnu.org/licenses/>.
+--
+
+local utils = require 'lem.utils'
+
+local M = {}
+
+local Queue = {}
+Queue.__index = Queue
+M.Queue = Queue
+
+local remove = table.remove
+local thisthread, suspend, resume =
+ utils.thisthread, utils.suspend, utils.resume
+
+function Queue:put(v)
+ local n = self.n + 1
+ self.n = n
+
+ if n >= 1 then
+ self[n] = v
+ else
+ resume(remove(self, 1), v)
+ end
+end
+
+function Queue:signal(...)
+ local n = self.n
+ if n < 0 then
+ for i = 1, -n do
+ resume(self[i], ...)
+ self[i] = nil
+ end
+ self.n = 0
+ end
+end
+
+function Queue:get()
+ local n = self.n - 1
+ self.n = n
+
+ if n >= 0 then
+ return remove(self, 1)
+ end
+
+ self[-n] = thisthread()
+ return suspend()
+end
+
+function Queue:reset()
+ self:signal(nil, 'reset')
+ for i = self.n, 1, -1 do
+ self[i] = nil
+ end
+ self.n = 0
+end
+
+local get = Queue.get
+function Queue:consume()
+ return get, self
+end
+
+function Queue:empty()
+ return self.n <= 0
+end
+
+function M.new()
+ return setmetatable({ n = 0 }, Queue)
+end
+
+return M
+
+-- vim: ts=2 sw=2 noet:
diff --git a/test/queue.lua b/test/queue.lua
new file mode 100755
index 0000000..54f025b
--- /dev/null
+++ b/test/queue.lua
@@ -0,0 +1,85 @@
+#!bin/lem
+--
+-- This file is part of LEM, a Lua Event Machine.
+-- Copyright 2011-2012 Emil Renner Berthing
+--
+-- LEM is free software: you can redistribute it and/or modify it
+-- under the terms of the GNU Lesser General Public License as
+-- published by the Free Software Foundation, either version 3 of
+-- the License, or (at your option) any later version.
+--
+-- LEM is distributed in the hope that it will be useful, but
+-- WITHOUT ANY WARRANTY; without even the implied warranty of
+-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+-- GNU Lesser General Public License for more details.
+--
+-- You should have received a copy of the GNU Lesser General Public
+-- License along with LEM. If not, see <http://www.gnu.org/licenses/>.
+--
+
+package.path = '?.lua'
+package.cpath = '?.so'
+
+local utils = require 'lem.utils'
+local queue = require 'lem.queue'
+
+local consumers = 0
+local function consumer(q, id)
+ consumers = consumers + 1
+ local sleeper = utils.newsleeper()
+ for v in q:consume() do
+ print(string.format('thread %d, n = %2d, received "%s"',
+ id, q.n, tostring(v)))
+ sleeper:sleep(0.04)
+ end
+ consumers = consumers - 1
+end
+
+local q, sleeper = queue.new(), utils.newsleeper()
+
+print "One consumer:\n"
+for i = 1, 5 do
+ q:put(i)
+end
+
+utils.spawn(consumer, q, 1)
+utils.yield()
+assert(consumers == 1)
+
+for i = 6, 10 do
+ q:put(i)
+ sleeper:sleep(0.1)
+end
+
+assert(q:empty())
+
+assert(consumers == 1)
+q:reset()
+utils.yield()
+assert(consumers == 0)
+
+print "\nFive consumers:\n"
+
+for i = 1, 10 do
+ q:put(i)
+end
+
+for i = 1, 5 do
+ utils.spawn(consumer, q, i)
+end
+utils.yield()
+assert(consumers == 5)
+
+for i = 11, 20 do
+ q:put(i)
+ sleeper:sleep(0.1)
+end
+
+assert(q:empty())
+assert(consumers == 5)
+
+q:signal(nil)
+utils.yield()
+assert(consumers == 0)
+
+-- vim: syntax=lua ts=2 sw=2 noet: