From bc525acebe8dbe4ec89e73703906b46fa6206d89 Mon Sep 17 00:00:00 2001 From: Emil Renner Berthing Date: Fri, 6 Jan 2012 17:54:40 +0100 Subject: update for LEM 0.3 --- Makefile | 87 ++--- PKGBUILD | 8 +- lem/postgres.c | 884 ++++++++++++++++++++++++++++++++++++++++++++++++ lem/postgres.so | 1 - lem/postgres/queued.lua | 94 ++++- postgres.c | 884 ------------------------------------------------ qtest.lua | 3 +- queued.lua | 96 ------ 8 files changed, 1029 insertions(+), 1028 deletions(-) create mode 100644 lem/postgres.c delete mode 120000 lem/postgres.so mode change 120000 => 100644 lem/postgres/queued.lua delete mode 100644 postgres.c delete mode 100644 queued.lua diff --git a/Makefile b/Makefile index 978ac73..58305c6 100644 --- a/Makefile +++ b/Makefile @@ -1,57 +1,62 @@ -CC = gcc -CFLAGS ?= -O2 -pipe -Wall -Wextra -Wno-variadic-macros -Wno-strict-aliasing -PKGCONFIG = pkg-config +CC = gcc -std=gnu99 +CFLAGS ?= -O2 -pipe -Wall -Wextra +PKG_CONFIG = pkg-config +PG_CONFIG = pg_config STRIP = strip INSTALL = install +UNAME = uname + +OS = $(shell $(UNAME)) +CFLAGS += $(shell $(PKG_CONFIG) --cflags lem) +lmoddir = $(shell $(PKG_CONFIG) --variable=INSTALL_LMOD lem) +cmoddir = $(shell $(PKG_CONFIG) --variable=INSTALL_CMOD lem) + +ifeq ($(OS),Darwin) +SHARED = -dynamiclib -Wl,-undefined,dynamic_lookup +STRIP += -x +else +SHARED = -shared +endif -CFLAGS += $(shell $(PKGCONFIG) --cflags lem) -LUA_PATH = $(shell $(PKGCONFIG) --variable=path lem) -LUA_CPATH = $(shell $(PKGCONFIG) --variable=cpath lem) - -programs = postgres.so -scripts = queued.lua +llibs = lem/postgres/queued.lua +clibs = lem/postgres.so -ifdef NDEBUG -CFLAGS += -DNDEBUG +ifdef V +E=@\# +Q= +else +E=@echo +Q=@ endif -.PHONY: all strip install clean -.PRECIOUS: %.o +.PHONY: all debug strip install clean -all: $(programs) +all: CFLAGS += -DNDEBUG +all: $(clibs) -%.o: %.c - @echo ' CC $@' - @$(CC) $(CFLAGS) -fPIC -nostartfiles -c $< -o $@ +debug: $(clibs) -postgres.so: CFLAGS += -I$(shell pg_config --includedir) -postgres.so: postgres.o - @echo ' LD $@' - @$(CC) -shared -L$(shell pg_config --libdir) -lpq $(LDFLAGS) $^ -o $@ +lem/postgres.so: CFLAGS += -I$(shell $(PG_CONFIG) --includedir) +lem/postgres.so: LDFLAGS += -L$(shell $(PG_CONFIG) --libdir) +lem/postgres.so: LIBS += -lpq +lem/postgres.so: lem/postgres.c + $E ' CCLD $@' + $Q$(CC) $(CFLAGS) -fPIC -nostartfiles $(SHARED) $^ -o $@ $(LDFLAGS) $(LIBS) %-strip: % - @echo ' STRIP $<' - @$(STRIP) $< - -strip: $(programs:%=%-strip) - -path-install: - @echo " INSTALL -d $(LUA_PATH)/lem/postgres" - @$(INSTALL) -d $(DESTDIR)$(LUA_PATH)/lem/postgres - -%.lua-install: %.lua path-install - @echo " INSTALL $<" - @$(INSTALL) -m644 $< $(DESTDIR)$(LUA_PATH)/lem/postgres/$< + $E ' STRIP $<' + $Q$(STRIP) $< -cpath-install: - @echo " INSTALL -d $(LUA_CPATH)/lem" - @$(INSTALL) -d $(DESTDIR)$(LUA_CPATH)/lem +strip: $(clibs:%=%-strip) -%.so-install: %.so cpath-install - @echo " INSTALL $<" - @$(INSTALL) $< $(DESTDIR)$(LUA_CPATH)/lem/$< +$(DESTDIR)$(lmoddir)/% $(DESTDIR)$(cmoddir)/%: % + $E ' INSTALL $@' + $Q$(INSTALL) -d $(dir $@) + $Q$(INSTALL) -m 644 $< $@ -install: $(programs:%=%-install) $(scripts:%=%-install) +install: \ + $(llibs:%=$(DESTDIR)$(lmoddir)/%) \ + $(clibs:%=$(DESTDIR)$(cmoddir)/%) clean: - rm -f $(programs) *.o *.c~ *.h~ + rm -f $(clibs) diff --git a/PKGBUILD b/PKGBUILD index 43a0238..7823f9d 100644 --- a/PKGBUILD +++ b/PKGBUILD @@ -1,11 +1,11 @@ # Maintainer: Emil Renner Berthing pkgname=lem-postgres -pkgver=0.1 +pkgver=0.3 pkgrel=1 -pkgdesc="PostgreSQL library for the Lua Event Machine" +pkgdesc='PostgreSQL library for the Lua Event Machine' arch=('i686' 'x86_64' 'armv5tel' 'armv7l') -url="https://github.com/esmil/lem-postgres" +url='https://github.com/esmil/lem-postgres' license=('GPL') depends=('lem' 'postgresql-libs') source=() @@ -13,7 +13,7 @@ source=() build() { cd "$startdir" - make NDEBUG=1 + make } package() { diff --git a/lem/postgres.c b/lem/postgres.c new file mode 100644 index 0000000..2d28cf5 --- /dev/null +++ b/lem/postgres.c @@ -0,0 +1,884 @@ +/* + * This file is part of lem-postgres. + * Copyright 2011 Emil Renner Berthing + * + * lem-postgres is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * lem-postgres is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with lem-postgres. If not, see . + */ + +#include +#include +#include +#include + +struct connection { + struct ev_io w; + PGconn *conn; + lua_State *T; +}; + +static void +push_error(lua_State *T, PGconn *conn) +{ + const char *msg = PQerrorMessage(conn); + const char *p; + + for (p = msg; *p != '\n' && *p != '\0'; p++); + + lua_pushnil(T); + if (p > msg) + lua_pushlstring(T, msg, p - msg); + else + lua_pushliteral(T, "unknown error"); +} + +static int +connection_gc(lua_State *T) +{ + struct connection *c = lua_touserdata(T, 1); + + if (c->conn == NULL) + return 0; + + ev_io_stop(LEM_ &c->w); + PQfinish(c->conn); + return 0; +} + +static int +connection_close(lua_State *T) +{ + struct connection *c; + + luaL_checktype(T, 1, LUA_TUSERDATA); + c = lua_touserdata(T, 1); + if (c->conn == NULL) { + lua_pushnil(T); + lua_pushliteral(T, "already closed"); + return 2; + } + + if (c->T != NULL) { + lua_pushnil(c->T); + lua_pushliteral(c->T, "interrupted"); + lem_queue(c->T, 2); + c->T = NULL; + } + + ev_io_stop(LEM_ &c->w); + PQfinish(c->conn); + c->conn = NULL; + + lua_pushboolean(T, 1); + return 1; +} + +static void +connect_handler(EV_P_ struct ev_io *w, int revents) +{ + struct connection *c = (struct connection *)w; + + (void)revents; + + ev_io_stop(EV_A_ &c->w); + switch (PQconnectPoll(c->conn)) { + case PGRES_POLLING_READING: + lem_debug("PGRES_POLLING_READING, socket = %d", PQsocket(c->conn)); + ev_io_set(&c->w, PQsocket(c->conn), EV_READ); + break; + + case PGRES_POLLING_WRITING: + lem_debug("PGRES_POLLING_WRITING, socket = %d", PQsocket(c->conn)); + ev_io_set(&c->w, PQsocket(c->conn), EV_WRITE); + break; + + case PGRES_POLLING_FAILED: + lem_debug("PGRES_POLLING_FAILED"); + lua_settop(c->T, 0); + push_error(c->T, c->conn); + PQfinish(c->conn); + c->conn = NULL; + lem_queue(c->T, 2); + c->T = NULL; + return; + + case PGRES_POLLING_OK: + lem_debug("PGRES_POLLING_OK"); + lem_queue(c->T, 1); + c->T = NULL; + return; + +#ifndef NDEBUG + case PGRES_POLLING_ACTIVE: + assert(0); +#endif + } + + ev_io_start(EV_A_ &c->w); +} + +static int +connection_connect(lua_State *T) +{ + const char *conninfo = luaL_checkstring(T, 1); + PGconn *conn; + ConnStatusType status; + struct connection *c; + + conn = PQconnectStart(conninfo); + if (conn == NULL) { + lua_pushnil(T); + lua_pushliteral(T, "out of memory"); + return 2; + } + + status = PQstatus(conn); + if (status == CONNECTION_BAD) { + lem_debug("CONNECTION_BAD"); + goto error; + } + + c = lua_newuserdata(T, sizeof(struct connection)); + lua_pushvalue(T, lua_upvalueindex(1)); + lua_setmetatable(T, -2); + + c->conn = conn; + + switch (PQconnectPoll(conn)) { + case PGRES_POLLING_READING: + lem_debug("PGRES_POLLING_READING"); + ev_io_init(&c->w, connect_handler, PQsocket(conn), EV_READ); + break; + + case PGRES_POLLING_WRITING: + lem_debug("PGRES_POLLING_WRITING"); + ev_io_init(&c->w, connect_handler, PQsocket(conn), EV_WRITE); + break; + + case PGRES_POLLING_FAILED: + lem_debug("PGRES_POLLING_FAILED"); + goto error; + + case PGRES_POLLING_OK: + lem_debug("PGRES_POLLING_OK"); + c->T = NULL; + return 1; + +#ifndef NDEBUG + case PGRES_POLLING_ACTIVE: + assert(0); +#endif + } + + c->T = T; + ev_io_start(LEM_ &c->w); + + lua_replace(T, 1); + lua_settop(T, 1); + return lua_yield(T, 1); +error: + push_error(T, conn); + PQfinish(conn); + return 2; +} + +static int +connection_reset(lua_State *T) +{ + struct connection *c; + + luaL_checktype(T, 1, LUA_TUSERDATA); + c = lua_touserdata(T, 1); + if (c->T != NULL) { + lua_pushnil(T); + lua_pushliteral(T, "busy"); + return 2; + } + + if (c->conn == NULL) { + lua_pushnil(T); + lua_pushliteral(T, "closed"); + return 2; + } + + if (PQresetStart(c->conn) != 1) + goto error; + + c->w.cb = connect_handler; + switch (PQconnectPoll(c->conn)) { + case PGRES_POLLING_READING: + lem_debug("PGRES_POLLING_READING"); + ev_io_set(&c->w, PQsocket(c->conn), EV_READ); + break; + + case PGRES_POLLING_WRITING: + lem_debug("PGRES_POLLING_WRITING"); + ev_io_set(&c->w, PQsocket(c->conn), EV_WRITE); + break; + + case PGRES_POLLING_FAILED: + lem_debug("PGRES_POLLING_FAILED"); + goto error; + + case PGRES_POLLING_OK: + lem_debug("PGRES_POLLING_OK"); + return 1; + +#ifndef NDEBUG + case PGRES_POLLING_ACTIVE: + assert(0); +#endif + } + + c->T = T; + ev_io_start(LEM_ &c->w); + + lua_settop(T, 1); + return lua_yield(T, 1); +error: + push_error(T, c->conn); + return 2; +} + +static void +push_tuples(lua_State *T, PGresult *res) +{ + int rows = PQntuples(res); + int columns = PQnfields(res); + int i; + + lua_createtable(T, rows, 0); + for (i = 0; i < rows; i++) { + int j; + + lua_createtable(T, columns, 0); + for (j = 0; j < columns; j++) { + if (PQgetisnull(res, i, j)) + lua_pushnil(T); + else + lua_pushlstring(T, PQgetvalue(res, i, j), + PQgetlength(res, i, j)); + lua_rawseti(T, -2, j+1); + } + lua_rawseti(T, -2, i+1); + } + + /* insert column names as "row 0" */ + lua_createtable(T, columns, 0); + for (i = 0; i < columns; i++) { + lua_pushstring(T, PQfname(res, i)); + lua_rawseti(T, -2, i+1); + } + lua_rawseti(T, -2, 0); +} + +static void +error_handler(EV_P_ struct ev_io *w, int revents) +{ + struct connection *c = (struct connection *)w; + + (void)revents; + + if (PQconsumeInput(c->conn) == 0) { + ev_io_stop(EV_A_ &c->w); + lua_settop(c->T, 0); + push_error(c->T, c->conn); + lem_queue(c->T, 2); + c->T = NULL; + return; + } + + while (!PQisBusy(c->conn)) { + PGresult *res = PQgetResult(c->conn); + + if (res == NULL) { + ev_io_stop(EV_A_ &c->w); + lem_queue(c->T, 2); + c->T = NULL; + return; + } + + PQclear(res); + } +} + +static void +exec_handler(EV_P_ struct ev_io *w, int revents) +{ + struct connection *c = (struct connection *)w; + PGresult *res; + + (void)revents; + + if (PQconsumeInput(c->conn) != 1) { + ev_io_stop(EV_A_ &c->w); + lua_settop(c->T, 0); + push_error(c->T, c->conn); + lem_queue(c->T, 2); + c->T = NULL; + return; + } + + while (!PQisBusy(c->conn)) { + res = PQgetResult(c->conn); + + if (res == NULL) { + ev_io_stop(EV_A_ &c->w); + lem_debug("returning %d values", lua_gettop(c->T) - 1); + lem_queue(c->T, lua_gettop(c->T) - 1); + c->T = NULL; + return; + } + + switch (PQresultStatus(res)) { + case PGRES_EMPTY_QUERY: + lem_debug("PGRES_EMPTY_QUERY"); + lua_settop(c->T, 0); + lua_pushnil(c->T); + lua_pushliteral(c->T, "empty query"); + goto error; + + case PGRES_COMMAND_OK: + lem_debug("PGRES_COMMAND_OK"); + lua_pushboolean(c->T, 1); + break; + + case PGRES_TUPLES_OK: + lem_debug("PGRES_TUPLES_OK"); + push_tuples(c->T, res); + break; + + case PGRES_COPY_IN: + lem_debug("PGRES_COPY_IN"); + (void)PQsetnonblocking(c->conn, 1); + case PGRES_COPY_OUT: + lem_debug("PGRES_COPY_OUT"); + PQclear(res); + lua_pushboolean(c->T, 1); + lem_queue(c->T, lua_gettop(c->T) - 1); + c->T = NULL; + return; + + case PGRES_BAD_RESPONSE: + lem_debug("PGRES_BAD_RESPONSE"); + lua_settop(c->T, 0); + push_error(c->T, c->conn); + goto error; + + case PGRES_NONFATAL_ERROR: + lem_debug("PGRES_NONFATAL_ERROR"); + break; + + case PGRES_FATAL_ERROR: + lem_debug("PGRES_FATAL_ERROR"); + lua_settop(c->T, 0); + push_error(c->T, c->conn); + goto error; + + default: + lem_debug("unknown result status"); + lua_settop(c->T, 0); + lua_pushnil(c->T); + lua_pushliteral(c->T, "unknown result status"); + goto error; + } + + PQclear(res); + } + + lem_debug("busy"); + return; +error: + PQclear(res); + c->w.cb = error_handler; + while (!PQisBusy(c->conn)) { + res = PQgetResult(c->conn); + + if (res == NULL) { + ev_io_stop(EV_A_ &c->w); + lem_queue(c->T, 2); + c->T = NULL; + return; + } + + PQclear(res); + } +} + +static int +connection_exec(lua_State *T) +{ + struct connection *c; + const char *command; + int n; + + luaL_checktype(T, 1, LUA_TUSERDATA); + command = luaL_checkstring(T, 2); + + c = lua_touserdata(T, 1); + if (c->T != NULL) { + lua_pushnil(T); + lua_pushliteral(T, "busy"); + return 2; + } + + if (c->conn == NULL) { + lua_pushnil(T); + lua_pushliteral(T, "closed"); + return 2; + } + + n = lua_gettop(T) - 2; + if (n > 0) { + const char **values = lem_xmalloc(n * sizeof(char *)); + int *lengths = lem_xmalloc(n * sizeof(int)); + int i; + + for (i = 0; i < n; i++) { + size_t len; + + values[i] = lua_tolstring(T, i+3, &len); + lengths[i] = len; + } + + n = PQsendQueryParams(c->conn, command, n, + NULL, values, lengths, NULL, 0); + free(values); + free(lengths); + } else + n = PQsendQuery(c->conn, command); + + if (n != 1) { + lem_debug("PQsendQuery failed"); + push_error(T, c->conn); + return 2; + } + + c->T = T; + c->w.cb = exec_handler; + c->w.events = EV_READ; + ev_io_start(LEM_ &c->w); + + lua_settop(T, 1); + return lua_yield(T, 1); +} + +static int +connection_prepare(lua_State *T) +{ + struct connection *c; + const char *name; + const char *query; + + luaL_checktype(T, 1, LUA_TUSERDATA); + name = luaL_checkstring(T, 2); + query = luaL_checkstring(T, 3); + + c = lua_touserdata(T, 1); + if (c->T != NULL) { + lua_pushnil(T); + lua_pushliteral(T, "busy"); + return 2; + } + + if (c->conn == NULL) { + lua_pushnil(T); + lua_pushliteral(T, "closed"); + return 2; + } + + if (PQsendPrepare(c->conn, name, query, 0, NULL) != 1) { + push_error(T, c->conn); + return 2; + } + + c->T = T; + c->w.cb = exec_handler; + c->w.events = EV_READ; + ev_io_start(LEM_ &c->w); + + lua_settop(T, 1); + return lua_yield(T, 1); +} + +static int +connection_run(lua_State *T) +{ + struct connection *c; + const char *name; + const char **values; + int *lengths; + int n; + int i; + + luaL_checktype(T, 1, LUA_TUSERDATA); + name = luaL_checkstring(T, 2); + + c = lua_touserdata(T, 1); + if (c->T != NULL) { + lua_pushnil(T); + lua_pushliteral(T, "busy"); + return 2; + } + + if (c->conn == NULL) { + lua_pushnil(T); + lua_pushliteral(T, "closed"); + return 2; + } + + n = lua_gettop(T) - 2; + values = lem_xmalloc(n * sizeof(char *)); + lengths = lem_xmalloc(n * sizeof(int)); + + for (i = 0; i < n; i++) { + size_t len; + + values[i] = lua_tolstring(T, i+3, &len); + lengths[i] = len; + } + + n = PQsendQueryPrepared(c->conn, name, n, + values, lengths, NULL, 0); + free(values); + free(lengths); + if (n != 1) { + push_error(T, c->conn); + return 2; + } + + c->T = T; + c->w.cb = exec_handler; + c->w.events = EV_READ; + ev_io_start(LEM_ &c->w); + + lua_settop(T, 1); + return lua_yield(T, 1); +} + +static void +put_handler(EV_P_ struct ev_io *w, int revents) +{ + struct connection *c = (struct connection *)w; + size_t len; + const char *data; + + (void)revents; + + data = lua_tolstring(c->T, 2, &len); + switch (PQputCopyData(c->conn, data, (int)len)) { + case 1: /* data sent */ + lem_debug("data sent"); + ev_io_stop(EV_A_ &c->w); + + lua_settop(c->T, 0); + lua_pushboolean(c->T, 1); + lem_queue(c->T, 1); + c->T = NULL; + break; + + case 0: /* would block */ + lem_debug("would block"); + break; + + default: /* should be -1 for error */ + lua_settop(c->T, 0); + push_error(c->T, c->conn); + lem_queue(c->T, 2); + c->T = NULL; + break; + } +} + +static int +connection_put(lua_State *T) +{ + struct connection *c; + size_t len; + const char *data; + + luaL_checktype(T, 1, LUA_TUSERDATA); + data = luaL_checklstring(T, 2, &len); + + c = lua_touserdata(T, 1); + if (c->T != NULL) { + lua_pushnil(T); + lua_pushliteral(T, "busy"); + return 2; + } + + if (c->conn == NULL) { + lua_pushnil(T); + lua_pushliteral(T, "closed"); + return 2; + } + + switch (PQputCopyData(c->conn, data, (int)len)) { + case 1: /* data sent */ + lem_debug("data sent"); + lua_pushboolean(T, 1); + return 1; + + case 0: /* would block */ + lem_debug("would block"); + break; + + default: /* should be -1 for error */ + push_error(T, c->conn); + return 2; + } + + c->T = T; + c->w.cb = put_handler; + c->w.events = EV_WRITE; + ev_io_start(LEM_ &c->w); + + lua_settop(T, 2); + return lua_yield(T, 2); +} + +static void +done_handler(EV_P_ struct ev_io *w, int revents) +{ + struct connection *c = (struct connection *)w; + const char *error = lua_tostring(c->T, 2); + + (void)revents; + + switch (PQputCopyEnd(c->conn, error)) { + case 1: /* data sent */ + lem_debug("data sent"); + ev_io_stop(EV_A_ &c->w); + lua_settop(c->T, 1); + (void)PQsetnonblocking(c->conn, 0); + c->w.cb = exec_handler; + c->w.events = EV_READ; + ev_io_start(EV_A_ &c->w); + break; + + case 0: /* would block */ + lem_debug("would block"); + break; + + default: /* should be -1 for error */ + ev_io_stop(EV_A_ &c->w); + lua_settop(c->T, 0); + push_error(c->T, c->conn); + lem_queue(c->T, 2); + c->T = NULL; + break; + } +} + +static int +connection_done(lua_State *T) +{ + struct connection *c; + const char *error; + + luaL_checktype(T, 1, LUA_TUSERDATA); + error = luaL_optstring(T, 2, NULL); + + c = lua_touserdata(T, 1); + if (c->T != NULL) { + lua_pushnil(T); + lua_pushliteral(T, "busy"); + return 2; + } + + if (c->conn == NULL) { + lua_pushnil(T); + lua_pushliteral(T, "closed"); + return 2; + } + + switch (PQputCopyEnd(c->conn, error)) { + case 1: /* data sent */ + lem_debug("data sent"); + (void)PQsetnonblocking(c->conn, 0); + c->T = T; + c->w.cb = exec_handler; + c->w.events = EV_READ; + ev_io_start(LEM_ &c->w); + lua_settop(c->T, 1); + return lua_yield(c->T, 1); + + case 0: /* would block */ + lem_debug("would block"); + break; + + default: /* should be -1 for error */ + push_error(T, c->conn); + return 2; + } + + c->T = T; + c->w.cb = done_handler; + c->w.events = EV_WRITE; + ev_io_start(LEM_ &c->w); + + if (error == NULL) { + lua_settop(T, 1); + lua_pushnil(T); + } else + lua_settop(T, 2); + return lua_yield(T, 2); +} + +static void +get_handler(EV_P_ struct ev_io *w, int revents) +{ + struct connection *c = (struct connection *)w; + char *buf; + int ret; + + ret = PQgetCopyData(c->conn, &buf, 1); + if (ret > 0) { + lem_debug("got data"); + ev_io_stop(EV_A_ &c->w); + + lua_pushlstring(c->T, buf, ret); + PQfreemem(buf); + lem_queue(c->T, 1); + c->T = NULL; + return; + } + + switch (ret) { + case 0: /* would block */ + lem_debug("would block"); + break; + + case -1: /* no more data */ + lem_debug("no more data"); + c->w.cb = exec_handler; + exec_handler(EV_A_ &c->w, revents); + break; + + default: /* should be -2 for error */ + ev_io_stop(EV_A_ &c->w); + lua_settop(c->T, 0); + push_error(c->T, c->conn); + lem_queue(c->T, 2); + c->T = NULL; + break; + } +} + +static int +connection_get(lua_State *T) +{ + struct connection *c; + char *buf; + int ret; + + luaL_checktype(T, 1, LUA_TUSERDATA); + c = lua_touserdata(T, 1); + if (c->T != NULL) { + lua_pushnil(T); + lua_pushliteral(T, "busy"); + return 2; + } + + if (c->conn == NULL) { + lua_pushnil(T); + lua_pushliteral(T, "closed"); + return 2; + } + + ret = PQgetCopyData(c->conn, &buf, 1); + if (ret > 0) { + lem_debug("got data"); + lua_pushlstring(T, buf, ret); + PQfreemem(buf); + return 1; + } + + switch (ret) { + case 0: /* would block */ + lem_debug("would block"); + c->T = T; + c->w.cb = get_handler; + c->w.events = EV_READ; + ev_io_start(LEM_ &c->w); + + lua_settop(T, 1); + return lua_yield(T, 1); + + case -1: /* no more data */ + lem_debug("no more data"); + break; + + default: /* should be -2 for error */ + push_error(T, c->conn); + return 2; + } + + c->T = T; + c->w.cb = exec_handler; + c->w.events = EV_READ; + ev_io_start(LEM_ &c->w); + + /* TODO: it is necessary but kinda ugly to call + * the exec_handler directly from here. + * find a better solution... */ + lua_settop(T, 1); + exec_handler(LEM_ &c->w, 0); + return lua_yield(T, 1); +} + +int +luaopen_lem_postgres(lua_State *L) +{ + lua_newtable(L); + + /* create Connection metatable mt */ + lua_newtable(L); + lua_pushvalue(L, -1); + lua_setfield(L, -2, "__index"); + /* mt.__gc = */ + lua_pushcfunction(L, connection_gc); + lua_setfield(L, -2, "__gc"); + /* mt.close = */ + lua_pushcfunction(L, connection_close); + lua_setfield(L, -2, "close"); + /* mt.connect = */ + lua_pushvalue(L, -1); /* upvalue 1: Connection */ + lua_pushcclosure(L, connection_connect, 1); + lua_setfield(L, -3, "connect"); + /* mt.reset = */ + lua_pushcfunction(L, connection_reset); + lua_setfield(L, -2, "reset"); + /* mt.exec = */ + lua_pushcfunction(L, connection_exec); + lua_setfield(L, -2, "exec"); + /* mt.prepare = */ + lua_pushcfunction(L, connection_prepare); + lua_setfield(L, -2, "prepare"); + /* mt.run = */ + lua_pushcfunction(L, connection_run); + lua_setfield(L, -2, "run"); + /* mt.put = */ + lua_pushcfunction(L, connection_put); + lua_setfield(L, -2, "put"); + /* mt.done = */ + lua_pushcfunction(L, connection_done); + lua_setfield(L, -2, "done"); + /* mt.get = */ + lua_pushcfunction(L, connection_get); + lua_setfield(L, -2, "get"); + /* set Connection */ + lua_setfield(L, -2, "Connection"); + + return 1; +} diff --git a/lem/postgres.so b/lem/postgres.so deleted file mode 120000 index d0ce981..0000000 --- a/lem/postgres.so +++ /dev/null @@ -1 +0,0 @@ -../postgres.so \ No newline at end of file diff --git a/lem/postgres/queued.lua b/lem/postgres/queued.lua deleted file mode 120000 index 94dfe13..0000000 --- a/lem/postgres/queued.lua +++ /dev/null @@ -1 +0,0 @@ -../../queued.lua \ No newline at end of file diff --git a/lem/postgres/queued.lua b/lem/postgres/queued.lua new file mode 100644 index 0000000..2f597f4 --- /dev/null +++ b/lem/postgres/queued.lua @@ -0,0 +1,93 @@ +-- +-- This file is part of lem-postgres. +-- Copyright 2011 Emil Renner Berthing +-- +-- lem-postgres is free software: you can redistribute it and/or +-- modify it under the terms of the GNU General Public License as +-- published by the Free Software Foundation, either version 3 of +-- the License, or (at your option) any later version. +-- +-- lem-postgres is distributed in the hope that it will be useful, +-- but WITHOUT ANY WARRANTY; without even the implied warranty of +-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +-- GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License +-- along with lem-postgres. If not, see . +-- + +local utils = require 'lem.utils' +local postgres = require 'lem.postgres' + +local remove = table.remove +local thisthread, suspend, resume = utils.thisthread, utils.suspend, utils.resume + +local QConnection = {} +QConnection.__index = QConnection + +function QConnection:close() + local ok, err = self.conn:close() + for i = 1, self.n - 1 do + resume(self[i]) + self[i] = nil + end + return ok, err +end +QConnection.__gc = QConnection.close + +local function lock(self) + local n = self.n + if n == 0 then + self.n = 1 + else + self[n] = thisthread() + self.n = n + 1 + suspend() + end + return self.conn +end +QConnection.lock = lock + +local function unlock(self, ...) + local n = self.n - 1 + self.n = n + if n > 0 then + resume(remove(self, 1)) + end + return ... +end +QConnection.unlock = unlock + +local function wrap(method) + return function(self, ...) + return unlock(self, method(lock(self), ...)) + end +end + +local Connection = postgres.Connection +QConnection.exec = wrap(Connection.exec) +QConnection.prepare = wrap(Connection.prepare) +QConnection.run = wrap(Connection.run) + +local qconnect +do + local setmetatable = setmetatable + local connect = postgres.connect + + function qconnect(...) + local conn, err = connect(...) + if not conn then return nil, err end + + return setmetatable({ + n = 0, + conn = conn, + }, QConnection) + end +end + +return { + QConnection = QConnection, + connect = qconnect, +} + +-- vim: ts=2 sw=2 noet: diff --git a/postgres.c b/postgres.c deleted file mode 100644 index 16f2de8..0000000 --- a/postgres.c +++ /dev/null @@ -1,884 +0,0 @@ -/* - * This file is part of lem-postgres. - * Copyright 2011 Emil Renner Berthing - * - * lem-postgres is free software: you can redistribute it and/or - * modify it under the terms of the GNU General Public License as - * published by the Free Software Foundation, either version 3 of - * the License, or (at your option) any later version. - * - * lem-postgres is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with lem-postgres. If not, see . - */ - -#include -#include -#include -#include - -struct connection { - struct ev_io w; - PGconn *conn; - lua_State *T; -}; - -static void -push_error(lua_State *T, PGconn *conn) -{ - const char *msg = PQerrorMessage(conn); - const char *p; - - for (p = msg; *p != '\n' && *p != '\0'; p++); - - lua_pushnil(T); - if (p > msg) - lua_pushlstring(T, msg, p - msg); - else - lua_pushliteral(T, "unknown error"); -} - -static int -connection_gc(lua_State *T) -{ - struct connection *c = lua_touserdata(T, 1); - - if (c->conn == NULL) - return 0; - - ev_io_stop(EV_G_ &c->w); - PQfinish(c->conn); - return 0; -} - -static int -connection_close(lua_State *T) -{ - struct connection *c; - - luaL_checktype(T, 1, LUA_TUSERDATA); - c = lua_touserdata(T, 1); - if (c->conn == NULL) { - lua_pushnil(T); - lua_pushliteral(T, "already closed"); - return 2; - } - - if (c->T != NULL) { - lua_pushnil(c->T); - lua_pushliteral(c->T, "interrupted"); - lem_queue(c->T, 2); - c->T = NULL; - } - - ev_io_stop(EV_G_ &c->w); - PQfinish(c->conn); - c->conn = NULL; - - lua_pushboolean(T, 1); - return 1; -} - -static void -connect_handler(EV_P_ struct ev_io *w, int revents) -{ - struct connection *c = (struct connection *)w; - - (void)revents; - - ev_io_stop(EV_A_ &c->w); - switch (PQconnectPoll(c->conn)) { - case PGRES_POLLING_READING: - lem_debug("PGRES_POLLING_READING, socket = %d", PQsocket(c->conn)); - ev_io_set(&c->w, PQsocket(c->conn), EV_READ); - break; - - case PGRES_POLLING_WRITING: - lem_debug("PGRES_POLLING_WRITING, socket = %d", PQsocket(c->conn)); - ev_io_set(&c->w, PQsocket(c->conn), EV_WRITE); - break; - - case PGRES_POLLING_FAILED: - lem_debug("PGRES_POLLING_FAILED"); - lua_settop(c->T, 0); - push_error(c->T, c->conn); - PQfinish(c->conn); - c->conn = NULL; - lem_queue(c->T, 2); - c->T = NULL; - return; - - case PGRES_POLLING_OK: - lem_debug("PGRES_POLLING_OK"); - lem_queue(c->T, 1); - c->T = NULL; - return; - -#ifndef NDEBUG - case PGRES_POLLING_ACTIVE: - assert(0); -#endif - } - - ev_io_start(EV_A_ &c->w); -} - -static int -connection_connect(lua_State *T) -{ - const char *conninfo = luaL_checkstring(T, 1); - PGconn *conn; - ConnStatusType status; - struct connection *c; - - conn = PQconnectStart(conninfo); - if (conn == NULL) { - lua_pushnil(T); - lua_pushliteral(T, "out of memory"); - return 2; - } - - status = PQstatus(conn); - if (status == CONNECTION_BAD) { - lem_debug("CONNECTION_BAD"); - goto error; - } - - c = lua_newuserdata(T, sizeof(struct connection)); - lua_pushvalue(T, lua_upvalueindex(1)); - lua_setmetatable(T, -2); - - c->conn = conn; - - switch (PQconnectPoll(conn)) { - case PGRES_POLLING_READING: - lem_debug("PGRES_POLLING_READING"); - ev_io_init(&c->w, connect_handler, PQsocket(conn), EV_READ); - break; - - case PGRES_POLLING_WRITING: - lem_debug("PGRES_POLLING_WRITING"); - ev_io_init(&c->w, connect_handler, PQsocket(conn), EV_WRITE); - break; - - case PGRES_POLLING_FAILED: - lem_debug("PGRES_POLLING_FAILED"); - goto error; - - case PGRES_POLLING_OK: - lem_debug("PGRES_POLLING_OK"); - c->T = NULL; - return 1; - -#ifndef NDEBUG - case PGRES_POLLING_ACTIVE: - assert(0); -#endif - } - - c->T = T; - ev_io_start(EV_G_ &c->w); - - lua_replace(T, 1); - lua_settop(T, 1); - return lua_yield(T, 1); -error: - push_error(T, conn); - PQfinish(conn); - return 2; -} - -static int -connection_reset(lua_State *T) -{ - struct connection *c; - - luaL_checktype(T, 1, LUA_TUSERDATA); - c = lua_touserdata(T, 1); - if (c->T != NULL) { - lua_pushnil(T); - lua_pushliteral(T, "busy"); - return 2; - } - - if (c->conn == NULL) { - lua_pushnil(T); - lua_pushliteral(T, "closed"); - return 2; - } - - if (PQresetStart(c->conn) != 1) - goto error; - - c->w.cb = connect_handler; - switch (PQconnectPoll(c->conn)) { - case PGRES_POLLING_READING: - lem_debug("PGRES_POLLING_READING"); - ev_io_set(&c->w, PQsocket(c->conn), EV_READ); - break; - - case PGRES_POLLING_WRITING: - lem_debug("PGRES_POLLING_WRITING"); - ev_io_set(&c->w, PQsocket(c->conn), EV_WRITE); - break; - - case PGRES_POLLING_FAILED: - lem_debug("PGRES_POLLING_FAILED"); - goto error; - - case PGRES_POLLING_OK: - lem_debug("PGRES_POLLING_OK"); - return 1; - -#ifndef NDEBUG - case PGRES_POLLING_ACTIVE: - assert(0); -#endif - } - - c->T = T; - ev_io_start(EV_G_ &c->w); - - lua_settop(T, 1); - return lua_yield(T, 1); -error: - push_error(T, c->conn); - return 2; -} - -static void -push_tuples(lua_State *T, PGresult *res) -{ - int rows = PQntuples(res); - int columns = PQnfields(res); - int i; - - lua_createtable(T, rows, 0); - for (i = 0; i < rows; i++) { - int j; - - lua_createtable(T, columns, 0); - for (j = 0; j < columns; j++) { - if (PQgetisnull(res, i, j)) - lua_pushnil(T); - else - lua_pushlstring(T, PQgetvalue(res, i, j), - PQgetlength(res, i, j)); - lua_rawseti(T, -2, j+1); - } - lua_rawseti(T, -2, i+1); - } - - /* insert column names as "row 0" */ - lua_createtable(T, columns, 0); - for (i = 0; i < columns; i++) { - lua_pushstring(T, PQfname(res, i)); - lua_rawseti(T, -2, i+1); - } - lua_rawseti(T, -2, 0); -} - -static void -error_handler(EV_P_ struct ev_io *w, int revents) -{ - struct connection *c = (struct connection *)w; - - (void)revents; - - if (PQconsumeInput(c->conn) == 0) { - ev_io_stop(EV_A_ &c->w); - lua_settop(c->T, 0); - push_error(c->T, c->conn); - lem_queue(c->T, 2); - c->T = NULL; - return; - } - - while (!PQisBusy(c->conn)) { - PGresult *res = PQgetResult(c->conn); - - if (res == NULL) { - ev_io_stop(EV_A_ &c->w); - lem_queue(c->T, 2); - c->T = NULL; - return; - } - - PQclear(res); - } -} - -static void -exec_handler(EV_P_ struct ev_io *w, int revents) -{ - struct connection *c = (struct connection *)w; - PGresult *res; - - (void)revents; - - if (PQconsumeInput(c->conn) != 1) { - ev_io_stop(EV_A_ &c->w); - lua_settop(c->T, 0); - push_error(c->T, c->conn); - lem_queue(c->T, 2); - c->T = NULL; - return; - } - - while (!PQisBusy(c->conn)) { - res = PQgetResult(c->conn); - - if (res == NULL) { - ev_io_stop(EV_A_ &c->w); - lem_debug("returning %d values", lua_gettop(c->T) - 1); - lem_queue(c->T, lua_gettop(c->T) - 1); - c->T = NULL; - return; - } - - switch (PQresultStatus(res)) { - case PGRES_EMPTY_QUERY: - lem_debug("PGRES_EMPTY_QUERY"); - lua_settop(c->T, 0); - lua_pushnil(c->T); - lua_pushliteral(c->T, "empty query"); - goto error; - - case PGRES_COMMAND_OK: - lem_debug("PGRES_COMMAND_OK"); - lua_pushboolean(c->T, 1); - break; - - case PGRES_TUPLES_OK: - lem_debug("PGRES_TUPLES_OK"); - push_tuples(c->T, res); - break; - - case PGRES_COPY_IN: - lem_debug("PGRES_COPY_IN"); - (void)PQsetnonblocking(c->conn, 1); - case PGRES_COPY_OUT: - lem_debug("PGRES_COPY_OUT"); - PQclear(res); - lua_pushboolean(c->T, 1); - lem_queue(c->T, lua_gettop(c->T) - 1); - c->T = NULL; - return; - - case PGRES_BAD_RESPONSE: - lem_debug("PGRES_BAD_RESPONSE"); - lua_settop(c->T, 0); - push_error(c->T, c->conn); - goto error; - - case PGRES_NONFATAL_ERROR: - lem_debug("PGRES_NONFATAL_ERROR"); - break; - - case PGRES_FATAL_ERROR: - lem_debug("PGRES_FATAL_ERROR"); - lua_settop(c->T, 0); - push_error(c->T, c->conn); - goto error; - - default: - lem_debug("unknown result status"); - lua_settop(c->T, 0); - lua_pushnil(c->T); - lua_pushliteral(c->T, "unknown result status"); - goto error; - } - - PQclear(res); - } - - lem_debug("busy"); - return; -error: - PQclear(res); - c->w.cb = error_handler; - while (!PQisBusy(c->conn)) { - res = PQgetResult(c->conn); - - if (res == NULL) { - ev_io_stop(EV_A_ &c->w); - lem_queue(c->T, 2); - c->T = NULL; - return; - } - - PQclear(res); - } -} - -static int -connection_exec(lua_State *T) -{ - struct connection *c; - const char *command; - int n; - - luaL_checktype(T, 1, LUA_TUSERDATA); - command = luaL_checkstring(T, 2); - - c = lua_touserdata(T, 1); - if (c->T != NULL) { - lua_pushnil(T); - lua_pushliteral(T, "busy"); - return 2; - } - - if (c->conn == NULL) { - lua_pushnil(T); - lua_pushliteral(T, "closed"); - return 2; - } - - n = lua_gettop(T) - 2; - if (n > 0) { - const char **values = lem_xmalloc(n * sizeof(char *)); - int *lengths = lem_xmalloc(n * sizeof(int)); - int i; - - for (i = 0; i < n; i++) { - size_t len; - - values[i] = lua_tolstring(T, i+3, &len); - lengths[i] = len; - } - - n = PQsendQueryParams(c->conn, command, n, - NULL, values, lengths, NULL, 0); - free(values); - free(lengths); - } else - n = PQsendQuery(c->conn, command); - - if (n != 1) { - lem_debug("PQsendQuery failed"); - push_error(T, c->conn); - return 2; - } - - c->T = T; - c->w.cb = exec_handler; - c->w.events = EV_READ; - ev_io_start(EV_G_ &c->w); - - lua_settop(T, 1); - return lua_yield(T, 1); -} - -static int -connection_prepare(lua_State *T) -{ - struct connection *c; - const char *name; - const char *query; - - luaL_checktype(T, 1, LUA_TUSERDATA); - name = luaL_checkstring(T, 2); - query = luaL_checkstring(T, 3); - - c = lua_touserdata(T, 1); - if (c->T != NULL) { - lua_pushnil(T); - lua_pushliteral(T, "busy"); - return 2; - } - - if (c->conn == NULL) { - lua_pushnil(T); - lua_pushliteral(T, "closed"); - return 2; - } - - if (PQsendPrepare(c->conn, name, query, 0, NULL) != 1) { - push_error(T, c->conn); - return 2; - } - - c->T = T; - c->w.cb = exec_handler; - c->w.events = EV_READ; - ev_io_start(EV_G_ &c->w); - - lua_settop(T, 1); - return lua_yield(T, 1); -} - -static int -connection_run(lua_State *T) -{ - struct connection *c; - const char *name; - const char **values; - int *lengths; - int n; - int i; - - luaL_checktype(T, 1, LUA_TUSERDATA); - name = luaL_checkstring(T, 2); - - c = lua_touserdata(T, 1); - if (c->T != NULL) { - lua_pushnil(T); - lua_pushliteral(T, "busy"); - return 2; - } - - if (c->conn == NULL) { - lua_pushnil(T); - lua_pushliteral(T, "closed"); - return 2; - } - - n = lua_gettop(T) - 2; - values = lem_xmalloc(n * sizeof(char *)); - lengths = lem_xmalloc(n * sizeof(int)); - - for (i = 0; i < n; i++) { - size_t len; - - values[i] = lua_tolstring(T, i+3, &len); - lengths[i] = len; - } - - n = PQsendQueryPrepared(c->conn, name, n, - values, lengths, NULL, 0); - free(values); - free(lengths); - if (n != 1) { - push_error(T, c->conn); - return 2; - } - - c->T = T; - c->w.cb = exec_handler; - c->w.events = EV_READ; - ev_io_start(EV_G_ &c->w); - - lua_settop(T, 1); - return lua_yield(T, 1); -} - -static void -put_handler(EV_P_ struct ev_io *w, int revents) -{ - struct connection *c = (struct connection *)w; - size_t len; - const char *data; - - (void)revents; - - data = lua_tolstring(c->T, 2, &len); - switch (PQputCopyData(c->conn, data, (int)len)) { - case 1: /* data sent */ - lem_debug("data sent"); - ev_io_stop(EV_A_ &c->w); - - lua_settop(c->T, 0); - lua_pushboolean(c->T, 1); - lem_queue(c->T, 1); - c->T = NULL; - break; - - case 0: /* would block */ - lem_debug("would block"); - break; - - default: /* should be -1 for error */ - lua_settop(c->T, 0); - push_error(c->T, c->conn); - lem_queue(c->T, 2); - c->T = NULL; - break; - } -} - -static int -connection_put(lua_State *T) -{ - struct connection *c; - size_t len; - const char *data; - - luaL_checktype(T, 1, LUA_TUSERDATA); - data = luaL_checklstring(T, 2, &len); - - c = lua_touserdata(T, 1); - if (c->T != NULL) { - lua_pushnil(T); - lua_pushliteral(T, "busy"); - return 2; - } - - if (c->conn == NULL) { - lua_pushnil(T); - lua_pushliteral(T, "closed"); - return 2; - } - - switch (PQputCopyData(c->conn, data, (int)len)) { - case 1: /* data sent */ - lem_debug("data sent"); - lua_pushboolean(T, 1); - return 1; - - case 0: /* would block */ - lem_debug("would block"); - break; - - default: /* should be -1 for error */ - push_error(T, c->conn); - return 2; - } - - c->T = T; - c->w.cb = put_handler; - c->w.events = EV_WRITE; - ev_io_start(EV_G_ &c->w); - - lua_settop(T, 2); - return lua_yield(T, 2); -} - -static void -done_handler(EV_P_ struct ev_io *w, int revents) -{ - struct connection *c = (struct connection *)w; - const char *error = lua_tostring(c->T, 2); - - (void)revents; - - switch (PQputCopyEnd(c->conn, error)) { - case 1: /* data sent */ - lem_debug("data sent"); - ev_io_stop(EV_A_ &c->w); - lua_settop(c->T, 1); - (void)PQsetnonblocking(c->conn, 0); - c->w.cb = exec_handler; - c->w.events = EV_READ; - ev_io_start(EV_A_ &c->w); - break; - - case 0: /* would block */ - lem_debug("would block"); - break; - - default: /* should be -1 for error */ - ev_io_stop(EV_A_ &c->w); - lua_settop(c->T, 0); - push_error(c->T, c->conn); - lem_queue(c->T, 2); - c->T = NULL; - break; - } -} - -static int -connection_done(lua_State *T) -{ - struct connection *c; - const char *error; - - luaL_checktype(T, 1, LUA_TUSERDATA); - error = luaL_optstring(T, 2, NULL); - - c = lua_touserdata(T, 1); - if (c->T != NULL) { - lua_pushnil(T); - lua_pushliteral(T, "busy"); - return 2; - } - - if (c->conn == NULL) { - lua_pushnil(T); - lua_pushliteral(T, "closed"); - return 2; - } - - switch (PQputCopyEnd(c->conn, error)) { - case 1: /* data sent */ - lem_debug("data sent"); - (void)PQsetnonblocking(c->conn, 0); - c->T = T; - c->w.cb = exec_handler; - c->w.events = EV_READ; - ev_io_start(EV_G_ &c->w); - lua_settop(c->T, 1); - return lua_yield(c->T, 1); - - case 0: /* would block */ - lem_debug("would block"); - break; - - default: /* should be -1 for error */ - push_error(T, c->conn); - return 2; - } - - c->T = T; - c->w.cb = done_handler; - c->w.events = EV_WRITE; - ev_io_start(EV_G_ &c->w); - - if (error == NULL) { - lua_settop(T, 1); - lua_pushnil(T); - } else - lua_settop(T, 2); - return lua_yield(T, 2); -} - -static void -get_handler(EV_P_ struct ev_io *w, int revents) -{ - struct connection *c = (struct connection *)w; - char *buf; - int ret; - - ret = PQgetCopyData(c->conn, &buf, 1); - if (ret > 0) { - lem_debug("got data"); - ev_io_stop(EV_A_ &c->w); - - lua_pushlstring(c->T, buf, ret); - PQfreemem(buf); - lem_queue(c->T, 1); - c->T = NULL; - return; - } - - switch (ret) { - case 0: /* would block */ - lem_debug("would block"); - break; - - case -1: /* no more data */ - lem_debug("no more data"); - c->w.cb = exec_handler; - exec_handler(EV_A_ &c->w, revents); - break; - - default: /* should be -2 for error */ - ev_io_stop(EV_A_ &c->w); - lua_settop(c->T, 0); - push_error(c->T, c->conn); - lem_queue(c->T, 2); - c->T = NULL; - break; - } -} - -static int -connection_get(lua_State *T) -{ - struct connection *c; - char *buf; - int ret; - - luaL_checktype(T, 1, LUA_TUSERDATA); - c = lua_touserdata(T, 1); - if (c->T != NULL) { - lua_pushnil(T); - lua_pushliteral(T, "busy"); - return 2; - } - - if (c->conn == NULL) { - lua_pushnil(T); - lua_pushliteral(T, "closed"); - return 2; - } - - ret = PQgetCopyData(c->conn, &buf, 1); - if (ret > 0) { - lem_debug("got data"); - lua_pushlstring(T, buf, ret); - PQfreemem(buf); - return 1; - } - - switch (ret) { - case 0: /* would block */ - lem_debug("would block"); - c->T = T; - c->w.cb = get_handler; - c->w.events = EV_READ; - ev_io_start(EV_G_ &c->w); - - lua_settop(T, 1); - return lua_yield(T, 1); - - case -1: /* no more data */ - lem_debug("no more data"); - break; - - default: /* should be -2 for error */ - push_error(T, c->conn); - return 2; - } - - c->T = T; - c->w.cb = exec_handler; - c->w.events = EV_READ; - ev_io_start(EV_G_ &c->w); - - /* TODO: it is necessary but kinda ugly to call - * the exec_handler directly from here. - * find a better solution... */ - lua_settop(T, 1); - exec_handler(EV_G_ &c->w, 0); - return lua_yield(T, 1); -} - -int -luaopen_lem_postgres(lua_State *L) -{ - lua_newtable(L); - - /* create Connection metatable mt */ - lua_newtable(L); - lua_pushvalue(L, -1); - lua_setfield(L, -2, "__index"); - /* mt.__gc = */ - lua_pushcfunction(L, connection_gc); - lua_setfield(L, -2, "__gc"); - /* mt.close = */ - lua_pushcfunction(L, connection_close); - lua_setfield(L, -2, "close"); - /* mt.connect = */ - lua_pushvalue(L, -1); /* upvalue 1: Connection */ - lua_pushcclosure(L, connection_connect, 1); - lua_setfield(L, -3, "connect"); - /* mt.reset = */ - lua_pushcfunction(L, connection_reset); - lua_setfield(L, -2, "reset"); - /* mt.exec = */ - lua_pushcfunction(L, connection_exec); - lua_setfield(L, -2, "exec"); - /* mt.prepare = */ - lua_pushcfunction(L, connection_prepare); - lua_setfield(L, -2, "prepare"); - /* mt.run = */ - lua_pushcfunction(L, connection_run); - lua_setfield(L, -2, "run"); - /* mt.put = */ - lua_pushcfunction(L, connection_put); - lua_setfield(L, -2, "put"); - /* mt.done = */ - lua_pushcfunction(L, connection_done); - lua_setfield(L, -2, "done"); - /* mt.get = */ - lua_pushcfunction(L, connection_get); - lua_setfield(L, -2, "get"); - /* set Connection */ - lua_setfield(L, -2, "Connection"); - - return 1; -} diff --git a/qtest.lua b/qtest.lua index c1c0f18..bc3dfe2 100755 --- a/qtest.lua +++ b/qtest.lua @@ -105,7 +105,8 @@ do db:unlock() end -utils.timer(1.0, function() +utils.spawn(function() + utils.sleeper():sleep(1.0) assert(db:exec('DROP TABLE mytable')) end) diff --git a/queued.lua b/queued.lua deleted file mode 100644 index d244e61..0000000 --- a/queued.lua +++ /dev/null @@ -1,96 +0,0 @@ --- --- This file is part of lem-postgres. --- Copyright 2011 Emil Renner Berthing --- --- lem-postgres is free software: you can redistribute it and/or --- modify it under the terms of the GNU General Public License as --- published by the Free Software Foundation, either version 3 of --- the License, or (at your option) any later version. --- --- lem-postgres is distributed in the hope that it will be useful, --- but WITHOUT ANY WARRANTY; without even the implied warranty of --- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the --- GNU General Public License for more details. --- --- You should have received a copy of the GNU General Public License --- along with lem-postgres. If not, see . --- - -local utils = require 'lem.utils' -local postgres = require 'lem.postgres' - -local QConnection = {} -QConnection.__index = QConnection - -function QConnection:close() - local ok, err = self.conn:close() - for i = 1, self.n - 1 do - self[i]:wakeup() - self[i] = nil - end - return ok, err -end -QConnection.__gc = QConnection.close - -do - local remove = table.remove - local newsleeper = utils.sleeper - - local function lock(self) - local n = self.n - if n == 0 then - self.n = 1 - else - local sleeper = newsleeper() - self[n] = sleeper - self.n = n + 1 - sleeper:sleep() - end - return self.conn - end - QConnection.lock = lock - - local function unlock(self, ...) - local n = self.n - 1 - self.n = n - if n > 0 then - remove(self, 1):wakeup() - end - return ... - end - QConnection.unlock = unlock - - local function wrap(method) - return function(self, ...) - return unlock(self, method(lock(self), ...)) - end - end - - local Connection = postgres.Connection - QConnection.exec = wrap(Connection.exec) - QConnection.prepare = wrap(Connection.prepare) - QConnection.run = wrap(Connection.run) -end - -local qconnect -do - local setmetatable = setmetatable - local connect = postgres.connect - - function qconnect(...) - local conn, err = connect(...) - if not conn then return nil, err end - - return setmetatable({ - n = 0, - conn = conn, - }, QConnection) - end -end - -return { - QConnection = QConnection, - connect = qconnect, -} - --- vim: ts=2 sw=2 noet: -- cgit v1.2.1