diff options
author | Emil Renner Berthing <esmil@mailme.dk> | 2012-12-11 23:59:37 +0100 |
---|---|---|
committer | Emil Renner Berthing <esmil@mailme.dk> | 2012-12-17 10:11:06 +0100 |
commit | f25118928aa35861b370a6529c651a28f5c8859b (patch) | |
tree | 63f07f33cd207f947ea43bef5d92f5311b2e758b | |
parent | b738f523cc7a56602e07ff54bd11203355a64af9 (diff) | |
download | lem-f25118928aa35861b370a6529c651a28f5c8859b.tar.gz lem-f25118928aa35861b370a6529c651a28f5c8859b.tar.xz lem-f25118928aa35861b370a6529c651a28f5c8859b.zip |
io: rework streams
-rw-r--r-- | lem/hathaway.lua | 18 | ||||
-rw-r--r-- | lem/http.lua | 16 | ||||
-rw-r--r-- | lem/http/core.c | 2 | ||||
-rw-r--r-- | lem/io.lua | 4 | ||||
-rw-r--r-- | lem/io/core.c | 122 | ||||
-rw-r--r-- | lem/io/queue.lua | 18 | ||||
-rw-r--r-- | lem/io/server.c | 88 | ||||
-rw-r--r-- | lem/io/stream.c | 508 | ||||
-rw-r--r-- | lem/io/tcp.c | 26 | ||||
-rwxr-xr-x | test/ctest.lua | 8 | ||||
-rwxr-xr-x | test/httptest.lua | 15 | ||||
-rwxr-xr-x | test/multiplexing.lua | 2 | ||||
-rwxr-xr-x | test/stest.lua | 9 |
13 files changed, 290 insertions, 546 deletions
diff --git a/lem/hathaway.lua b/lem/hathaway.lua index bdc4535..2cde2ca 100644 --- a/lem/hathaway.lua +++ b/lem/hathaway.lua @@ -226,14 +226,13 @@ do return true end - local function handler(istream, ostream) + local function handler(client) repeat - local req, err = istream:read('HTTPRequest') + local req, err = client:read('HTTPRequest') if not req then M.debug(err) break end local method, uri, version = req.method, req.uri, req.version M.debug(format("%s %s HTTP/%s", method, uri, version)) - req.ostream = ostream local res = new_response(req) if version ~= '1.0' and version ~= '1.1' then @@ -322,31 +321,30 @@ do i = i + 1 robe[i] = '\r\n' - local ok, err = ostream:cork() + local ok, err = client:cork() if not ok then M.debug(err) break end - local ok, err = ostream:write(concat(robe)) + local ok, err = client:write(concat(robe)) if not ok then M.debug(err) break end if method ~= 'HEAD' then if file then - ok, err = ostream:sendfile(file) + ok, err = client:sendfile(file) if close then file:close() end else - ok, err = ostream:write(concat(res)) + ok, err = client:write(concat(res)) end if not ok then M.debug(err) break end end - local ok, err = ostream:uncork() + local ok, err = client:uncork() if not ok then M.debug(err) break end until version == '1.0' or req.headers['Connection'] == 'close' or headers['Connection'] == 'close' - istream:close() - ostream:close() + client:close() end function M.Hathaway(address, port) diff --git a/lem/http.lua b/lem/http.lua index 6dd25b9..8a14961 100644 --- a/lem/http.lua +++ b/lem/http.lua @@ -35,40 +35,40 @@ function http.Request:body() if len <= 0 then return body end if self.headers['Expect'] == '100-continue' then - local ok, err = self.ostream:send('HTTP/1.1 100 Continue\r\n\r\n') + local ok, err = self.client:send('HTTP/1.1 100 Continue\r\n\r\n') if not ok then return nil, err end end local err - body, err = self.istream:read(len) + body, err = self.client:read(len) if not body then return nil, err end return body end function http.Response:body_chunked() - local istream = self.istream + local client = self.client local t, n = {}, 0 local line, err while true do - line, err = istream:read('*l') + line, err = client:read('*l') if not line then return nil, err end local num = tonumber(line, 16) if not num then return nil, 'expectation failed' end if num == 0 then break end - local data, err = istream:read(num) + local data, err = client:read(num) if not data then return nil, err end n = n + 1 t[n] = data - line, err = istream:read('*l') + line, err = client:read('*l') if not line then return nil, err end end - line, err = istream:read('*l') + line, err = client:read('*l') if not line then return nil, err end return t @@ -85,7 +85,7 @@ function http.Response:body() num = tonumber(num) if not num then return nil, 'invalid content length' end - return self.istream:read(num) + return self.client:read(num) end return http diff --git a/lem/http/core.c b/lem/http/core.c index 3c2d6b3..bfaa99b 100644 --- a/lem/http/core.c +++ b/lem/http/core.c @@ -165,7 +165,7 @@ parse_http_init(lua_State *T) lua_settop(T, 2); lua_createtable(T, 0, 5); lua_pushvalue(T, 1); - lua_setfield(T, -2, "istream"); + lua_setfield(T, -2, "client"); } static void @@ -42,12 +42,12 @@ do end end - io.IStream.read = io.reader(io.IStream.readp) + io.Stream.read = io.reader(io.Stream.readp) io.File.read = io.reader(io.File.readp) end do - local _write, stdout = io.OStream.write, io.stdout + local _write, stdout = io.Stream.write, io.stdout function io.write(str) return _write(stdout, str) diff --git a/lem/io/core.c b/lem/io/core.c index da3241e..5f964be 100644 --- a/lem/io/core.c +++ b/lem/io/core.c @@ -20,6 +20,7 @@ #include <unistd.h> #include <errno.h> #include <string.h> +#include <assert.h> #include <fcntl.h> #include <sys/stat.h> @@ -83,16 +84,12 @@ module_index(lua_State *T) return 2; } - if (fd == 0) - (void)istream_new(T, fd, lua_upvalueindex(1)); - else - (void)ostream_new(T, fd, lua_upvalueindex(2)); + stream_new(T, fd, lua_upvalueindex(1)); /* save this object so we don't initialize it again */ lua_pushvalue(T, 2); lua_pushvalue(T, -2); lua_rawset(T, 1); - return 1; } @@ -124,52 +121,49 @@ luaopen_lem_io_core(lua_State *L) lua_pushcclosure(L, sendfile_open, 1); lua_setfield(L, -2, "sendfile"); - /* create metatable for input stream objects */ + /* create File metatable */ lua_newtable(L); /* mt.__index = mt */ lua_pushvalue(L, -1); lua_setfield(L, -2, "__index"); - /* mt.__gc = <stream_close> */ - lua_pushcfunction(L, istream_close); + /* mt.__gc = <file_gc> */ + lua_pushcfunction(L, file_gc); lua_setfield(L, -2, "__gc"); - /* mt.closed = <stream_closed> */ - lua_pushcfunction(L, stream_closed); + /* mt.closed = <file_closed> */ + lua_pushcfunction(L, file_closed); lua_setfield(L, -2, "closed"); - /* mt.busy = <stream_busy> */ - lua_pushcfunction(L, stream_busy); - lua_setfield(L, -2, "busy"); - /* mt.interrupt = <stream_interrupt> */ - lua_pushcfunction(L, stream_interrupt); - lua_setfield(L, -2, "interrupt"); - /* mt.close = <istream_close> */ - lua_pushcfunction(L, istream_close); + /* mt.close = <file_close> */ + lua_pushcfunction(L, file_close); lua_setfield(L, -2, "close"); - /* mt.readp = <stream_readp> */ - lua_pushcfunction(L, stream_readp); + /* mt.readp = <file_readp> */ + lua_pushcfunction(L, file_readp); lua_setfield(L, -2, "readp"); + /* mt.write = <file_write> */ + lua_pushcfunction(L, file_write); + lua_setfield(L, -2, "write"); + /* mt.seek = <file_seek> */ + lua_pushcfunction(L, file_seek); + lua_setfield(L, -2, "seek"); /* insert table */ - lua_setfield(L, -2, "IStream"); + lua_setfield(L, -2, "File"); - /* create metatable for output stream objects */ + /* create Stream metatable */ lua_newtable(L); /* mt.__index = mt */ lua_pushvalue(L, -1); lua_setfield(L, -2, "__index"); - /* mt.__gc = <ostream_close> */ - lua_pushcfunction(L, ostream_close); + /* mt.__gc = <stream_gc> */ + lua_pushcfunction(L, stream_gc); lua_setfield(L, -2, "__gc"); /* mt.closed = <stream_closed> */ lua_pushcfunction(L, stream_closed); lua_setfield(L, -2, "closed"); - /* mt.busy = <stream_busy> */ - lua_pushcfunction(L, stream_busy); - lua_setfield(L, -2, "busy"); - /* mt.interrupt = <stream_interrupt> */ - lua_pushcfunction(L, stream_interrupt); - lua_setfield(L, -2, "interrupt"); - /* mt.close = <ostream_close> */ - lua_pushcfunction(L, ostream_close); + /* mt.close = <stream_close> */ + lua_pushcfunction(L, stream_close); lua_setfield(L, -2, "close"); + /* mt.readp = <stream_readp> */ + lua_pushcfunction(L, stream_readp); + lua_setfield(L, -2, "readp"); /* mt.write = <stream_write> */ lua_pushcfunction(L, stream_write); lua_setfield(L, -2, "write"); @@ -179,39 +173,13 @@ luaopen_lem_io_core(lua_State *L) /* mt.uncork = <stream_uncork> */ lua_pushcfunction(L, stream_uncork); lua_setfield(L, -2, "uncork"); - /* mt.sendfile = <ostream_sendfile> */ + /* mt.sendfile = <stream_sendfile> */ lua_pushcfunction(L, stream_sendfile); lua_setfield(L, -2, "sendfile"); /* insert table */ - lua_setfield(L, -2, "OStream"); - - /* create File metatable */ - lua_newtable(L); - /* mt.__index = mt */ - lua_pushvalue(L, -1); - lua_setfield(L, -2, "__index"); - /* mt.__gc = <file_gc> */ - lua_pushcfunction(L, file_gc); - lua_setfield(L, -2, "__gc"); - /* mt.closed = <file_closed> */ - lua_pushcfunction(L, file_closed); - lua_setfield(L, -2, "closed"); - /* mt.close = <file_close> */ - lua_pushcfunction(L, file_close); - lua_setfield(L, -2, "close"); - /* mt.readp = <file_readp> */ - lua_pushcfunction(L, file_readp); - lua_setfield(L, -2, "readp"); - /* mt.write = <file_write> */ - lua_pushcfunction(L, file_write); - lua_setfield(L, -2, "write"); - /* mt.seek = <file_seek> */ - lua_pushcfunction(L, file_seek); - lua_setfield(L, -2, "seek"); - /* insert table */ - lua_setfield(L, -2, "File"); + lua_setfield(L, -2, "Stream"); - /* create metatable for server objects */ + /* create Server metatable */ lua_newtable(L); /* mt.__index = mt */ lua_pushvalue(L, -1); @@ -232,34 +200,29 @@ luaopen_lem_io_core(lua_State *L) lua_pushcfunction(L, server_interrupt); lua_setfield(L, -2, "interrupt"); /* mt.accept = <server_accept> */ - lua_getfield(L, -2, "IStream"); /* upvalue 1 = IStream */ - lua_getfield(L, -3, "OStream"); /* upvalue 2 = OStream */ - lua_pushcclosure(L, server_accept, 2); + lua_getfield(L, -2, "Stream"); /* upvalue 1 = Stream */ + lua_pushcclosure(L, server_accept, 1); lua_setfield(L, -2, "accept"); /* mt.autospawn = <server_autospawn> */ - lua_getfield(L, -2, "IStream"); /* upvalue 1 = IStream */ - lua_getfield(L, -3, "OStream"); /* upvalue 2 = OStream */ - lua_pushcclosure(L, server_autospawn, 2); + lua_getfield(L, -2, "Stream"); /* upvalue 1 = Stream */ + lua_pushcclosure(L, server_autospawn, 1); lua_setfield(L, -2, "autospawn"); /* insert table */ lua_setfield(L, -2, "Server"); /* insert open function */ - lua_getfield(L, -1, "IStream"); /* upvalue 1 = IStream */ - lua_getfield(L, -2, "OStream"); /* upvalue 2 = OStream */ - lua_getfield(L, -3, "File"); /* upvalue 3 = File */ - lua_pushcclosure(L, stream_open, 3); + lua_getfield(L, -1, "File"); /* upvalue 1 = File */ + lua_getfield(L, -2, "Stream"); /* upvalue 2 = Stream */ + lua_pushcclosure(L, stream_open, 2); lua_setfield(L, -2, "open"); /* insert popen function */ - lua_getfield(L, -1, "IStream"); /* upvalue 1 = IStream */ - lua_getfield(L, -2, "OStream"); /* upvalue 2 = OStream */ - lua_pushcclosure(L, stream_popen, 2); + lua_getfield(L, -1, "Stream"); /* upvalue 1 = Stream */ + lua_pushcclosure(L, stream_popen, 1); lua_setfield(L, -2, "popen"); /* insert the connect function */ - lua_getfield(L, -1, "IStream"); /* upvalue 1 = IStream */ - lua_getfield(L, -2, "OStream"); /* upvalue 2 = OStream */ - lua_pushcclosure(L, tcp_connect, 2); + lua_getfield(L, -1, "Stream"); /* upvalue 1 = Stream */ + lua_pushcclosure(L, tcp_connect, 1); lua_setfield(L, -2, "tcp_connect"); /* insert the tcp4_listen function */ lua_getfield(L, -1, "Server"); /* upvalue 1 = Server */ @@ -290,9 +253,8 @@ luaopen_lem_io_core(lua_State *L) /* create metatable for the module */ lua_newtable(L); /* insert the index function */ - lua_getfield(L, -2, "IStream"); /* upvalue 1 = IStream */ - lua_getfield(L, -3, "OStream"); /* upvalue 2 = OStream */ - lua_pushcclosure(L, module_index, 2); + lua_getfield(L, -2, "Stream"); /* upvalue 1 = Stream */ + lua_pushcclosure(L, module_index, 1); lua_setfield(L, -2, "__index"); /* set the metatable */ diff --git a/lem/io/queue.lua b/lem/io/queue.lua index 63bda6d..70ee887 100644 --- a/lem/io/queue.lua +++ b/lem/io/queue.lua @@ -22,22 +22,18 @@ local setmetatable = setmetatable local thisthread, suspend, resume = utils.thisthread, utils.suspend, utils.resume -local QOStream = {} -QOStream.__index = QOStream +local Stream = {} +Stream.__index = Stream -function QOStream:closed(...) +function Stream:closed(...) return self.stream:closed(...) end -function QOStream:interrupt(...) - return self.stream:interrupt(...) -end - -function QOStream:close(...) +function Stream:close(...) return self.stream:close(...) end -function QOStream:write(...) +function Stream:write(...) local nxt = self.next if nxt == 0 then nxt = 1 @@ -67,11 +63,11 @@ end local function wrap(stream, ...) if not stream then return stream, ... end - return setmetatable({ stream = stream, next = 0 }, QOStream) + return setmetatable({ stream = stream, next = 0 }, Stream) end return { - QOStream = QOStream, + Stream = Stream, wrap = wrap, } diff --git a/lem/io/server.c b/lem/io/server.c index d318609..df63593 100644 --- a/lem/io/server.c +++ b/lem/io/server.c @@ -102,22 +102,23 @@ server_interrupt(lua_State *T) } static int -try_accept(lua_State *T, struct ev_io *w) +server__accept(lua_State *T, struct ev_io *w) { struct sockaddr client_addr; unsigned int client_addrlen; int sock; - struct istream *is; - struct ostream *os; sock = accept(w->fd, &client_addr, &client_addrlen); if (sock < 0) { if (errno == EAGAIN || errno == ECONNABORTED) return 0; + + close(w->fd); + w->fd = -1; lua_pushnil(T); lua_pushfstring(T, "error accepting connection: %s", strerror(errno)); - return -1; + return 2; } /* make the socket non-blocking */ @@ -126,43 +127,34 @@ try_accept(lua_State *T, struct ev_io *w) lua_pushnil(T); lua_pushfstring(T, "error making socket non-blocking: %s", strerror(errno)); - return -1; + return 2; } - is = istream_new(T, sock, lua_upvalueindex(1)); - os = ostream_new(T, sock, lua_upvalueindex(2)); - is->twin = os; - os->twin = is; - + stream_new(T, sock, lua_upvalueindex(1)); return 1; } static void -server_accept_handler(EV_P_ struct ev_io *w, int revents) +server_accept_cb(EV_P_ struct ev_io *w, int revents) { + int ret; + (void)revents; - switch (try_accept(w->data, w)) { - case 0: + ret = server__accept(w->data, w); + if (ret == 0) return; - case 1: - break; - - default: - close(w->fd); - w->fd = -1; - } - - ev_io_stop(EV_A_ w); - lem_queue(w->data, 2); w->data = NULL; + ev_io_stop(EV_A_ w); + lem_queue(w->data, ret); } static int server_accept(lua_State *T) { struct ev_io *w; + int ret; luaL_checktype(T, 1, LUA_TUSERDATA); w = lua_touserdata(T, 1); @@ -171,38 +163,25 @@ server_accept(lua_State *T) if (w->data != NULL) return io_busy(T); - switch (try_accept(T, w)) { - case 0: - w->cb = server_accept_handler; - w->data = T; - ev_io_start(LEM_ w); - - /* yield server object */ - lua_settop(T, 1); - return lua_yield(T, 1); - - case 1: - break; - - - default: - close(w->fd); - w->fd = -1; - } + ret = server__accept(T, w); + if (ret > 0) + return ret; - return 2; + lua_settop(T, 1); + w->cb = server_accept_cb; + w->data = T; + ev_io_start(LEM_ w); + return lua_yield(T, 1); } static void -server_autospawn_handler(EV_P_ struct ev_io *w, int revents) +server_autospawn_cb(EV_P_ struct ev_io *w, int revents) { lua_State *T = w->data; struct sockaddr client_addr; unsigned int client_addrlen; int sock; lua_State *S; - struct istream *is; - struct ostream *os; (void)revents; @@ -233,25 +212,20 @@ server_autospawn_handler(EV_P_ struct ev_io *w, int revents) lua_pushvalue(T, 2); lua_xmove(T, S, 1); - /* create streams */ - is = istream_new(T, sock, lua_upvalueindex(1)); - os = ostream_new(T, sock, lua_upvalueindex(2)); - is->twin = os; - os->twin = is; - - /* move streams to new thread */ - lua_xmove(T, S, 2); + /* create stream */ + stream_new(T, sock, lua_upvalueindex(1)); + /* move stream to new thread */ + lua_xmove(T, S, 1); - lem_queue(S, 2); + lem_queue(S, 1); return; error: ev_io_stop(EV_A_ w); close(w->fd); w->fd = -1; - - lem_queue(T, 2); w->data = NULL; + lem_queue(T, 2); } static int @@ -268,7 +242,7 @@ server_autospawn(lua_State *T) if (w->data != NULL) return io_busy(T); - w->cb = server_autospawn_handler; + w->cb = server_autospawn_cb; w->data = T; ev_io_start(LEM_ w); diff --git a/lem/io/stream.c b/lem/io/stream.c index 62a12fa..9c36484 100644 --- a/lem/io/stream.c +++ b/lem/io/stream.c @@ -16,235 +16,151 @@ * along with LEM. If not, see <http://www.gnu.org/licenses/>. */ -struct ostream; - -struct istream { - ev_io w; - struct ostream *twin; +struct stream { + struct ev_io r; + struct ev_io w; + const char *out; + size_t out_len; struct lem_parser *p; struct lem_inputbuf buf; }; -struct ostream { - ev_io w; - struct istream *twin; - const char *data; - size_t len; -}; +#define STREAM_FROM_WATCH(w, member)\ + (struct stream *)(((char *)w) - offsetof(struct stream, member)) -static struct istream * -istream_new(lua_State *T, int fd, int mt) +static struct stream * +stream_new(lua_State *T, int fd, int mt) { - struct istream *s; - /* create userdata and set the metatable */ - s = lua_newuserdata(T, sizeof(struct istream)); + struct stream *s = lua_newuserdata(T, sizeof(struct stream)); lua_pushvalue(T, mt); lua_setmetatable(T, -2); /* initialize userdata */ - ev_io_init(&s->w, NULL, fd, EV_READ); + ev_io_init(&s->r, NULL, fd, EV_READ); + ev_io_init(&s->w, NULL, fd, EV_WRITE); + s->r.data = NULL; s->w.data = NULL; - s->twin = NULL; s->buf.start = s->buf.end = 0; return s; } static int -stream_closed(lua_State *T) -{ - struct ev_io *w; - - luaL_checktype(T, 1, LUA_TUSERDATA); - w = lua_touserdata(T, 1); - lua_pushboolean(T, w->fd < 0); - return 1; -} - -static int -stream_busy(lua_State *T) -{ - struct ev_io *w; - - luaL_checktype(T, 1, LUA_TUSERDATA); - w = lua_touserdata(T, 1); - lua_pushboolean(T, w->data != NULL); - return 1; -} - -static int -stream_interrupt(lua_State *T) -{ - struct ev_io *w; - lua_State *S; - - luaL_checktype(T, 1, LUA_TUSERDATA); - w = lua_touserdata(T, 1); - S = w->data; - if (S == NULL) { - lua_pushnil(T); - lua_pushliteral(T, "not busy"); - return 2; - } - - lem_debug("interrupting io action"); - ev_io_stop(LEM_ w); - w->data = NULL; - lua_settop(S, 0); - lua_pushnil(S); - lua_pushliteral(S, "interrupted"); - lem_queue(S, 2); - - lua_pushboolean(T, 1); - return 1; -} - -static int -stream_close_check(lua_State *T, struct ev_io *w) +stream_gc(lua_State *T) { - if (w->fd < 0) - return io_closed(T); + struct stream *s = lua_touserdata(T, 1); - if (w->data != NULL) { - lua_State *S = w->data; - - lem_debug("interrupting io action"); - ev_io_stop(LEM_ w); - w->data = NULL; - lua_settop(S, 0); - lua_pushnil(S); - lua_pushliteral(S, "interrupted"); - lem_queue(S, 2); - } + if (s->r.fd >= 0) + close(s->r.fd); return 0; } static int -istream_close(lua_State *T) +stream_closed(lua_State *T) { - struct istream *s; + struct stream *s; luaL_checktype(T, 1, LUA_TUSERDATA); s = lua_touserdata(T, 1); - - lem_debug("collecting %d", s->w.fd); - if (stream_close_check(T, &s->w)) - return 2; - - if (s->twin) - s->twin->twin = NULL; - else if (close(s->w.fd)) { - s->w.fd = -1; - lua_pushnil(T); - lua_pushstring(T, strerror(errno)); - return 2; - } - - s->w.fd = -1; - lua_pushboolean(T, 1); + lua_pushboolean(T, s->r.fd < 0); return 1; } static int -ostream_close(lua_State *T) +stream_close(lua_State *T) { - struct ostream *s; + struct stream *s; + int ret; luaL_checktype(T, 1, LUA_TUSERDATA); s = lua_touserdata(T, 1); + if (s->r.fd < 0) + return io_closed(T); + if (s->r.data != NULL || s->w.data != NULL) + return io_busy(T); - lem_debug("collecting %d", s->w.fd); - if (stream_close_check(T, &s->w)) - return 2; - - if (s->twin) - s->twin->twin = NULL; - else if (close(s->w.fd)) { - s->w.fd = -1; + ret = close(s->r.fd); + s->r.fd = s->w.fd = -1; + if (ret) { lua_pushnil(T); lua_pushstring(T, strerror(errno)); return 2; } - s->w.fd = -1; lua_pushboolean(T, 1); return 1; } /* - * istream:readp() method + * stream:readp() method */ - -static void -stream_readp_handler(EV_P_ ev_io *w, int revents) +static int +stream__readp(lua_State *T, struct stream *s) { - struct istream *s = (struct istream *)w; - lua_State *T = s->w.data; ssize_t bytes; int ret; - enum lem_preason reason; - const char *msg; - - (void)revents; + int err; + enum lem_preason res; - while ((bytes = read(s->w.fd, s->buf.buf + s->buf.end, + while ((bytes = read(s->r.fd, s->buf.buf + s->buf.end, LEM_INPUTBUF_SIZE - s->buf.end)) > 0) { - lem_debug("read %ld bytes from %d", bytes, s->w.fd); + lem_debug("read %ld bytes from %d", bytes, s->r.fd); s->buf.end += bytes; ret = s->p->process(T, &s->buf); - if (ret > 0) { - ev_io_stop(EV_A_ &s->w); - s->w.data = NULL; - lem_queue(T, ret); - return; - } + if (ret > 0) + return ret; } - lem_debug("read %ld bytes from %d", bytes, s->w.fd); - - if (bytes < 0 && errno == EAGAIN) - return; + err = errno; + lem_debug("read %ld bytes from %d", bytes, s->r.fd); - ev_io_stop(EV_A_ &s->w); - s->w.data = NULL; + if (bytes < 0 && err == EAGAIN) + return 0; - if (s->twin) - s->twin->twin = NULL; + if (bytes == 0 || err == ECONNRESET || err == EPIPE) + res = LEM_PCLOSED; else - (void)close(s->w.fd); - s->w.fd = -1; + res = LEM_PERROR; - if (bytes == 0 || errno == ECONNRESET || errno == EPIPE) { - reason = LEM_PCLOSED; - msg = "closed"; - } else { - reason = LEM_PERROR; - msg = strerror(errno); - } - - if (s->p->destroy && (ret = s->p->destroy(T, &s->buf, reason)) > 0) { - lem_queue(T, ret); - return; - } + if (s->p->destroy && (ret = s->p->destroy(T, &s->buf, res)) > 0) + return ret; lua_settop(T, 0); + if (res == LEM_PCLOSED) + return io_closed(T); + lua_pushnil(T); - lua_pushstring(T, msg); - lem_queue(T, 2); + lua_pushstring(T, strerror(err)); + return 2; +} + +static void +stream_readp_cb(EV_P_ struct ev_io *w, int revents) +{ + struct stream *s = STREAM_FROM_WATCH(w, r); + lua_State *T = s->r.data; + int ret; + + (void)revents; + + ret = stream__readp(T, s); + if (ret == 0) + return; + + ev_io_stop(EV_A_ &s->r); + s->r.data = NULL; + lem_queue(T, ret); } static int stream_readp(lua_State *T) { - struct istream *s; + struct stream *s; struct lem_parser *p; int ret; - ssize_t bytes; - enum lem_preason reason; - const char *msg; luaL_checktype(T, 1, LUA_TUSERDATA); ret = lua_type(T, 2); @@ -252,130 +168,91 @@ stream_readp(lua_State *T) return luaL_argerror(T, 2, "expected userdata"); s = lua_touserdata(T, 1); - if (s->w.fd < 0) + if (s->r.fd < 0) return io_closed(T); - if (s->w.data != NULL) + if (s->r.data != NULL) return io_busy(T); p = lua_touserdata(T, 2); if (p->init) p->init(T, &s->buf); -again: ret = p->process(T, &s->buf); if (ret > 0) return ret; - bytes = read(s->w.fd, s->buf.buf + s->buf.end, LEM_INPUTBUF_SIZE - s->buf.end); - lem_debug("read %ld bytes from %d", bytes, s->w.fd); - if (bytes > 0) { - s->buf.end += bytes; - goto again; - } - - if (bytes < 0 && errno == EAGAIN) { - s->p = p; - s->w.data = T; - s->w.cb = stream_readp_handler; - ev_io_start(LEM_ &s->w); - return lua_yield(T, lua_gettop(T)); - } - - if (s->twin) - s->twin->twin = NULL; - else - (void)close(s->w.fd); - s->w.fd = -1; - - if (bytes == 0 || errno == ECONNRESET || errno == EPIPE) { - reason = LEM_PCLOSED; - msg = "closed"; - } else { - reason = LEM_PERROR; - msg = strerror(errno); - } - - if (p->destroy && (ret = p->destroy(T, &s->buf, reason)) > 0) + s->p = p; + ret = stream__readp(T, s); + if (ret > 0) return ret; - lua_settop(T, 0); - lua_pushnil(T); - lua_pushstring(T, msg); - return 2; + s->r.data = T; + s->r.cb = stream_readp_cb; + ev_io_start(LEM_ &s->r); + return lua_yield(T, lua_gettop(T)); } -static struct ostream * -ostream_new(lua_State *T, int fd, int mt) +/* + * stream:write() method + */ +static int +stream__write(lua_State *T, struct stream *s) { - struct ostream *s; + ssize_t bytes; + int err; - /* create userdata and set the metatable */ - s = lua_newuserdata(T, sizeof(struct ostream)); - lua_pushvalue(T, mt); - lua_setmetatable(T, -2); + while ((bytes = write(s->w.fd, s->out, s->out_len)) > 0) { + s->out_len -= bytes; + if (s->out_len == 0) { + lua_pushboolean(T, 1); + return 1; + } + s->out += bytes; + } + err = errno; - /* initialize userdata */ - ev_io_init(&s->w, NULL, fd, EV_WRITE); - s->w.data = NULL; - s->twin = NULL; + if (bytes < 0 && err == EAGAIN) + return 0; - return s; + close(s->w.fd); + s->w.fd = s->r.fd = -1; + + if (bytes == 0 || err == ECONNRESET || err == EPIPE) + return io_closed(T); + + lua_pushnil(T); + lua_pushstring(T, strerror(err)); + return 2; } static void -stream_write_handler(EV_P_ struct ev_io *w, int revents) +stream_write_cb(EV_P_ struct ev_io *w, int revents) { - struct ostream *s = (struct ostream *)w; + struct stream *s = STREAM_FROM_WATCH(w, w); lua_State *T = s->w.data; - ssize_t bytes; + int ret; (void)revents; -again: - bytes = write(s->w.fd, s->data, s->len); - if (bytes > 0) { - s->len -= bytes; - if (s->len > 0) { - s->data += bytes; - goto again; - } - - ev_io_stop(EV_A_ &s->w); - s->w.data = NULL; - - lua_pushboolean(T, 1); - lem_queue(T, 1); - return; - } - - if (bytes < 0 && errno == EAGAIN) + ret = stream__write(T, s); + if (ret == 0) return; ev_io_stop(EV_A_ &s->w); s->w.data = NULL; - - lua_pushnil(T); - if (bytes == 0 || errno == ECONNRESET || errno == EPIPE) - lua_pushliteral(T, "closed"); - else - lua_pushstring(T, strerror(errno)); - - if (s->twin) - s->twin->twin = NULL; - else - (void)close(s->w.fd); - s->w.fd = -1; - lem_queue(T, 2); + lem_queue(T, ret); } static int stream_write(lua_State *T) { - struct ostream *s; - ssize_t bytes; + struct stream *s; + const char *out; + size_t out_len; + int ret; luaL_checktype(T, 1, LUA_TUSERDATA); - luaL_checktype(T, 2, LUA_TSTRING); + out = luaL_checklstring(T, 2, &out_len); s = lua_touserdata(T, 1); if (s->w.fd < 0) @@ -383,48 +260,20 @@ stream_write(lua_State *T) if (s->w.data != NULL) return io_busy(T); - s->data = lua_tolstring(T, 2, &s->len); - if (s->len == 0) { - lua_pushboolean(T, 1); - return 1; - } - -again: - bytes = write(s->w.fd, s->data, s->len); - if (bytes > 0) { - s->len -= bytes; - if (s->len > 0) { - s->data += bytes; - goto again; - } - - lua_pushboolean(T, 1); - return 1; - } - - if (bytes < 0 && errno == EAGAIN) { - lua_settop(T, 1); - s->w.data = T; - s->w.cb = stream_write_handler; - ev_io_start(LEM_ &s->w); - return lua_yield(T, 1); - } + lua_settop(T, 2); - lua_pushnil(T); - if (bytes == 0 || errno == ECONNRESET || errno == EPIPE) - lua_pushliteral(T, "closed"); - else - lua_pushstring(T, strerror(errno)); + s->out = out; + s->out_len = out_len; + ret = stream__write(T, s); + if (ret > 0) + return ret; - if (s->twin) - s->twin->twin = NULL; - else - (void)close(s->w.fd); - s->w.fd = -1; - return 2; + s->w.data = T; + s->w.cb = stream_write_cb; + ev_io_start(LEM_ &s->w); + return lua_yield(T, 2); } - #ifndef TCP_CORK #define TCP_CORK TCP_NOPUSH #endif @@ -432,7 +281,7 @@ again: static int stream_setcork(lua_State *T, int state) { - struct ostream *s; + struct stream *s; luaL_checktype(T, 1, LUA_TUSERDATA); s = lua_touserdata(T, 1); @@ -469,7 +318,7 @@ struct sendfile { }; static int -try_sendfile(lua_State *T, struct ostream *s, struct sendfile *sf) +stream__sendfile(lua_State *T, struct stream *s, struct sendfile *sf) { #ifdef __FreeBSD__ int ret; @@ -539,36 +388,28 @@ try_sendfile(lua_State *T, struct ostream *s, struct sendfile *sf) } static void -sendfile_handler(EV_P_ struct ev_io *w, int revents) +stream_sendfile_cb(EV_P_ struct ev_io *w, int revents) { - struct ostream *s = (struct ostream *)w; + struct stream *s = STREAM_FROM_WATCH(w, w); lua_State *T = s->w.data; struct sendfile *sf = lua_touserdata(T, 3); int ret; (void)revents; - ret = try_sendfile(T, s, sf); + ret = stream__sendfile(T, s, sf); if (ret == 0) return; ev_io_stop(EV_A_ &s->w); - if (ret == 2) { - if (s->twin) - s->twin->twin = NULL; - else - (void)close(s->w.fd); - s->w.fd = -1; - } - - lem_queue(T, ret); s->w.data = NULL; + lem_queue(T, ret); } static int stream_sendfile(lua_State *T) { - struct ostream *s; + struct stream *s; struct lem_sendfile *f; struct sendfile *sf; off_t offset; @@ -599,21 +440,13 @@ stream_sendfile(lua_State *T) sf->file = f; sf->offset = offset; - ret = try_sendfile(T, s, sf); - if (ret > 0) { - if (ret == 2) { - if (s->twin) - s->twin->twin = NULL; - else - (void)close(s->w.fd); - s->w.fd = -1; - } + ret = stream__sendfile(T, s, sf); + if (ret > 0) return ret; - } lem_debug("yielding"); s->w.data = T; - s->w.cb = sendfile_handler; + s->w.cb = stream_sendfile_cb; ev_io_start(LEM_ &s->w); return lua_yield(T, 3); } @@ -671,10 +504,7 @@ stream_open_reap(struct lem_async *a) struct open *o = (struct open *)a; lua_State *T = o->a.T; int fd = o->fd; - int flags = o->flags; int ret = o->type; - struct istream *is; - struct ostream *os; lem_debug("o->type = %d", ret); free(o); @@ -705,30 +535,12 @@ stream_open_reap(struct lem_async *a) return; } - if (ret == 0) { + if (ret == 0) file_new(T, fd, lua_upvalueindex(3)); - lem_queue(T, 1); - return; - } - - if ((flags & O_WRONLY) == 0) - is = istream_new(T, fd, lua_upvalueindex(1)); - else - is = NULL; - - if (flags & (O_RDWR | O_WRONLY)) - os = ostream_new(T, fd, lua_upvalueindex(2)); else - os = NULL; + stream_new(T, fd, lua_upvalueindex(1)); - if (is && os) { - is->twin = os; - os->twin = is; - ret = 2; - } else - ret = 1; - - lem_queue(T, ret); + lem_queue(T, 1); } static int @@ -806,39 +618,45 @@ stream_popen(lua_State *T) const char *cmd = luaL_checkstring(T, 1); const char *mode = luaL_optstring(T, 2, "r"); int fd[2]; + int err; if (mode[0] != 'r' && mode[0] != 'w') return luaL_error(T, "invalid mode string"); - if (pipe(fd)) + if (pipe(fd)) { + err = errno; goto error; + } switch (fork()) { case -1: /* error */ - (void)close(fd[0]); - (void)close(fd[1]); + err = errno; + close(fd[0]); + close(fd[1]); goto error; case 0: /* child */ if (mode[0] == 'r') { - (void)close(fd[0]); - (void)dup2(fd[1], 1); + close(fd[0]); + dup2(fd[1], 1); } else { - (void)close(fd[1]); - (void)dup2(fd[0], 0); + close(fd[1]); + dup2(fd[0], 0); } - (void)execl("/bin/sh", "/bin/sh", "-c", cmd, NULL); + execl("/bin/sh", "/bin/sh", "-c", cmd, NULL); exit(EXIT_FAILURE); } if (mode[0] == 'r') { if (close(fd[1])) { - (void)close(fd[0]); + err = errno; + close(fd[0]); goto error; } } else { if (close(fd[0])) { - (void)close(fd[1]); + err = errno; + close(fd[1]); goto error; } fd[0] = fd[1]; @@ -846,17 +664,15 @@ stream_popen(lua_State *T) /* make the pipe non-blocking */ if (fcntl(fd[0], F_SETFL, O_NONBLOCK) < 0) { - (void)close(fd[0]); + err = errno; + close(fd[0]); goto error; } - if (mode[0] == 'r') - (void)istream_new(T, fd[0], lua_upvalueindex(1)); - else - (void)ostream_new(T, fd[0], lua_upvalueindex(2)); + stream_new(T, fd[0], lua_upvalueindex(1)); return 1; error: lua_pushnil(T); - lua_pushstring(T, strerror(errno)); + lua_pushstring(T, strerror(err)); return 2; } diff --git a/lem/io/tcp.c b/lem/io/tcp.c index fd13760..23fea2a 100644 --- a/lem/io/tcp.c +++ b/lem/io/tcp.c @@ -17,13 +17,13 @@ */ static void -connect_handler(EV_P_ struct ev_io *w, int revents) +tcp_connect_cb(EV_P_ struct ev_io *w, int revents) { (void)revents; lem_debug("connection established"); ev_io_stop(EV_A_ w); - lem_queue(w->data, 2); + lem_queue(w->data, 1); w->data = NULL; } @@ -45,8 +45,7 @@ tcp_connect(lua_State *T) struct addrinfo *ainfo; int sock; int ret; - struct istream *is; - struct ostream *os; + struct stream *s; /* lookup name */ ret = getaddrinfo(addr, NULL, &hints, &ainfo); @@ -99,10 +98,7 @@ tcp_connect(lua_State *T) } lua_settop(T, 1); - is = istream_new(T, sock, lua_upvalueindex(1)); - os = ostream_new(T, sock, lua_upvalueindex(2)); - is->twin = os; - os->twin = is; + s = stream_new(T, sock, lua_upvalueindex(1)); /* connect */ ret = connect(sock, ainfo->ai_addr, ainfo->ai_addrlen); @@ -114,10 +110,10 @@ tcp_connect(lua_State *T) if (errno == EINPROGRESS) { lem_debug("EINPROGRESS"); - os->w.data = T; - os->w.cb = connect_handler; - ev_io_start(LEM_ &os->w); - return lua_yield(T, 3); + s->w.data = T; + s->w.cb = tcp_connect_cb; + ev_io_start(LEM_ &s->w); + return lua_yield(T, 2); } close(sock); @@ -128,7 +124,7 @@ tcp_connect(lua_State *T) } static int -common_listen(lua_State *T, struct sockaddr *address, socklen_t alen, +tcp_listen(lua_State *T, struct sockaddr *address, socklen_t alen, int sock, int backlog) { struct ev_io *w; @@ -212,7 +208,7 @@ tcp4_listen(lua_State *T) return 2; } - return common_listen(T, (struct sockaddr *)&address, + return tcp_listen(T, (struct sockaddr *)&address, sizeof(struct sockaddr_in), sock, backlog); } @@ -246,6 +242,6 @@ tcp6_listen(lua_State *T) return 2; } - return common_listen(T, (struct sockaddr *)&address, + return tcp_listen(T, (struct sockaddr *)&address, sizeof(struct sockaddr_in6), sock, backlog); } diff --git a/test/ctest.lua b/test/ctest.lua index e33b372..399fd3a 100755 --- a/test/ctest.lua +++ b/test/ctest.lua @@ -22,12 +22,12 @@ print('Entered ' .. arg[0]) local utils = require 'lem.utils' local io = require 'lem.io' -local iconn, oconn = assert(io.tcp_connect('127.0.0.1', arg[1] or 8080)) +local conn = assert(io.tcp_connect('127.0.0.1', arg[1] or 8080)) for i = 1, 10 do - assert(oconn:write('ping\n')) + assert(conn:write('ping\n')) - local line, err = iconn:read('*l') + local line, err = conn:read('*l') if not line then if err == 'closed' then print("Server closed connection") @@ -40,7 +40,7 @@ for i = 1, 10 do print("Server answered: '" .. line .. "'") end -oconn:write('quit\n') +conn:write('quit\n') print('Exiting ' .. arg[0]) diff --git a/test/httptest.lua b/test/httptest.lua index 85d21d1..1e88f28 100755 --- a/test/httptest.lua +++ b/test/httptest.lua @@ -27,22 +27,23 @@ local concat = table.concat local done = false utils.spawn(function() - --local istream, ostream = assert(io.tcp_connect('www.google.dk', 80)) - local istream, ostream = assert(io.tcp_connect('127.0.0.1', 8080)) + local conn = assert(io.tcp_connect('www.google.dk', 80)) + --local conn = assert(io.tcp_connect('127.0.0.1', 8080)) print('\nConnected.') for i = 1, 2 do - --assert(ostream:write('GET / HTTP/1.1\r\nHost: www.google.dk\r\nConnection: close\r\n\r\n')) - assert(ostream:write('GET / HTTP/1.1\r\nHost: www.google.dk\r\n\r\n')) + --assert(conn:write('GET / HTTP/1.1\r\nHost: www.google.dk\r\nConnection: close\r\n\r\n')) + assert(conn:write('GET / HTTP/1.1\r\nHost: www.google.dk\r\n\r\n')) - local res = assert(istream:read('HTTPResponse')) + local res = assert(conn:read('HTTPResponse')) print(format('\nHTTP/%s %d %s', res.version, res.status, res.text)) for k, v in pairs(res.headers) do print(format('%s: %s', k, v)) end + --print(format('\n#body = %d', #assert(conn:read('*a')))) print(format('\n#body = %d', #assert(res:body()))) end @@ -50,9 +51,11 @@ utils.spawn(function() end) local write, yield = io.write, utils.yield +local sleeper = utils.newsleeper() repeat write('.') - yield() + --yield() + sleeper:sleep(0.001) until done -- vim: set ts=2 sw=2 noet: diff --git a/test/multiplexing.lua b/test/multiplexing.lua index fc421cb..d66d710 100755 --- a/test/multiplexing.lua +++ b/test/multiplexing.lua @@ -27,7 +27,7 @@ local stdout = queue.wrap(io.stdout) do local format = string.format - function queue.QOStream:printf(...) + function queue.Stream:printf(...) return self:write(format(...)) end end diff --git a/test/stest.lua b/test/stest.lua index b791bc8..60ea03d 100755 --- a/test/stest.lua +++ b/test/stest.lua @@ -31,12 +31,12 @@ utils.spawn(function() server:close() end) -local ok, err = server:autospawn(function(i, o) +local ok, err = server:autospawn(function(client) print 'Accepted a connection' local sleeper = utils.newsleeper() while true do - local line, err = i:read('*l') + local line, err = client:read('*l') if not line then if err == 'closed' then print("Client closed connection") @@ -50,12 +50,11 @@ local ok, err = server:autospawn(function(i, o) if line ~= 'ping' then break end sleeper:sleep(0.4) - assert(o:write('pong\n')) + assert(client:write('pong\n')) end print "Ok, I'm out" - assert(i:close()) - assert(o:close()) + assert(client:close()) end) if not ok and err ~= 'interrupted' then error(err) end |