summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEmil Renner Berthing <esmil@mailme.dk>2012-12-09 03:27:02 +0100
committerEmil Renner Berthing <esmil@mailme.dk>2012-12-17 10:11:06 +0100
commit9e69d37f9e7a00b9902ebe5618904871397c7999 (patch)
treeb76203d2288f18947d62441cd16c70ec4b2a0783
parentbf8c9594b6492af209b7d62c67d7f165d85b46e8 (diff)
downloadlem-9e69d37f9e7a00b9902ebe5618904871397c7999.tar.gz
lem-9e69d37f9e7a00b9902ebe5618904871397c7999.tar.xz
lem-9e69d37f9e7a00b9902ebe5618904871397c7999.zip
streams: add File type
-rw-r--r--lem/streams.lua1
-rw-r--r--lem/streams/core.c24
-rw-r--r--lem/streams/file.c196
-rw-r--r--lem/streams/stream.c157
-rwxr-xr-xtest/fread.lua72
5 files changed, 416 insertions, 34 deletions
diff --git a/lem/streams.lua b/lem/streams.lua
index f4f9d4f..51d9e4f 100644
--- a/lem/streams.lua
+++ b/lem/streams.lua
@@ -43,6 +43,7 @@ do
end
M.IStream.read = M.reader(M.IStream.readp)
+ M.File.read = M.reader(M.File.readp)
end
return M
diff --git a/lem/streams/core.c b/lem/streams/core.c
index e9ed9f4..b71dc06 100644
--- a/lem/streams/core.c
+++ b/lem/streams/core.c
@@ -38,6 +38,7 @@
#include <streams.h>
#include "sendfile.c"
+#include "file.c"
#include "stream.c"
#include "server.c"
#include "tcp.c"
@@ -168,6 +169,26 @@ luaopen_lem_streams_core(lua_State *L)
/* insert table */
lua_setfield(L, -2, "OStream");
+ /* create File metatable */
+ lua_newtable(L);
+ /* mt.__index = mt */
+ lua_pushvalue(L, -1);
+ lua_setfield(L, -2, "__index");
+ /* mt.__gc = <file_gc> */
+ lua_pushcfunction(L, file_gc);
+ lua_setfield(L, -2, "__gc");
+ /* mt.closed = <file_closed> */
+ lua_pushcfunction(L, file_closed);
+ lua_setfield(L, -2, "closed");
+ /* mt.close = <file_close> */
+ lua_pushcfunction(L, file_close);
+ lua_setfield(L, -2, "close");
+ /* mt.readp = <file_readp> */
+ lua_pushcfunction(L, file_readp);
+ lua_setfield(L, -2, "readp");
+ /* insert table */
+ lua_setfield(L, -2, "File");
+
/* create metatable for server objects */
lua_newtable(L);
/* mt.__index = mt */
@@ -204,7 +225,8 @@ luaopen_lem_streams_core(lua_State *L)
/* insert open function */
lua_getfield(L, -1, "IStream"); /* upvalue 1 = IStream */
lua_getfield(L, -2, "OStream"); /* upvalue 2 = OStream */
- lua_pushcclosure(L, stream_open, 2);
+ lua_getfield(L, -3, "File"); /* upvalue 3 = File */
+ lua_pushcclosure(L, stream_open, 3);
lua_setfield(L, -2, "open");
/* insert popen function */
lua_getfield(L, -1, "IStream"); /* upvalue 1 = IStream */
diff --git a/lem/streams/file.c b/lem/streams/file.c
new file mode 100644
index 0000000..85fb5a0
--- /dev/null
+++ b/lem/streams/file.c
@@ -0,0 +1,196 @@
+/*
+ * This file is part of LEM, a Lua Event Machine.
+ * Copyright 2011-2012 Emil Renner Berthing
+ *
+ * LEM is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * LEM is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with LEM. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+struct file {
+ struct lem_async a;
+ int fd;
+ int ret;
+ struct lem_parser *p;
+ struct lem_inputbuf buf;
+};
+
+static struct file *
+file_new(lua_State *T, int fd, int mt)
+{
+ struct file *f;
+
+ /* create userdata and set the metatable */
+ f = lua_newuserdata(T, sizeof(struct file));
+ lua_pushvalue(T, mt);
+ lua_setmetatable(T, -2);
+
+ /* initialize userdata */
+ f->a.T = NULL;
+ f->fd = fd;
+ f->buf.start = f->buf.end = 0;
+
+ return f;
+}
+
+static int
+file_gc(lua_State *T)
+{
+ struct file *f = lua_touserdata(T, 1);
+
+ lem_debug("collecting %p, fd = %d", f, f->fd);
+ if (f->fd >= 0)
+ (void)close(f->fd);
+
+ return 0;
+}
+
+static int
+file_closed(lua_State *T)
+{
+ struct file *f;
+
+ luaL_checktype(T, 1, LUA_TUSERDATA);
+ f = lua_touserdata(T, 1);
+ lua_pushboolean(T, f->fd < 0);
+ return 1;
+}
+
+static int
+file_close(lua_State *T)
+{
+ struct file *f;
+ int ret;
+
+ luaL_checktype(T, 1, LUA_TUSERDATA);
+ f = lua_touserdata(T, 1);
+ if (f->fd < 0) {
+ lua_pushnil(T);
+ lua_pushliteral(T, "already closed");
+ return 2;
+ }
+
+ if (f->a.T != NULL) {
+ lua_pushnil(T);
+ lua_pushliteral(T, "busy");
+ return 2;
+ }
+
+ lem_debug("collecting %d", f->fd);
+ ret = close(f->fd);
+ f->fd = -1;
+ if (ret) {
+ lua_pushnil(T);
+ lua_pushstring(T, strerror(errno));
+ return 2;
+ }
+
+ lua_pushboolean(T, 1);
+ return 1;
+}
+
+/*
+ * file:readp() method
+ */
+static void
+file_readp_work(struct lem_async *a)
+{
+ struct file *f = (struct file *)a;
+ ssize_t bytes = read(f->fd, f->buf.buf + f->buf.end,
+ LEM_INPUTBUF_SIZE - f->buf.end);
+
+ lem_debug("read %ld bytes from %d", bytes, f->fd);
+ if (bytes > 0) {
+ f->ret = 0;
+ f->buf.end += bytes;
+ } else if (bytes == 0) {
+ f->ret = -1;
+ } else {
+ close(f->fd);
+ f->fd = -1;
+ f->ret = errno;
+ }
+}
+
+static void
+file_readp_reap(struct lem_async *a)
+{
+ struct file *f = (struct file *)a;
+ lua_State *T = f->a.T;
+ int ret;
+
+ if (f->ret) {
+ enum lem_preason res = f->ret < 0 ? LEM_PCLOSED : LEM_PERROR;
+
+ f->a.T = NULL;
+
+ if (f->p->destroy && (ret = f->p->destroy(T, &f->buf, res)) > 0) {
+ lem_queue(T, ret);
+ return;
+ }
+
+ lua_pushnil(T);
+ if (res == LEM_PCLOSED)
+ lua_pushliteral(T, "eof");
+ else
+ lua_pushstring(T, strerror(errno));
+ lem_queue(T, 2);
+ return;
+ }
+
+ ret = f->p->process(T, &f->buf);
+ if (ret > 0) {
+ f->a.T = NULL;
+ lem_queue(T, ret);
+ return;
+ }
+
+ lem_async_put(&f->a);
+}
+
+static int
+file_readp(lua_State *T)
+{
+ struct file *f;
+ struct lem_parser *p;
+ int ret;
+
+ luaL_checktype(T, 1, LUA_TUSERDATA);
+ ret = lua_type(T, 2);
+ if (ret != LUA_TUSERDATA && ret != LUA_TLIGHTUSERDATA)
+ return luaL_argerror(T, 2, "expected userdata");
+
+ f = lua_touserdata(T, 1);
+ if (f->fd < 0) {
+ lua_pushnil(T);
+ lua_pushliteral(T, "closed");
+ return 2;
+ }
+
+ if (f->a.T != NULL) {
+ lua_pushnil(T);
+ lua_pushliteral(T, "busy");
+ return 2;
+ }
+
+ p = lua_touserdata(T, 2);
+ if (p->init)
+ p->init(T, &f->buf);
+
+ ret = p->process(T, &f->buf);
+ if (ret > 0)
+ return ret;
+
+ f->p = p;
+ lem_async_do(&f->a, T, file_readp_work, file_readp_reap);
+ return lua_yield(T, lua_gettop(T));
+}
diff --git a/lem/streams/stream.c b/lem/streams/stream.c
index 50ffc50..f499f1e 100644
--- a/lem/streams/stream.c
+++ b/lem/streams/stream.c
@@ -649,6 +649,119 @@ stream_sendfile(lua_State *T)
return lua_yield(T, 3);
}
+struct open {
+ struct lem_async a;
+ const char *path;
+ int fd;
+ int flags;
+ int type;
+};
+
+static void
+stream_open_work(struct lem_async *a)
+{
+ struct open *o = (struct open *)a;
+ int fd;
+ struct stat st;
+
+ fd = open(o->path, o->flags | O_NONBLOCK,
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
+ if (fd < 0)
+ goto error;
+
+ if (fstat(fd, &st))
+ goto error;
+
+ o->fd = fd;
+ lem_debug("st.st_mode & S_IFMT = %o", st.st_mode & S_IFMT);
+ switch (st.st_mode & S_IFMT) {
+ case S_IFSOCK:
+ case S_IFCHR:
+ case S_IFIFO:
+ o->type = 1;
+ break;
+
+ case S_IFREG:
+ case S_IFBLK:
+ o->type = 0;
+ break;
+
+ default:
+ o->type = -1;
+ }
+
+ return;
+
+error:
+ o->fd = -errno;
+}
+
+static void
+stream_open_reap(struct lem_async *a)
+{
+ struct open *o = (struct open *)a;
+ lua_State *T = o->a.T;
+ int fd = o->fd;
+ int flags = o->flags;
+ int ret = o->type;
+ struct istream *is;
+ struct ostream *os;
+
+ lem_debug("o->type = %d", ret);
+ free(o);
+
+ if (fd < 0) {
+ lua_pushnil(T);
+ lua_pushstring(T, strerror(-o->fd));
+ /*
+ switch (-o->fd) {
+ case ENOENT:
+ lua_pushliteral(T, "not found");
+ break;
+ case EACCES:
+ lua_pushliteral(T, "permission denied");
+ break;
+ default:
+ lua_pushstring(T, strerror(errno));
+ }
+ */
+ lem_queue(T, 2);
+ return;
+ }
+
+ if (ret < 0) {
+ lua_pushnil(T);
+ lua_pushliteral(T, "invalid type");
+ lem_queue(T, 2);
+ return;
+ }
+
+ if (ret == 0) {
+ file_new(T, fd, lua_upvalueindex(3));
+ lem_queue(T, 1);
+ return;
+ }
+
+ if ((flags & O_WRONLY) == 0)
+ is = istream_new(T, fd, lua_upvalueindex(1));
+ else
+ is = NULL;
+
+ if (flags & (O_RDWR | O_WRONLY))
+ os = ostream_new(T, fd, lua_upvalueindex(2));
+ else
+ os = NULL;
+
+ if (is && os) {
+ is->twin = os;
+ os->twin = is;
+ ret = 2;
+ } else
+ ret = 1;
+
+ lem_queue(T, ret);
+}
+
static int
mode_to_flags(const char *mode)
{
@@ -698,46 +811,24 @@ stream_open(lua_State *T)
{
const char *path = luaL_checkstring(T, 1);
int flags = mode_to_flags(luaL_optstring(T, 2, "r"));
- int fd;
- struct istream *is;
- struct ostream *os;
+ struct open *o;
+ int args;
if (flags < 0)
return luaL_error(T, "invalid mode string");
- fd = open(path, flags | O_NONBLOCK);
- if (fd < 0) {
- lua_pushnil(T);
- switch (errno) {
- case ENOENT:
- lua_pushliteral(T, "not found");
- break;
- case EACCES:
- lua_pushliteral(T, "permission denied");
- break;
- default:
- lua_pushstring(T, strerror(errno));
- }
- return 2;
- }
-
- if ((flags & O_WRONLY) == 0)
- is = istream_new(T, fd, lua_upvalueindex(1));
- else
- is = NULL;
+ o = lem_xmalloc(sizeof(struct open));
+ o->path = path;
+ o->flags = flags;
- if (flags & (O_RDWR | O_WRONLY))
- os = ostream_new(T, fd, lua_upvalueindex(2));
- else
- os = NULL;
+ lem_async_do(&o->a, T, stream_open_work, stream_open_reap);
- if (is && os) {
- is->twin = os;
- os->twin = is;
- return 2;
+ args = lua_gettop(T);
+ if (args > 2) {
+ lua_settop(T, 2);
+ args = 2;
}
-
- return 1;
+ return lua_yield(T, args);
}
static int
diff --git a/test/fread.lua b/test/fread.lua
new file mode 100755
index 0000000..56309d8
--- /dev/null
+++ b/test/fread.lua
@@ -0,0 +1,72 @@
+#!bin/lem
+--
+-- This file is part of LEM, a Lua Event Machine.
+-- Copyright 2011-2012 Emil Renner Berthing
+--
+-- LEM is free software: you can redistribute it and/or
+-- modify it under the terms of the GNU General Public License as
+-- published by the Free Software Foundation, either version 3 of
+-- the License, or (at your option) any later version.
+--
+-- LEM is distributed in the hope that it will be useful,
+-- but WITHOUT ANY WARRANTY; without even the implied warranty of
+-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+-- GNU General Public License for more details.
+--
+-- You should have received a copy of the GNU General Public License
+-- along with LEM. If not, see <http://www.gnu.org/licenses/>.
+--
+
+local utils = require 'lem.utils'
+local stream = require 'lem.streams'
+
+local write
+do
+ local stdout = stream.stdout
+ function write(str)
+ return stdout:write(str)
+ end
+end
+
+print("Press enter to read '" .. (arg[1] or arg[0]) .. "'")
+stream.stdin:read()
+
+local threads = 2
+for i = 1, threads do
+ utils.spawn(function()
+ local file = assert(stream.open(arg[1] or arg[0]))
+ assert(getmetatable(file) == stream.File, "Hmm...")
+
+ ---[[
+ local chunk, err
+ while true do
+ chunk, err = file:read()
+ if not chunk then break end
+ write('|')
+ --print(chunk)
+ end
+ assert(err == 'eof', "Hmm..")
+ --[=[
+ --]]
+ local line, err
+ while true do
+ line, err = file:read('*l')
+ if not line then break end
+ print(line)
+ end
+ --]=]
+
+ threads = threads - 1
+ end)
+end
+
+local yield = utils.yield
+while threads > 0 do
+ write('.')
+ yield()
+end
+
+print "\nDone. Press enter to continue."
+stream.stdin:read()
+
+-- vim: set ts=2 sw=2 noet: