diff options
author | Emil Renner Berthing <esmil@mailme.dk> | 2012-12-09 03:27:02 +0100 |
---|---|---|
committer | Emil Renner Berthing <esmil@mailme.dk> | 2012-12-17 10:11:06 +0100 |
commit | 9e69d37f9e7a00b9902ebe5618904871397c7999 (patch) | |
tree | b76203d2288f18947d62441cd16c70ec4b2a0783 | |
parent | bf8c9594b6492af209b7d62c67d7f165d85b46e8 (diff) | |
download | lem-9e69d37f9e7a00b9902ebe5618904871397c7999.tar.gz lem-9e69d37f9e7a00b9902ebe5618904871397c7999.tar.xz lem-9e69d37f9e7a00b9902ebe5618904871397c7999.zip |
streams: add File type
-rw-r--r-- | lem/streams.lua | 1 | ||||
-rw-r--r-- | lem/streams/core.c | 24 | ||||
-rw-r--r-- | lem/streams/file.c | 196 | ||||
-rw-r--r-- | lem/streams/stream.c | 157 | ||||
-rwxr-xr-x | test/fread.lua | 72 |
5 files changed, 416 insertions, 34 deletions
diff --git a/lem/streams.lua b/lem/streams.lua index f4f9d4f..51d9e4f 100644 --- a/lem/streams.lua +++ b/lem/streams.lua @@ -43,6 +43,7 @@ do end M.IStream.read = M.reader(M.IStream.readp) + M.File.read = M.reader(M.File.readp) end return M diff --git a/lem/streams/core.c b/lem/streams/core.c index e9ed9f4..b71dc06 100644 --- a/lem/streams/core.c +++ b/lem/streams/core.c @@ -38,6 +38,7 @@ #include <streams.h> #include "sendfile.c" +#include "file.c" #include "stream.c" #include "server.c" #include "tcp.c" @@ -168,6 +169,26 @@ luaopen_lem_streams_core(lua_State *L) /* insert table */ lua_setfield(L, -2, "OStream"); + /* create File metatable */ + lua_newtable(L); + /* mt.__index = mt */ + lua_pushvalue(L, -1); + lua_setfield(L, -2, "__index"); + /* mt.__gc = <file_gc> */ + lua_pushcfunction(L, file_gc); + lua_setfield(L, -2, "__gc"); + /* mt.closed = <file_closed> */ + lua_pushcfunction(L, file_closed); + lua_setfield(L, -2, "closed"); + /* mt.close = <file_close> */ + lua_pushcfunction(L, file_close); + lua_setfield(L, -2, "close"); + /* mt.readp = <file_readp> */ + lua_pushcfunction(L, file_readp); + lua_setfield(L, -2, "readp"); + /* insert table */ + lua_setfield(L, -2, "File"); + /* create metatable for server objects */ lua_newtable(L); /* mt.__index = mt */ @@ -204,7 +225,8 @@ luaopen_lem_streams_core(lua_State *L) /* insert open function */ lua_getfield(L, -1, "IStream"); /* upvalue 1 = IStream */ lua_getfield(L, -2, "OStream"); /* upvalue 2 = OStream */ - lua_pushcclosure(L, stream_open, 2); + lua_getfield(L, -3, "File"); /* upvalue 3 = File */ + lua_pushcclosure(L, stream_open, 3); lua_setfield(L, -2, "open"); /* insert popen function */ lua_getfield(L, -1, "IStream"); /* upvalue 1 = IStream */ diff --git a/lem/streams/file.c b/lem/streams/file.c new file mode 100644 index 0000000..85fb5a0 --- /dev/null +++ b/lem/streams/file.c @@ -0,0 +1,196 @@ +/* + * This file is part of LEM, a Lua Event Machine. + * Copyright 2011-2012 Emil Renner Berthing + * + * LEM is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * LEM is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with LEM. If not, see <http://www.gnu.org/licenses/>. + */ + +struct file { + struct lem_async a; + int fd; + int ret; + struct lem_parser *p; + struct lem_inputbuf buf; +}; + +static struct file * +file_new(lua_State *T, int fd, int mt) +{ + struct file *f; + + /* create userdata and set the metatable */ + f = lua_newuserdata(T, sizeof(struct file)); + lua_pushvalue(T, mt); + lua_setmetatable(T, -2); + + /* initialize userdata */ + f->a.T = NULL; + f->fd = fd; + f->buf.start = f->buf.end = 0; + + return f; +} + +static int +file_gc(lua_State *T) +{ + struct file *f = lua_touserdata(T, 1); + + lem_debug("collecting %p, fd = %d", f, f->fd); + if (f->fd >= 0) + (void)close(f->fd); + + return 0; +} + +static int +file_closed(lua_State *T) +{ + struct file *f; + + luaL_checktype(T, 1, LUA_TUSERDATA); + f = lua_touserdata(T, 1); + lua_pushboolean(T, f->fd < 0); + return 1; +} + +static int +file_close(lua_State *T) +{ + struct file *f; + int ret; + + luaL_checktype(T, 1, LUA_TUSERDATA); + f = lua_touserdata(T, 1); + if (f->fd < 0) { + lua_pushnil(T); + lua_pushliteral(T, "already closed"); + return 2; + } + + if (f->a.T != NULL) { + lua_pushnil(T); + lua_pushliteral(T, "busy"); + return 2; + } + + lem_debug("collecting %d", f->fd); + ret = close(f->fd); + f->fd = -1; + if (ret) { + lua_pushnil(T); + lua_pushstring(T, strerror(errno)); + return 2; + } + + lua_pushboolean(T, 1); + return 1; +} + +/* + * file:readp() method + */ +static void +file_readp_work(struct lem_async *a) +{ + struct file *f = (struct file *)a; + ssize_t bytes = read(f->fd, f->buf.buf + f->buf.end, + LEM_INPUTBUF_SIZE - f->buf.end); + + lem_debug("read %ld bytes from %d", bytes, f->fd); + if (bytes > 0) { + f->ret = 0; + f->buf.end += bytes; + } else if (bytes == 0) { + f->ret = -1; + } else { + close(f->fd); + f->fd = -1; + f->ret = errno; + } +} + +static void +file_readp_reap(struct lem_async *a) +{ + struct file *f = (struct file *)a; + lua_State *T = f->a.T; + int ret; + + if (f->ret) { + enum lem_preason res = f->ret < 0 ? LEM_PCLOSED : LEM_PERROR; + + f->a.T = NULL; + + if (f->p->destroy && (ret = f->p->destroy(T, &f->buf, res)) > 0) { + lem_queue(T, ret); + return; + } + + lua_pushnil(T); + if (res == LEM_PCLOSED) + lua_pushliteral(T, "eof"); + else + lua_pushstring(T, strerror(errno)); + lem_queue(T, 2); + return; + } + + ret = f->p->process(T, &f->buf); + if (ret > 0) { + f->a.T = NULL; + lem_queue(T, ret); + return; + } + + lem_async_put(&f->a); +} + +static int +file_readp(lua_State *T) +{ + struct file *f; + struct lem_parser *p; + int ret; + + luaL_checktype(T, 1, LUA_TUSERDATA); + ret = lua_type(T, 2); + if (ret != LUA_TUSERDATA && ret != LUA_TLIGHTUSERDATA) + return luaL_argerror(T, 2, "expected userdata"); + + f = lua_touserdata(T, 1); + if (f->fd < 0) { + lua_pushnil(T); + lua_pushliteral(T, "closed"); + return 2; + } + + if (f->a.T != NULL) { + lua_pushnil(T); + lua_pushliteral(T, "busy"); + return 2; + } + + p = lua_touserdata(T, 2); + if (p->init) + p->init(T, &f->buf); + + ret = p->process(T, &f->buf); + if (ret > 0) + return ret; + + f->p = p; + lem_async_do(&f->a, T, file_readp_work, file_readp_reap); + return lua_yield(T, lua_gettop(T)); +} diff --git a/lem/streams/stream.c b/lem/streams/stream.c index 50ffc50..f499f1e 100644 --- a/lem/streams/stream.c +++ b/lem/streams/stream.c @@ -649,6 +649,119 @@ stream_sendfile(lua_State *T) return lua_yield(T, 3); } +struct open { + struct lem_async a; + const char *path; + int fd; + int flags; + int type; +}; + +static void +stream_open_work(struct lem_async *a) +{ + struct open *o = (struct open *)a; + int fd; + struct stat st; + + fd = open(o->path, o->flags | O_NONBLOCK, + S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); + if (fd < 0) + goto error; + + if (fstat(fd, &st)) + goto error; + + o->fd = fd; + lem_debug("st.st_mode & S_IFMT = %o", st.st_mode & S_IFMT); + switch (st.st_mode & S_IFMT) { + case S_IFSOCK: + case S_IFCHR: + case S_IFIFO: + o->type = 1; + break; + + case S_IFREG: + case S_IFBLK: + o->type = 0; + break; + + default: + o->type = -1; + } + + return; + +error: + o->fd = -errno; +} + +static void +stream_open_reap(struct lem_async *a) +{ + struct open *o = (struct open *)a; + lua_State *T = o->a.T; + int fd = o->fd; + int flags = o->flags; + int ret = o->type; + struct istream *is; + struct ostream *os; + + lem_debug("o->type = %d", ret); + free(o); + + if (fd < 0) { + lua_pushnil(T); + lua_pushstring(T, strerror(-o->fd)); + /* + switch (-o->fd) { + case ENOENT: + lua_pushliteral(T, "not found"); + break; + case EACCES: + lua_pushliteral(T, "permission denied"); + break; + default: + lua_pushstring(T, strerror(errno)); + } + */ + lem_queue(T, 2); + return; + } + + if (ret < 0) { + lua_pushnil(T); + lua_pushliteral(T, "invalid type"); + lem_queue(T, 2); + return; + } + + if (ret == 0) { + file_new(T, fd, lua_upvalueindex(3)); + lem_queue(T, 1); + return; + } + + if ((flags & O_WRONLY) == 0) + is = istream_new(T, fd, lua_upvalueindex(1)); + else + is = NULL; + + if (flags & (O_RDWR | O_WRONLY)) + os = ostream_new(T, fd, lua_upvalueindex(2)); + else + os = NULL; + + if (is && os) { + is->twin = os; + os->twin = is; + ret = 2; + } else + ret = 1; + + lem_queue(T, ret); +} + static int mode_to_flags(const char *mode) { @@ -698,46 +811,24 @@ stream_open(lua_State *T) { const char *path = luaL_checkstring(T, 1); int flags = mode_to_flags(luaL_optstring(T, 2, "r")); - int fd; - struct istream *is; - struct ostream *os; + struct open *o; + int args; if (flags < 0) return luaL_error(T, "invalid mode string"); - fd = open(path, flags | O_NONBLOCK); - if (fd < 0) { - lua_pushnil(T); - switch (errno) { - case ENOENT: - lua_pushliteral(T, "not found"); - break; - case EACCES: - lua_pushliteral(T, "permission denied"); - break; - default: - lua_pushstring(T, strerror(errno)); - } - return 2; - } - - if ((flags & O_WRONLY) == 0) - is = istream_new(T, fd, lua_upvalueindex(1)); - else - is = NULL; + o = lem_xmalloc(sizeof(struct open)); + o->path = path; + o->flags = flags; - if (flags & (O_RDWR | O_WRONLY)) - os = ostream_new(T, fd, lua_upvalueindex(2)); - else - os = NULL; + lem_async_do(&o->a, T, stream_open_work, stream_open_reap); - if (is && os) { - is->twin = os; - os->twin = is; - return 2; + args = lua_gettop(T); + if (args > 2) { + lua_settop(T, 2); + args = 2; } - - return 1; + return lua_yield(T, args); } static int diff --git a/test/fread.lua b/test/fread.lua new file mode 100755 index 0000000..56309d8 --- /dev/null +++ b/test/fread.lua @@ -0,0 +1,72 @@ +#!bin/lem +-- +-- This file is part of LEM, a Lua Event Machine. +-- Copyright 2011-2012 Emil Renner Berthing +-- +-- LEM is free software: you can redistribute it and/or +-- modify it under the terms of the GNU General Public License as +-- published by the Free Software Foundation, either version 3 of +-- the License, or (at your option) any later version. +-- +-- LEM is distributed in the hope that it will be useful, +-- but WITHOUT ANY WARRANTY; without even the implied warranty of +-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +-- GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License +-- along with LEM. If not, see <http://www.gnu.org/licenses/>. +-- + +local utils = require 'lem.utils' +local stream = require 'lem.streams' + +local write +do + local stdout = stream.stdout + function write(str) + return stdout:write(str) + end +end + +print("Press enter to read '" .. (arg[1] or arg[0]) .. "'") +stream.stdin:read() + +local threads = 2 +for i = 1, threads do + utils.spawn(function() + local file = assert(stream.open(arg[1] or arg[0])) + assert(getmetatable(file) == stream.File, "Hmm...") + + ---[[ + local chunk, err + while true do + chunk, err = file:read() + if not chunk then break end + write('|') + --print(chunk) + end + assert(err == 'eof', "Hmm..") + --[=[ + --]] + local line, err + while true do + line, err = file:read('*l') + if not line then break end + print(line) + end + --]=] + + threads = threads - 1 + end) +end + +local yield = utils.yield +while threads > 0 do + write('.') + yield() +end + +print "\nDone. Press enter to continue." +stream.stdin:read() + +-- vim: set ts=2 sw=2 noet: |