diff options
Diffstat (limited to 'lem/postgres.c')
-rw-r--r-- | lem/postgres.c | 884 |
1 files changed, 884 insertions, 0 deletions
diff --git a/lem/postgres.c b/lem/postgres.c new file mode 100644 index 0000000..2d28cf5 --- /dev/null +++ b/lem/postgres.c @@ -0,0 +1,884 @@ +/* + * This file is part of lem-postgres. + * Copyright 2011 Emil Renner Berthing + * + * lem-postgres is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * lem-postgres is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with lem-postgres. If not, see <http://www.gnu.org/licenses/>. + */ + +#include <stdlib.h> +#include <assert.h> +#include <lem.h> +#include <libpq-fe.h> + +struct connection { + struct ev_io w; + PGconn *conn; + lua_State *T; +}; + +static void +push_error(lua_State *T, PGconn *conn) +{ + const char *msg = PQerrorMessage(conn); + const char *p; + + for (p = msg; *p != '\n' && *p != '\0'; p++); + + lua_pushnil(T); + if (p > msg) + lua_pushlstring(T, msg, p - msg); + else + lua_pushliteral(T, "unknown error"); +} + +static int +connection_gc(lua_State *T) +{ + struct connection *c = lua_touserdata(T, 1); + + if (c->conn == NULL) + return 0; + + ev_io_stop(LEM_ &c->w); + PQfinish(c->conn); + return 0; +} + +static int +connection_close(lua_State *T) +{ + struct connection *c; + + luaL_checktype(T, 1, LUA_TUSERDATA); + c = lua_touserdata(T, 1); + if (c->conn == NULL) { + lua_pushnil(T); + lua_pushliteral(T, "already closed"); + return 2; + } + + if (c->T != NULL) { + lua_pushnil(c->T); + lua_pushliteral(c->T, "interrupted"); + lem_queue(c->T, 2); + c->T = NULL; + } + + ev_io_stop(LEM_ &c->w); + PQfinish(c->conn); + c->conn = NULL; + + lua_pushboolean(T, 1); + return 1; +} + +static void +connect_handler(EV_P_ struct ev_io *w, int revents) +{ + struct connection *c = (struct connection *)w; + + (void)revents; + + ev_io_stop(EV_A_ &c->w); + switch (PQconnectPoll(c->conn)) { + case PGRES_POLLING_READING: + lem_debug("PGRES_POLLING_READING, socket = %d", PQsocket(c->conn)); + ev_io_set(&c->w, PQsocket(c->conn), EV_READ); + break; + + case PGRES_POLLING_WRITING: + lem_debug("PGRES_POLLING_WRITING, socket = %d", PQsocket(c->conn)); + ev_io_set(&c->w, PQsocket(c->conn), EV_WRITE); + break; + + case PGRES_POLLING_FAILED: + lem_debug("PGRES_POLLING_FAILED"); + lua_settop(c->T, 0); + push_error(c->T, c->conn); + PQfinish(c->conn); + c->conn = NULL; + lem_queue(c->T, 2); + c->T = NULL; + return; + + case PGRES_POLLING_OK: + lem_debug("PGRES_POLLING_OK"); + lem_queue(c->T, 1); + c->T = NULL; + return; + +#ifndef NDEBUG + case PGRES_POLLING_ACTIVE: + assert(0); +#endif + } + + ev_io_start(EV_A_ &c->w); +} + +static int +connection_connect(lua_State *T) +{ + const char *conninfo = luaL_checkstring(T, 1); + PGconn *conn; + ConnStatusType status; + struct connection *c; + + conn = PQconnectStart(conninfo); + if (conn == NULL) { + lua_pushnil(T); + lua_pushliteral(T, "out of memory"); + return 2; + } + + status = PQstatus(conn); + if (status == CONNECTION_BAD) { + lem_debug("CONNECTION_BAD"); + goto error; + } + + c = lua_newuserdata(T, sizeof(struct connection)); + lua_pushvalue(T, lua_upvalueindex(1)); + lua_setmetatable(T, -2); + + c->conn = conn; + + switch (PQconnectPoll(conn)) { + case PGRES_POLLING_READING: + lem_debug("PGRES_POLLING_READING"); + ev_io_init(&c->w, connect_handler, PQsocket(conn), EV_READ); + break; + + case PGRES_POLLING_WRITING: + lem_debug("PGRES_POLLING_WRITING"); + ev_io_init(&c->w, connect_handler, PQsocket(conn), EV_WRITE); + break; + + case PGRES_POLLING_FAILED: + lem_debug("PGRES_POLLING_FAILED"); + goto error; + + case PGRES_POLLING_OK: + lem_debug("PGRES_POLLING_OK"); + c->T = NULL; + return 1; + +#ifndef NDEBUG + case PGRES_POLLING_ACTIVE: + assert(0); +#endif + } + + c->T = T; + ev_io_start(LEM_ &c->w); + + lua_replace(T, 1); + lua_settop(T, 1); + return lua_yield(T, 1); +error: + push_error(T, conn); + PQfinish(conn); + return 2; +} + +static int +connection_reset(lua_State *T) +{ + struct connection *c; + + luaL_checktype(T, 1, LUA_TUSERDATA); + c = lua_touserdata(T, 1); + if (c->T != NULL) { + lua_pushnil(T); + lua_pushliteral(T, "busy"); + return 2; + } + + if (c->conn == NULL) { + lua_pushnil(T); + lua_pushliteral(T, "closed"); + return 2; + } + + if (PQresetStart(c->conn) != 1) + goto error; + + c->w.cb = connect_handler; + switch (PQconnectPoll(c->conn)) { + case PGRES_POLLING_READING: + lem_debug("PGRES_POLLING_READING"); + ev_io_set(&c->w, PQsocket(c->conn), EV_READ); + break; + + case PGRES_POLLING_WRITING: + lem_debug("PGRES_POLLING_WRITING"); + ev_io_set(&c->w, PQsocket(c->conn), EV_WRITE); + break; + + case PGRES_POLLING_FAILED: + lem_debug("PGRES_POLLING_FAILED"); + goto error; + + case PGRES_POLLING_OK: + lem_debug("PGRES_POLLING_OK"); + return 1; + +#ifndef NDEBUG + case PGRES_POLLING_ACTIVE: + assert(0); +#endif + } + + c->T = T; + ev_io_start(LEM_ &c->w); + + lua_settop(T, 1); + return lua_yield(T, 1); +error: + push_error(T, c->conn); + return 2; +} + +static void +push_tuples(lua_State *T, PGresult *res) +{ + int rows = PQntuples(res); + int columns = PQnfields(res); + int i; + + lua_createtable(T, rows, 0); + for (i = 0; i < rows; i++) { + int j; + + lua_createtable(T, columns, 0); + for (j = 0; j < columns; j++) { + if (PQgetisnull(res, i, j)) + lua_pushnil(T); + else + lua_pushlstring(T, PQgetvalue(res, i, j), + PQgetlength(res, i, j)); + lua_rawseti(T, -2, j+1); + } + lua_rawseti(T, -2, i+1); + } + + /* insert column names as "row 0" */ + lua_createtable(T, columns, 0); + for (i = 0; i < columns; i++) { + lua_pushstring(T, PQfname(res, i)); + lua_rawseti(T, -2, i+1); + } + lua_rawseti(T, -2, 0); +} + +static void +error_handler(EV_P_ struct ev_io *w, int revents) +{ + struct connection *c = (struct connection *)w; + + (void)revents; + + if (PQconsumeInput(c->conn) == 0) { + ev_io_stop(EV_A_ &c->w); + lua_settop(c->T, 0); + push_error(c->T, c->conn); + lem_queue(c->T, 2); + c->T = NULL; + return; + } + + while (!PQisBusy(c->conn)) { + PGresult *res = PQgetResult(c->conn); + + if (res == NULL) { + ev_io_stop(EV_A_ &c->w); + lem_queue(c->T, 2); + c->T = NULL; + return; + } + + PQclear(res); + } +} + +static void +exec_handler(EV_P_ struct ev_io *w, int revents) +{ + struct connection *c = (struct connection *)w; + PGresult *res; + + (void)revents; + + if (PQconsumeInput(c->conn) != 1) { + ev_io_stop(EV_A_ &c->w); + lua_settop(c->T, 0); + push_error(c->T, c->conn); + lem_queue(c->T, 2); + c->T = NULL; + return; + } + + while (!PQisBusy(c->conn)) { + res = PQgetResult(c->conn); + + if (res == NULL) { + ev_io_stop(EV_A_ &c->w); + lem_debug("returning %d values", lua_gettop(c->T) - 1); + lem_queue(c->T, lua_gettop(c->T) - 1); + c->T = NULL; + return; + } + + switch (PQresultStatus(res)) { + case PGRES_EMPTY_QUERY: + lem_debug("PGRES_EMPTY_QUERY"); + lua_settop(c->T, 0); + lua_pushnil(c->T); + lua_pushliteral(c->T, "empty query"); + goto error; + + case PGRES_COMMAND_OK: + lem_debug("PGRES_COMMAND_OK"); + lua_pushboolean(c->T, 1); + break; + + case PGRES_TUPLES_OK: + lem_debug("PGRES_TUPLES_OK"); + push_tuples(c->T, res); + break; + + case PGRES_COPY_IN: + lem_debug("PGRES_COPY_IN"); + (void)PQsetnonblocking(c->conn, 1); + case PGRES_COPY_OUT: + lem_debug("PGRES_COPY_OUT"); + PQclear(res); + lua_pushboolean(c->T, 1); + lem_queue(c->T, lua_gettop(c->T) - 1); + c->T = NULL; + return; + + case PGRES_BAD_RESPONSE: + lem_debug("PGRES_BAD_RESPONSE"); + lua_settop(c->T, 0); + push_error(c->T, c->conn); + goto error; + + case PGRES_NONFATAL_ERROR: + lem_debug("PGRES_NONFATAL_ERROR"); + break; + + case PGRES_FATAL_ERROR: + lem_debug("PGRES_FATAL_ERROR"); + lua_settop(c->T, 0); + push_error(c->T, c->conn); + goto error; + + default: + lem_debug("unknown result status"); + lua_settop(c->T, 0); + lua_pushnil(c->T); + lua_pushliteral(c->T, "unknown result status"); + goto error; + } + + PQclear(res); + } + + lem_debug("busy"); + return; +error: + PQclear(res); + c->w.cb = error_handler; + while (!PQisBusy(c->conn)) { + res = PQgetResult(c->conn); + + if (res == NULL) { + ev_io_stop(EV_A_ &c->w); + lem_queue(c->T, 2); + c->T = NULL; + return; + } + + PQclear(res); + } +} + +static int +connection_exec(lua_State *T) +{ + struct connection *c; + const char *command; + int n; + + luaL_checktype(T, 1, LUA_TUSERDATA); + command = luaL_checkstring(T, 2); + + c = lua_touserdata(T, 1); + if (c->T != NULL) { + lua_pushnil(T); + lua_pushliteral(T, "busy"); + return 2; + } + + if (c->conn == NULL) { + lua_pushnil(T); + lua_pushliteral(T, "closed"); + return 2; + } + + n = lua_gettop(T) - 2; + if (n > 0) { + const char **values = lem_xmalloc(n * sizeof(char *)); + int *lengths = lem_xmalloc(n * sizeof(int)); + int i; + + for (i = 0; i < n; i++) { + size_t len; + + values[i] = lua_tolstring(T, i+3, &len); + lengths[i] = len; + } + + n = PQsendQueryParams(c->conn, command, n, + NULL, values, lengths, NULL, 0); + free(values); + free(lengths); + } else + n = PQsendQuery(c->conn, command); + + if (n != 1) { + lem_debug("PQsendQuery failed"); + push_error(T, c->conn); + return 2; + } + + c->T = T; + c->w.cb = exec_handler; + c->w.events = EV_READ; + ev_io_start(LEM_ &c->w); + + lua_settop(T, 1); + return lua_yield(T, 1); +} + +static int +connection_prepare(lua_State *T) +{ + struct connection *c; + const char *name; + const char *query; + + luaL_checktype(T, 1, LUA_TUSERDATA); + name = luaL_checkstring(T, 2); + query = luaL_checkstring(T, 3); + + c = lua_touserdata(T, 1); + if (c->T != NULL) { + lua_pushnil(T); + lua_pushliteral(T, "busy"); + return 2; + } + + if (c->conn == NULL) { + lua_pushnil(T); + lua_pushliteral(T, "closed"); + return 2; + } + + if (PQsendPrepare(c->conn, name, query, 0, NULL) != 1) { + push_error(T, c->conn); + return 2; + } + + c->T = T; + c->w.cb = exec_handler; + c->w.events = EV_READ; + ev_io_start(LEM_ &c->w); + + lua_settop(T, 1); + return lua_yield(T, 1); +} + +static int +connection_run(lua_State *T) +{ + struct connection *c; + const char *name; + const char **values; + int *lengths; + int n; + int i; + + luaL_checktype(T, 1, LUA_TUSERDATA); + name = luaL_checkstring(T, 2); + + c = lua_touserdata(T, 1); + if (c->T != NULL) { + lua_pushnil(T); + lua_pushliteral(T, "busy"); + return 2; + } + + if (c->conn == NULL) { + lua_pushnil(T); + lua_pushliteral(T, "closed"); + return 2; + } + + n = lua_gettop(T) - 2; + values = lem_xmalloc(n * sizeof(char *)); + lengths = lem_xmalloc(n * sizeof(int)); + + for (i = 0; i < n; i++) { + size_t len; + + values[i] = lua_tolstring(T, i+3, &len); + lengths[i] = len; + } + + n = PQsendQueryPrepared(c->conn, name, n, + values, lengths, NULL, 0); + free(values); + free(lengths); + if (n != 1) { + push_error(T, c->conn); + return 2; + } + + c->T = T; + c->w.cb = exec_handler; + c->w.events = EV_READ; + ev_io_start(LEM_ &c->w); + + lua_settop(T, 1); + return lua_yield(T, 1); +} + +static void +put_handler(EV_P_ struct ev_io *w, int revents) +{ + struct connection *c = (struct connection *)w; + size_t len; + const char *data; + + (void)revents; + + data = lua_tolstring(c->T, 2, &len); + switch (PQputCopyData(c->conn, data, (int)len)) { + case 1: /* data sent */ + lem_debug("data sent"); + ev_io_stop(EV_A_ &c->w); + + lua_settop(c->T, 0); + lua_pushboolean(c->T, 1); + lem_queue(c->T, 1); + c->T = NULL; + break; + + case 0: /* would block */ + lem_debug("would block"); + break; + + default: /* should be -1 for error */ + lua_settop(c->T, 0); + push_error(c->T, c->conn); + lem_queue(c->T, 2); + c->T = NULL; + break; + } +} + +static int +connection_put(lua_State *T) +{ + struct connection *c; + size_t len; + const char *data; + + luaL_checktype(T, 1, LUA_TUSERDATA); + data = luaL_checklstring(T, 2, &len); + + c = lua_touserdata(T, 1); + if (c->T != NULL) { + lua_pushnil(T); + lua_pushliteral(T, "busy"); + return 2; + } + + if (c->conn == NULL) { + lua_pushnil(T); + lua_pushliteral(T, "closed"); + return 2; + } + + switch (PQputCopyData(c->conn, data, (int)len)) { + case 1: /* data sent */ + lem_debug("data sent"); + lua_pushboolean(T, 1); + return 1; + + case 0: /* would block */ + lem_debug("would block"); + break; + + default: /* should be -1 for error */ + push_error(T, c->conn); + return 2; + } + + c->T = T; + c->w.cb = put_handler; + c->w.events = EV_WRITE; + ev_io_start(LEM_ &c->w); + + lua_settop(T, 2); + return lua_yield(T, 2); +} + +static void +done_handler(EV_P_ struct ev_io *w, int revents) +{ + struct connection *c = (struct connection *)w; + const char *error = lua_tostring(c->T, 2); + + (void)revents; + + switch (PQputCopyEnd(c->conn, error)) { + case 1: /* data sent */ + lem_debug("data sent"); + ev_io_stop(EV_A_ &c->w); + lua_settop(c->T, 1); + (void)PQsetnonblocking(c->conn, 0); + c->w.cb = exec_handler; + c->w.events = EV_READ; + ev_io_start(EV_A_ &c->w); + break; + + case 0: /* would block */ + lem_debug("would block"); + break; + + default: /* should be -1 for error */ + ev_io_stop(EV_A_ &c->w); + lua_settop(c->T, 0); + push_error(c->T, c->conn); + lem_queue(c->T, 2); + c->T = NULL; + break; + } +} + +static int +connection_done(lua_State *T) +{ + struct connection *c; + const char *error; + + luaL_checktype(T, 1, LUA_TUSERDATA); + error = luaL_optstring(T, 2, NULL); + + c = lua_touserdata(T, 1); + if (c->T != NULL) { + lua_pushnil(T); + lua_pushliteral(T, "busy"); + return 2; + } + + if (c->conn == NULL) { + lua_pushnil(T); + lua_pushliteral(T, "closed"); + return 2; + } + + switch (PQputCopyEnd(c->conn, error)) { + case 1: /* data sent */ + lem_debug("data sent"); + (void)PQsetnonblocking(c->conn, 0); + c->T = T; + c->w.cb = exec_handler; + c->w.events = EV_READ; + ev_io_start(LEM_ &c->w); + lua_settop(c->T, 1); + return lua_yield(c->T, 1); + + case 0: /* would block */ + lem_debug("would block"); + break; + + default: /* should be -1 for error */ + push_error(T, c->conn); + return 2; + } + + c->T = T; + c->w.cb = done_handler; + c->w.events = EV_WRITE; + ev_io_start(LEM_ &c->w); + + if (error == NULL) { + lua_settop(T, 1); + lua_pushnil(T); + } else + lua_settop(T, 2); + return lua_yield(T, 2); +} + +static void +get_handler(EV_P_ struct ev_io *w, int revents) +{ + struct connection *c = (struct connection *)w; + char *buf; + int ret; + + ret = PQgetCopyData(c->conn, &buf, 1); + if (ret > 0) { + lem_debug("got data"); + ev_io_stop(EV_A_ &c->w); + + lua_pushlstring(c->T, buf, ret); + PQfreemem(buf); + lem_queue(c->T, 1); + c->T = NULL; + return; + } + + switch (ret) { + case 0: /* would block */ + lem_debug("would block"); + break; + + case -1: /* no more data */ + lem_debug("no more data"); + c->w.cb = exec_handler; + exec_handler(EV_A_ &c->w, revents); + break; + + default: /* should be -2 for error */ + ev_io_stop(EV_A_ &c->w); + lua_settop(c->T, 0); + push_error(c->T, c->conn); + lem_queue(c->T, 2); + c->T = NULL; + break; + } +} + +static int +connection_get(lua_State *T) +{ + struct connection *c; + char *buf; + int ret; + + luaL_checktype(T, 1, LUA_TUSERDATA); + c = lua_touserdata(T, 1); + if (c->T != NULL) { + lua_pushnil(T); + lua_pushliteral(T, "busy"); + return 2; + } + + if (c->conn == NULL) { + lua_pushnil(T); + lua_pushliteral(T, "closed"); + return 2; + } + + ret = PQgetCopyData(c->conn, &buf, 1); + if (ret > 0) { + lem_debug("got data"); + lua_pushlstring(T, buf, ret); + PQfreemem(buf); + return 1; + } + + switch (ret) { + case 0: /* would block */ + lem_debug("would block"); + c->T = T; + c->w.cb = get_handler; + c->w.events = EV_READ; + ev_io_start(LEM_ &c->w); + + lua_settop(T, 1); + return lua_yield(T, 1); + + case -1: /* no more data */ + lem_debug("no more data"); + break; + + default: /* should be -2 for error */ + push_error(T, c->conn); + return 2; + } + + c->T = T; + c->w.cb = exec_handler; + c->w.events = EV_READ; + ev_io_start(LEM_ &c->w); + + /* TODO: it is necessary but kinda ugly to call + * the exec_handler directly from here. + * find a better solution... */ + lua_settop(T, 1); + exec_handler(LEM_ &c->w, 0); + return lua_yield(T, 1); +} + +int +luaopen_lem_postgres(lua_State *L) +{ + lua_newtable(L); + + /* create Connection metatable mt */ + lua_newtable(L); + lua_pushvalue(L, -1); + lua_setfield(L, -2, "__index"); + /* mt.__gc = <connection_gc> */ + lua_pushcfunction(L, connection_gc); + lua_setfield(L, -2, "__gc"); + /* mt.close = <connection_close> */ + lua_pushcfunction(L, connection_close); + lua_setfield(L, -2, "close"); + /* mt.connect = <connection_connect> */ + lua_pushvalue(L, -1); /* upvalue 1: Connection */ + lua_pushcclosure(L, connection_connect, 1); + lua_setfield(L, -3, "connect"); + /* mt.reset = <connection_reset> */ + lua_pushcfunction(L, connection_reset); + lua_setfield(L, -2, "reset"); + /* mt.exec = <connection_exec> */ + lua_pushcfunction(L, connection_exec); + lua_setfield(L, -2, "exec"); + /* mt.prepare = <connection_prepare> */ + lua_pushcfunction(L, connection_prepare); + lua_setfield(L, -2, "prepare"); + /* mt.run = <connection_run> */ + lua_pushcfunction(L, connection_run); + lua_setfield(L, -2, "run"); + /* mt.put = <connection_put> */ + lua_pushcfunction(L, connection_put); + lua_setfield(L, -2, "put"); + /* mt.done = <connection_done> */ + lua_pushcfunction(L, connection_done); + lua_setfield(L, -2, "done"); + /* mt.get = <connection_get> */ + lua_pushcfunction(L, connection_get); + lua_setfield(L, -2, "get"); + /* set Connection */ + lua_setfield(L, -2, "Connection"); + + return 1; +} |