summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEmil Renner Berthing <esmil@mailme.dk>2012-12-11 23:59:37 +0100
committerEmil Renner Berthing <esmil@mailme.dk>2012-12-17 10:11:06 +0100
commitf25118928aa35861b370a6529c651a28f5c8859b (patch)
tree63f07f33cd207f947ea43bef5d92f5311b2e758b
parentb738f523cc7a56602e07ff54bd11203355a64af9 (diff)
downloadlem-f25118928aa35861b370a6529c651a28f5c8859b.tar.gz
lem-f25118928aa35861b370a6529c651a28f5c8859b.tar.xz
lem-f25118928aa35861b370a6529c651a28f5c8859b.zip
io: rework streams
-rw-r--r--lem/hathaway.lua18
-rw-r--r--lem/http.lua16
-rw-r--r--lem/http/core.c2
-rw-r--r--lem/io.lua4
-rw-r--r--lem/io/core.c122
-rw-r--r--lem/io/queue.lua18
-rw-r--r--lem/io/server.c88
-rw-r--r--lem/io/stream.c508
-rw-r--r--lem/io/tcp.c26
-rwxr-xr-xtest/ctest.lua8
-rwxr-xr-xtest/httptest.lua15
-rwxr-xr-xtest/multiplexing.lua2
-rwxr-xr-xtest/stest.lua9
13 files changed, 290 insertions, 546 deletions
diff --git a/lem/hathaway.lua b/lem/hathaway.lua
index bdc4535..2cde2ca 100644
--- a/lem/hathaway.lua
+++ b/lem/hathaway.lua
@@ -226,14 +226,13 @@ do
return true
end
- local function handler(istream, ostream)
+ local function handler(client)
repeat
- local req, err = istream:read('HTTPRequest')
+ local req, err = client:read('HTTPRequest')
if not req then M.debug(err) break end
local method, uri, version = req.method, req.uri, req.version
M.debug(format("%s %s HTTP/%s", method, uri, version))
- req.ostream = ostream
local res = new_response(req)
if version ~= '1.0' and version ~= '1.1' then
@@ -322,31 +321,30 @@ do
i = i + 1
robe[i] = '\r\n'
- local ok, err = ostream:cork()
+ local ok, err = client:cork()
if not ok then M.debug(err) break end
- local ok, err = ostream:write(concat(robe))
+ local ok, err = client:write(concat(robe))
if not ok then M.debug(err) break end
if method ~= 'HEAD' then
if file then
- ok, err = ostream:sendfile(file)
+ ok, err = client:sendfile(file)
if close then file:close() end
else
- ok, err = ostream:write(concat(res))
+ ok, err = client:write(concat(res))
end
if not ok then M.debug(err) break end
end
- local ok, err = ostream:uncork()
+ local ok, err = client:uncork()
if not ok then M.debug(err) break end
until version == '1.0'
or req.headers['Connection'] == 'close'
or headers['Connection'] == 'close'
- istream:close()
- ostream:close()
+ client:close()
end
function M.Hathaway(address, port)
diff --git a/lem/http.lua b/lem/http.lua
index 6dd25b9..8a14961 100644
--- a/lem/http.lua
+++ b/lem/http.lua
@@ -35,40 +35,40 @@ function http.Request:body()
if len <= 0 then return body end
if self.headers['Expect'] == '100-continue' then
- local ok, err = self.ostream:send('HTTP/1.1 100 Continue\r\n\r\n')
+ local ok, err = self.client:send('HTTP/1.1 100 Continue\r\n\r\n')
if not ok then return nil, err end
end
local err
- body, err = self.istream:read(len)
+ body, err = self.client:read(len)
if not body then return nil, err end
return body
end
function http.Response:body_chunked()
- local istream = self.istream
+ local client = self.client
local t, n = {}, 0
local line, err
while true do
- line, err = istream:read('*l')
+ line, err = client:read('*l')
if not line then return nil, err end
local num = tonumber(line, 16)
if not num then return nil, 'expectation failed' end
if num == 0 then break end
- local data, err = istream:read(num)
+ local data, err = client:read(num)
if not data then return nil, err end
n = n + 1
t[n] = data
- line, err = istream:read('*l')
+ line, err = client:read('*l')
if not line then return nil, err end
end
- line, err = istream:read('*l')
+ line, err = client:read('*l')
if not line then return nil, err end
return t
@@ -85,7 +85,7 @@ function http.Response:body()
num = tonumber(num)
if not num then return nil, 'invalid content length' end
- return self.istream:read(num)
+ return self.client:read(num)
end
return http
diff --git a/lem/http/core.c b/lem/http/core.c
index 3c2d6b3..bfaa99b 100644
--- a/lem/http/core.c
+++ b/lem/http/core.c
@@ -165,7 +165,7 @@ parse_http_init(lua_State *T)
lua_settop(T, 2);
lua_createtable(T, 0, 5);
lua_pushvalue(T, 1);
- lua_setfield(T, -2, "istream");
+ lua_setfield(T, -2, "client");
}
static void
diff --git a/lem/io.lua b/lem/io.lua
index 273a8ad..517e61e 100644
--- a/lem/io.lua
+++ b/lem/io.lua
@@ -42,12 +42,12 @@ do
end
end
- io.IStream.read = io.reader(io.IStream.readp)
+ io.Stream.read = io.reader(io.Stream.readp)
io.File.read = io.reader(io.File.readp)
end
do
- local _write, stdout = io.OStream.write, io.stdout
+ local _write, stdout = io.Stream.write, io.stdout
function io.write(str)
return _write(stdout, str)
diff --git a/lem/io/core.c b/lem/io/core.c
index da3241e..5f964be 100644
--- a/lem/io/core.c
+++ b/lem/io/core.c
@@ -20,6 +20,7 @@
#include <unistd.h>
#include <errno.h>
#include <string.h>
+#include <assert.h>
#include <fcntl.h>
#include <sys/stat.h>
@@ -83,16 +84,12 @@ module_index(lua_State *T)
return 2;
}
- if (fd == 0)
- (void)istream_new(T, fd, lua_upvalueindex(1));
- else
- (void)ostream_new(T, fd, lua_upvalueindex(2));
+ stream_new(T, fd, lua_upvalueindex(1));
/* save this object so we don't initialize it again */
lua_pushvalue(T, 2);
lua_pushvalue(T, -2);
lua_rawset(T, 1);
-
return 1;
}
@@ -124,52 +121,49 @@ luaopen_lem_io_core(lua_State *L)
lua_pushcclosure(L, sendfile_open, 1);
lua_setfield(L, -2, "sendfile");
- /* create metatable for input stream objects */
+ /* create File metatable */
lua_newtable(L);
/* mt.__index = mt */
lua_pushvalue(L, -1);
lua_setfield(L, -2, "__index");
- /* mt.__gc = <stream_close> */
- lua_pushcfunction(L, istream_close);
+ /* mt.__gc = <file_gc> */
+ lua_pushcfunction(L, file_gc);
lua_setfield(L, -2, "__gc");
- /* mt.closed = <stream_closed> */
- lua_pushcfunction(L, stream_closed);
+ /* mt.closed = <file_closed> */
+ lua_pushcfunction(L, file_closed);
lua_setfield(L, -2, "closed");
- /* mt.busy = <stream_busy> */
- lua_pushcfunction(L, stream_busy);
- lua_setfield(L, -2, "busy");
- /* mt.interrupt = <stream_interrupt> */
- lua_pushcfunction(L, stream_interrupt);
- lua_setfield(L, -2, "interrupt");
- /* mt.close = <istream_close> */
- lua_pushcfunction(L, istream_close);
+ /* mt.close = <file_close> */
+ lua_pushcfunction(L, file_close);
lua_setfield(L, -2, "close");
- /* mt.readp = <stream_readp> */
- lua_pushcfunction(L, stream_readp);
+ /* mt.readp = <file_readp> */
+ lua_pushcfunction(L, file_readp);
lua_setfield(L, -2, "readp");
+ /* mt.write = <file_write> */
+ lua_pushcfunction(L, file_write);
+ lua_setfield(L, -2, "write");
+ /* mt.seek = <file_seek> */
+ lua_pushcfunction(L, file_seek);
+ lua_setfield(L, -2, "seek");
/* insert table */
- lua_setfield(L, -2, "IStream");
+ lua_setfield(L, -2, "File");
- /* create metatable for output stream objects */
+ /* create Stream metatable */
lua_newtable(L);
/* mt.__index = mt */
lua_pushvalue(L, -1);
lua_setfield(L, -2, "__index");
- /* mt.__gc = <ostream_close> */
- lua_pushcfunction(L, ostream_close);
+ /* mt.__gc = <stream_gc> */
+ lua_pushcfunction(L, stream_gc);
lua_setfield(L, -2, "__gc");
/* mt.closed = <stream_closed> */
lua_pushcfunction(L, stream_closed);
lua_setfield(L, -2, "closed");
- /* mt.busy = <stream_busy> */
- lua_pushcfunction(L, stream_busy);
- lua_setfield(L, -2, "busy");
- /* mt.interrupt = <stream_interrupt> */
- lua_pushcfunction(L, stream_interrupt);
- lua_setfield(L, -2, "interrupt");
- /* mt.close = <ostream_close> */
- lua_pushcfunction(L, ostream_close);
+ /* mt.close = <stream_close> */
+ lua_pushcfunction(L, stream_close);
lua_setfield(L, -2, "close");
+ /* mt.readp = <stream_readp> */
+ lua_pushcfunction(L, stream_readp);
+ lua_setfield(L, -2, "readp");
/* mt.write = <stream_write> */
lua_pushcfunction(L, stream_write);
lua_setfield(L, -2, "write");
@@ -179,39 +173,13 @@ luaopen_lem_io_core(lua_State *L)
/* mt.uncork = <stream_uncork> */
lua_pushcfunction(L, stream_uncork);
lua_setfield(L, -2, "uncork");
- /* mt.sendfile = <ostream_sendfile> */
+ /* mt.sendfile = <stream_sendfile> */
lua_pushcfunction(L, stream_sendfile);
lua_setfield(L, -2, "sendfile");
/* insert table */
- lua_setfield(L, -2, "OStream");
-
- /* create File metatable */
- lua_newtable(L);
- /* mt.__index = mt */
- lua_pushvalue(L, -1);
- lua_setfield(L, -2, "__index");
- /* mt.__gc = <file_gc> */
- lua_pushcfunction(L, file_gc);
- lua_setfield(L, -2, "__gc");
- /* mt.closed = <file_closed> */
- lua_pushcfunction(L, file_closed);
- lua_setfield(L, -2, "closed");
- /* mt.close = <file_close> */
- lua_pushcfunction(L, file_close);
- lua_setfield(L, -2, "close");
- /* mt.readp = <file_readp> */
- lua_pushcfunction(L, file_readp);
- lua_setfield(L, -2, "readp");
- /* mt.write = <file_write> */
- lua_pushcfunction(L, file_write);
- lua_setfield(L, -2, "write");
- /* mt.seek = <file_seek> */
- lua_pushcfunction(L, file_seek);
- lua_setfield(L, -2, "seek");
- /* insert table */
- lua_setfield(L, -2, "File");
+ lua_setfield(L, -2, "Stream");
- /* create metatable for server objects */
+ /* create Server metatable */
lua_newtable(L);
/* mt.__index = mt */
lua_pushvalue(L, -1);
@@ -232,34 +200,29 @@ luaopen_lem_io_core(lua_State *L)
lua_pushcfunction(L, server_interrupt);
lua_setfield(L, -2, "interrupt");
/* mt.accept = <server_accept> */
- lua_getfield(L, -2, "IStream"); /* upvalue 1 = IStream */
- lua_getfield(L, -3, "OStream"); /* upvalue 2 = OStream */
- lua_pushcclosure(L, server_accept, 2);
+ lua_getfield(L, -2, "Stream"); /* upvalue 1 = Stream */
+ lua_pushcclosure(L, server_accept, 1);
lua_setfield(L, -2, "accept");
/* mt.autospawn = <server_autospawn> */
- lua_getfield(L, -2, "IStream"); /* upvalue 1 = IStream */
- lua_getfield(L, -3, "OStream"); /* upvalue 2 = OStream */
- lua_pushcclosure(L, server_autospawn, 2);
+ lua_getfield(L, -2, "Stream"); /* upvalue 1 = Stream */
+ lua_pushcclosure(L, server_autospawn, 1);
lua_setfield(L, -2, "autospawn");
/* insert table */
lua_setfield(L, -2, "Server");
/* insert open function */
- lua_getfield(L, -1, "IStream"); /* upvalue 1 = IStream */
- lua_getfield(L, -2, "OStream"); /* upvalue 2 = OStream */
- lua_getfield(L, -3, "File"); /* upvalue 3 = File */
- lua_pushcclosure(L, stream_open, 3);
+ lua_getfield(L, -1, "File"); /* upvalue 1 = File */
+ lua_getfield(L, -2, "Stream"); /* upvalue 2 = Stream */
+ lua_pushcclosure(L, stream_open, 2);
lua_setfield(L, -2, "open");
/* insert popen function */
- lua_getfield(L, -1, "IStream"); /* upvalue 1 = IStream */
- lua_getfield(L, -2, "OStream"); /* upvalue 2 = OStream */
- lua_pushcclosure(L, stream_popen, 2);
+ lua_getfield(L, -1, "Stream"); /* upvalue 1 = Stream */
+ lua_pushcclosure(L, stream_popen, 1);
lua_setfield(L, -2, "popen");
/* insert the connect function */
- lua_getfield(L, -1, "IStream"); /* upvalue 1 = IStream */
- lua_getfield(L, -2, "OStream"); /* upvalue 2 = OStream */
- lua_pushcclosure(L, tcp_connect, 2);
+ lua_getfield(L, -1, "Stream"); /* upvalue 1 = Stream */
+ lua_pushcclosure(L, tcp_connect, 1);
lua_setfield(L, -2, "tcp_connect");
/* insert the tcp4_listen function */
lua_getfield(L, -1, "Server"); /* upvalue 1 = Server */
@@ -290,9 +253,8 @@ luaopen_lem_io_core(lua_State *L)
/* create metatable for the module */
lua_newtable(L);
/* insert the index function */
- lua_getfield(L, -2, "IStream"); /* upvalue 1 = IStream */
- lua_getfield(L, -3, "OStream"); /* upvalue 2 = OStream */
- lua_pushcclosure(L, module_index, 2);
+ lua_getfield(L, -2, "Stream"); /* upvalue 1 = Stream */
+ lua_pushcclosure(L, module_index, 1);
lua_setfield(L, -2, "__index");
/* set the metatable */
diff --git a/lem/io/queue.lua b/lem/io/queue.lua
index 63bda6d..70ee887 100644
--- a/lem/io/queue.lua
+++ b/lem/io/queue.lua
@@ -22,22 +22,18 @@ local setmetatable = setmetatable
local thisthread, suspend, resume
= utils.thisthread, utils.suspend, utils.resume
-local QOStream = {}
-QOStream.__index = QOStream
+local Stream = {}
+Stream.__index = Stream
-function QOStream:closed(...)
+function Stream:closed(...)
return self.stream:closed(...)
end
-function QOStream:interrupt(...)
- return self.stream:interrupt(...)
-end
-
-function QOStream:close(...)
+function Stream:close(...)
return self.stream:close(...)
end
-function QOStream:write(...)
+function Stream:write(...)
local nxt = self.next
if nxt == 0 then
nxt = 1
@@ -67,11 +63,11 @@ end
local function wrap(stream, ...)
if not stream then return stream, ... end
- return setmetatable({ stream = stream, next = 0 }, QOStream)
+ return setmetatable({ stream = stream, next = 0 }, Stream)
end
return {
- QOStream = QOStream,
+ Stream = Stream,
wrap = wrap,
}
diff --git a/lem/io/server.c b/lem/io/server.c
index d318609..df63593 100644
--- a/lem/io/server.c
+++ b/lem/io/server.c
@@ -102,22 +102,23 @@ server_interrupt(lua_State *T)
}
static int
-try_accept(lua_State *T, struct ev_io *w)
+server__accept(lua_State *T, struct ev_io *w)
{
struct sockaddr client_addr;
unsigned int client_addrlen;
int sock;
- struct istream *is;
- struct ostream *os;
sock = accept(w->fd, &client_addr, &client_addrlen);
if (sock < 0) {
if (errno == EAGAIN || errno == ECONNABORTED)
return 0;
+
+ close(w->fd);
+ w->fd = -1;
lua_pushnil(T);
lua_pushfstring(T, "error accepting connection: %s",
strerror(errno));
- return -1;
+ return 2;
}
/* make the socket non-blocking */
@@ -126,43 +127,34 @@ try_accept(lua_State *T, struct ev_io *w)
lua_pushnil(T);
lua_pushfstring(T, "error making socket non-blocking: %s",
strerror(errno));
- return -1;
+ return 2;
}
- is = istream_new(T, sock, lua_upvalueindex(1));
- os = ostream_new(T, sock, lua_upvalueindex(2));
- is->twin = os;
- os->twin = is;
-
+ stream_new(T, sock, lua_upvalueindex(1));
return 1;
}
static void
-server_accept_handler(EV_P_ struct ev_io *w, int revents)
+server_accept_cb(EV_P_ struct ev_io *w, int revents)
{
+ int ret;
+
(void)revents;
- switch (try_accept(w->data, w)) {
- case 0:
+ ret = server__accept(w->data, w);
+ if (ret == 0)
return;
- case 1:
- break;
-
- default:
- close(w->fd);
- w->fd = -1;
- }
-
- ev_io_stop(EV_A_ w);
- lem_queue(w->data, 2);
w->data = NULL;
+ ev_io_stop(EV_A_ w);
+ lem_queue(w->data, ret);
}
static int
server_accept(lua_State *T)
{
struct ev_io *w;
+ int ret;
luaL_checktype(T, 1, LUA_TUSERDATA);
w = lua_touserdata(T, 1);
@@ -171,38 +163,25 @@ server_accept(lua_State *T)
if (w->data != NULL)
return io_busy(T);
- switch (try_accept(T, w)) {
- case 0:
- w->cb = server_accept_handler;
- w->data = T;
- ev_io_start(LEM_ w);
-
- /* yield server object */
- lua_settop(T, 1);
- return lua_yield(T, 1);
-
- case 1:
- break;
-
-
- default:
- close(w->fd);
- w->fd = -1;
- }
+ ret = server__accept(T, w);
+ if (ret > 0)
+ return ret;
- return 2;
+ lua_settop(T, 1);
+ w->cb = server_accept_cb;
+ w->data = T;
+ ev_io_start(LEM_ w);
+ return lua_yield(T, 1);
}
static void
-server_autospawn_handler(EV_P_ struct ev_io *w, int revents)
+server_autospawn_cb(EV_P_ struct ev_io *w, int revents)
{
lua_State *T = w->data;
struct sockaddr client_addr;
unsigned int client_addrlen;
int sock;
lua_State *S;
- struct istream *is;
- struct ostream *os;
(void)revents;
@@ -233,25 +212,20 @@ server_autospawn_handler(EV_P_ struct ev_io *w, int revents)
lua_pushvalue(T, 2);
lua_xmove(T, S, 1);
- /* create streams */
- is = istream_new(T, sock, lua_upvalueindex(1));
- os = ostream_new(T, sock, lua_upvalueindex(2));
- is->twin = os;
- os->twin = is;
-
- /* move streams to new thread */
- lua_xmove(T, S, 2);
+ /* create stream */
+ stream_new(T, sock, lua_upvalueindex(1));
+ /* move stream to new thread */
+ lua_xmove(T, S, 1);
- lem_queue(S, 2);
+ lem_queue(S, 1);
return;
error:
ev_io_stop(EV_A_ w);
close(w->fd);
w->fd = -1;
-
- lem_queue(T, 2);
w->data = NULL;
+ lem_queue(T, 2);
}
static int
@@ -268,7 +242,7 @@ server_autospawn(lua_State *T)
if (w->data != NULL)
return io_busy(T);
- w->cb = server_autospawn_handler;
+ w->cb = server_autospawn_cb;
w->data = T;
ev_io_start(LEM_ w);
diff --git a/lem/io/stream.c b/lem/io/stream.c
index 62a12fa..9c36484 100644
--- a/lem/io/stream.c
+++ b/lem/io/stream.c
@@ -16,235 +16,151 @@
* along with LEM. If not, see <http://www.gnu.org/licenses/>.
*/
-struct ostream;
-
-struct istream {
- ev_io w;
- struct ostream *twin;
+struct stream {
+ struct ev_io r;
+ struct ev_io w;
+ const char *out;
+ size_t out_len;
struct lem_parser *p;
struct lem_inputbuf buf;
};
-struct ostream {
- ev_io w;
- struct istream *twin;
- const char *data;
- size_t len;
-};
+#define STREAM_FROM_WATCH(w, member)\
+ (struct stream *)(((char *)w) - offsetof(struct stream, member))
-static struct istream *
-istream_new(lua_State *T, int fd, int mt)
+static struct stream *
+stream_new(lua_State *T, int fd, int mt)
{
- struct istream *s;
-
/* create userdata and set the metatable */
- s = lua_newuserdata(T, sizeof(struct istream));
+ struct stream *s = lua_newuserdata(T, sizeof(struct stream));
lua_pushvalue(T, mt);
lua_setmetatable(T, -2);
/* initialize userdata */
- ev_io_init(&s->w, NULL, fd, EV_READ);
+ ev_io_init(&s->r, NULL, fd, EV_READ);
+ ev_io_init(&s->w, NULL, fd, EV_WRITE);
+ s->r.data = NULL;
s->w.data = NULL;
- s->twin = NULL;
s->buf.start = s->buf.end = 0;
return s;
}
static int
-stream_closed(lua_State *T)
-{
- struct ev_io *w;
-
- luaL_checktype(T, 1, LUA_TUSERDATA);
- w = lua_touserdata(T, 1);
- lua_pushboolean(T, w->fd < 0);
- return 1;
-}
-
-static int
-stream_busy(lua_State *T)
-{
- struct ev_io *w;
-
- luaL_checktype(T, 1, LUA_TUSERDATA);
- w = lua_touserdata(T, 1);
- lua_pushboolean(T, w->data != NULL);
- return 1;
-}
-
-static int
-stream_interrupt(lua_State *T)
-{
- struct ev_io *w;
- lua_State *S;
-
- luaL_checktype(T, 1, LUA_TUSERDATA);
- w = lua_touserdata(T, 1);
- S = w->data;
- if (S == NULL) {
- lua_pushnil(T);
- lua_pushliteral(T, "not busy");
- return 2;
- }
-
- lem_debug("interrupting io action");
- ev_io_stop(LEM_ w);
- w->data = NULL;
- lua_settop(S, 0);
- lua_pushnil(S);
- lua_pushliteral(S, "interrupted");
- lem_queue(S, 2);
-
- lua_pushboolean(T, 1);
- return 1;
-}
-
-static int
-stream_close_check(lua_State *T, struct ev_io *w)
+stream_gc(lua_State *T)
{
- if (w->fd < 0)
- return io_closed(T);
+ struct stream *s = lua_touserdata(T, 1);
- if (w->data != NULL) {
- lua_State *S = w->data;
-
- lem_debug("interrupting io action");
- ev_io_stop(LEM_ w);
- w->data = NULL;
- lua_settop(S, 0);
- lua_pushnil(S);
- lua_pushliteral(S, "interrupted");
- lem_queue(S, 2);
- }
+ if (s->r.fd >= 0)
+ close(s->r.fd);
return 0;
}
static int
-istream_close(lua_State *T)
+stream_closed(lua_State *T)
{
- struct istream *s;
+ struct stream *s;
luaL_checktype(T, 1, LUA_TUSERDATA);
s = lua_touserdata(T, 1);
-
- lem_debug("collecting %d", s->w.fd);
- if (stream_close_check(T, &s->w))
- return 2;
-
- if (s->twin)
- s->twin->twin = NULL;
- else if (close(s->w.fd)) {
- s->w.fd = -1;
- lua_pushnil(T);
- lua_pushstring(T, strerror(errno));
- return 2;
- }
-
- s->w.fd = -1;
- lua_pushboolean(T, 1);
+ lua_pushboolean(T, s->r.fd < 0);
return 1;
}
static int
-ostream_close(lua_State *T)
+stream_close(lua_State *T)
{
- struct ostream *s;
+ struct stream *s;
+ int ret;
luaL_checktype(T, 1, LUA_TUSERDATA);
s = lua_touserdata(T, 1);
+ if (s->r.fd < 0)
+ return io_closed(T);
+ if (s->r.data != NULL || s->w.data != NULL)
+ return io_busy(T);
- lem_debug("collecting %d", s->w.fd);
- if (stream_close_check(T, &s->w))
- return 2;
-
- if (s->twin)
- s->twin->twin = NULL;
- else if (close(s->w.fd)) {
- s->w.fd = -1;
+ ret = close(s->r.fd);
+ s->r.fd = s->w.fd = -1;
+ if (ret) {
lua_pushnil(T);
lua_pushstring(T, strerror(errno));
return 2;
}
- s->w.fd = -1;
lua_pushboolean(T, 1);
return 1;
}
/*
- * istream:readp() method
+ * stream:readp() method
*/
-
-static void
-stream_readp_handler(EV_P_ ev_io *w, int revents)
+static int
+stream__readp(lua_State *T, struct stream *s)
{
- struct istream *s = (struct istream *)w;
- lua_State *T = s->w.data;
ssize_t bytes;
int ret;
- enum lem_preason reason;
- const char *msg;
-
- (void)revents;
+ int err;
+ enum lem_preason res;
- while ((bytes = read(s->w.fd, s->buf.buf + s->buf.end,
+ while ((bytes = read(s->r.fd, s->buf.buf + s->buf.end,
LEM_INPUTBUF_SIZE - s->buf.end)) > 0) {
- lem_debug("read %ld bytes from %d", bytes, s->w.fd);
+ lem_debug("read %ld bytes from %d", bytes, s->r.fd);
s->buf.end += bytes;
ret = s->p->process(T, &s->buf);
- if (ret > 0) {
- ev_io_stop(EV_A_ &s->w);
- s->w.data = NULL;
- lem_queue(T, ret);
- return;
- }
+ if (ret > 0)
+ return ret;
}
- lem_debug("read %ld bytes from %d", bytes, s->w.fd);
-
- if (bytes < 0 && errno == EAGAIN)
- return;
+ err = errno;
+ lem_debug("read %ld bytes from %d", bytes, s->r.fd);
- ev_io_stop(EV_A_ &s->w);
- s->w.data = NULL;
+ if (bytes < 0 && err == EAGAIN)
+ return 0;
- if (s->twin)
- s->twin->twin = NULL;
+ if (bytes == 0 || err == ECONNRESET || err == EPIPE)
+ res = LEM_PCLOSED;
else
- (void)close(s->w.fd);
- s->w.fd = -1;
+ res = LEM_PERROR;
- if (bytes == 0 || errno == ECONNRESET || errno == EPIPE) {
- reason = LEM_PCLOSED;
- msg = "closed";
- } else {
- reason = LEM_PERROR;
- msg = strerror(errno);
- }
-
- if (s->p->destroy && (ret = s->p->destroy(T, &s->buf, reason)) > 0) {
- lem_queue(T, ret);
- return;
- }
+ if (s->p->destroy && (ret = s->p->destroy(T, &s->buf, res)) > 0)
+ return ret;
lua_settop(T, 0);
+ if (res == LEM_PCLOSED)
+ return io_closed(T);
+
lua_pushnil(T);
- lua_pushstring(T, msg);
- lem_queue(T, 2);
+ lua_pushstring(T, strerror(err));
+ return 2;
+}
+
+static void
+stream_readp_cb(EV_P_ struct ev_io *w, int revents)
+{
+ struct stream *s = STREAM_FROM_WATCH(w, r);
+ lua_State *T = s->r.data;
+ int ret;
+
+ (void)revents;
+
+ ret = stream__readp(T, s);
+ if (ret == 0)
+ return;
+
+ ev_io_stop(EV_A_ &s->r);
+ s->r.data = NULL;
+ lem_queue(T, ret);
}
static int
stream_readp(lua_State *T)
{
- struct istream *s;
+ struct stream *s;
struct lem_parser *p;
int ret;
- ssize_t bytes;
- enum lem_preason reason;
- const char *msg;
luaL_checktype(T, 1, LUA_TUSERDATA);
ret = lua_type(T, 2);
@@ -252,130 +168,91 @@ stream_readp(lua_State *T)
return luaL_argerror(T, 2, "expected userdata");
s = lua_touserdata(T, 1);
- if (s->w.fd < 0)
+ if (s->r.fd < 0)
return io_closed(T);
- if (s->w.data != NULL)
+ if (s->r.data != NULL)
return io_busy(T);
p = lua_touserdata(T, 2);
if (p->init)
p->init(T, &s->buf);
-again:
ret = p->process(T, &s->buf);
if (ret > 0)
return ret;
- bytes = read(s->w.fd, s->buf.buf + s->buf.end, LEM_INPUTBUF_SIZE - s->buf.end);
- lem_debug("read %ld bytes from %d", bytes, s->w.fd);
- if (bytes > 0) {
- s->buf.end += bytes;
- goto again;
- }
-
- if (bytes < 0 && errno == EAGAIN) {
- s->p = p;
- s->w.data = T;
- s->w.cb = stream_readp_handler;
- ev_io_start(LEM_ &s->w);
- return lua_yield(T, lua_gettop(T));
- }
-
- if (s->twin)
- s->twin->twin = NULL;
- else
- (void)close(s->w.fd);
- s->w.fd = -1;
-
- if (bytes == 0 || errno == ECONNRESET || errno == EPIPE) {
- reason = LEM_PCLOSED;
- msg = "closed";
- } else {
- reason = LEM_PERROR;
- msg = strerror(errno);
- }
-
- if (p->destroy && (ret = p->destroy(T, &s->buf, reason)) > 0)
+ s->p = p;
+ ret = stream__readp(T, s);
+ if (ret > 0)
return ret;
- lua_settop(T, 0);
- lua_pushnil(T);
- lua_pushstring(T, msg);
- return 2;
+ s->r.data = T;
+ s->r.cb = stream_readp_cb;
+ ev_io_start(LEM_ &s->r);
+ return lua_yield(T, lua_gettop(T));
}
-static struct ostream *
-ostream_new(lua_State *T, int fd, int mt)
+/*
+ * stream:write() method
+ */
+static int
+stream__write(lua_State *T, struct stream *s)
{
- struct ostream *s;
+ ssize_t bytes;
+ int err;
- /* create userdata and set the metatable */
- s = lua_newuserdata(T, sizeof(struct ostream));
- lua_pushvalue(T, mt);
- lua_setmetatable(T, -2);
+ while ((bytes = write(s->w.fd, s->out, s->out_len)) > 0) {
+ s->out_len -= bytes;
+ if (s->out_len == 0) {
+ lua_pushboolean(T, 1);
+ return 1;
+ }
+ s->out += bytes;
+ }
+ err = errno;
- /* initialize userdata */
- ev_io_init(&s->w, NULL, fd, EV_WRITE);
- s->w.data = NULL;
- s->twin = NULL;
+ if (bytes < 0 && err == EAGAIN)
+ return 0;
- return s;
+ close(s->w.fd);
+ s->w.fd = s->r.fd = -1;
+
+ if (bytes == 0 || err == ECONNRESET || err == EPIPE)
+ return io_closed(T);
+
+ lua_pushnil(T);
+ lua_pushstring(T, strerror(err));
+ return 2;
}
static void
-stream_write_handler(EV_P_ struct ev_io *w, int revents)
+stream_write_cb(EV_P_ struct ev_io *w, int revents)
{
- struct ostream *s = (struct ostream *)w;
+ struct stream *s = STREAM_FROM_WATCH(w, w);
lua_State *T = s->w.data;
- ssize_t bytes;
+ int ret;
(void)revents;
-again:
- bytes = write(s->w.fd, s->data, s->len);
- if (bytes > 0) {
- s->len -= bytes;
- if (s->len > 0) {
- s->data += bytes;
- goto again;
- }
-
- ev_io_stop(EV_A_ &s->w);
- s->w.data = NULL;
-
- lua_pushboolean(T, 1);
- lem_queue(T, 1);
- return;
- }
-
- if (bytes < 0 && errno == EAGAIN)
+ ret = stream__write(T, s);
+ if (ret == 0)
return;
ev_io_stop(EV_A_ &s->w);
s->w.data = NULL;
-
- lua_pushnil(T);
- if (bytes == 0 || errno == ECONNRESET || errno == EPIPE)
- lua_pushliteral(T, "closed");
- else
- lua_pushstring(T, strerror(errno));
-
- if (s->twin)
- s->twin->twin = NULL;
- else
- (void)close(s->w.fd);
- s->w.fd = -1;
- lem_queue(T, 2);
+ lem_queue(T, ret);
}
static int
stream_write(lua_State *T)
{
- struct ostream *s;
- ssize_t bytes;
+ struct stream *s;
+ const char *out;
+ size_t out_len;
+ int ret;
luaL_checktype(T, 1, LUA_TUSERDATA);
- luaL_checktype(T, 2, LUA_TSTRING);
+ out = luaL_checklstring(T, 2, &out_len);
s = lua_touserdata(T, 1);
if (s->w.fd < 0)
@@ -383,48 +260,20 @@ stream_write(lua_State *T)
if (s->w.data != NULL)
return io_busy(T);
- s->data = lua_tolstring(T, 2, &s->len);
- if (s->len == 0) {
- lua_pushboolean(T, 1);
- return 1;
- }
-
-again:
- bytes = write(s->w.fd, s->data, s->len);
- if (bytes > 0) {
- s->len -= bytes;
- if (s->len > 0) {
- s->data += bytes;
- goto again;
- }
-
- lua_pushboolean(T, 1);
- return 1;
- }
-
- if (bytes < 0 && errno == EAGAIN) {
- lua_settop(T, 1);
- s->w.data = T;
- s->w.cb = stream_write_handler;
- ev_io_start(LEM_ &s->w);
- return lua_yield(T, 1);
- }
+ lua_settop(T, 2);
- lua_pushnil(T);
- if (bytes == 0 || errno == ECONNRESET || errno == EPIPE)
- lua_pushliteral(T, "closed");
- else
- lua_pushstring(T, strerror(errno));
+ s->out = out;
+ s->out_len = out_len;
+ ret = stream__write(T, s);
+ if (ret > 0)
+ return ret;
- if (s->twin)
- s->twin->twin = NULL;
- else
- (void)close(s->w.fd);
- s->w.fd = -1;
- return 2;
+ s->w.data = T;
+ s->w.cb = stream_write_cb;
+ ev_io_start(LEM_ &s->w);
+ return lua_yield(T, 2);
}
-
#ifndef TCP_CORK
#define TCP_CORK TCP_NOPUSH
#endif
@@ -432,7 +281,7 @@ again:
static int
stream_setcork(lua_State *T, int state)
{
- struct ostream *s;
+ struct stream *s;
luaL_checktype(T, 1, LUA_TUSERDATA);
s = lua_touserdata(T, 1);
@@ -469,7 +318,7 @@ struct sendfile {
};
static int
-try_sendfile(lua_State *T, struct ostream *s, struct sendfile *sf)
+stream__sendfile(lua_State *T, struct stream *s, struct sendfile *sf)
{
#ifdef __FreeBSD__
int ret;
@@ -539,36 +388,28 @@ try_sendfile(lua_State *T, struct ostream *s, struct sendfile *sf)
}
static void
-sendfile_handler(EV_P_ struct ev_io *w, int revents)
+stream_sendfile_cb(EV_P_ struct ev_io *w, int revents)
{
- struct ostream *s = (struct ostream *)w;
+ struct stream *s = STREAM_FROM_WATCH(w, w);
lua_State *T = s->w.data;
struct sendfile *sf = lua_touserdata(T, 3);
int ret;
(void)revents;
- ret = try_sendfile(T, s, sf);
+ ret = stream__sendfile(T, s, sf);
if (ret == 0)
return;
ev_io_stop(EV_A_ &s->w);
- if (ret == 2) {
- if (s->twin)
- s->twin->twin = NULL;
- else
- (void)close(s->w.fd);
- s->w.fd = -1;
- }
-
- lem_queue(T, ret);
s->w.data = NULL;
+ lem_queue(T, ret);
}
static int
stream_sendfile(lua_State *T)
{
- struct ostream *s;
+ struct stream *s;
struct lem_sendfile *f;
struct sendfile *sf;
off_t offset;
@@ -599,21 +440,13 @@ stream_sendfile(lua_State *T)
sf->file = f;
sf->offset = offset;
- ret = try_sendfile(T, s, sf);
- if (ret > 0) {
- if (ret == 2) {
- if (s->twin)
- s->twin->twin = NULL;
- else
- (void)close(s->w.fd);
- s->w.fd = -1;
- }
+ ret = stream__sendfile(T, s, sf);
+ if (ret > 0)
return ret;
- }
lem_debug("yielding");
s->w.data = T;
- s->w.cb = sendfile_handler;
+ s->w.cb = stream_sendfile_cb;
ev_io_start(LEM_ &s->w);
return lua_yield(T, 3);
}
@@ -671,10 +504,7 @@ stream_open_reap(struct lem_async *a)
struct open *o = (struct open *)a;
lua_State *T = o->a.T;
int fd = o->fd;
- int flags = o->flags;
int ret = o->type;
- struct istream *is;
- struct ostream *os;
lem_debug("o->type = %d", ret);
free(o);
@@ -705,30 +535,12 @@ stream_open_reap(struct lem_async *a)
return;
}
- if (ret == 0) {
+ if (ret == 0)
file_new(T, fd, lua_upvalueindex(3));
- lem_queue(T, 1);
- return;
- }
-
- if ((flags & O_WRONLY) == 0)
- is = istream_new(T, fd, lua_upvalueindex(1));
- else
- is = NULL;
-
- if (flags & (O_RDWR | O_WRONLY))
- os = ostream_new(T, fd, lua_upvalueindex(2));
else
- os = NULL;
+ stream_new(T, fd, lua_upvalueindex(1));
- if (is && os) {
- is->twin = os;
- os->twin = is;
- ret = 2;
- } else
- ret = 1;
-
- lem_queue(T, ret);
+ lem_queue(T, 1);
}
static int
@@ -806,39 +618,45 @@ stream_popen(lua_State *T)
const char *cmd = luaL_checkstring(T, 1);
const char *mode = luaL_optstring(T, 2, "r");
int fd[2];
+ int err;
if (mode[0] != 'r' && mode[0] != 'w')
return luaL_error(T, "invalid mode string");
- if (pipe(fd))
+ if (pipe(fd)) {
+ err = errno;
goto error;
+ }
switch (fork()) {
case -1: /* error */
- (void)close(fd[0]);
- (void)close(fd[1]);
+ err = errno;
+ close(fd[0]);
+ close(fd[1]);
goto error;
case 0: /* child */
if (mode[0] == 'r') {
- (void)close(fd[0]);
- (void)dup2(fd[1], 1);
+ close(fd[0]);
+ dup2(fd[1], 1);
} else {
- (void)close(fd[1]);
- (void)dup2(fd[0], 0);
+ close(fd[1]);
+ dup2(fd[0], 0);
}
- (void)execl("/bin/sh", "/bin/sh", "-c", cmd, NULL);
+ execl("/bin/sh", "/bin/sh", "-c", cmd, NULL);
exit(EXIT_FAILURE);
}
if (mode[0] == 'r') {
if (close(fd[1])) {
- (void)close(fd[0]);
+ err = errno;
+ close(fd[0]);
goto error;
}
} else {
if (close(fd[0])) {
- (void)close(fd[1]);
+ err = errno;
+ close(fd[1]);
goto error;
}
fd[0] = fd[1];
@@ -846,17 +664,15 @@ stream_popen(lua_State *T)
/* make the pipe non-blocking */
if (fcntl(fd[0], F_SETFL, O_NONBLOCK) < 0) {
- (void)close(fd[0]);
+ err = errno;
+ close(fd[0]);
goto error;
}
- if (mode[0] == 'r')
- (void)istream_new(T, fd[0], lua_upvalueindex(1));
- else
- (void)ostream_new(T, fd[0], lua_upvalueindex(2));
+ stream_new(T, fd[0], lua_upvalueindex(1));
return 1;
error:
lua_pushnil(T);
- lua_pushstring(T, strerror(errno));
+ lua_pushstring(T, strerror(err));
return 2;
}
diff --git a/lem/io/tcp.c b/lem/io/tcp.c
index fd13760..23fea2a 100644
--- a/lem/io/tcp.c
+++ b/lem/io/tcp.c
@@ -17,13 +17,13 @@
*/
static void
-connect_handler(EV_P_ struct ev_io *w, int revents)
+tcp_connect_cb(EV_P_ struct ev_io *w, int revents)
{
(void)revents;
lem_debug("connection established");
ev_io_stop(EV_A_ w);
- lem_queue(w->data, 2);
+ lem_queue(w->data, 1);
w->data = NULL;
}
@@ -45,8 +45,7 @@ tcp_connect(lua_State *T)
struct addrinfo *ainfo;
int sock;
int ret;
- struct istream *is;
- struct ostream *os;
+ struct stream *s;
/* lookup name */
ret = getaddrinfo(addr, NULL, &hints, &ainfo);
@@ -99,10 +98,7 @@ tcp_connect(lua_State *T)
}
lua_settop(T, 1);
- is = istream_new(T, sock, lua_upvalueindex(1));
- os = ostream_new(T, sock, lua_upvalueindex(2));
- is->twin = os;
- os->twin = is;
+ s = stream_new(T, sock, lua_upvalueindex(1));
/* connect */
ret = connect(sock, ainfo->ai_addr, ainfo->ai_addrlen);
@@ -114,10 +110,10 @@ tcp_connect(lua_State *T)
if (errno == EINPROGRESS) {
lem_debug("EINPROGRESS");
- os->w.data = T;
- os->w.cb = connect_handler;
- ev_io_start(LEM_ &os->w);
- return lua_yield(T, 3);
+ s->w.data = T;
+ s->w.cb = tcp_connect_cb;
+ ev_io_start(LEM_ &s->w);
+ return lua_yield(T, 2);
}
close(sock);
@@ -128,7 +124,7 @@ tcp_connect(lua_State *T)
}
static int
-common_listen(lua_State *T, struct sockaddr *address, socklen_t alen,
+tcp_listen(lua_State *T, struct sockaddr *address, socklen_t alen,
int sock, int backlog)
{
struct ev_io *w;
@@ -212,7 +208,7 @@ tcp4_listen(lua_State *T)
return 2;
}
- return common_listen(T, (struct sockaddr *)&address,
+ return tcp_listen(T, (struct sockaddr *)&address,
sizeof(struct sockaddr_in), sock, backlog);
}
@@ -246,6 +242,6 @@ tcp6_listen(lua_State *T)
return 2;
}
- return common_listen(T, (struct sockaddr *)&address,
+ return tcp_listen(T, (struct sockaddr *)&address,
sizeof(struct sockaddr_in6), sock, backlog);
}
diff --git a/test/ctest.lua b/test/ctest.lua
index e33b372..399fd3a 100755
--- a/test/ctest.lua
+++ b/test/ctest.lua
@@ -22,12 +22,12 @@ print('Entered ' .. arg[0])
local utils = require 'lem.utils'
local io = require 'lem.io'
-local iconn, oconn = assert(io.tcp_connect('127.0.0.1', arg[1] or 8080))
+local conn = assert(io.tcp_connect('127.0.0.1', arg[1] or 8080))
for i = 1, 10 do
- assert(oconn:write('ping\n'))
+ assert(conn:write('ping\n'))
- local line, err = iconn:read('*l')
+ local line, err = conn:read('*l')
if not line then
if err == 'closed' then
print("Server closed connection")
@@ -40,7 +40,7 @@ for i = 1, 10 do
print("Server answered: '" .. line .. "'")
end
-oconn:write('quit\n')
+conn:write('quit\n')
print('Exiting ' .. arg[0])
diff --git a/test/httptest.lua b/test/httptest.lua
index 85d21d1..1e88f28 100755
--- a/test/httptest.lua
+++ b/test/httptest.lua
@@ -27,22 +27,23 @@ local concat = table.concat
local done = false
utils.spawn(function()
- --local istream, ostream = assert(io.tcp_connect('www.google.dk', 80))
- local istream, ostream = assert(io.tcp_connect('127.0.0.1', 8080))
+ local conn = assert(io.tcp_connect('www.google.dk', 80))
+ --local conn = assert(io.tcp_connect('127.0.0.1', 8080))
print('\nConnected.')
for i = 1, 2 do
- --assert(ostream:write('GET / HTTP/1.1\r\nHost: www.google.dk\r\nConnection: close\r\n\r\n'))
- assert(ostream:write('GET / HTTP/1.1\r\nHost: www.google.dk\r\n\r\n'))
+ --assert(conn:write('GET / HTTP/1.1\r\nHost: www.google.dk\r\nConnection: close\r\n\r\n'))
+ assert(conn:write('GET / HTTP/1.1\r\nHost: www.google.dk\r\n\r\n'))
- local res = assert(istream:read('HTTPResponse'))
+ local res = assert(conn:read('HTTPResponse'))
print(format('\nHTTP/%s %d %s', res.version, res.status, res.text))
for k, v in pairs(res.headers) do
print(format('%s: %s', k, v))
end
+ --print(format('\n#body = %d', #assert(conn:read('*a'))))
print(format('\n#body = %d', #assert(res:body())))
end
@@ -50,9 +51,11 @@ utils.spawn(function()
end)
local write, yield = io.write, utils.yield
+local sleeper = utils.newsleeper()
repeat
write('.')
- yield()
+ --yield()
+ sleeper:sleep(0.001)
until done
-- vim: set ts=2 sw=2 noet:
diff --git a/test/multiplexing.lua b/test/multiplexing.lua
index fc421cb..d66d710 100755
--- a/test/multiplexing.lua
+++ b/test/multiplexing.lua
@@ -27,7 +27,7 @@ local stdout = queue.wrap(io.stdout)
do
local format = string.format
- function queue.QOStream:printf(...)
+ function queue.Stream:printf(...)
return self:write(format(...))
end
end
diff --git a/test/stest.lua b/test/stest.lua
index b791bc8..60ea03d 100755
--- a/test/stest.lua
+++ b/test/stest.lua
@@ -31,12 +31,12 @@ utils.spawn(function()
server:close()
end)
-local ok, err = server:autospawn(function(i, o)
+local ok, err = server:autospawn(function(client)
print 'Accepted a connection'
local sleeper = utils.newsleeper()
while true do
- local line, err = i:read('*l')
+ local line, err = client:read('*l')
if not line then
if err == 'closed' then
print("Client closed connection")
@@ -50,12 +50,11 @@ local ok, err = server:autospawn(function(i, o)
if line ~= 'ping' then break end
sleeper:sleep(0.4)
- assert(o:write('pong\n'))
+ assert(client:write('pong\n'))
end
print "Ok, I'm out"
- assert(i:close())
- assert(o:close())
+ assert(client:close())
end)
if not ok and err ~= 'interrupted' then error(err) end