/* * This file is part of LEM, a Lua Event Machine. * Copyright 2011-2013 Emil Renner Berthing * * LEM is free software: you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as * published by the Free Software Foundation, either version 3 of * the License, or (at your option) any later version. * * LEM is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with LEM. If not, see . */ struct stream { struct ev_io r; struct ev_io w; unsigned int open; int idx; const char *out; size_t out_len; struct lem_parser *p; struct lem_inputbuf buf; }; #define STREAM_FROM_WATCH(w, member)\ (struct stream *)(((char *)w) - offsetof(struct stream, member)) #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wstrict-aliasing" static inline void stream_watch_init(struct stream *s, int fd) { ev_io_init(&s->r, NULL, fd, EV_READ); ev_io_init(&s->w, NULL, fd, EV_WRITE); } #pragma GCC diagnostic pop static struct stream * stream_new(lua_State *T, int fd, int mt) { /* create userdata and set the metatable */ struct stream *s = lua_newuserdata(T, sizeof(struct stream)); lua_pushvalue(T, mt); lua_setmetatable(T, -2); /* initialize userdata */ stream_watch_init(s, fd); s->open = 1; s->r.data = NULL; s->w.data = NULL; lem_inputbuf_init(&s->buf); return s; } static int stream_gc(lua_State *T) { struct stream *s = lua_touserdata(T, 1); if (s->open & 1) close(s->r.fd); if (s->open & 2) fcntl(s->w.fd, F_SETFL, 0); return 0; } static int stream_closed(lua_State *T) { struct stream *s; luaL_checktype(T, 1, LUA_TUSERDATA); s = lua_touserdata(T, 1); lua_pushboolean(T, !s->open); return 1; } static int stream_close(lua_State *T) { struct stream *s; luaL_checktype(T, 1, LUA_TUSERDATA); s = lua_touserdata(T, 1); if (!s->open) return io_closed(T); if (s->r.data != NULL) { ev_io_stop(LEM_ &s->r); lem_queue(s->r.data, io_closed(s->r.data)); s->r.data = NULL; } if (s->w.data != NULL) { ev_io_stop(LEM_ &s->w); lem_queue(s->w.data, io_closed(s->w.data)); s->w.data = NULL; } s->open = 0; if (close(s->r.fd)) return io_strerror(T, errno); lua_pushboolean(T, 1); return 1; } /* * stream:readp() method */ static int stream__readp(lua_State *T, struct stream *s) { ssize_t bytes; int ret; int err; enum lem_preason res; while ((bytes = read(s->r.fd, s->buf.buf + s->buf.end, LEM_INPUTBUF_SIZE - s->buf.end)) > 0) { lem_debug("read %ld bytes from %d", bytes, s->r.fd); s->buf.end += bytes; ret = s->p->process(T, &s->buf); if (ret > 0) return ret; } err = errno; lem_debug("read %ld bytes from %d", bytes, s->r.fd); if (bytes < 0 && (err == EAGAIN || err == EINTR)) return 0; if (bytes == 0 || err == ECONNRESET || err == EPIPE) res = LEM_PCLOSED; else res = LEM_PERROR; s->open = 0; close(s->r.fd); if (s->p->destroy && (ret = s->p->destroy(T, &s->buf, res)) > 0) return ret; lua_settop(T, 0); if (res == LEM_PCLOSED) return io_closed(T); return io_strerror(T, err); } static void stream_readp_cb(EV_P_ struct ev_io *w, int revents) { struct stream *s = STREAM_FROM_WATCH(w, r); lua_State *T = s->r.data; int ret; (void)revents; if (!s->open) { ret = 0; if (s->p->destroy) ret = s->p->destroy(T, &s->buf, LEM_PCLOSED); if (ret <= 0) ret = io_closed(T); } else { ret = stream__readp(T, s); if (ret == 0) return; } ev_io_stop(EV_A_ &s->r); s->r.data = NULL; lem_queue(T, ret); } static int stream_readp(lua_State *T) { struct stream *s; struct lem_parser *p; int ret; luaL_checktype(T, 1, LUA_TUSERDATA); ret = lua_type(T, 2); if (ret != LUA_TUSERDATA && ret != LUA_TLIGHTUSERDATA) return luaL_argerror(T, 2, "expected userdata"); s = lua_touserdata(T, 1); if (!s->open) return io_closed(T); if (s->r.data != NULL) return io_busy(T); p = lua_touserdata(T, 2); if (p->init) p->init(T, &s->buf); ret = p->process(T, &s->buf); if (ret > 0) return ret; s->p = p; ret = stream__readp(T, s); if (ret > 0) return ret; s->r.data = T; s->r.cb = stream_readp_cb; ev_io_start(LEM_ &s->r); return lua_yield(T, lua_gettop(T)); } /* * stream:write() method */ static int stream__write(lua_State *T, struct stream *s) { ssize_t bytes; int err; while ((bytes = write(s->w.fd, s->out, s->out_len)) > 0) { lem_debug("wrote %ld bytes to fd %d", bytes, s->w.fd); s->out += bytes; s->out_len -= bytes; while (s->out_len == 0) { if (s->idx == lua_gettop(T)) { lua_pushboolean(T, 1); return 1; } s->out = lua_tolstring(T, ++s->idx, &s->out_len); } } err = errno; lem_debug("wrote %ld bytes to fd %d", bytes, s->w.fd); if (bytes < 0 && (err == EAGAIN || err == EINTR)) return 0; s->open = 0; close(s->w.fd); if (bytes == 0 || err == ECONNRESET || err == EPIPE) return io_closed(T); return io_strerror(T, err); } static void stream_write_cb(EV_P_ struct ev_io *w, int revents) { struct stream *s = STREAM_FROM_WATCH(w, w); lua_State *T = s->w.data; int ret; (void)revents; if (!s->open) ret = io_closed(T); else { ret = stream__write(T, s); if (ret == 0) return; } ev_io_stop(EV_A_ &s->w); s->w.data = NULL; lem_queue(T, ret); } static int stream_write(lua_State *T) { struct stream *s; const char *out; size_t out_len; int idx; int i; int top; int ret; luaL_checktype(T, 1, LUA_TUSERDATA); top = lua_gettop(T); idx = 1; do { out = luaL_checklstring(T, ++idx, &out_len); } while (out_len == 0 && idx < top); for (i = idx+1; i <= top; i++) (void)luaL_checkstring(T, i); s = lua_touserdata(T, 1); if (!s->open) return io_closed(T); if (s->w.data != NULL) return io_busy(T); if (out_len == 0) { lua_pushboolean(T, 1); return 1; } s->out = out; s->out_len = out_len; s->idx = idx; ret = stream__write(T, s); if (ret > 0) return ret; s->w.data = T; s->w.cb = stream_write_cb; ev_io_start(LEM_ &s->w); return lua_yield(T, top); } #ifdef TCP_CORK static int stream_setcork(lua_State *T, int state) { struct stream *s; luaL_checktype(T, 1, LUA_TUSERDATA); s = lua_touserdata(T, 1); if (!s->open) return io_closed(T); if (s->w.data != NULL) return io_busy(T); if (setsockopt(s->w.fd, IPPROTO_TCP, TCP_CORK, &state, sizeof(int))) return io_strerror(T, errno); lua_pushboolean(T, 1); return 1; } static int stream_cork(lua_State *T) { return stream_setcork(T, 1); } static int stream_uncork(lua_State *T) { return stream_setcork(T, 0); } #endif static int stream_getpeer(lua_State *T) { struct stream *s; union { struct sockaddr all; struct sockaddr_in in; struct sockaddr_in6 in6; } addr; socklen_t len; luaL_checktype(T, 1, LUA_TUSERDATA); s = lua_touserdata(T, 1); if (!s->open) return io_closed(T); len = sizeof(addr); if (getpeername(s->r.fd, &addr.all, &len)) return io_strerror(T, errno); switch (addr.all.sa_family) { case AF_UNIX: { #if defined(__FreeBSD__) || defined(__APPLE__) struct xucred cred; len = sizeof(struct xucred); if (getsockopt(s->r.fd, 0, LOCAL_PEERCRED, &cred, &len)) return io_strerror(T, errno); if (len != sizeof(struct xucred) || cred.cr_version != XUCRED_VERSION) { lua_pushnil(T); lua_pushliteral(T, "version mismatch"); return 2; } lua_pushliteral(T, "*unix"); lua_pushnumber(T, cred.cr_uid); lua_pushnumber(T, cred.cr_gid); #else struct ucred cred; len = sizeof(struct ucred); if (getsockopt(s->r.fd, SOL_SOCKET, SO_PEERCRED, &cred, &len)) return io_strerror(T, errno); lua_pushliteral(T, "*unix"); lua_pushnumber(T, cred.uid); lua_pushnumber(T, cred.gid); #endif return 3; } case AF_INET: { char buf[INET_ADDRSTRLEN]; if (inet_ntop(addr.in.sin_family, &addr.in.sin_addr, buf, sizeof(buf)) == NULL) return io_strerror(T, errno); lua_pushstring(T, buf); lua_pushnumber(T, ntohs(addr.in.sin_port)); return 2; } case AF_INET6: { char buf[INET6_ADDRSTRLEN]; if (inet_ntop(addr.in6.sin6_family, &addr.in6.sin6_addr, buf, sizeof(buf)) == NULL) return io_strerror(T, errno); lua_pushstring(T, buf); lua_pushnumber(T, ntohs(addr.in6.sin6_port)); return 2; } } return io_strerror(T, EINVAL); } struct sfhandle { struct lem_async a; lua_State *T; struct stream *s; off_t size; off_t offset; int fd; int ret; }; static void stream_sendfile_work(struct lem_async *a) { struct sfhandle *sf = (struct sfhandle *)a; struct stream *s = sf->s; /* make socket blocking */ if (fcntl(s->w.fd, F_SETFL, 0) == -1) { sf->ret = errno; return; } #ifdef __FreeBSD__ off_t written; int ret = sendfile(sf->fd, s->w.fd, sf->offset, sf->size, NULL, &written, SF_SYNC); if (ret == 0) { sf->ret = 0; sf->size = written; } else sf->ret = errno; lem_debug("wrote = %ld bytes", written); #else #ifdef __APPLE__ int ret = sendfile(sf->fd, s->w.fd, sf->offset, &sf->size, NULL, 0); if (ret == 0) sf->ret = 0; else sf->ret = errno; lem_debug("wrote = %lld bytes", sf->size); #else ssize_t ret = sendfile(s->w.fd, sf->fd, &sf->offset, sf->size); if (ret >= 0) { sf->ret = 0; sf->size = ret; } else sf->ret = errno; lem_debug("wrote = %ld bytes", ret); #endif #endif /* make socket non-blocking again */ if (fcntl(s->w.fd, F_SETFL, O_NONBLOCK) == -1) { sf->ret = errno; return; } } static void stream_sendfile_reap(struct lem_async *a) { struct sfhandle *sf = (struct sfhandle *)a; lua_State *T = sf->T; struct stream *s = sf->s; int ret; if (sf->ret == 0) { lua_pushnumber(T, sf->size); ret = 1; } else { if (s->open) close(s->w.fd); s->open = 0; ret = io_strerror(T, sf->ret); } free(sf); s->w.data = NULL; lem_queue(T, ret); } static int stream_sendfile(lua_State *T) { struct stream *s; struct file *f; off_t size; off_t offset; struct sfhandle *sf; luaL_checktype(T, 1, LUA_TUSERDATA); luaL_checktype(T, 2, LUA_TUSERDATA); size = (off_t)luaL_checknumber(T, 3); offset = (off_t)luaL_optnumber(T, 4, 0); s = lua_touserdata(T, 1); if (!s->open) return io_closed(T); if (s->w.data != NULL) return io_busy(T); f = lua_touserdata(T, 2); if (f->fd < 0) { lua_pushnil(T); lua_pushliteral(T, "file closed"); return 2; } s->w.data = T; sf = lem_xmalloc(sizeof(struct sfhandle)); sf->T = T; sf->s = s; sf->size = size; sf->offset = offset; sf->fd = f->fd; lem_async_do(&sf->a, stream_sendfile_work, stream_sendfile_reap); lua_settop(T, 2); return lua_yield(T, 2); }