diff options
author | Emil Renner Berthing <esmil@mailme.dk> | 2012-07-15 19:58:28 +0200 |
---|---|---|
committer | Emil Renner Berthing <esmil@mailme.dk> | 2012-07-31 18:47:58 +0200 |
commit | 9db0a1b8d4539fe5f7aa678e0ef8fb634fb48bf7 (patch) | |
tree | 7c4c361ec7b8de44868036902256b3afe2d2a67b | |
parent | 78e6e89431e4ef9419a716f246f1f3cfaf7d9dfe (diff) | |
download | lem-9db0a1b8d4539fe5f7aa678e0ef8fb634fb48bf7.tar.gz lem-9db0a1b8d4539fe5f7aa678e0ef8fb634fb48bf7.tar.xz lem-9db0a1b8d4539fe5f7aa678e0ef8fb634fb48bf7.zip |
add more libraries
-rw-r--r-- | .gitignore | 9 | ||||
-rw-r--r-- | Makefile.in | 112 | ||||
-rw-r--r-- | README.markdown | 226 | ||||
-rwxr-xr-x | bin/lem-repl (renamed from lem-repl) | 0 | ||||
-rw-r--r-- | bin/lem.c (renamed from lem.c) | 3 | ||||
-rw-r--r-- | bin/libev.c (renamed from libev.c) | 4 | ||||
-rw-r--r-- | bin/lua.c | 52 | ||||
-rw-r--r-- | configure.ac | 17 | ||||
-rw-r--r-- | include/lem.h (renamed from lem.h) | 0 | ||||
-rw-r--r-- | include/streams.h | 52 | ||||
-rw-r--r-- | lem/hathaway.lua | 385 | ||||
-rw-r--r-- | lem/http.lua | 93 | ||||
-rw-r--r-- | lem/http/core.c | 366 | ||||
-rw-r--r-- | lem/repl.lua (renamed from repl.lua) | 0 | ||||
-rw-r--r-- | lem/streams.lua | 50 | ||||
-rw-r--r-- | lem/streams/core.c | 258 | ||||
-rw-r--r-- | lem/streams/parsers.c | 175 | ||||
-rw-r--r-- | lem/streams/queue.lua | 78 | ||||
-rw-r--r-- | lem/streams/sendfile.c | 120 | ||||
-rw-r--r-- | lem/streams/server.c | 298 | ||||
-rw-r--r-- | lem/streams/stream.c | 802 | ||||
-rw-r--r-- | lem/streams/tcp.c | 251 | ||||
-rw-r--r-- | lem/utils.c (renamed from utils.c) | 0 | ||||
-rw-r--r-- | lua.c | 52 | ||||
-rw-r--r-- | lua/luaconf.h.in | 4 | ||||
-rwxr-xr-x | test/ctest.lua | 48 | ||||
-rwxr-xr-x | test/htest.lua | 148 | ||||
-rwxr-xr-x | test/httptest.lua | 58 | ||||
-rwxr-xr-x | test/multiplexing.lua | 79 | ||||
-rwxr-xr-x | test/sleep.lua | 46 | ||||
-rwxr-xr-x | test/stest.lua | 64 | ||||
-rwxr-xr-x | test/test.lua (renamed from test.lua) | 5 | ||||
-rwxr-xr-x | test/test2.lua (renamed from test2.lua) | 5 |
33 files changed, 3728 insertions, 132 deletions
@@ -1,13 +1,12 @@ *.o -*.lo *.so -config.status -config.log -Makefile +bin/lem lua/luaconf.h libev/ev-config.h lem.pc -lem +config.status +config.log +Makefile aclocal.m4 autom4te.cache *.pkg.tar.* diff --git a/Makefile.in b/Makefile.in index e374abe..6caee46 100644 --- a/Makefile.in +++ b/Makefile.in @@ -1,6 +1,6 @@ CC = @CC@ CFLAGS ?= @CFLAGS@ -CFLAGS += -I. @CPPFLAGS@ +CFLAGS += -Iinclude @CPPFLAGS@ SHARED = @SHARED@ PKG_CONFIG = @PKG_CONFIG@ @@ -8,14 +8,6 @@ STRIP = @STRIP@ INSTALL = @INSTALL@ SED = @SED@ -headers = @headers@ -programs = lem utils.so -scripts = repl.lua lem-repl - -objects = @objects@ - -LIBS = @LIBS@ - prefix = @prefix@ exec_prefix = @exec_prefix@ bindir = @bindir@ @@ -26,8 +18,22 @@ pkgconfigdir = @pkgconfigdir@ lmoddir = @lmoddir@ cmoddir = @cmoddir@ -installdirs = $(bindir) $(includedir)/lem \ - $(lmoddir)/lem $(cmoddir)/lem $(pkgconfigdir) +objects = @objects@ +LIBS = @LIBS@ + +headers = @headers@ + +llibs = \ + lem/repl.lua \ + lem/streams.lua \ + lem/streams/queue.lua \ + lem/http.lua \ + lem/hathaway.lua + +clibs = \ + lem/utils.so \ + lem/streams/core.so \ + lem/http/core.so ifdef V E=@\# @@ -37,19 +43,18 @@ E=@echo Q=@ endif -.PHONY: all strip install clean $(installdirs) -.PRECIOUS: %.lo +.PHONY: all strip install clean -all: $(programs) lem.pc +all: bin/lem lem.pc $(clibs) -libev.o: CFLAGS += -w -lua.o: lua/luaconf.h +bin/libev.o: CFLAGS += -w +bin/lua.o: lua/luaconf.h %.o: %.c $E ' CC $@' $Q$(CC) $(CFLAGS) -c $< -o $@ -lem: $(objects) +bin/lem: $(objects) $E ' LD $@' $Q$(CC) $^ -o $@ -rdynamic $(LDFLAGS) $(LIBS) @@ -60,11 +65,11 @@ lem: $(objects) lua/luaconf.h: lua/luaconf.h.in $E ' SED > $@' $Q$(SED) \ - -e 's|@path@|$(lua_path)|' \ - -e 's|@cpath@|$(lua_cpath)|' \ + -e 's|@lmoddir[@]|$(lmoddir)|' \ + -e 's|@cmoddir[@]|$(cmoddir)|' \ $< > $@ -%.pc: %.pc.in +lem.pc: lem.pc.in $E ' SED > $@' $Q$(SED) \ -e 's|@lmoddir[@]|$(lmoddir)|' \ @@ -76,37 +81,40 @@ lua/luaconf.h: lua/luaconf.h.in $E ' STRIP $<' $Q$(STRIP) $(STRIP_ARGS) $< -strip: $(programs:%=%-strip) - -$(installdirs): - $E ' INSTALL -d $@' - $Q$(INSTALL) -dm755 $(DESTDIR)$@ - -lem-install: lem | $(bindir) - $E ' INSTALL $<' - $Q$(INSTALL) $< $(DESTDIR)$(bindir)/$< - -lem-repl-install: lem-repl | $(bindir) - $E ' INSTALL $<' - $Q$(INSTALL) $< $(DESTDIR)$(bindir)/$< - -%.h-install: %.h | $(includedir)/lem - $E ' INSTALL $(notdir $<)' - $Q$(INSTALL) -m644 $< $(DESTDIR)$(includedir)/lem/$(notdir $<) - -%.lua-install: %.lua | $(lmoddir)/lem - $E ' INSTALL $<' - $Q$(INSTALL) -m644 $< $(DESTDIR)$(lmoddir)/lem/$< - -%.so-install: %.so | $(cmoddir)/lem - $E ' INSTALL $<' - $Q$(INSTALL) $< $(DESTDIR)$(cmoddir)/lem/$< - -%.pc-install: %.pc | $(pkgconfigdir) - $E ' INSTALL $<' - $Q$(INSTALL) -m644 $< $(DESTDIR)$(pkgconfigdir) - -install: lem.pc-install $(headers:%=%-install) $(programs:%=%-install) $(scripts:%=%-install) +strip: bin/lem-strip $(clibs:%=%-strip) + +$(DESTDIR)$(bindir)/%: bin/% + $E ' INSTALL $@' + $Q$(INSTALL) -d $(dir $@) + $Q$(INSTALL) -m 755 $< $@ + +$(DESTDIR)$(includedir)/lem/%: lua/% + $E ' INSTALL $@' + $Q$(INSTALL) -d $(dir $@) + $Q$(INSTALL) -m 644 $< $@ + +$(DESTDIR)$(includedir)/lem/%: libev/% + $E ' INSTALL $@' + $Q$(INSTALL) -d $(dir $@) + $Q$(INSTALL) -m 644 $< $@ + +$(DESTDIR)$(includedir)/lem/%: include/% + $E ' INSTALL $@' + $Q$(INSTALL) -d $(dir $@) + $Q$(INSTALL) -m 644 $< $@ + +$(DESTDIR)$(lmoddir)/% $(DESTDIR)$(cmoddir)/% $(DESTDIR)$(pkgconfigdir)/%: % + $E ' INSTALL $@' + $Q$(INSTALL) -d $(dir $@) + $Q$(INSTALL) -m 644 $< $@ + +install: \ + $(DESTDIR)$(pkgconfigdir)/lem.pc \ + $(DESTDIR)$(bindir)/lem \ + $(DESTDIR)$(bindir)/lem-repl \ + $(headers:%=$(DESTDIR)$(includedir)/lem/%) \ + $(llibs:%=$(DESTDIR)$(lmoddir)/%) \ + $(clibs:%=$(DESTDIR)$(cmoddir)/%) clean: - rm -f lem lua/luaconf.h lem.pc *.o *.so + rm -f bin/lem bin/*.o $(clibs) lua/luaconf.h lem.pc diff --git a/README.markdown b/README.markdown index f01fa6e..a7488a0 100644 --- a/README.markdown +++ b/README.markdown @@ -158,6 +158,232 @@ This sets `utils` to a table with the following functions. been scheduled to run. +The Stream Library +------------------ + +Import the module using something like + + local streams = require 'lem.streams' + +This sets `streams` to a table with the following functions. + +* __streams.open(path, [mode])__ + + Opens a given `path` and returns a new stream object. + This should be a path to a device, named pipe or named socket. + The function does work on regular files, but non-blocking IO doesn't, + so you might as well use `io.open()`. + This is a property of POSIX, sorry. + + The `mode` argument supports the same strings as `io.open()`. + If `mode` is not specified it defaults to `"r"`. + +* __streams.popen(cmd, [mode])__ + + This function works just like `io.popen()` except it returns a + stream object (which can be read or written to with blocking the + main loop). + +* __streams.tcp_connect(address, port)__ + + This function connects to the specified address and port + over TCP and if successful returns a new stream object. + +* __streams.tcp_listen(address, port)__ + + This function creates a new server object which can be used to receive + incoming TCP connections on the specified address and port. + +* __streams.sendfile(path)__ + + This function opens a file and returns a new object to be used by the + `streams:sendfile()` method. The path should point to a regular file or + `streams:sendfile()` will fail. + +The above functions return either a new stream object, a new server object, +a new sendfile object or `nil` and an error message indicating what went wrong. +The metatable of those objects can be found under __streams.Stream__, +__streams.Server__ and __streams.SendFile__ respectively. + +The following methods are available on streams. + +* __stream:closed()__ + + Returns `true` when the stream is closed, `false` otherwise. + +* __stream:busy()__ + + Returns `true` when another coroutine is waiting for IO on this stream, + `false` otherwise. + +* __stream:close()__ + + Closes the stream. If the stream is busy, this also interrupts the IO + action on the stream. + + Returns `true` on succes or otherwise `nil` followed by an error message. + If the stream is already closed the error message will be `'already closed'`. + +* __stream:interrupt()__ + + Interrupt any coroutine waiting for IO on the stream. + + Returns `true` on success and `nil, 'not busy'` if no coroutine is waiting + for connections on the server object. + +* __stream:read([mode])__ + + Read data from the stream. The `mode` argument can be one of the following: + + - a number: read the given number of bytes from the stream + - "\*a": read all data from stream until the stream is closed + - "\*l": read a line (read up to and including the next '\n' character) + + If there is not enough data immediately available the current coroutine will + be suspended until there is. + + However if the method is called without the mode argument, it will return + what is immediately available on the stream (up to a certain size limit). + Only if there is no data immediately available will the current coroutine + be suspended until there is. + + On success this method will return the data read from stream in a Lua string. + Otherwise it will return `nil` followed by an error message. + If another coroutine is waiting for IO on the stream the error message + will be `'busy'`. + If the stream was interrupted (eg. by another coroutine calling + `stream:interrupt()`, or `stream:close()`) the error message will be + `'interrupted'`. + If the stream is closed either before calling the method or closed + from the other end during the read the error message will be `'closed'`. + +* __stream:write(data)__ + + Write the given data, which must be a Lua string, to the stream. + If the data cannot be immediately written to the stream the current + coroutine will be suspended until all data is written. + + Returns `true` on success or otherwise `nil` followed by an error message. + If another coroutine is waiting for IO on the stream the error message + will be `'busy'`. + If the stream was interrupted (eg. by another coroutine calling + `stream:interrupt()`, or `stream:close()`) the error message will be + `'interrupted'`. + If the stream is closed either before calling the method or closed + from the other end during the write the error message will be `'closed'`. + +* __stream:sendfile(file, [offset])__ + + Write the given file to the stream. This is more effektive than reading + from a file and writing to the socket since the data doesn't have to go + through userspace. It only works on socket streams though. + + The file must be a sendfile object as returned by `streams.sendfile()`. + + If the offset argument is given the transfer will begin at the given + offset into the file. + + Returns `true` on success or otherwise `nil` followed by an error message. + If another coroutine is waiting for IO on the stream the error message + will be `'busy'`. + If the stream was interrupted (eg. by another coroutine calling + `stream:interrupt()`, or `stream:close()`) the error message will be + `'interrupted'`. + If the stream is closed either before calling the method or closed + from the other end during the write the error message will be `'closed'`. + If the file is closed the error message will be `'file closed'`. + +* __stream:cork()__ + + Sets the `TCP_CORK` attribute on the stream. This will of course fail + on streams which are not TCP connections. + + Returns `true` on success or otherwise `nil` followed by an error message. + If another coroutine is waiting for IO on the stream the error message + will be `'busy'`. + If the stream is closed the error message will be `'closed'`. + +* __stream:uncork()__ + + Removes the `TCP_CORK` attribute on the stream. This will of course fail + on streams which are not TCP connections. + + Returns `true` on success or otherwise `nil` followed by an error message. + If another coroutine is waiting for IO on the stream the error message + will be `'busy'`. + If the stream is closed the error message will be `'closed'`. + + +The following methods are available on server objects. + +* __server:closed()__ + + Returns `true` if the server is closed, `false` otherwise. + +* __server:busy()__ + + Returns `true` if another coroutine is listening on the server object, + `false` otherwise. + +* __server:close()__ + + Closes the server object. If another coroutine is waiting for connections + on the object it will be interrupted. + + Returns `true` on succes or otherwise `nil` followed by an error message. + If the server is already closed the error message will be `'already closed'`. + +* __server:interrupt()__ + + Interrupt any coroutine waiting for new connections on the server object. + + Returns `true` on success and `nil, 'not busy'` if no coroutine is waiting + for connections on the server object. + +* __server:accept()__ + + This method will get a stream object of a new incoming connection. + If there are no incoming connections immediately available, + the current coroutine will be suspended until there is. + + Returns a new stream object on succes or otherwise `nil` followed by an + error message. + If another coroutine is already waiting for new connections on the server + object the error message will be `'busy'`. + If the server was interrupted (eg. by another coroutine calling + `server:interrupt()`, or `server:close()`) the error message will be + `'interrupted'`. + If the server object is closed the error message will be `'closed'`. + +* __server:autospawn(handler)__ + + This method will suspend the currently running coroutine while + listening for new connections to the server object. + When a new client connects it will automatically spawn a new + coroutine running the `handler` function with a stream object + for the new connection as first argument. + + If an error occurs the method will return `nil` followed by an error message. + If another coroutine is already waiting for new connections on the server + object the error message will be `'busy'`. + If the server was interrupted (eg. by another coroutine calling + `server:interrupt()`, or `server:close()`) the error message will be + `'interrupted'`. + If the server object is closed the error message will be `'closed'`. + +The following methods are available on sendfile objects. + +* __file:close()__ + + Closes the gives file. + + Returns `true` on success or otherwise `nil` followed by an error message. + +* __file:size()__ + + Returns the size of the file. + + License ------- @@ -23,8 +23,7 @@ #include <stdio.h> #include <assert.h> -#include "lem.h" - +#include <lem.h> #include <lualib.h> #if EV_USE_KQUEUE @@ -16,5 +16,5 @@ * along with LEM. If not, see <http://www.gnu.org/licenses/>. */ -#include "libev/ev-config.h" -#include "libev/ev.c" +#include "../libev/ev-config.h" +#include "../libev/ev.c" diff --git a/bin/lua.c b/bin/lua.c new file mode 100644 index 0000000..ec5c936 --- /dev/null +++ b/bin/lua.c @@ -0,0 +1,52 @@ +/* setup for luaconf.h */ +#define LUA_CORE +#define LUA_LIB +#define ltable_c +#define lvm_c +#include <luaconf.h> + +/* do not export internal symbols */ +#undef LUAI_FUNC +#undef LUAI_DDEC +#undef LUAI_DDEF +#define LUAI_FUNC static +#define LUAI_DDEC static +#define LUAI_DDEF static + +/* core */ +#include "../lua/lapi.c" +#include "../lua/lcode.c" +#include "../lua/lctype.c" +#include "../lua/ldebug.c" +#include "../lua/ldo.c" +#include "../lua/ldump.c" +#include "../lua/lfunc.c" +#include "../lua/lgc.c" +#include "../lua/llex.c" +#include "../lua/lmem.c" +#include "../lua/lobject.c" +#include "../lua/lopcodes.c" +#include "../lua/lparser.c" +#include "../lua/lstate.c" +#include "../lua/lstring.c" +#include "../lua/ltable.c" +#include "../lua/ltm.c" +#include "../lua/lundump.c" +#include "../lua/lvm.c" +#include "../lua/lzio.c" + +/* auxiliary library */ +#include "../lua/lauxlib.c" + +/* standard library */ +#include "../lua/lbaselib.c" +#include "../lua/lbitlib.c" +#include "../lua/lcorolib.c" +#include "../lua/ldblib.c" +#include "../lua/liolib.c" +#include "../lua/lmathlib.c" +#include "../lua/loadlib.c" +#include "../lua/loslib.c" +#include "../lua/lstrlib.c" +#include "../lua/ltablib.c" +#include "../lua/linit.c" diff --git a/configure.ac b/configure.ac index a62d657..7bd1c9f 100644 --- a/configure.ac +++ b/configure.ac @@ -5,9 +5,9 @@ builtin_lua_version='5.2' AC_LANG(C) AC_CONFIG_HEADERS([libev/ev-config.h:ev-config.h.in]) -AC_SUBST([headers], ['lem.h']) -AC_SUBST([objects], ['lem.o']) -AC_SUBST([SHARED]) +AC_SUBST([headers], ['lem.h streams.h']) +AC_SUBST([objects], ['bin/lem.o']) +AC_SUBST([SHARED], ['-shared']) AC_ARG_WITH([debug], [AS_HELP_STRING([--with-debug], @@ -49,8 +49,7 @@ AS_CASE(["x$target_os"], [xdarwin*], [ac_cv_func_kqueue=no] # kqueue seems to be broken on OSX [SHARED='-dynamiclib -Wl,-undefined,dynamic_lookup'] - [STRIP="$STRIP -x"], - [SHARED='-shared']) + [STRIP="$STRIP -x"]) # Checks for Lua. AS_CASE(["x$with_lua"], @@ -90,8 +89,8 @@ AS_CASE(["x$with_lua"], AS_IF([test "x$with_lua" = 'xbuiltin'], [AC_CHECK_LIB([m], [sin])] [AC_SEARCH_LIBS([dlopen], [dl])] - [objects="lua.o $objects"] - [headers="lua/luaconf.h lua/lua.h lua/lauxlib.h $headers"] + [objects="bin/lua.o $objects"] + [headers="luaconf.h lua.h lauxlib.h $headers"] [CPPFLAGS="$CPPFLAGS -Ilua"] [AS_IF([test "x$lmoddir" = 'x'], [lmoddir="\${datarootdir}/lua/$builtin_lua_version"])] [AS_IF([test "x$cmoddir" = 'x'], [cmoddir="\${libdir}/lua/$builtin_lua_version"])]) @@ -109,8 +108,8 @@ AC_CHECK_HEADERS([sys/eventfd.h sys/epoll.h sys/event.h]) #])], # [echo "conftest returned: `cat conftest.out`"],[])]) -objects="libev.o $objects" -headers="libev/ev-config.h libev/ev.h $headers" +objects="bin/libev.o $objects" +headers="ev-config.h ev.h $headers" CPPFLAGS="$CPPFLAGS -Ilibev" AS_IF([test "x$ac_cv_header_sys_eventfd_h" = 'xyes'], diff --git a/include/streams.h b/include/streams.h new file mode 100644 index 0000000..e139138 --- /dev/null +++ b/include/streams.h @@ -0,0 +1,52 @@ +/* + * This file is part of LEM, a Lua Event Machine. + * Copyright 2011-2012 Emil Renner Berthing + * + * LEM is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * LEM is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with LEM. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef _LEM_STREAMS_H +#define _LEM_STREAMS_H + +#include <lem.h> + +#define LEM_INPUTBUF_SIZE 1024 + +struct lem_inputbuf { + unsigned int start; + unsigned int end; + union { + void *p; + unsigned long u; + }; + int parts; + char buf[LEM_INPUTBUF_SIZE]; +}; + +enum lem_presult { + LEM_PMORE = 0, +}; + +enum lem_preason { + LEM_PCLOSED, + LEM_PERROR, +}; + +struct lem_parser { + void (*init)(lua_State *T, struct lem_inputbuf *b); + int (*process)(lua_State *T, struct lem_inputbuf *b); + int (*destroy)(lua_State *T, struct lem_inputbuf *b, enum lem_preason reason); +}; + +#endif diff --git a/lem/hathaway.lua b/lem/hathaway.lua new file mode 100644 index 0000000..2497232 --- /dev/null +++ b/lem/hathaway.lua @@ -0,0 +1,385 @@ +-- +-- This file is part of LEM, a Lua Event Machine. +-- Copyright 2011-2012 Emil Renner Berthing +-- +-- LEM is free software: you can redistribute it and/or +-- modify it under the terms of the GNU General Public License as +-- published by the Free Software Foundation, either version 3 of +-- the License, or (at your option) any later version. +-- +-- LEM is distributed in the hope that it will be useful, +-- but WITHOUT ANY WARRANTY; without even the implied warranty of +-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +-- GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License +-- along with LEM. If not, see <http://www.gnu.org/licenses/>. +-- + +local setmetatable = setmetatable +local tostring = tostring +local tonumber = tonumber +local pairs = pairs +local type = type +local date = os.date +local format = string.format +local concat = table.concat +local remove = table.remove + +local streams = require 'lem.streams' +require 'lem.http' + +local M = {} + +local status_string = { + [100] = '100 Continue', + [101] = '101 Switching Protocols', + [102] = '102 Processing', -- WebDAV + + [200] = '200 OK', + [201] = '201 Created', + [202] = '202 Accepted', + [203] = '203 Non-Authoritative Information', + [204] = '204 No Content', + [205] = '205 Reset Content', + [206] = '206 Partial Content', + [207] = '207 Multi-Status', -- WebDAV + + [300] = '300 Multiple Choices', + [301] = '301 Moved Permanently', + [302] = '302 Found', + [303] = '303 See Other', + [304] = '304 Not Modified', + [305] = '305 Use Proxy', + [306] = '306 Switch Proxy', + [307] = '307 Temporary Redirect', + + [400] = '400 Bad Request', + [401] = '401 Unauthorized', + [402] = '402 Payment Required', + [403] = '403 Forbidden', + [404] = '404 Not Found', + [405] = '405 Method Not Allowed', + -- ... + [417] = '417 Expectation Failed', + + [500] = '500 Internal Server Error', + [501] = '501 Not Implemented', + -- ... + [505] = '505 HTTP Version Not Supported', + -- ... +} +M.status_string = status_string + +function M.not_found(req, res) + if req.headers['Expect'] ~= '100-continue' then + req:body() + end + + res.status = 404 + res.headers['Content-Type'] = 'text/html; charset=UTF-8' + res:add([[ +<?xml version="1.0" encoding="UTF-8"?> +<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en"> +<head> +<title>Not Found</title> +</head> +<body> +<h1>Not found</h1> +</body> +</html> +]]) +end + +do + local function htmlerror(num, text) + local str = format([[ +<?xml version="1.0" encoding="UTF-8"?> +<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en"> +<head> +<title>%s</title> +</head> +<body> +<h1>%s</h1> +</body> +</html> +]], text, text) + return function(req, res) + res.status = num + res.headers['Content-Type'] = 'text/html; charset=UTF-8' + res.headers['Connection'] = 'close' + res:add(str) + end + end + + M.method_not_allowed = htmlerror(405, 'Method Not Allowed') + M.expectation_failed = htmlerror(417, 'Expectation Failed') + M.version_not_supported = htmlerror(505, 'HTTP Version Not Supported') +end + +function M.debug() end + +do + local lookup = {} + M.lookup = lookup + + function M.GET(uri, handler) + local path = lookup[uri] + if path then + path['HEAD'] = handler + path['GET'] = handler + else + path = { + ['HEAD'] = handler, + ['GET'] = handler, + } + lookup[uri] = path + end + end + + do + local function static_setter(method) + return function(uri, handler) + local path = lookup[uri] + if path then + path[method] = handler + else + lookup[uri] = { [method] = handler } + end + end + end + + M.POST = static_setter('POST') + M.PUT = static_setter('PUT') + M.DELETE = static_setter('DELETE') + end + + function M.GETM(pattern, handler) + local i = 1 + while true do + local entry = lookup[i] + if entry == nil then + lookup[i] = { pattern, + ['GET'] = handler, + ['HEAD'] = handler + } + break + end + if entry[1] == pattern then + entry['GET'] = handler + entry['HEAD'] = handler + break + end + i = i + 1 + end + end + + do + local function match_setter(method) + return function(pattern, handler) + local i = 1 + while true do + local entry = lookup[i] + if entry == nil then + lookup[i] = { pattern, [method] = handler } + break + end + if entry[1] == pattern then + entry[method] = handler + break + end + i = i + 1 + end + end + end + + M.POSTM = match_setter('POST') + M.PUTM = match_setter('PUT') + M.DELETEM = match_setter('DELETE') + end + + local Response = {} + Response.__index = Response + M.Response = Response + + function new_response(req) + local n = 0 + return setmetatable({ + headers = {}, + status = 200, + version = req.version, + add = function(self, ...) + n = n + 1 + self[n] = format(...) + end + }, Response) + end + + local function check_match(entry, req, res, ok, ...) + if not ok then return false end + local handler = entry[req.method] + if handler then + handler(req, res, ok, ...) + else + M.method_not_allowed(req, res) + end + return true + end + + local function handler(istream, ostream) + repeat + local req, err = istream:read('HTTPRequest') + if not req then M.debug(err) break end + local method, uri, version = req.method, req.uri, req.version + M.debug(format("%s %s HTTP/%s", method, uri, version)) + + req.ostream = ostream + local res = new_response(req) + + if version ~= '1.0' and version ~= '1.1' then + M.version_not_supported(req, res) + version = '1.1' + else + local expect = req.headers['Expect'] + if expect and expect ~= '100-continue' then + M.expectation_failed(req, res) + else + local path = lookup[uri] + if path then + local handler = path[method] + if handler then + handler(req, res) + else + M.method_not_allowed(req, res) + end + else + local i = 0 + repeat + i = i + 1 + local entry = lookup[i] + if not entry then + M.not_found(req, res) + break + end + until check_match(entry, req, res, uri:match(entry[1])) + end + end + end + + local headers = res.headers + local file, close = res.file, false + if type(file) == 'string' then + file, err = streams.sendfile(file) + if file then + close = true + else + M.debug(err) + res = new_response(req) + headers = res.headers + M.not_found(req, res) + end + end + + if res.status == 200 and #res == 0 and res.file == nil then + res.status = 204 + elseif headers['Content-Length'] == nil then + local len + if file then + len = file:size() + else + len = 0 + for i = 1, #res do + len = len + #res[i] + end + end + + headers['Content-Length'] = len + end + + if headers['Date'] == nil then + headers['Date'] = date('!%a, %d %b %Y %T GMT') + end + + if headers['Server'] == nil then + headers['Server'] = 'Hathaway/0.1 LEM/0.1' + end + + local robe, i = {}, 1 + do + local status = res.status + if type(status) == 'number' then + status = status_string[status] + end + + robe[1] = format('HTTP/%s %s\r\n', version, status) + end + + for k, v in pairs(headers) do + i = i + 1 + robe[i] = format('%s: %s\r\n', k, tostring(v)) + end + + i = i + 1 + robe[i] = '\r\n' + + local ok, err = ostream:cork() + if not ok then M.debug(err) break end + + local ok, err = ostream:write(concat(robe)) + if not ok then M.debug(err) break end + + if method ~= 'HEAD' then + if file then + ok, err = ostream:sendfile(file) + if close then file:close() end + else + ok, err = ostream:write(concat(res)) + end + if not ok then M.debug(err) break end + end + + local ok, err = ostream:uncork() + if not ok then M.debug(err) break end + + until version == '1.0' + or req.headers['Connection'] == 'close' + or headers['Connection'] == 'close' + + istream:close() + ostream:close() + end + + function M.Hathaway(address, port) + local server, err = streams.tcp4_listen(address, port) + if not server then M.debug(err) return nil, err end + + M.server = server + + local ok, err = server:autospawn(handler) + if not ok and err ~= 'interrupted' then + M.debug(err) + return nil, err + end + return true + end +end + +function M.import(env) + if not env then + env = _G + end + + env.GET = M.GET + env.POST = M.POST + env.PUT = M.PUT + env.DELETE = M.DELETE + env.GETM = M.GETM + env.POSTM = M.POSTM + env.PUTM = M.PUTM + env.DELETEM = M.DELETEM + env.Hathaway = M.Hathaway +end + +return M + +-- vim: ts=2 sw=2 noet: diff --git a/lem/http.lua b/lem/http.lua new file mode 100644 index 0000000..15f4125 --- /dev/null +++ b/lem/http.lua @@ -0,0 +1,93 @@ +-- +-- This file is part of LEM, a Lua Event Machine. +-- Copyright 2011-2012 Emil Renner Berthing +-- +-- LEM is free software: you can redistribute it and/or +-- modify it under the terms of the GNU General Public License as +-- published by the Free Software Foundation, either version 3 of +-- the License, or (at your option) any later version. +-- +-- LEM is distributed in the hope that it will be useful, +-- but WITHOUT ANY WARRANTY; without even the implied warranty of +-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +-- GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License +-- along with LEM. If not, see <http://www.gnu.org/licenses/>. +-- + +local streams = require 'lem.streams' +local M = require 'lem.http.core' + +streams.parsers['HTTPRequest'] = M.HTTPRequest +M.HTTPRequest = nil +streams.parsers['HTTPResponse'] = M.HTTPResponse +M.HTTPResponse = nil + +local tonumber = tonumber +local concat = table.concat + +function M.Request:body() + local len, body = self.headers['Content-Length'], '' + if not len then return body end + + len = tonumber(len) + if len <= 0 then return body end + + if self.headers['Expect'] == '100-continue' then + local ok, err = self.ostream:send('HTTP/1.1 100 Continue\r\n\r\n') + if not ok then return nil, err end + end + + local err + body, err = self.istream:read(len) + if not body then return nil, err end + + return body +end + +function M.Response:body_chunked() + local istream = self.istream + local t, n = {}, 0 + local line, err + while true do + line, err = istream:read('*l') + if not line then return nil, err end + + local num = tonumber(line, 16) + if not num then return nil, 'expectation failed' end + if num == 0 then break end + + local data, err = istream:read(num) + if not data then return nil, err end + + n = n + 1 + t[n] = data + + line, err = istream:read('*l') + if not line then return nil, err end + end + + line, err = istream:read('*l') + if not line then return nil, err end + + return t +end + +function M.Response:body() + if self.headers['Transfer-Encoding'] == 'chunked' then + return concat(self:body_chunked()) + end + + local num = self.headers['Content-Length'] + if not num then return nil, 'no content length specified' end + + num = tonumber(num) + if not num then return nil, 'invalid content length' end + + return self.istream:read(num) +end + +return M + +-- vim: ts=2 sw=2 noet: diff --git a/lem/http/core.c b/lem/http/core.c new file mode 100644 index 0000000..ed1dce0 --- /dev/null +++ b/lem/http/core.c @@ -0,0 +1,366 @@ +/* + * This file is part of LEM, a Lua Event Machine. + * Copyright 2011-2012 Emil Renner Berthing + * + * LEM is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * LEM is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with LEM. If not, see <http://www.gnu.org/licenses/>. + */ + +#include <streams.h> + +#if !(LUA_VERSION_NUM >= 502) +#define lua_getuservalue lua_getfenv +#define lua_setuservalue lua_setfenv +#endif + +enum classes { + C_CTL, /* control characters */ + C_LF, /* \n */ + C_CR, /* \r */ + C_LWS, /* space or \t */ + C_TSPCL, /* tspecials */ + C_SLASH, /* / */ + C_COLON, /* : */ + C_DOT, /* . */ + C_NUM, /* 0-9 */ + C_H, /* H */ + C_T, /* T */ + C_P, /* P */ + C_ETC, /* the rest */ + C_MAX +}; + +/* + * This array maps the first 128 ASCII characters into character classes + * The remaining characters should be mapped to C_ETC + */ +static const unsigned char ascii_class[128] = { + C_CTL, C_CTL, C_CTL, C_CTL, C_CTL, C_CTL, C_CTL, C_CTL, + C_CTL, C_LWS, C_LF, C_CTL, C_CTL, C_CR, C_CTL, C_CTL, + C_CTL, C_CTL, C_CTL, C_CTL, C_CTL, C_CTL, C_CTL, C_CTL, + C_CTL, C_CTL, C_CTL, C_CTL, C_CTL, C_CTL, C_CTL, C_CTL, + + C_LWS, C_ETC, C_TSPCL, C_ETC, C_ETC, C_ETC, C_ETC, C_ETC, + C_TSPCL, C_TSPCL, C_ETC, C_ETC, C_TSPCL, C_ETC, C_DOT, C_SLASH, + C_NUM, C_NUM, C_NUM, C_NUM, C_NUM, C_NUM, C_NUM, C_NUM, + C_NUM, C_NUM, C_COLON, C_TSPCL, C_TSPCL, C_TSPCL, C_TSPCL, C_TSPCL, + + C_TSPCL, C_ETC, C_ETC, C_ETC, C_ETC, C_ETC, C_ETC, C_ETC, + C_H, C_ETC, C_ETC, C_ETC, C_ETC, C_ETC, C_ETC, C_ETC, + C_P, C_ETC, C_ETC, C_ETC, C_T, C_ETC, C_ETC, C_ETC, + C_ETC, C_ETC, C_ETC, C_TSPCL, C_TSPCL, C_TSPCL, C_ETC, C_ETC, + + C_ETC, C_ETC, C_ETC, C_ETC, C_ETC, C_ETC, C_ETC, C_ETC, + C_H, C_ETC, C_ETC, C_ETC, C_ETC, C_ETC, C_ETC, C_ETC, + C_P, C_ETC, C_ETC, C_ETC, C_T, C_ETC, C_ETC, C_ETC, + C_ETC, C_ETC, C_ETC, C_TSPCL, C_ETC, C_TSPCL, C_ETC, C_CTL +}; + +enum states { + S_GO, + SMTD, + SMUS, + SURI, + SUHS, + SH__, + SHT1, + SHT2, + SHTP, + SSLH, + SMAV, + SDOT, + SMIV, + C_GO, + CH__, + CHT1, + CHT2, + CHTP, + CSLH, + CMAV, + CDOT, + CMIV, + CVNS, + CNUM, + CNTS, + CTXT, + SRE1, + SRE2, + SKEY, + SCOL, + SVAL, + SLWS, + SCN1, + SCN2, + SNL1, + SNL2, + SEND, + SMAX, + X___ = SMAX, + XMUS, + XUHS, + XVNS, + XNTS, + XRE1, + XKEY, + XCOL, + XVAL, + XEND +}; + +static const unsigned char state_table[SMAX][C_MAX] = { +/* ctl \n \r lws tsp / : . num H T P etc */ +/* S_GO*/ { X___,X___,X___,X___,X___,X___,X___,SMTD,SMTD,SMTD,SMTD,SMTD,SMTD }, +/* SMTD*/ { X___,X___,X___,XMUS,X___,X___,X___,SMTD,SMTD,SMTD,SMTD,SMTD,SMTD }, +/* SMUS*/ { X___,X___,X___,SMUS,SURI,SURI,SURI,SURI,SURI,SURI,SURI,SURI,SURI }, +/* SURI*/ { X___,X___,X___,XUHS,SURI,SURI,SURI,SURI,SURI,SURI,SURI,SURI,SURI }, +/* SUHS*/ { X___,X___,X___,SUHS,X___,X___,X___,X___,X___,SH__,X___,X___,X___ }, +/* SH__*/ { X___,X___,X___,X___,X___,X___,X___,X___,X___,X___,SHT1,X___,X___ }, +/* SHT1*/ { X___,X___,X___,X___,X___,X___,X___,X___,X___,X___,SHT2,X___,X___ }, +/* SHT2*/ { X___,X___,X___,X___,X___,X___,X___,X___,X___,X___,X___,SHTP,X___ }, +/* SHTP*/ { X___,X___,X___,X___,X___,SSLH,X___,X___,X___,X___,X___,X___,X___ }, +/* SSLH*/ { X___,X___,X___,X___,X___,X___,X___,X___,SMAV,X___,X___,X___,X___ }, +/* SMAV*/ { X___,X___,X___,X___,X___,X___,X___,SDOT,SMAV,X___,X___,X___,X___ }, +/* SDOT*/ { X___,X___,X___,X___,X___,X___,X___,X___,SMIV,X___,X___,X___,X___ }, +/* SMIV*/ { X___,X___,SRE1,X___,X___,X___,X___,X___,SMIV,X___,X___,X___,X___ }, +/* C_GO*/ { X___,X___,X___,X___,X___,X___,X___,X___,X___,CH__,X___,X___,X___ }, +/* CH__*/ { X___,X___,X___,X___,X___,X___,X___,X___,X___,X___,CHT1,X___,X___ }, +/* CHT1*/ { X___,X___,X___,X___,X___,X___,X___,X___,X___,X___,CHT2,X___,X___ }, +/* CHT2*/ { X___,X___,X___,X___,X___,X___,X___,X___,X___,X___,X___,CHTP,X___ }, +/* CHTP*/ { X___,X___,X___,X___,X___,CSLH,X___,X___,X___,X___,X___,X___,X___ }, +/* CSLH*/ { X___,X___,X___,X___,X___,X___,X___,X___,CMAV,X___,X___,X___,X___ }, +/* CMAV*/ { X___,X___,X___,X___,X___,X___,X___,CDOT,CMAV,X___,X___,X___,X___ }, +/* CDOT*/ { X___,X___,X___,X___,X___,X___,X___,X___,CMIV,X___,X___,X___,X___ }, +/* CMIV*/ { X___,X___,X___,XVNS,X___,X___,X___,X___,CMIV,X___,X___,X___,X___ }, +/* CVNS*/ { X___,X___,X___,CVNS,X___,X___,X___,X___,CNUM,X___,X___,X___,X___ }, +/* CNUM*/ { X___,X___,X___,XNTS,X___,X___,X___,X___,CNUM,X___,X___,X___,X___ }, +/* CNTS*/ { X___,X___,X___,CNTS,CTXT,X___,X___,X___,CTXT,CTXT,CTXT,CTXT,CTXT }, +/* CTXT*/ { X___,X___,XRE1,CTXT,CTXT,X___,X___,X___,CTXT,CTXT,CTXT,CTXT,CTXT }, +/* SRE1*/ { X___,SRE2,X___,X___,X___,X___,X___,X___,X___,X___,X___,X___,X___ }, +/* SRE2*/ { X___,X___,SEND,X___,X___,X___,X___,SKEY,SKEY,SKEY,SKEY,SKEY,SKEY }, +/* SKEY*/ { X___,X___,X___,X___,X___,X___,XCOL,SKEY,SKEY,SKEY,SKEY,SKEY,SKEY }, +/* SCOL*/ { X___,X___,SCN1,SCOL,SVAL,SVAL,SVAL,SVAL,SVAL,SVAL,SVAL,SVAL,SVAL }, +/* SVAL*/ { X___,X___,SNL1,SLWS,SVAL,SVAL,SVAL,SVAL,SVAL,SVAL,SVAL,SVAL,SVAL }, +/* SLWS*/ { X___,X___,SNL1,SLWS,XVAL,XVAL,XVAL,XVAL,XVAL,XVAL,XVAL,XVAL,XVAL }, +/* SCN1*/ { X___,SCN2,X___,X___,X___,X___,X___,X___,X___,X___,X___,X___,X___ }, +/* SCN2*/ { X___,X___,SEND,SCOL,X___,X___,X___,XKEY,XKEY,XKEY,XKEY,XKEY,XKEY }, +/* SNL1*/ { X___,SNL2,X___,X___,X___,X___,X___,X___,X___,X___,X___,X___,X___ }, +/* SNL2*/ { X___,X___,SEND,SLWS,X___,X___,X___,XKEY,XKEY,XKEY,XKEY,XKEY,XKEY }, +/* SEND*/ { X___,XEND,X___,X___,X___,X___,X___,X___,X___,X___,X___,X___,X___ }, +}; + +static void +parse_http_init(lua_State *T) +{ + /* create result table */ + lua_settop(T, 2); + lua_createtable(T, 0, 5); + lua_pushvalue(T, 1); + lua_setfield(T, -2, "istream"); +} + +static void +parse_http_req_init(lua_State *T, struct lem_inputbuf *b) +{ + b->u = S_GO; + parse_http_init(T); +} + +static void +parse_http_res_init(lua_State *T, struct lem_inputbuf *b) +{ + b->u = C_GO; + parse_http_init(T); +} + +static int +parse_http_process(lua_State *T, struct lem_inputbuf *b) +{ + unsigned char state = b->u & 0xFF; + unsigned int w = b->u >> 8; + unsigned int r = b->start; + unsigned int end = b->end; + + while (r < end) { + unsigned char ch = b->buf[r++]; + + state = state_table[state][ch > 127 ? C_ETC : ascii_class[ch]]; + /*lem_debug("char = %c (%hhu), state = %hhu", ch, ch, state);*/ + switch (state) { + case SMTD: + case SURI: + case SMAV: + case SDOT: + case SMIV: + case CMAV: + case CDOT: + case CMIV: + case CNUM: + case CTXT: + case SKEY: + case SVAL: + b->buf[w++] = ch; + break; + + case SRE1: + lua_pushlstring(T, b->buf, w); + lua_setfield(T, -2, "version"); + w = 0; + lua_newtable(T); + break; + + case X___: + lem_debug("HTTP parse error"); + lua_settop(T, 0); + lua_pushnil(T); + lua_pushliteral(T, "parse error"); + return 2; + + case XMUS: + state = SMUS; + lua_pushlstring(T, b->buf, w); + lua_setfield(T, -2, "method"); + w = 0; + break; + + case XUHS: + state = SUHS; + lua_pushlstring(T, b->buf, w); + lua_setfield(T, -2, "uri"); + w = 0; + break; + + case XVNS: + state = CVNS; + lua_pushlstring(T, b->buf, w); + lua_setfield(T, -2, "version"); + w = 0; + break; + + case XNTS: + state = CNTS; + { + unsigned int n = 0; + unsigned int k; + + for (k = 0; k < w; k++) { + n *= 10; + n += b->buf[k] - '0'; + } + + lua_pushnumber(T, n); + } + lua_setfield(T, -2, "status"); + w = 0; + break; + + case XRE1: + state = SRE1; + lua_pushlstring(T, b->buf, w); + lua_setfield(T, -2, "text"); + w = 0; + lua_newtable(T); + break; + + case XCOL: + state = SCOL; + lua_pushlstring(T, b->buf, w); + w = 0; + break; + + case XVAL: + state = SVAL; + b->buf[w++] = ' '; + b->buf[w++] = ch; + break; + + case XKEY: + state = SKEY; + lua_pushlstring(T, b->buf, w); + lua_rawset(T, -3); + w = 0; + b->buf[w++] = ch; + break; + + case XEND: + /* in case there are no headers this is false */ + if (lua_type(T, -1) == LUA_TSTRING) { + lua_pushlstring(T, b->buf, w); + lua_rawset(T, -3); + } + lua_setfield(T, -2, "headers"); + + /* set metatable */ + lua_getuservalue(T, 2); + lua_setmetatable(T, -2); + + if (r == end) + b->start = b->end = 0; + else + b->start = r; + return 1; + } + } + + if (w == LEM_INPUTBUF_SIZE - 1) { + b->start = b->end = 0; + lua_settop(T, 0); + lua_pushnil(T); + lua_pushliteral(T, "out of buffer space"); + return 2; + } + + b->start = b->end = w + 1; + b->u = (w << 8) | state; + return LEM_PMORE; +} + +int +luaopen_lem_http_core(lua_State *L) +{ + struct lem_parser *p; + + /* create module table M */ + lua_newtable(L); + + /* create Request metatable */ + lua_newtable(L); + lua_pushvalue(L, -1); + lua_setfield(L, -2, "__index"); + /* insert Request metatable */ + lua_setfield(L, -2, "Request"); + + /* create Response metatable */ + lua_newtable(L); + lua_pushvalue(L, -1); + lua_setfield(L, -2, "__index"); + /* insert Request metatable */ + lua_setfield(L, -2, "Response"); + + p = lua_newuserdata(L, sizeof(struct lem_parser)); + p->init = parse_http_req_init; + p->process = parse_http_process; + p->destroy = NULL; + lua_getfield(L, -2, "Request"); + lua_setuservalue(L, -2); + lua_setfield(L, -2, "HTTPRequest"); + + p = lua_newuserdata(L, sizeof(struct lem_parser)); + p->init = parse_http_res_init; + p->process = parse_http_process; + p->destroy = NULL; + lua_getfield(L, -2, "Response"); + lua_setuservalue(L, -2); + lua_setfield(L, -2, "HTTPResponse"); + + return 1; +} diff --git a/lem/streams.lua b/lem/streams.lua new file mode 100644 index 0000000..f4f9d4f --- /dev/null +++ b/lem/streams.lua @@ -0,0 +1,50 @@ +-- +-- This file is part of LEM, a Lua Event Machine. +-- Copyright 2011-2012 Emil Renner Berthing +-- +-- LEM is free software: you can redistribute it and/or +-- modify it under the terms of the GNU General Public License as +-- published by the Free Software Foundation, either version 3 of +-- the License, or (at your option) any later version. +-- +-- LEM is distributed in the hope that it will be useful, +-- but WITHOUT ANY WARRANTY; without even the implied warranty of +-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +-- GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License +-- along with LEM. If not, see <http://www.gnu.org/licenses/>. +-- + +local M = require 'lem.streams.core' + +do + local type = type + local parsers = M.parsers + local parser_available = parsers.available + parsers.available = nil + local parser_target = parsers.target + parsers.target = nil + + function M.reader(readp) + return function(self, fmt, ...) + if fmt == nil then + return readp(self, parser_available) + end + if type(fmt) == 'number' then + return readp(self, parser_target, fmt) + end + local parser = parsers[fmt] + if parser == nil then + error('invalid format', 2) + end + return readp(self, parser, ...) + end + end + + M.IStream.read = M.reader(M.IStream.readp) +end + +return M + +-- vim: ts=2 sw=2 noet: diff --git a/lem/streams/core.c b/lem/streams/core.c new file mode 100644 index 0000000..e9ed9f4 --- /dev/null +++ b/lem/streams/core.c @@ -0,0 +1,258 @@ +/* + * This file is part of LEM, a Lua Event Machine. + * Copyright 2011-2012 Emil Renner Berthing + * + * LEM is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * LEM is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with LEM. If not, see <http://www.gnu.org/licenses/>. + */ + +#include <stdlib.h> +#include <unistd.h> +#include <errno.h> +#include <string.h> + +#include <fcntl.h> +#include <sys/stat.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <arpa/inet.h> +#include <netinet/tcp.h> +#include <netdb.h> + +#if defined(__FreeBSD__) || defined(__APPLE__) +#include <netinet/in.h> +#else +#include <sys/sendfile.h> +#endif + +#include <streams.h> + +#include "sendfile.c" +#include "stream.c" +#include "server.c" +#include "tcp.c" +#include "parsers.c" + +static int +module_index(lua_State *T) +{ + const char *key = lua_tostring(T, 2); + int fd; + + if (strcmp(key, "stdin") == 0) + fd = 0; + else if (strcmp(key, "stdout") == 0) + fd = 1; + else if (strcmp(key, "stderr") == 0) + fd = 2; + else + return 0; + + /* make the socket non-blocking */ + if (fcntl(fd, F_SETFL, O_NONBLOCK) < 0) { + lua_pushnil(T); + lua_pushfstring(T, "error making filedescriptor non-blocking: %s", + strerror(errno)); + return 2; + } + + if (fd == 0) + (void)istream_new(T, fd, lua_upvalueindex(1)); + else + (void)ostream_new(T, fd, lua_upvalueindex(2)); + + /* save this object so we don't initialize it again */ + lua_pushvalue(T, 2); + lua_pushvalue(T, -2); + lua_rawset(T, 1); + + return 1; +} + +int +luaopen_lem_streams_core(lua_State *L) +{ + /* create module table */ + lua_newtable(L); + + /* create metatable for sendfile objects */ + lua_newtable(L); + /* mt.__index = mt */ + lua_pushvalue(L, -1); + lua_setfield(L, -2, "__index"); + /* mt.__gc = <sendfile_gc> */ + lua_pushcfunction(L, sendfile_gc); + lua_setfield(L, -2, "__gc"); + /* mt.close = <sendfile_close> */ + lua_pushcfunction(L, sendfile_close); + lua_setfield(L, -2, "close"); + /* mt.size = <sendfile_size> */ + lua_pushcfunction(L, sendfile_size); + lua_setfield(L, -2, "size"); + /* insert table */ + lua_setfield(L, -2, "SendFile"); + + /* insert sendfile function */ + lua_getfield(L, -1, "SendFile"); /* upvalue 1 = SendFile */ + lua_pushcclosure(L, sendfile_open, 1); + lua_setfield(L, -2, "sendfile"); + + /* create metatable for input stream objects */ + lua_newtable(L); + /* mt.__index = mt */ + lua_pushvalue(L, -1); + lua_setfield(L, -2, "__index"); + /* mt.__gc = <stream_close> */ + lua_pushcfunction(L, istream_close); + lua_setfield(L, -2, "__gc"); + /* mt.closed = <stream_closed> */ + lua_pushcfunction(L, stream_closed); + lua_setfield(L, -2, "closed"); + /* mt.busy = <stream_busy> */ + lua_pushcfunction(L, stream_busy); + lua_setfield(L, -2, "busy"); + /* mt.interrupt = <stream_interrupt> */ + lua_pushcfunction(L, stream_interrupt); + lua_setfield(L, -2, "interrupt"); + /* mt.close = <istream_close> */ + lua_pushcfunction(L, istream_close); + lua_setfield(L, -2, "close"); + /* mt.readp = <stream_readp> */ + lua_pushcfunction(L, stream_readp); + lua_setfield(L, -2, "readp"); + /* insert table */ + lua_setfield(L, -2, "IStream"); + + /* create metatable for output stream objects */ + lua_newtable(L); + /* mt.__index = mt */ + lua_pushvalue(L, -1); + lua_setfield(L, -2, "__index"); + /* mt.__gc = <ostream_close> */ + lua_pushcfunction(L, ostream_close); + lua_setfield(L, -2, "__gc"); + /* mt.closed = <stream_closed> */ + lua_pushcfunction(L, stream_closed); + lua_setfield(L, -2, "closed"); + /* mt.busy = <stream_busy> */ + lua_pushcfunction(L, stream_busy); + lua_setfield(L, -2, "busy"); + /* mt.interrupt = <stream_interrupt> */ + lua_pushcfunction(L, stream_interrupt); + lua_setfield(L, -2, "interrupt"); + /* mt.close = <ostream_close> */ + lua_pushcfunction(L, ostream_close); + lua_setfield(L, -2, "close"); + /* mt.write = <stream_write> */ + lua_pushcfunction(L, stream_write); + lua_setfield(L, -2, "write"); + /* mt.cork = <stream_cork> */ + lua_pushcfunction(L, stream_cork); + lua_setfield(L, -2, "cork"); + /* mt.uncork = <stream_uncork> */ + lua_pushcfunction(L, stream_uncork); + lua_setfield(L, -2, "uncork"); + /* mt.sendfile = <ostream_sendfile> */ + lua_pushcfunction(L, stream_sendfile); + lua_setfield(L, -2, "sendfile"); + /* insert table */ + lua_setfield(L, -2, "OStream"); + + /* create metatable for server objects */ + lua_newtable(L); + /* mt.__index = mt */ + lua_pushvalue(L, -1); + lua_setfield(L, -2, "__index"); + /* mt.__gc = <server_close> */ + lua_pushcfunction(L, server_close); + lua_setfield(L, -2, "__gc"); + /* mt.closed = <server_closed> */ + lua_pushcfunction(L, server_closed); + lua_setfield(L, -2, "closed"); + /* mt.busy = <server_busy> */ + lua_pushcfunction(L, server_busy); + lua_setfield(L, -2, "busy"); + /* mt.close = <server_close> */ + lua_pushcfunction(L, server_close); + lua_setfield(L, -2, "close"); + /* mt.interrupt = <server_interrupt> */ + lua_pushcfunction(L, server_interrupt); + lua_setfield(L, -2, "interrupt"); + /* mt.accept = <server_accept> */ + lua_getfield(L, -2, "IStream"); /* upvalue 1 = IStream */ + lua_getfield(L, -3, "OStream"); /* upvalue 2 = OStream */ + lua_pushcclosure(L, server_accept, 2); + lua_setfield(L, -2, "accept"); + /* mt.autospawn = <server_autospawn> */ + lua_getfield(L, -2, "IStream"); /* upvalue 1 = IStream */ + lua_getfield(L, -3, "OStream"); /* upvalue 2 = OStream */ + lua_pushcclosure(L, server_autospawn, 2); + lua_setfield(L, -2, "autospawn"); + /* insert table */ + lua_setfield(L, -2, "Server"); + + /* insert open function */ + lua_getfield(L, -1, "IStream"); /* upvalue 1 = IStream */ + lua_getfield(L, -2, "OStream"); /* upvalue 2 = OStream */ + lua_pushcclosure(L, stream_open, 2); + lua_setfield(L, -2, "open"); + /* insert popen function */ + lua_getfield(L, -1, "IStream"); /* upvalue 1 = IStream */ + lua_getfield(L, -2, "OStream"); /* upvalue 2 = OStream */ + lua_pushcclosure(L, stream_popen, 2); + lua_setfield(L, -2, "popen"); + + /* insert the connect function */ + lua_getfield(L, -1, "IStream"); /* upvalue 1 = IStream */ + lua_getfield(L, -2, "OStream"); /* upvalue 2 = OStream */ + lua_pushcclosure(L, tcp_connect, 2); + lua_setfield(L, -2, "tcp_connect"); + /* insert the tcp4_listen function */ + lua_getfield(L, -1, "Server"); /* upvalue 1 = Server */ + lua_pushcclosure(L, tcp4_listen, 1); + lua_setfield(L, -2, "tcp4_listen"); + /* insert the tcp6_listen function */ + lua_getfield(L, -1, "Server"); /* upvalue 1 = Server */ + lua_pushcclosure(L, tcp6_listen, 1); + lua_setfield(L, -2, "tcp6_listen"); + + /* create parser table */ + lua_createtable(L, 0, 4); + /* push parser_line */ + lua_pushlightuserdata(L, (void *)&parser_available); + lua_setfield(L, -2, "available"); + /* push parser_target */ + lua_pushlightuserdata(L, (void *)&parser_target); + lua_setfield(L, -2, "target"); + /* push parser_all */ + lua_pushlightuserdata(L, (void *)&parser_all); + lua_setfield(L, -2, "*a"); + /* push parser_line */ + lua_pushlightuserdata(L, (void *)&parser_line); + lua_setfield(L, -2, "*l"); + /* insert parser table */ + lua_setfield(L, -2, "parsers"); + + /* create metatable for the module */ + lua_newtable(L); + /* insert the index function */ + lua_getfield(L, -2, "IStream"); /* upvalue 1 = IStream */ + lua_getfield(L, -3, "OStream"); /* upvalue 2 = OStream */ + lua_pushcclosure(L, module_index, 2); + lua_setfield(L, -2, "__index"); + + /* set the metatable */ + lua_setmetatable(L, -2); + + return 1; +} diff --git a/lem/streams/parsers.c b/lem/streams/parsers.c new file mode 100644 index 0000000..72cd671 --- /dev/null +++ b/lem/streams/parsers.c @@ -0,0 +1,175 @@ +/* + * This file is part of LEM, a Lua Event Machine. + * Copyright 2011-2012 Emil Renner Berthing + * + * LEM is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * LEM is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with LEM. If not, see <http://www.gnu.org/licenses/>. + */ + +/* + * read available data + */ +static int +parse_available_process(lua_State *T, struct lem_inputbuf *b) +{ + size_t size = b->end - b->start; + + if (size == 0) + return LEM_PMORE; + + lua_pushlstring(T, b->buf + b->start, size); + b->start = b->end = 0; + return 1; +} + +static const struct lem_parser parser_available = { + .process = parse_available_process, +}; + +/* + * read a specified number of bytes + */ +static void +parse_target_init(lua_State *T, struct lem_inputbuf *b) +{ + b->u = luaL_checknumber(T, 3); + b->parts = 1; + lua_settop(T, 2); +} + +static int +parse_target_process(lua_State *T, struct lem_inputbuf *b) +{ + unsigned int target = b->u; + unsigned int size = b->end - b->start; + + if (size >= target) { + lua_pushlstring(T, b->buf + b->start, target); + lua_concat(T, b->parts); + b->start += target; + if (b->start == b->end) + b->start = b->end = 0; + + return 1; + } + + if (b->end == LEM_INPUTBUF_SIZE) { + lua_pushlstring(T, b->buf + b->start, size); + b->parts++; + b->start = b->end = 0; + b->u = target - size; + } + + return LEM_PMORE; +} + +static const struct lem_parser parser_target = { + .init = parse_target_init, + .process = parse_target_process, +}; + +/* + * read all data until stream closes + */ +static void +parse_all_init(lua_State *T, struct lem_inputbuf *b) +{ + b->parts = 0; + lua_settop(T, 2); +} + +static int +parse_all_process(lua_State *T, struct lem_inputbuf *b) +{ + if (b->end == LEM_INPUTBUF_SIZE) { + lua_pushlstring(T, b->buf + b->start, + LEM_INPUTBUF_SIZE - b->start); + b->start = b->end = 0; + b->parts++; + } + + return LEM_PMORE; +} + +static int +parse_all_destroy(lua_State *T, struct lem_inputbuf *b, enum lem_preason reason) +{ + unsigned int size; + + if (reason != LEM_PCLOSED) + return LEM_PMORE; + + size = b->end - b->start; + if (size > 0) { + lua_pushlstring(T, b->buf + b->start, size); + b->start = b->end = 0; + b->parts++; + } + + lua_concat(T, b->parts); + return 1; +} + +static const struct lem_parser parser_all = { + .init = parse_all_init, + .process = parse_all_process, + .destroy = parse_all_destroy, +}; + +/* + * read a line + */ +static void +parse_line_init(lua_State *T, struct lem_inputbuf *b) +{ + const char *stopbyte = luaL_optstring(T, 3, "\n"); + + b->u = (b->start << 8) | stopbyte[0]; + b->parts = 1; + lua_settop(T, 2); +} + +static int +parse_line_process(lua_State *T, struct lem_inputbuf *b) +{ + unsigned int i; + unsigned char stopbyte = b->u & 0xFF; + + for (i = b->u >> 8; i < b->end; i++) { + if (b->buf[i] == stopbyte) { + lua_pushlstring(T, b->buf + b->start, i - b->start); + lua_concat(T, b->parts); + i++; + if (i == b->end) + b->start = b->end = 0; + else + b->start = i; + return 1; + } + } + + if (b->end == LEM_INPUTBUF_SIZE) { + lua_pushlstring(T, b->buf + b->start, b->end - b->start); + b->parts++; + b->start = b->end = 0; + b->u = stopbyte; + } else + b->u = (i << 8) | stopbyte; + + return LEM_PMORE; +} + +static const struct lem_parser parser_line = { + .init = parse_line_init, + .process = parse_line_process, +}; diff --git a/lem/streams/queue.lua b/lem/streams/queue.lua new file mode 100644 index 0000000..7672798 --- /dev/null +++ b/lem/streams/queue.lua @@ -0,0 +1,78 @@ +-- +-- This file is part of lem-streams. +-- Copyright 2011 Emil Renner Berthing +-- +-- lem-streams is free software: you can redistribute it and/or +-- modify it under the terms of the GNU General Public License as +-- published by the Free Software Foundation, either version 3 of +-- the License, or (at your option) any later version. +-- +-- lem-streams is distributed in the hope that it will be useful, +-- but WITHOUT ANY WARRANTY; without even the implied warranty of +-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +-- GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License +-- along with lem-streams. If not, see <http://www.gnu.org/licenses/>. +-- + +local utils = require 'lem.utils' + +local setmetatable = setmetatable +local thisthread, suspend, resume + = utils.thisthread, utils.suspend, utils.resume + +local QOStream = {} +QOStream.__index = QOStream + +function QOStream:closed(...) + return self.stream:closed(...) +end + +function QOStream:interrupt(...) + return self.stream:interrupt(...) +end + +function QOStream:close(...) + return self.stream:close(...) +end + +function QOStream:write(...) + local nxt = self.next + if nxt == 0 then + nxt = 1 + self.next = 1 + else + local me = nxt + + self[me] = thisthread() + nxt = #self+1 + self.next = nxt + suspend() + self[me] = nil + end + + local ok, err = self.stream:write(...) + + nxt = self[nxt] + if nxt then + resume(nxt) + else + self.next = 0 + end + + if not ok then return nil, err end + return ok +end + +local function wrap(stream, ...) + if not stream then return stream, ... end + return setmetatable({ stream = stream, next = 0 }, QOStream) +end + +return { + QOStream = QOStream, + wrap = wrap, +} + +-- vim: set ts=2 sw=2 noet: diff --git a/lem/streams/sendfile.c b/lem/streams/sendfile.c new file mode 100644 index 0000000..c3395e3 --- /dev/null +++ b/lem/streams/sendfile.c @@ -0,0 +1,120 @@ +/* + * This file is part of LEM, a Lua Event Machine. + * Copyright 2011-2012 Emil Renner Berthing + * + * LEM is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * LEM is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with LEM. If not, see <http://www.gnu.org/licenses/>. + */ + +struct lem_sendfile { + int fd; + off_t size; +}; + +static int +sendfile_open(lua_State *T) +{ + const char *path = luaL_checkstring(T, 1); + int fd; + struct stat buf; + struct lem_sendfile *f; + + fd = open(path, O_RDONLY | O_NONBLOCK); + if (fd < 0) { + int err = errno; + + lua_pushnil(T); + switch (err) { + case ENOENT: + lua_pushliteral(T, "not found"); + break; + case EACCES: + lua_pushliteral(T, "permission denied"); + break; + default: + lua_pushstring(T, strerror(err)); + } + return 2; + } + + if (fstat(fd, &buf)) { + lua_pushnil(T); + lua_pushstring(T, strerror(errno)); + (void)close(fd); + return 2; + } + + /* create userdata and set the metatable */ + f = lua_newuserdata(T, sizeof(struct lem_sendfile)); + lua_pushvalue(T, lua_upvalueindex(1)); + lua_setmetatable(T, -2); + + /* initialize userdata */ + f->fd = fd; + f->size = buf.st_size; + + return 1; +} + +static int +sendfile_gc(lua_State *T) +{ + struct lem_sendfile *f = lua_touserdata(T, 1); + + if (f->fd < 0) + return 0; + + (void)close(f->fd); + return 0; +} + +static int +sendfile_close(lua_State *T) +{ + struct lem_sendfile *f; + + luaL_checktype(T, 1, LUA_TUSERDATA); + f = lua_touserdata(T, 1); + if (f->fd < 0) { + lua_pushnil(T); + lua_pushliteral(T, "already closed"); + return 2; + } + + if (close(f->fd)) { + lua_pushnil(T); + lua_pushstring(T, strerror(errno)); + return 2; + } + + f->fd = -1; + lua_pushboolean(T, 1); + return 1; +} + +static int +sendfile_size(lua_State *T) +{ + struct lem_sendfile *f; + + luaL_checktype(T, 1, LUA_TUSERDATA); + f = lua_touserdata(T, 1); + if (f->fd < 0) { + lua_pushnil(T); + lua_pushliteral(T, "closed"); + return 2; + } + + lua_pushnumber(T, (lua_Number)f->size); + return 1; +} diff --git a/lem/streams/server.c b/lem/streams/server.c new file mode 100644 index 0000000..c2c03cf --- /dev/null +++ b/lem/streams/server.c @@ -0,0 +1,298 @@ +/* + * This file is part of LEM, a Lua Event Machine. + * Copyright 2011-2012 Emil Renner Berthing + * + * LEM is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * LEM is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with LEM. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef MAXPENDING +#define MAXPENDING 50 +#endif + +static int +server_closed(lua_State *T) +{ + struct ev_io *w; + + luaL_checktype(T, 1, LUA_TUSERDATA); + w = lua_touserdata(T, 1); + lua_pushboolean(T, w->fd < 0); + return 1; +} + +static int +server_busy(lua_State *T) +{ + struct ev_io *w; + + luaL_checktype(T, 1, LUA_TUSERDATA); + w = lua_touserdata(T, 1); + lua_pushboolean(T, w->data != NULL); + return 1; +} + +static int +server_close(lua_State *T) +{ + struct ev_io *w; + int ret; + + luaL_checktype(T, 1, LUA_TUSERDATA); + w = lua_touserdata(T, 1); + if (w->fd < 0) { + lua_pushnil(T); + lua_pushliteral(T, "already closed"); + return 2; + } + + if (w->data != NULL) { + lem_debug("interrupting listen"); + ev_io_stop(LEM_ w); + lua_pushnil(w->data); + lua_pushliteral(w->data, "interrupted"); + lem_queue(w->data, 2); + w->data = NULL; + } + + lem_debug("closing server.."); + + if (close(w->fd)) { + lua_pushnil(T); + lua_pushstring(T, strerror(errno)); + ret = 2; + } else { + lua_pushboolean(T, 1); + ret = 1; + } + + w->fd = -1; + return ret; +} + +static int +server_interrupt(lua_State *T) +{ + struct ev_io *w; + + luaL_checktype(T, 1, LUA_TUSERDATA); + w = lua_touserdata(T, 1); + if (w->data == NULL) { + lua_pushnil(T); + lua_pushliteral(T, "not busy"); + return 2; + } + + lem_debug("interrupting listening"); + ev_io_stop(LEM_ w); + lua_pushnil(w->data); + lua_pushliteral(w->data, "interrupted"); + lem_queue(w->data, 2); + w->data = NULL; + + lua_pushboolean(T, 1); + return 1; +} + +static int +try_accept(lua_State *T, struct ev_io *w) +{ + struct sockaddr client_addr; + unsigned int client_addrlen; + int sock; + struct istream *is; + struct ostream *os; + + sock = accept(w->fd, &client_addr, &client_addrlen); + if (sock < 0) { + if (errno == EAGAIN || errno == ECONNABORTED) + return 0; + lua_pushnil(T); + lua_pushfstring(T, "error accepting connection: %s", + strerror(errno)); + return -1; + } + + /* make the socket non-blocking */ + if (fcntl(sock, F_SETFL, O_NONBLOCK) < 0) { + close(sock); + lua_pushnil(T); + lua_pushfstring(T, "error making socket non-blocking: %s", + strerror(errno)); + return -1; + } + + is = istream_new(T, sock, lua_upvalueindex(1)); + os = ostream_new(T, sock, lua_upvalueindex(2)); + is->twin = os; + os->twin = is; + + return 1; +} + +static void +server_accept_handler(EV_P_ struct ev_io *w, int revents) +{ + (void)revents; + + switch (try_accept(w->data, w)) { + case 0: + return; + + case 1: + break; + + default: + close(w->fd); + w->fd = -1; + } + + ev_io_stop(EV_A_ w); + lem_queue(w->data, 2); + w->data = NULL; +} + +static int +server_accept(lua_State *T) +{ + struct ev_io *w; + + luaL_checktype(T, 1, LUA_TUSERDATA); + w = lua_touserdata(T, 1); + if (w->fd < 0) { + lua_pushnil(T); + lua_pushliteral(T, "closed"); + return 2; + } + + if (w->data != NULL) { + lua_pushnil(T); + lua_pushliteral(T, "busy"); + return 2; + } + + switch (try_accept(T, w)) { + case 0: + w->cb = server_accept_handler; + w->data = T; + ev_io_start(LEM_ w); + + /* yield server object */ + lua_settop(T, 1); + return lua_yield(T, 1); + + case 1: + break; + + + default: + close(w->fd); + w->fd = -1; + } + + return 2; +} + +static void +server_autospawn_handler(EV_P_ struct ev_io *w, int revents) +{ + lua_State *T = w->data; + struct sockaddr client_addr; + unsigned int client_addrlen; + int sock; + lua_State *S; + struct istream *is; + struct ostream *os; + + (void)revents; + + /* dequeue the incoming connection */ + client_addrlen = sizeof(struct sockaddr); + sock = accept(w->fd, &client_addr, &client_addrlen); + if (sock < 0) { + if (errno == EAGAIN || errno == ECONNABORTED) + return; + lua_pushnil(T); + lua_pushfstring(T, "error accepting connection: %s", + strerror(errno)); + goto error; + } + + /* make the socket non-blocking */ + if (fcntl(sock, F_SETFL, O_NONBLOCK) < 0) { + close(sock); + lua_pushnil(T); + lua_pushfstring(T, "error making socket non-blocking: %s", + strerror(errno)); + goto error; + } + + S = lem_newthread(); + + /* copy handler function to thread */ + lua_pushvalue(T, 2); + lua_xmove(T, S, 1); + + /* create streams */ + is = istream_new(T, sock, lua_upvalueindex(1)); + os = ostream_new(T, sock, lua_upvalueindex(2)); + is->twin = os; + os->twin = is; + + /* move streams to new thread */ + lua_xmove(T, S, 2); + + lem_queue(S, 2); + return; + +error: + ev_io_stop(EV_A_ w); + close(w->fd); + w->fd = -1; + + lem_queue(T, 2); + w->data = NULL; +} + +static int +server_autospawn(lua_State *T) +{ + struct ev_io *w; + + luaL_checktype(T, 1, LUA_TUSERDATA); + luaL_checktype(T, 2, LUA_TFUNCTION); + + w = lua_touserdata(T, 1); + if (w->fd < 0) { + lua_pushnil(T); + lua_pushliteral(T, "closed"); + return 2; + } + + if (w->data != NULL) { + lua_pushnil(T); + lua_pushliteral(T, "busy"); + return 2; + } + + w->cb = server_autospawn_handler; + w->data = T; + ev_io_start(LEM_ w); + + lem_debug("yielding"); + + /* yield server object, function and metatable*/ + lua_settop(T, 2); + lua_pushvalue(T, lua_upvalueindex(1)); + return lua_yield(T, 3); +} diff --git a/lem/streams/stream.c b/lem/streams/stream.c new file mode 100644 index 0000000..50ffc50 --- /dev/null +++ b/lem/streams/stream.c @@ -0,0 +1,802 @@ +/* + * This file is part of LEM, a Lua Event Machine. + * Copyright 2011-2012 Emil Renner Berthing + * + * LEM is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * LEM is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with LEM. If not, see <http://www.gnu.org/licenses/>. + */ + +struct ostream; + +struct istream { + ev_io w; + struct ostream *twin; + struct lem_parser *p; + struct lem_inputbuf buf; +}; + +struct ostream { + ev_io w; + struct istream *twin; + const char *data; + size_t len; +}; + +static struct istream * +istream_new(lua_State *T, int fd, int mt) +{ + struct istream *s; + + /* create userdata and set the metatable */ + s = lua_newuserdata(T, sizeof(struct istream)); + lua_pushvalue(T, mt); + lua_setmetatable(T, -2); + + /* initialize userdata */ + ev_io_init(&s->w, NULL, fd, EV_READ); + s->w.data = NULL; + s->twin = NULL; + s->buf.start = s->buf.end = 0; + + return s; +} + +static int +stream_closed(lua_State *T) +{ + struct ev_io *w; + + luaL_checktype(T, 1, LUA_TUSERDATA); + w = lua_touserdata(T, 1); + lua_pushboolean(T, w->fd < 0); + return 1; +} + +static int +stream_busy(lua_State *T) +{ + struct ev_io *w; + + luaL_checktype(T, 1, LUA_TUSERDATA); + w = lua_touserdata(T, 1); + lua_pushboolean(T, w->data != NULL); + return 1; +} + +static int +stream_interrupt(lua_State *T) +{ + struct ev_io *w; + lua_State *S; + + luaL_checktype(T, 1, LUA_TUSERDATA); + w = lua_touserdata(T, 1); + S = w->data; + if (S == NULL) { + lua_pushnil(T); + lua_pushliteral(T, "not busy"); + return 2; + } + + lem_debug("interrupting io action"); + ev_io_stop(LEM_ w); + w->data = NULL; + lua_settop(S, 0); + lua_pushnil(S); + lua_pushliteral(S, "interrupted"); + lem_queue(S, 2); + + lua_pushboolean(T, 1); + return 1; +} + +static int +stream_close_check(lua_State *T, struct ev_io *w) +{ + if (w->fd < 0) { + lua_pushnil(T); + lua_pushliteral(T, "already closed"); + return 2; + } + + if (w->data != NULL) { + lua_State *S = w->data; + + lem_debug("interrupting io action"); + ev_io_stop(LEM_ w); + w->data = NULL; + lua_settop(S, 0); + lua_pushnil(S); + lua_pushliteral(S, "interrupted"); + lem_queue(S, 2); + } + + return 0; +} + +static int +istream_close(lua_State *T) +{ + struct istream *s; + + luaL_checktype(T, 1, LUA_TUSERDATA); + s = lua_touserdata(T, 1); + + lem_debug("collecting %d", s->w.fd); + if (stream_close_check(T, &s->w)) + return 2; + + if (s->twin) + s->twin->twin = NULL; + else if (close(s->w.fd)) { + s->w.fd = -1; + lua_pushnil(T); + lua_pushstring(T, strerror(errno)); + return 2; + } + + s->w.fd = -1; + lua_pushboolean(T, 1); + return 1; +} + +static int +ostream_close(lua_State *T) +{ + struct ostream *s; + + luaL_checktype(T, 1, LUA_TUSERDATA); + s = lua_touserdata(T, 1); + + lem_debug("collecting %d", s->w.fd); + if (stream_close_check(T, &s->w)) + return 2; + + if (s->twin) + s->twin->twin = NULL; + else if (close(s->w.fd)) { + s->w.fd = -1; + lua_pushnil(T); + lua_pushstring(T, strerror(errno)); + return 2; + } + + s->w.fd = -1; + lua_pushboolean(T, 1); + return 1; +} + +/* + * istream:readp() method + */ + +static void +stream_readp_handler(EV_P_ ev_io *w, int revents) +{ + struct istream *s = (struct istream *)w; + lua_State *T = s->w.data; + ssize_t bytes; + int ret; + enum lem_preason reason; + const char *msg; + + (void)revents; + + while ((bytes = read(s->w.fd, s->buf.buf + s->buf.end, + LEM_INPUTBUF_SIZE - s->buf.end)) > 0) { + lem_debug("read %ld bytes from %d", bytes, s->w.fd); + + s->buf.end += bytes; + + ret = s->p->process(T, &s->buf); + if (ret > 0) { + ev_io_stop(EV_A_ &s->w); + s->w.data = NULL; + lem_queue(T, ret); + return; + } + } + lem_debug("read %ld bytes from %d", bytes, s->w.fd); + + if (bytes < 0 && errno == EAGAIN) + return; + + ev_io_stop(EV_A_ &s->w); + s->w.data = NULL; + + if (s->twin) + s->twin->twin = NULL; + else + (void)close(s->w.fd); + s->w.fd = -1; + + if (bytes == 0 || errno == ECONNRESET || errno == EPIPE) { + reason = LEM_PCLOSED; + msg = "closed"; + } else { + reason = LEM_PERROR; + msg = strerror(errno); + } + + if (s->p->destroy && (ret = s->p->destroy(T, &s->buf, reason)) > 0) { + lem_queue(T, ret); + return; + } + + lua_settop(T, 0); + lua_pushnil(T); + lua_pushstring(T, msg); + lem_queue(T, 2); +} + +static int +stream_readp(lua_State *T) +{ + struct istream *s; + struct lem_parser *p; + int ret; + ssize_t bytes; + enum lem_preason reason; + const char *msg; + + luaL_checktype(T, 1, LUA_TUSERDATA); + ret = lua_type(T, 2); + if (ret != LUA_TUSERDATA && ret != LUA_TLIGHTUSERDATA) + return luaL_argerror(T, 2, "expected userdata"); + + s = lua_touserdata(T, 1); + if (s->w.fd < 0) { + lua_pushnil(T); + lua_pushliteral(T, "closed"); + return 2; + } + + if (s->w.data != NULL) { + lua_pushnil(T); + lua_pushliteral(T, "busy"); + return 2; + } + + p = lua_touserdata(T, 2); + if (p->init) + p->init(T, &s->buf); + +again: + ret = p->process(T, &s->buf); + if (ret > 0) + return ret; + + bytes = read(s->w.fd, s->buf.buf + s->buf.end, LEM_INPUTBUF_SIZE - s->buf.end); + lem_debug("read %ld bytes from %d", bytes, s->w.fd); + if (bytes > 0) { + s->buf.end += bytes; + goto again; + } + + if (bytes < 0 && errno == EAGAIN) { + s->p = p; + s->w.data = T; + s->w.cb = stream_readp_handler; + ev_io_start(LEM_ &s->w); + return lua_yield(T, lua_gettop(T)); + } + + if (s->twin) + s->twin->twin = NULL; + else + (void)close(s->w.fd); + s->w.fd = -1; + + if (bytes == 0 || errno == ECONNRESET || errno == EPIPE) { + reason = LEM_PCLOSED; + msg = "closed"; + } else { + reason = LEM_PERROR; + msg = strerror(errno); + } + + if (p->destroy && (ret = p->destroy(T, &s->buf, reason)) > 0) + return ret; + + lua_settop(T, 0); + lua_pushnil(T); + lua_pushstring(T, msg); + return 2; +} + +static struct ostream * +ostream_new(lua_State *T, int fd, int mt) +{ + struct ostream *s; + + /* create userdata and set the metatable */ + s = lua_newuserdata(T, sizeof(struct ostream)); + lua_pushvalue(T, mt); + lua_setmetatable(T, -2); + + /* initialize userdata */ + ev_io_init(&s->w, NULL, fd, EV_WRITE); + s->w.data = NULL; + s->twin = NULL; + + return s; +} + +static void +stream_write_handler(EV_P_ struct ev_io *w, int revents) +{ + struct ostream *s = (struct ostream *)w; + lua_State *T = s->w.data; + ssize_t bytes; + + (void)revents; + +again: + bytes = write(s->w.fd, s->data, s->len); + if (bytes > 0) { + s->len -= bytes; + if (s->len > 0) { + s->data += bytes; + goto again; + } + + ev_io_stop(EV_A_ &s->w); + s->w.data = NULL; + + lua_pushboolean(T, 1); + lem_queue(T, 1); + return; + } + + if (bytes < 0 && errno == EAGAIN) + return; + + ev_io_stop(EV_A_ &s->w); + s->w.data = NULL; + + lua_pushnil(T); + if (bytes == 0 || errno == ECONNRESET || errno == EPIPE) + lua_pushliteral(T, "closed"); + else + lua_pushstring(T, strerror(errno)); + + if (s->twin) + s->twin->twin = NULL; + else + (void)close(s->w.fd); + s->w.fd = -1; + lem_queue(T, 2); +} + +static int +stream_write(lua_State *T) +{ + struct ostream *s; + ssize_t bytes; + + luaL_checktype(T, 1, LUA_TUSERDATA); + luaL_checktype(T, 2, LUA_TSTRING); + + s = lua_touserdata(T, 1); + if (s->w.fd < 0) { + lua_pushnil(T); + lua_pushliteral(T, "closed"); + return 2; + } + + if (s->w.data != NULL) { + lua_pushnil(T); + lua_pushliteral(T, "busy"); + return 2; + } + + s->data = lua_tolstring(T, 2, &s->len); + if (s->len == 0) { + lua_pushboolean(T, 1); + return 1; + } + +again: + bytes = write(s->w.fd, s->data, s->len); + if (bytes > 0) { + s->len -= bytes; + if (s->len > 0) { + s->data += bytes; + goto again; + } + + lua_pushboolean(T, 1); + return 1; + } + + if (bytes < 0 && errno == EAGAIN) { + lua_settop(T, 1); + s->w.data = T; + s->w.cb = stream_write_handler; + ev_io_start(LEM_ &s->w); + return lua_yield(T, 1); + } + + lua_pushnil(T); + if (bytes == 0 || errno == ECONNRESET || errno == EPIPE) + lua_pushliteral(T, "closed"); + else + lua_pushstring(T, strerror(errno)); + + if (s->twin) + s->twin->twin = NULL; + else + (void)close(s->w.fd); + s->w.fd = -1; + return 2; +} + + +#ifndef TCP_CORK +#define TCP_CORK TCP_NOPUSH +#endif + +static int +stream_setcork(lua_State *T, int state) +{ + struct ostream *s; + + luaL_checktype(T, 1, LUA_TUSERDATA); + s = lua_touserdata(T, 1); + if (s->w.fd < 0) { + lua_pushnil(T); + lua_pushliteral(T, "closed"); + return 2; + } + + if (s->w.data != NULL) { + lua_pushnil(T); + lua_pushliteral(T, "busy"); + return 2; + } + + if (setsockopt(s->w.fd, IPPROTO_TCP, TCP_CORK, &state, sizeof(int))) { + lua_pushnil(T); + lua_pushstring(T, strerror(errno)); + return 2; + } + + lua_pushboolean(T, 1); + return 1; +} + +static int +stream_cork(lua_State *T) +{ + return stream_setcork(T, 1); +} + +static int +stream_uncork(lua_State *T) +{ + return stream_setcork(T, 0); +} + +struct sendfile { + struct lem_sendfile *file; + off_t offset; +}; + +static int +try_sendfile(lua_State *T, struct ostream *s, struct sendfile *sf) +{ +#ifdef __FreeBSD__ + int ret; + + do { + size_t count; + off_t written = 0; + + count = sf->file->size - sf->offset; + + if (count == 0) { + lua_settop(T, 0); + lua_pushboolean(T, 1); + return 1; + } + + ret = sendfile(sf->file->fd, s->w.fd, + sf->offset, count, + NULL, &written, 0); + lem_debug("wrote = %ld bytes", written); + sf->offset += written; + } while (ret >= 0); +#else +#ifdef __APPLE__ + int ret; + + do { + off_t count = sf->file->size - sf->offset; + + if (count == 0) { + lua_settop(T, 0); + lua_pushboolean(T, 1); + return 1; + } + + ret = sendfile(sf->file->fd, s->w.fd, + sf->offset, &count, NULL, 0); + lem_debug("wrote = %lld bytes", count); + sf->offset += count; + } while (ret >= 0); +#else + ssize_t ret; + + do { + size_t count = sf->file->size - sf->offset; + + if (count == 0) { + lua_settop(T, 0); + lua_pushboolean(T, 1); + return 1; + } + + ret = sendfile(s->w.fd, sf->file->fd, + &sf->offset, + count); + lem_debug("wrote = %ld bytes", ret); + } while (ret >= 0); +#endif +#endif + + if (errno == EAGAIN) + return 0; + + lua_pushnil(T); + lua_pushstring(T, strerror(errno)); + return 2; +} + +static void +sendfile_handler(EV_P_ struct ev_io *w, int revents) +{ + struct ostream *s = (struct ostream *)w; + lua_State *T = s->w.data; + struct sendfile *sf = lua_touserdata(T, 3); + int ret; + + (void)revents; + + ret = try_sendfile(T, s, sf); + if (ret == 0) + return; + + ev_io_stop(EV_A_ &s->w); + if (ret == 2) { + if (s->twin) + s->twin->twin = NULL; + else + (void)close(s->w.fd); + s->w.fd = -1; + } + + lem_queue(T, ret); + s->w.data = NULL; +} + +static int +stream_sendfile(lua_State *T) +{ + struct ostream *s; + struct lem_sendfile *f; + struct sendfile *sf; + off_t offset; + int ret; + + luaL_checktype(T, 1, LUA_TUSERDATA); + luaL_checktype(T, 2, LUA_TUSERDATA); + offset = (off_t)luaL_optnumber(T, 3, 0); + + s = lua_touserdata(T, 1); + if (s->w.fd < 0) { + lua_pushnil(T); + lua_pushliteral(T, "closed"); + return 2; + } + + if (s->w.data != NULL) { + lua_pushnil(T); + lua_pushliteral(T, "busy"); + return 2; + } + + f = lua_touserdata(T, 2); + if (f->fd < 0) { + lua_pushnil(T); + lua_pushliteral(T, "file closed"); + return 2; + } + + if (offset > f->size) + return luaL_error(T, "offset too big"); + + lua_settop(T, 2); + sf = lua_newuserdata(T, sizeof(struct sendfile)); + sf->file = f; + sf->offset = offset; + + ret = try_sendfile(T, s, sf); + if (ret > 0) { + if (ret == 2) { + if (s->twin) + s->twin->twin = NULL; + else + (void)close(s->w.fd); + s->w.fd = -1; + } + return ret; + } + + lem_debug("yielding"); + s->w.data = T; + s->w.cb = sendfile_handler; + ev_io_start(LEM_ &s->w); + return lua_yield(T, 3); +} + +static int +mode_to_flags(const char *mode) +{ + int omode; + int oflags; + + switch (*mode++) { + case 'r': + omode = O_RDONLY; + oflags = 0; + break; + case 'w': + omode = O_WRONLY; + oflags = O_CREAT | O_TRUNC; + break; + case 'a': + omode = O_WRONLY; + oflags = O_CREAT | O_APPEND; + break; + default: + return -1; + } + +next: + switch (*mode++) { + case '\0': + break; + case '+': + omode = O_RDWR; + goto next; + case 'b': + /* this does nothing on *nix, but + * don't treat it as an error */ + goto next; + case 'x': + oflags |= O_EXCL; + goto next; + default: + return -1; + } + + return omode | oflags; +} + +static int +stream_open(lua_State *T) +{ + const char *path = luaL_checkstring(T, 1); + int flags = mode_to_flags(luaL_optstring(T, 2, "r")); + int fd; + struct istream *is; + struct ostream *os; + + if (flags < 0) + return luaL_error(T, "invalid mode string"); + + fd = open(path, flags | O_NONBLOCK); + if (fd < 0) { + lua_pushnil(T); + switch (errno) { + case ENOENT: + lua_pushliteral(T, "not found"); + break; + case EACCES: + lua_pushliteral(T, "permission denied"); + break; + default: + lua_pushstring(T, strerror(errno)); + } + return 2; + } + + if ((flags & O_WRONLY) == 0) + is = istream_new(T, fd, lua_upvalueindex(1)); + else + is = NULL; + + if (flags & (O_RDWR | O_WRONLY)) + os = ostream_new(T, fd, lua_upvalueindex(2)); + else + os = NULL; + + if (is && os) { + is->twin = os; + os->twin = is; + return 2; + } + + return 1; +} + +static int +stream_popen(lua_State *T) +{ + const char *cmd = luaL_checkstring(T, 1); + const char *mode = luaL_optstring(T, 2, "r"); + int fd[2]; + + if (mode[0] != 'r' && mode[0] != 'w') + return luaL_error(T, "invalid mode string"); + + if (pipe(fd)) + goto error; + + switch (fork()) { + case -1: /* error */ + (void)close(fd[0]); + (void)close(fd[1]); + goto error; + case 0: /* child */ + if (mode[0] == 'r') { + (void)close(fd[0]); + (void)dup2(fd[1], 1); + } else { + (void)close(fd[1]); + (void)dup2(fd[0], 0); + } + + (void)execl("/bin/sh", "/bin/sh", "-c", cmd, NULL); + exit(EXIT_FAILURE); + } + + if (mode[0] == 'r') { + if (close(fd[1])) { + (void)close(fd[0]); + goto error; + } + } else { + if (close(fd[0])) { + (void)close(fd[1]); + goto error; + } + fd[0] = fd[1]; + } + + /* make the pipe non-blocking */ + if (fcntl(fd[0], F_SETFL, O_NONBLOCK) < 0) { + (void)close(fd[0]); + goto error; + } + + if (mode[0] == 'r') + (void)istream_new(T, fd[0], lua_upvalueindex(1)); + else + (void)ostream_new(T, fd[0], lua_upvalueindex(2)); + return 1; +error: + lua_pushnil(T); + lua_pushstring(T, strerror(errno)); + return 2; +} diff --git a/lem/streams/tcp.c b/lem/streams/tcp.c new file mode 100644 index 0000000..fd13760 --- /dev/null +++ b/lem/streams/tcp.c @@ -0,0 +1,251 @@ +/* + * This file is part of LEM, a Lua Event Machine. + * Copyright 2011-2012 Emil Renner Berthing + * + * LEM is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * LEM is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with LEM. If not, see <http://www.gnu.org/licenses/>. + */ + +static void +connect_handler(EV_P_ struct ev_io *w, int revents) +{ + (void)revents; + + lem_debug("connection established"); + ev_io_stop(EV_A_ w); + lem_queue(w->data, 2); + w->data = NULL; +} + +static int +tcp_connect(lua_State *T) +{ + const char *addr = luaL_checkstring(T, 1); + uint16_t port = (uint16_t)luaL_checknumber(T, 2); + struct addrinfo hints = { + .ai_flags = 0, + .ai_family = AF_UNSPEC, + .ai_socktype = SOCK_STREAM, + .ai_protocol = IPPROTO_TCP, + .ai_addrlen = 0, + .ai_addr = NULL, + .ai_canonname = NULL, + .ai_next = NULL + }; + struct addrinfo *ainfo; + int sock; + int ret; + struct istream *is; + struct ostream *os; + + /* lookup name */ + ret = getaddrinfo(addr, NULL, &hints, &ainfo); + if (ret != 0) { + lua_pushnil(T); + lua_pushfstring(T, "error looking up \"%s\": %s", + addr, gai_strerror(ret)); + return 2; + } + + /* create the TCP socket */ + switch (ainfo->ai_family) { + case AF_INET: + ((struct sockaddr_in *)ainfo->ai_addr)->sin_port = htons(port); + sock = socket(PF_INET, ainfo->ai_socktype, ainfo->ai_protocol); + break; + + case AF_INET6: + ((struct sockaddr_in6 *)ainfo->ai_addr)->sin6_port = htons(port); + sock = socket(PF_INET6, ainfo->ai_socktype, ainfo->ai_protocol); + break; + + default: + freeaddrinfo(ainfo); + lua_pushnil(T); + lua_pushfstring(T, "getaddrinfo() returned neither " + "IPv4 or IPv6 address for \"%s\"", + addr); + return 2; + } + + lem_debug("sock = %d", sock); + + if (sock < 0) { + freeaddrinfo(ainfo); + lua_pushnil(T); + lua_pushfstring(T, "error creating TCP socket: %s", + strerror(errno)); + return 2; + } + + /* make the socket non-blocking */ + if (fcntl(sock, F_SETFL, O_NONBLOCK) < 0) { + close(sock); + freeaddrinfo(ainfo); + lua_pushnil(T); + lua_pushfstring(T, "error making socket non-blocking: %s", + strerror(errno)); + return 2; + } + + lua_settop(T, 1); + is = istream_new(T, sock, lua_upvalueindex(1)); + os = ostream_new(T, sock, lua_upvalueindex(2)); + is->twin = os; + os->twin = is; + + /* connect */ + ret = connect(sock, ainfo->ai_addr, ainfo->ai_addrlen); + freeaddrinfo(ainfo); + if (ret == 0) { + lem_debug("connection established"); + return 2; + } + + if (errno == EINPROGRESS) { + lem_debug("EINPROGRESS"); + os->w.data = T; + os->w.cb = connect_handler; + ev_io_start(LEM_ &os->w); + return lua_yield(T, 3); + } + + close(sock); + lua_pushnil(T); + lua_pushfstring(T, "error connecting to %s:%d: %s", + addr, (int)port, strerror(errno)); + return 2; +} + +static int +common_listen(lua_State *T, struct sockaddr *address, socklen_t alen, + int sock, int backlog) +{ + struct ev_io *w; + int optval = 1; + + /* set SO_REUSEADDR option */ + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, + &optval, sizeof(int))) { + close(sock); + lua_pushnil(T); + lua_pushfstring(T, "error setting SO_REUSEADDR on socket: %s", + strerror(errno)); + return 2; + } + + /* make the socket non-blocking */ + if (fcntl(sock, F_SETFL, O_NONBLOCK) < 0) { + close(sock); + lua_pushnil(T); + lua_pushfstring(T, "error making socket non-blocking: %s", + strerror(errno)); + return 2; + } + + /* bind */ + if (bind(sock, address, alen)) { + close(sock); + lua_pushnil(T); + lua_pushfstring(T, "error binding socket: %s", + strerror(errno)); + return 2; + } + + /* listen to the socket */ + if (listen(sock, backlog) < 0) { + lua_pushnil(T); + lua_pushfstring(T, "error listening to the socket: %s", + strerror(errno)); + return 2; + } + + /* create userdata and set the metatable */ + w = lua_newuserdata(T, sizeof(struct ev_io)); + lua_pushvalue(T, lua_upvalueindex(1)); + lua_setmetatable(T, -2); + + /* initialize userdata */ + ev_io_init(w, NULL, sock, EV_READ); + w->data = NULL; + + return 1; +} + +static int +tcp4_listen(lua_State *T) +{ + const char *addr = luaL_checkstring(T, 1); + uint16_t port = (uint16_t)luaL_checknumber(T, 2); + int backlog = (int)luaL_optnumber(T, 3, MAXPENDING); + int sock; + struct sockaddr_in address; + + /* initialise the socketadr_in structure */ + memset(&address, 0, sizeof(struct sockaddr_in)); + address.sin_family = AF_INET; + if (addr[0] == '*' && addr[1] == '\0') + address.sin_addr.s_addr = INADDR_ANY; + else if (!inet_pton(AF_INET, addr, &address.sin_addr)) { + lua_pushnil(T); + lua_pushfstring(T, "cannot bind to '%s'", addr); + return 2; + } + address.sin_port = htons(port); + + /* create the TCP socket */ + sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); + if (sock < 0) { + lua_pushnil(T); + lua_pushfstring(T, "error creating TCP socket: %s", + strerror(errno)); + return 2; + } + + return common_listen(T, (struct sockaddr *)&address, + sizeof(struct sockaddr_in), sock, backlog); +} + +static int +tcp6_listen(lua_State *T) +{ + const char *addr = luaL_checkstring(T, 1); + uint16_t port = (uint16_t)luaL_checknumber(T, 2); + int backlog = (int)luaL_optnumber(T, 3, MAXPENDING); + int sock; + struct sockaddr_in6 address; + + /* initialise the socketadr_in structure */ + memset(&address, 0, sizeof(struct sockaddr_in6)); + address.sin6_family = AF_INET6; + if (addr[0] == '*' && addr[1] == '\0') + address.sin6_addr = in6addr_any; + else if (!inet_pton(AF_INET6, addr, &address.sin6_addr)) { + lua_pushnil(T); + lua_pushfstring(T, "cannot bind to '%s'", addr); + return 2; + } + address.sin6_port = htons(port); + + /* create the TCP socket */ + sock = socket(PF_INET6, SOCK_STREAM, IPPROTO_TCP); + if (sock < 0) { + lua_pushnil(T); + lua_pushfstring(T, "error creating TCP socket: %s", + strerror(errno)); + return 2; + } + + return common_listen(T, (struct sockaddr *)&address, + sizeof(struct sockaddr_in6), sock, backlog); +} @@ -1,52 +0,0 @@ -/* setup for luaconf.h */ -#define LUA_CORE -#define LUA_LIB -#define ltable_c -#define lvm_c -#include "luaconf.h" - -/* do not export internal symbols */ -#undef LUAI_FUNC -#undef LUAI_DDEC -#undef LUAI_DDEF -#define LUAI_FUNC static -#define LUAI_DDEC static -#define LUAI_DDEF static - -/* core */ -#include "lua/lapi.c" -#include "lua/lcode.c" -#include "lua/lctype.c" -#include "lua/ldebug.c" -#include "lua/ldo.c" -#include "lua/ldump.c" -#include "lua/lfunc.c" -#include "lua/lgc.c" -#include "lua/llex.c" -#include "lua/lmem.c" -#include "lua/lobject.c" -#include "lua/lopcodes.c" -#include "lua/lparser.c" -#include "lua/lstate.c" -#include "lua/lstring.c" -#include "lua/ltable.c" -#include "lua/ltm.c" -#include "lua/lundump.c" -#include "lua/lvm.c" -#include "lua/lzio.c" - -/* auxiliary library */ -#include "lua/lauxlib.c" - -/* standard library */ -#include "lua/lbaselib.c" -#include "lua/lbitlib.c" -#include "lua/lcorolib.c" -#include "lua/ldblib.c" -#include "lua/liolib.c" -#include "lua/lmathlib.c" -#include "lua/loadlib.c" -#include "lua/loslib.c" -#include "lua/lstrlib.c" -#include "lua/ltablib.c" -#include "lua/linit.c" diff --git a/lua/luaconf.h.in b/lua/luaconf.h.in index b0a75e8..6f4be5b 100644 --- a/lua/luaconf.h.in +++ b/lua/luaconf.h.in @@ -100,8 +100,8 @@ #else /* }{ */ -#define LUA_LDIR "@path@/" -#define LUA_CDIR "@cpath@/" +#define LUA_LDIR "@lmoddir@/" +#define LUA_CDIR "@cmoddir@/" #define LUA_PATH_DEFAULT \ LUA_LDIR"?.lua;" LUA_LDIR"?/init.lua;" \ LUA_CDIR"?.lua;" LUA_CDIR"?/init.lua;" "./?.lua" diff --git a/test/ctest.lua b/test/ctest.lua new file mode 100755 index 0000000..e0bb6c0 --- /dev/null +++ b/test/ctest.lua @@ -0,0 +1,48 @@ +#!bin/lem +-- +-- This file is part of LEM, a Lua Event Machine. +-- Copyright 2011-2012 Emil Renner Berthing +-- +-- LEM is free software: you can redistribute it and/or +-- modify it under the terms of the GNU General Public License as +-- published by the Free Software Foundation, either version 3 of +-- the License, or (at your option) any later version. +-- +-- LEM is distributed in the hope that it will be useful, +-- but WITHOUT ANY WARRANTY; without even the implied warranty of +-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +-- GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License +-- along with LEM. If not, see <http://www.gnu.org/licenses/>. +-- + +print('Entered ' .. arg[0]) + +local utils = require 'lem.utils' +local streams = require 'lem.streams' + +local sleeper = utils.sleeper() +local iconn, oconn = assert(streams.tcp_connect('127.0.0.1', arg[1] or 8080)) + +for i = 1, 10 do + assert(oconn:write('ping\n')) + + local line, err = iconn:read('*l') + if not line then + if err == 'closed' then + print("Server closed connection") + return + end + + error(err) + end + + print("Server answered: '" .. line .. "'") +end + +oconn:write('quit\n') + +print('Exiting ' .. arg[0]) + +-- vim: syntax=lua ts=2 sw=2 noet: diff --git a/test/htest.lua b/test/htest.lua new file mode 100755 index 0000000..4272118 --- /dev/null +++ b/test/htest.lua @@ -0,0 +1,148 @@ +#!bin/lem +-- +-- This file is part of LEM, a Lua Event Machine. +-- Copyright 2011-2012 Emil Renner Berthing +-- +-- LEM is free software: you can redistribute it and/or +-- modify it under the terms of the GNU General Public License as +-- published by the Free Software Foundation, either version 3 of +-- the License, or (at your option) any later version. +-- +-- LEM is distributed in the hope that it will be useful, +-- but WITHOUT ANY WARRANTY; without even the implied warranty of +-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +-- GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License +-- along with LEM. If not, see <http://www.gnu.org/licenses/>. +-- + +local utils = require 'lem.utils' +local hathaway = require 'lem.hathaway' +hathaway.import() + +GET('/', function(req, res) + res.status = 302 + res.headers['Location'] = '/dump' +end) + +GET('/hello', function(req, res) + res.headers['Content-Type'] = 'text/plain' + res:add('Hello, World!\n') +end) + +GET('/self', function(req, res) + res.headers['Content-Type'] = 'text/plain' + res.file = arg[0] +end) + +GET('/dump', function(req, res) + local accept = req.headers['Accept'] + if accept and accept:match('application/xhtml%+xml') then + res.headers['Content-Type'] = 'application/xhtml+xml' + else + res.headers['Content-Type'] = 'text/html' + end + res:add([[ +<?xml version="1.0" encoding="UTF-8"?> +<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en"> +<head> + <title>Hathaway HTTP dump</title> + <style type="text/css"> + th { text-align:left; } + </style> +</head> +<body> + +<h2>Request</h2> +<table> + <tr><th>Method:</th><td>%s</td></tr> + <tr><th>Uri:</th><td>%s</td></tr> + <tr><th>Version:</th><td>%s</td></tr> +</table> + +<h2>Headers</h2> +<table> +]], req.method or '', req.uri or '', req.version) + + for k, v in pairs(req.headers) do + res:add(' <tr><th>%s</th><td>%s</td></tr>\n', k, v) + end + + res:add([[ +</table> + +<h2>Body</h2> +<form action="/form" method="POST" accept-charset="UTF-8"> + <p> + <textarea name="text" cols="80" rows="25"></textarea><br /> + <input type="submit" value="Submit" /> + </p> +</form> + +<form action="/quit" method="post"> + <p> + <input type="hidden" name="quit" value="secret" /> + <input type="submit" value="Quit" /> + </p> +</form> + +</body> +</html> +]]) +end) + +local function urldecode(str) + return str:gsub('+', ' '):gsub('%%(%x%x)', function (str) + return string.char(tonumber(str, 16)) + end) +end + +local function parseform(str) + local t = {} + for k, v in str:gmatch('([^&]+)=([^&]*)') do + t[urldecode(k)] = urldecode(v) + end + return t +end + +POST('/form', function(req, res) + res.headers['Content-Type'] = 'text/plain' + local body =req:body() + res:add("You sent:\n%s\n", body) + res:add('{\n') + for k, v in pairs(parseform(body)) do + res:add(" ['%s'] = '%s'\n", k, v) + end + res:add('}\n') +end) + +GET('/close', function(req, res) + res.headers['Content-Type'] = 'text/plain' + res.headers['Connection'] = 'close' + res:add('This connection should close\n') +end) + +POST('/quit', function(req, res) + local body = req:body() + + req.headers['Content-Type'] = 'text/plain' + + if body == 'quit=secret' then + res:add("Bye o/\n") + hathaway.server:close() + else + res:add("You didn't supply the right value...\n") + end +end) + +GETM('^/hello/([^/]+)$', function(req, res, name) + res.headers['Content-Type'] = 'text/plain' + res:add('Hello, %s!\n', name) +end) + +hathaway.debug = print +Hathaway('*', arg[1] or 8080) +utils.exit(0) -- otherwise open connections will keep us running + +-- vim: syntax=lua ts=2 sw=2 noet: diff --git a/test/httptest.lua b/test/httptest.lua new file mode 100755 index 0000000..7bc3b93 --- /dev/null +++ b/test/httptest.lua @@ -0,0 +1,58 @@ +#!bin/lem +-- +-- This file is part of LEM, a Lua Event Machine. +-- Copyright 2011-2012 Emil Renner Berthing +-- +-- LEM is free software: you can redistribute it and/or +-- modify it under the terms of the GNU General Public License as +-- published by the Free Software Foundation, either version 3 of +-- the License, or (at your option) any later version. +-- +-- LEM is distributed in the hope that it will be useful, +-- but WITHOUT ANY WARRANTY; without even the implied warranty of +-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +-- GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License +-- along with LEM. If not, see <http://www.gnu.org/licenses/>. +-- + +local utils = require 'lem.utils' +local streams = require 'lem.streams' +local http = require 'lem.http' + +local format = string.format +local concat = table.concat + +local done = false + +utils.spawn(function() + --local istream, ostream = assert(streams.tcp_connect('www.google.dk', 80)) + local istream, ostream = assert(streams.tcp_connect('127.0.0.1', 8080)) + + print('\nConnected.') + + for i = 1, 2 do + --assert(ostream:write('GET / HTTP/1.1\r\nHost: www.google.dk\r\nConnection: close\r\n\r\n')) + assert(ostream:write('GET / HTTP/1.1\r\nHost: www.google.dk\r\n\r\n')) + + local res = assert(istream:read('HTTPResponse')) + + print(format('\nHTTP/%s %d %s', res.version, res.status, res.text)) + for k, v in pairs(res.headers) do + print(format('%s: %s', k, v)) + end + + print(format('\n#body = %d', #assert(res:body()))) + end + + done = true +end) + +local write, yield = io.write, utils.yield +repeat + write('.') + yield() +until done + +-- vim: set ts=2 sw=2 noet: diff --git a/test/multiplexing.lua b/test/multiplexing.lua new file mode 100755 index 0000000..8265e4c --- /dev/null +++ b/test/multiplexing.lua @@ -0,0 +1,79 @@ +#!bin/lem +-- +-- This file is part of LEM, a Lua Event Machine. +-- Copyright 2011-2012 Emil Renner Berthing +-- +-- LEM is free software: you can redistribute it and/or +-- modify it under the terms of the GNU General Public License as +-- published by the Free Software Foundation, either version 3 of +-- the License, or (at your option) any later version. +-- +-- LEM is distributed in the hope that it will be useful, +-- but WITHOUT ANY WARRANTY; without even the implied warranty of +-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +-- GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License +-- along with LEM. If not, see <http://www.gnu.org/licenses/>. +-- + +local utils = require 'lem.utils' +local streams = require 'lem.streams' +local queue = require 'lem.streams.queue' + +local exit = false +local ticker = utils.sleeper() +local stdout = queue.wrap(streams.stdout) + +do + local format = string.format + function queue.QOStream:printf(...) + return self:write(format(...)) + end +end + +-- this function just reads lines from a +-- stream and prints them to stdout +local function poll(stream, name) + repeat + local line, err = stream:read('*l') + if not line then + stdout:printf('%s: %s\n', name, err) + break + end + + stdout:printf('%s: %s\n', name, line) + until line == 'quit' + + exit = true + ticker:wakeup() +end + +-- type 'mkfifo pipe' to create a named pipe for this script +-- and do 'cat > pipe' (in another terminal) to write to it +local pipe = assert(streams.open(arg[1] or 'pipe', 'r')) + +-- spawn coroutines to read from stdin and the pipe +utils.spawn(poll, streams.stdin, 'stdin') +utils.spawn(poll, pipe, 'pipe') + +do + --local out = streams.stderr + local out = stdout + local sound + + repeat + if sound == 'tick\n' then + sound = 'tock\n' + else + sound = 'tick\n' + end + out:write(sound) + ticker:sleep(1.0) + until exit +end + +streams.stdin:close() +pipe:close() + +-- vim: syntax=lua ts=2 sw=2 noet: diff --git a/test/sleep.lua b/test/sleep.lua new file mode 100755 index 0000000..41f7fe5 --- /dev/null +++ b/test/sleep.lua @@ -0,0 +1,46 @@ +#!bin/lem +-- +-- This file is part of LEM, a Lua Event Machine. +-- Copyright 2011-2012 Emil Renner Berthing +-- +-- LEM is free software: you can redistribute it and/or +-- modify it under the terms of the GNU General Public License as +-- published by the Free Software Foundation, either version 3 of +-- the License, or (at your option) any later version. +-- +-- LEM is distributed in the hope that it will be useful, +-- but WITHOUT ANY WARRANTY; without even the implied warranty of +-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +-- GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License +-- along with LEM. If not, see <http://www.gnu.org/licenses/>. +-- + +local utils = require 'lem.utils' + +local sum, coros, n = 0, 0, 0 +local done = utils.sleeper() + +local function test(t) + local sleeper = utils.sleeper() + local diff = utils.now() + sleeper:sleep(t) + diff = utils.now() - diff + + print(string.format('%fs, %fms off', diff, 1000*(diff - t))) + sum = sum + math.abs(diff - t) + n = n + 1 + if n == coros then done:wakeup() end +end + +for t = 0, 3, 0.1 do + coros = coros + 1 + utils.spawn(test, t) +end + +done:sleep() + +print(string.format('%fms off on average', 1000*sum / n)) + +-- vim: syntax=lua ts=2 sw=2 noet: diff --git a/test/stest.lua b/test/stest.lua new file mode 100755 index 0000000..f54a415 --- /dev/null +++ b/test/stest.lua @@ -0,0 +1,64 @@ +#!bin/lem +-- +-- This file is part of LEM, a Lua Event Machine. +-- Copyright 2011-2012 Emil Renner Berthing +-- +-- LEM is free software: you can redistribute it and/or +-- modify it under the terms of the GNU General Public License as +-- published by the Free Software Foundation, either version 3 of +-- the License, or (at your option) any later version. +-- +-- LEM is distributed in the hope that it will be useful, +-- but WITHOUT ANY WARRANTY; without even the implied warranty of +-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +-- GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License +-- along with LEM. If not, see <http://www.gnu.org/licenses/>. +-- + +print('Entered ' .. arg[0]) + +local utils = require 'lem.utils' +local streams = require 'lem.streams' + +local server = assert(streams.tcp4_listen('*', arg[1] or 8080)) + +--timer(10, function() exit(0) end) +utils.timer(10, function() + print 'Closing server' + server:close() +end) + +local ok, err = server:autospawn(function(i, o) + print 'Accepted a connection' + local sleeper = utils.sleeper() + + while true do + local line, err = i:read('*l') + if not line then + if err == 'closed' then + print("Client closed connection") + return + end + + error(err) + end + + print("Client sent: '" .. line .. "'") + if line ~= 'ping' then break end + + sleeper:sleep(0.4) + assert(o:write('pong\n')) + end + + print "Ok, I'm out" + assert(i:close()) + assert(o:close()) +end) + +if not ok and err ~= 'interrupted' then error(err) end + +print('Exiting ' .. arg[0]) + +-- vim: syntax=lua ts=2 sw=2 noet: @@ -1,4 +1,4 @@ -#!./lem +#!bin/lem -- -- This file is part of LEM, a Lua Event Machine. -- Copyright 2011-2012 Emil Renner Berthing @@ -17,9 +17,6 @@ -- along with LEM. If not, see <http://www.gnu.org/licenses/>. -- -package.path = package.path .. ';../?.lua' -package.cpath = package.cpath .. ';../?.so' - local utils = require 'lem.utils' local function sleep(n) diff --git a/test2.lua b/test/test2.lua index 06ca7c9..b64eedf 100755 --- a/test2.lua +++ b/test/test2.lua @@ -1,4 +1,4 @@ -#!./lem +#!bin/lem -- -- This file is part of LEM, a Lua Event Machine. -- Copyright 2011-2012 Emil Renner Berthing @@ -17,9 +17,6 @@ -- along with LEM. If not, see <http://www.gnu.org/licenses/>. -- -package.path = package.path .. ';../?.lua' -package.cpath = package.cpath .. ';../?.so' - print 'Entered test.lua' local utils = require 'lem.utils' |