diff options
Diffstat (limited to 'lem/streams/stream.c')
-rw-r--r-- | lem/streams/stream.c | 802 |
1 files changed, 802 insertions, 0 deletions
diff --git a/lem/streams/stream.c b/lem/streams/stream.c new file mode 100644 index 0000000..50ffc50 --- /dev/null +++ b/lem/streams/stream.c @@ -0,0 +1,802 @@ +/* + * This file is part of LEM, a Lua Event Machine. + * Copyright 2011-2012 Emil Renner Berthing + * + * LEM is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * LEM is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with LEM. If not, see <http://www.gnu.org/licenses/>. + */ + +struct ostream; + +struct istream { + ev_io w; + struct ostream *twin; + struct lem_parser *p; + struct lem_inputbuf buf; +}; + +struct ostream { + ev_io w; + struct istream *twin; + const char *data; + size_t len; +}; + +static struct istream * +istream_new(lua_State *T, int fd, int mt) +{ + struct istream *s; + + /* create userdata and set the metatable */ + s = lua_newuserdata(T, sizeof(struct istream)); + lua_pushvalue(T, mt); + lua_setmetatable(T, -2); + + /* initialize userdata */ + ev_io_init(&s->w, NULL, fd, EV_READ); + s->w.data = NULL; + s->twin = NULL; + s->buf.start = s->buf.end = 0; + + return s; +} + +static int +stream_closed(lua_State *T) +{ + struct ev_io *w; + + luaL_checktype(T, 1, LUA_TUSERDATA); + w = lua_touserdata(T, 1); + lua_pushboolean(T, w->fd < 0); + return 1; +} + +static int +stream_busy(lua_State *T) +{ + struct ev_io *w; + + luaL_checktype(T, 1, LUA_TUSERDATA); + w = lua_touserdata(T, 1); + lua_pushboolean(T, w->data != NULL); + return 1; +} + +static int +stream_interrupt(lua_State *T) +{ + struct ev_io *w; + lua_State *S; + + luaL_checktype(T, 1, LUA_TUSERDATA); + w = lua_touserdata(T, 1); + S = w->data; + if (S == NULL) { + lua_pushnil(T); + lua_pushliteral(T, "not busy"); + return 2; + } + + lem_debug("interrupting io action"); + ev_io_stop(LEM_ w); + w->data = NULL; + lua_settop(S, 0); + lua_pushnil(S); + lua_pushliteral(S, "interrupted"); + lem_queue(S, 2); + + lua_pushboolean(T, 1); + return 1; +} + +static int +stream_close_check(lua_State *T, struct ev_io *w) +{ + if (w->fd < 0) { + lua_pushnil(T); + lua_pushliteral(T, "already closed"); + return 2; + } + + if (w->data != NULL) { + lua_State *S = w->data; + + lem_debug("interrupting io action"); + ev_io_stop(LEM_ w); + w->data = NULL; + lua_settop(S, 0); + lua_pushnil(S); + lua_pushliteral(S, "interrupted"); + lem_queue(S, 2); + } + + return 0; +} + +static int +istream_close(lua_State *T) +{ + struct istream *s; + + luaL_checktype(T, 1, LUA_TUSERDATA); + s = lua_touserdata(T, 1); + + lem_debug("collecting %d", s->w.fd); + if (stream_close_check(T, &s->w)) + return 2; + + if (s->twin) + s->twin->twin = NULL; + else if (close(s->w.fd)) { + s->w.fd = -1; + lua_pushnil(T); + lua_pushstring(T, strerror(errno)); + return 2; + } + + s->w.fd = -1; + lua_pushboolean(T, 1); + return 1; +} + +static int +ostream_close(lua_State *T) +{ + struct ostream *s; + + luaL_checktype(T, 1, LUA_TUSERDATA); + s = lua_touserdata(T, 1); + + lem_debug("collecting %d", s->w.fd); + if (stream_close_check(T, &s->w)) + return 2; + + if (s->twin) + s->twin->twin = NULL; + else if (close(s->w.fd)) { + s->w.fd = -1; + lua_pushnil(T); + lua_pushstring(T, strerror(errno)); + return 2; + } + + s->w.fd = -1; + lua_pushboolean(T, 1); + return 1; +} + +/* + * istream:readp() method + */ + +static void +stream_readp_handler(EV_P_ ev_io *w, int revents) +{ + struct istream *s = (struct istream *)w; + lua_State *T = s->w.data; + ssize_t bytes; + int ret; + enum lem_preason reason; + const char *msg; + + (void)revents; + + while ((bytes = read(s->w.fd, s->buf.buf + s->buf.end, + LEM_INPUTBUF_SIZE - s->buf.end)) > 0) { + lem_debug("read %ld bytes from %d", bytes, s->w.fd); + + s->buf.end += bytes; + + ret = s->p->process(T, &s->buf); + if (ret > 0) { + ev_io_stop(EV_A_ &s->w); + s->w.data = NULL; + lem_queue(T, ret); + return; + } + } + lem_debug("read %ld bytes from %d", bytes, s->w.fd); + + if (bytes < 0 && errno == EAGAIN) + return; + + ev_io_stop(EV_A_ &s->w); + s->w.data = NULL; + + if (s->twin) + s->twin->twin = NULL; + else + (void)close(s->w.fd); + s->w.fd = -1; + + if (bytes == 0 || errno == ECONNRESET || errno == EPIPE) { + reason = LEM_PCLOSED; + msg = "closed"; + } else { + reason = LEM_PERROR; + msg = strerror(errno); + } + + if (s->p->destroy && (ret = s->p->destroy(T, &s->buf, reason)) > 0) { + lem_queue(T, ret); + return; + } + + lua_settop(T, 0); + lua_pushnil(T); + lua_pushstring(T, msg); + lem_queue(T, 2); +} + +static int +stream_readp(lua_State *T) +{ + struct istream *s; + struct lem_parser *p; + int ret; + ssize_t bytes; + enum lem_preason reason; + const char *msg; + + luaL_checktype(T, 1, LUA_TUSERDATA); + ret = lua_type(T, 2); + if (ret != LUA_TUSERDATA && ret != LUA_TLIGHTUSERDATA) + return luaL_argerror(T, 2, "expected userdata"); + + s = lua_touserdata(T, 1); + if (s->w.fd < 0) { + lua_pushnil(T); + lua_pushliteral(T, "closed"); + return 2; + } + + if (s->w.data != NULL) { + lua_pushnil(T); + lua_pushliteral(T, "busy"); + return 2; + } + + p = lua_touserdata(T, 2); + if (p->init) + p->init(T, &s->buf); + +again: + ret = p->process(T, &s->buf); + if (ret > 0) + return ret; + + bytes = read(s->w.fd, s->buf.buf + s->buf.end, LEM_INPUTBUF_SIZE - s->buf.end); + lem_debug("read %ld bytes from %d", bytes, s->w.fd); + if (bytes > 0) { + s->buf.end += bytes; + goto again; + } + + if (bytes < 0 && errno == EAGAIN) { + s->p = p; + s->w.data = T; + s->w.cb = stream_readp_handler; + ev_io_start(LEM_ &s->w); + return lua_yield(T, lua_gettop(T)); + } + + if (s->twin) + s->twin->twin = NULL; + else + (void)close(s->w.fd); + s->w.fd = -1; + + if (bytes == 0 || errno == ECONNRESET || errno == EPIPE) { + reason = LEM_PCLOSED; + msg = "closed"; + } else { + reason = LEM_PERROR; + msg = strerror(errno); + } + + if (p->destroy && (ret = p->destroy(T, &s->buf, reason)) > 0) + return ret; + + lua_settop(T, 0); + lua_pushnil(T); + lua_pushstring(T, msg); + return 2; +} + +static struct ostream * +ostream_new(lua_State *T, int fd, int mt) +{ + struct ostream *s; + + /* create userdata and set the metatable */ + s = lua_newuserdata(T, sizeof(struct ostream)); + lua_pushvalue(T, mt); + lua_setmetatable(T, -2); + + /* initialize userdata */ + ev_io_init(&s->w, NULL, fd, EV_WRITE); + s->w.data = NULL; + s->twin = NULL; + + return s; +} + +static void +stream_write_handler(EV_P_ struct ev_io *w, int revents) +{ + struct ostream *s = (struct ostream *)w; + lua_State *T = s->w.data; + ssize_t bytes; + + (void)revents; + +again: + bytes = write(s->w.fd, s->data, s->len); + if (bytes > 0) { + s->len -= bytes; + if (s->len > 0) { + s->data += bytes; + goto again; + } + + ev_io_stop(EV_A_ &s->w); + s->w.data = NULL; + + lua_pushboolean(T, 1); + lem_queue(T, 1); + return; + } + + if (bytes < 0 && errno == EAGAIN) + return; + + ev_io_stop(EV_A_ &s->w); + s->w.data = NULL; + + lua_pushnil(T); + if (bytes == 0 || errno == ECONNRESET || errno == EPIPE) + lua_pushliteral(T, "closed"); + else + lua_pushstring(T, strerror(errno)); + + if (s->twin) + s->twin->twin = NULL; + else + (void)close(s->w.fd); + s->w.fd = -1; + lem_queue(T, 2); +} + +static int +stream_write(lua_State *T) +{ + struct ostream *s; + ssize_t bytes; + + luaL_checktype(T, 1, LUA_TUSERDATA); + luaL_checktype(T, 2, LUA_TSTRING); + + s = lua_touserdata(T, 1); + if (s->w.fd < 0) { + lua_pushnil(T); + lua_pushliteral(T, "closed"); + return 2; + } + + if (s->w.data != NULL) { + lua_pushnil(T); + lua_pushliteral(T, "busy"); + return 2; + } + + s->data = lua_tolstring(T, 2, &s->len); + if (s->len == 0) { + lua_pushboolean(T, 1); + return 1; + } + +again: + bytes = write(s->w.fd, s->data, s->len); + if (bytes > 0) { + s->len -= bytes; + if (s->len > 0) { + s->data += bytes; + goto again; + } + + lua_pushboolean(T, 1); + return 1; + } + + if (bytes < 0 && errno == EAGAIN) { + lua_settop(T, 1); + s->w.data = T; + s->w.cb = stream_write_handler; + ev_io_start(LEM_ &s->w); + return lua_yield(T, 1); + } + + lua_pushnil(T); + if (bytes == 0 || errno == ECONNRESET || errno == EPIPE) + lua_pushliteral(T, "closed"); + else + lua_pushstring(T, strerror(errno)); + + if (s->twin) + s->twin->twin = NULL; + else + (void)close(s->w.fd); + s->w.fd = -1; + return 2; +} + + +#ifndef TCP_CORK +#define TCP_CORK TCP_NOPUSH +#endif + +static int +stream_setcork(lua_State *T, int state) +{ + struct ostream *s; + + luaL_checktype(T, 1, LUA_TUSERDATA); + s = lua_touserdata(T, 1); + if (s->w.fd < 0) { + lua_pushnil(T); + lua_pushliteral(T, "closed"); + return 2; + } + + if (s->w.data != NULL) { + lua_pushnil(T); + lua_pushliteral(T, "busy"); + return 2; + } + + if (setsockopt(s->w.fd, IPPROTO_TCP, TCP_CORK, &state, sizeof(int))) { + lua_pushnil(T); + lua_pushstring(T, strerror(errno)); + return 2; + } + + lua_pushboolean(T, 1); + return 1; +} + +static int +stream_cork(lua_State *T) +{ + return stream_setcork(T, 1); +} + +static int +stream_uncork(lua_State *T) +{ + return stream_setcork(T, 0); +} + +struct sendfile { + struct lem_sendfile *file; + off_t offset; +}; + +static int +try_sendfile(lua_State *T, struct ostream *s, struct sendfile *sf) +{ +#ifdef __FreeBSD__ + int ret; + + do { + size_t count; + off_t written = 0; + + count = sf->file->size - sf->offset; + + if (count == 0) { + lua_settop(T, 0); + lua_pushboolean(T, 1); + return 1; + } + + ret = sendfile(sf->file->fd, s->w.fd, + sf->offset, count, + NULL, &written, 0); + lem_debug("wrote = %ld bytes", written); + sf->offset += written; + } while (ret >= 0); +#else +#ifdef __APPLE__ + int ret; + + do { + off_t count = sf->file->size - sf->offset; + + if (count == 0) { + lua_settop(T, 0); + lua_pushboolean(T, 1); + return 1; + } + + ret = sendfile(sf->file->fd, s->w.fd, + sf->offset, &count, NULL, 0); + lem_debug("wrote = %lld bytes", count); + sf->offset += count; + } while (ret >= 0); +#else + ssize_t ret; + + do { + size_t count = sf->file->size - sf->offset; + + if (count == 0) { + lua_settop(T, 0); + lua_pushboolean(T, 1); + return 1; + } + + ret = sendfile(s->w.fd, sf->file->fd, + &sf->offset, + count); + lem_debug("wrote = %ld bytes", ret); + } while (ret >= 0); +#endif +#endif + + if (errno == EAGAIN) + return 0; + + lua_pushnil(T); + lua_pushstring(T, strerror(errno)); + return 2; +} + +static void +sendfile_handler(EV_P_ struct ev_io *w, int revents) +{ + struct ostream *s = (struct ostream *)w; + lua_State *T = s->w.data; + struct sendfile *sf = lua_touserdata(T, 3); + int ret; + + (void)revents; + + ret = try_sendfile(T, s, sf); + if (ret == 0) + return; + + ev_io_stop(EV_A_ &s->w); + if (ret == 2) { + if (s->twin) + s->twin->twin = NULL; + else + (void)close(s->w.fd); + s->w.fd = -1; + } + + lem_queue(T, ret); + s->w.data = NULL; +} + +static int +stream_sendfile(lua_State *T) +{ + struct ostream *s; + struct lem_sendfile *f; + struct sendfile *sf; + off_t offset; + int ret; + + luaL_checktype(T, 1, LUA_TUSERDATA); + luaL_checktype(T, 2, LUA_TUSERDATA); + offset = (off_t)luaL_optnumber(T, 3, 0); + + s = lua_touserdata(T, 1); + if (s->w.fd < 0) { + lua_pushnil(T); + lua_pushliteral(T, "closed"); + return 2; + } + + if (s->w.data != NULL) { + lua_pushnil(T); + lua_pushliteral(T, "busy"); + return 2; + } + + f = lua_touserdata(T, 2); + if (f->fd < 0) { + lua_pushnil(T); + lua_pushliteral(T, "file closed"); + return 2; + } + + if (offset > f->size) + return luaL_error(T, "offset too big"); + + lua_settop(T, 2); + sf = lua_newuserdata(T, sizeof(struct sendfile)); + sf->file = f; + sf->offset = offset; + + ret = try_sendfile(T, s, sf); + if (ret > 0) { + if (ret == 2) { + if (s->twin) + s->twin->twin = NULL; + else + (void)close(s->w.fd); + s->w.fd = -1; + } + return ret; + } + + lem_debug("yielding"); + s->w.data = T; + s->w.cb = sendfile_handler; + ev_io_start(LEM_ &s->w); + return lua_yield(T, 3); +} + +static int +mode_to_flags(const char *mode) +{ + int omode; + int oflags; + + switch (*mode++) { + case 'r': + omode = O_RDONLY; + oflags = 0; + break; + case 'w': + omode = O_WRONLY; + oflags = O_CREAT | O_TRUNC; + break; + case 'a': + omode = O_WRONLY; + oflags = O_CREAT | O_APPEND; + break; + default: + return -1; + } + +next: + switch (*mode++) { + case '\0': + break; + case '+': + omode = O_RDWR; + goto next; + case 'b': + /* this does nothing on *nix, but + * don't treat it as an error */ + goto next; + case 'x': + oflags |= O_EXCL; + goto next; + default: + return -1; + } + + return omode | oflags; +} + +static int +stream_open(lua_State *T) +{ + const char *path = luaL_checkstring(T, 1); + int flags = mode_to_flags(luaL_optstring(T, 2, "r")); + int fd; + struct istream *is; + struct ostream *os; + + if (flags < 0) + return luaL_error(T, "invalid mode string"); + + fd = open(path, flags | O_NONBLOCK); + if (fd < 0) { + lua_pushnil(T); + switch (errno) { + case ENOENT: + lua_pushliteral(T, "not found"); + break; + case EACCES: + lua_pushliteral(T, "permission denied"); + break; + default: + lua_pushstring(T, strerror(errno)); + } + return 2; + } + + if ((flags & O_WRONLY) == 0) + is = istream_new(T, fd, lua_upvalueindex(1)); + else + is = NULL; + + if (flags & (O_RDWR | O_WRONLY)) + os = ostream_new(T, fd, lua_upvalueindex(2)); + else + os = NULL; + + if (is && os) { + is->twin = os; + os->twin = is; + return 2; + } + + return 1; +} + +static int +stream_popen(lua_State *T) +{ + const char *cmd = luaL_checkstring(T, 1); + const char *mode = luaL_optstring(T, 2, "r"); + int fd[2]; + + if (mode[0] != 'r' && mode[0] != 'w') + return luaL_error(T, "invalid mode string"); + + if (pipe(fd)) + goto error; + + switch (fork()) { + case -1: /* error */ + (void)close(fd[0]); + (void)close(fd[1]); + goto error; + case 0: /* child */ + if (mode[0] == 'r') { + (void)close(fd[0]); + (void)dup2(fd[1], 1); + } else { + (void)close(fd[1]); + (void)dup2(fd[0], 0); + } + + (void)execl("/bin/sh", "/bin/sh", "-c", cmd, NULL); + exit(EXIT_FAILURE); + } + + if (mode[0] == 'r') { + if (close(fd[1])) { + (void)close(fd[0]); + goto error; + } + } else { + if (close(fd[0])) { + (void)close(fd[1]); + goto error; + } + fd[0] = fd[1]; + } + + /* make the pipe non-blocking */ + if (fcntl(fd[0], F_SETFL, O_NONBLOCK) < 0) { + (void)close(fd[0]); + goto error; + } + + if (mode[0] == 'r') + (void)istream_new(T, fd[0], lua_upvalueindex(1)); + else + (void)ostream_new(T, fd[0], lua_upvalueindex(2)); + return 1; +error: + lua_pushnil(T); + lua_pushstring(T, strerror(errno)); + return 2; +} |