/*
* This file is part of lem-postgres.
* Copyright 2011 Emil Renner Berthing
* Copyright 2013 Asbjørn Sloth Tønnesen
*
* lem-postgres is free software: you can redistribute it and/or
* modify it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* lem-postgres is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with lem-postgres. If not, see .
*/
#include
#include
#include
#include
struct db {
struct ev_io w;
PGconn *conn;
};
static int
err_closed(lua_State *T)
{
lua_pushnil(T);
lua_pushliteral(T, "closed");
return 2;
}
static int
err_busy(lua_State *T)
{
lua_pushnil(T);
lua_pushliteral(T, "busy");
return 2;
}
static int
err_connection(lua_State *T, PGconn *conn)
{
const char *msg = PQerrorMessage(conn);
const char *p;
for (p = msg; *p != '\n' && *p != '\0'; p++);
lua_pushnil(T);
if (p > msg)
lua_pushlstring(T, msg, p - msg);
else
lua_pushliteral(T, "unknown error");
return 2;
}
static int
db_gc(lua_State *T)
{
struct db *d = lua_touserdata(T, 1);
if (d->conn != NULL)
PQfinish(d->conn);
return 0;
}
static int
db_close(lua_State *T)
{
struct db *d;
lua_State *S;
luaL_checktype(T, 1, LUA_TUSERDATA);
d = lua_touserdata(T, 1);
if (d->conn == NULL)
return err_closed(T);
S = d->w.data;
if (S != NULL) {
ev_io_stop(LEM_ &d->w);
lua_pushnil(S);
lua_pushliteral(S, "interrupted");
lem_queue(S, 2);
d->w.data = NULL;
}
PQfinish(d->conn);
d->conn = NULL;
lua_pushboolean(T, 1);
return 1;
}
static void
db_notice_receiver(void *arg, const PGresult *res)
{
(void)arg;
(void)res;
lem_debug("%s", PQresultErrorMessage(res));
}
static void
postgres_connect_cb(EV_P_ struct ev_io *w, int revents)
{
struct db *d = (struct db *)w;
lua_State *T = d->w.data;
(void)revents;
ev_io_stop(EV_A_ &d->w);
switch (PQconnectPoll(d->conn)) {
case PGRES_POLLING_READING:
lem_debug("PGRES_POLLING_READING, socket = %d", PQsocket(d->conn));
ev_io_set(&d->w, PQsocket(d->conn), EV_READ);
break;
case PGRES_POLLING_WRITING:
lem_debug("PGRES_POLLING_WRITING, socket = %d", PQsocket(d->conn));
ev_io_set(&d->w, PQsocket(d->conn), EV_WRITE);
break;
case PGRES_POLLING_FAILED:
lem_debug("PGRES_POLLING_FAILED");
lua_settop(T, 0);
lem_queue(T, err_connection(T, d->conn));
d->w.data = NULL;
PQfinish(d->conn);
d->conn = NULL;
return;
case PGRES_POLLING_OK:
lem_debug("PGRES_POLLING_OK");
lem_queue(T, 1);
d->w.data = NULL;
return;
#ifndef NDEBUG
case PGRES_POLLING_ACTIVE:
assert(0);
#endif
}
ev_io_start(EV_A_ &d->w);
}
static int
postgres_connect(lua_State *T)
{
const char *conninfo = luaL_checkstring(T, 1);
PGconn *conn;
struct db *d;
conn = PQconnectStart(conninfo);
if (conn == NULL) {
lua_pushnil(T);
lua_pushliteral(T, "out of memory");
return 2;
}
if (PQstatus(conn) == CONNECTION_BAD) {
lem_debug("CONNECTION_BAD");
goto error;
}
lua_settop(T, 0);
d = lua_newuserdata(T, sizeof(struct db));
lua_pushvalue(T, lua_upvalueindex(1));
lua_setmetatable(T, -2);
d->conn = conn;
PQsetNoticeReceiver(conn, db_notice_receiver, NULL);
switch (PQconnectPoll(conn)) {
case PGRES_POLLING_READING:
lem_debug("PGRES_POLLING_READING");
ev_io_init(&d->w, postgres_connect_cb, PQsocket(conn), EV_READ);
break;
case PGRES_POLLING_WRITING:
lem_debug("PGRES_POLLING_WRITING");
ev_io_init(&d->w, postgres_connect_cb, PQsocket(conn), EV_WRITE);
break;
case PGRES_POLLING_FAILED:
lem_debug("PGRES_POLLING_FAILED");
d->conn = NULL;
goto error;
case PGRES_POLLING_OK:
lem_debug("PGRES_POLLING_OK");
ev_io_init(&d->w, NULL, PQsocket(conn), 0);
return 1;
#ifndef NDEBUG
case PGRES_POLLING_ACTIVE:
assert(0);
#endif
}
d->w.data = T;
ev_io_start(LEM_ &d->w);
return lua_yield(T, 1);
error:
err_connection(T, conn);
PQfinish(conn);
return 2;
}
static void
db_reset_cb(EV_P_ struct ev_io *w, int revents)
{
struct db *d = (struct db *)w;
lua_State *T = d->w.data;
(void)revents;
ev_io_stop(EV_A_ &d->w);
switch (PQresetPoll(d->conn)) {
case PGRES_POLLING_READING:
lem_debug("PGRES_POLLING_READING, socket = %d", PQsocket(d->conn));
ev_io_set(&d->w, PQsocket(d->conn), EV_READ);
break;
case PGRES_POLLING_WRITING:
lem_debug("PGRES_POLLING_WRITING, socket = %d", PQsocket(d->conn));
ev_io_set(&d->w, PQsocket(d->conn), EV_WRITE);
break;
case PGRES_POLLING_FAILED:
lem_debug("PGRES_POLLING_FAILED");
lua_settop(T, 0);
lem_queue(T, err_connection(T, d->conn));
d->w.data = NULL;
PQfinish(d->conn);
d->conn = NULL;
return;
case PGRES_POLLING_OK:
lem_debug("PGRES_POLLING_OK");
lem_queue(T, 1);
d->w.data = NULL;
return;
#ifndef NDEBUG
case PGRES_POLLING_ACTIVE:
assert(0);
#endif
}
ev_io_start(EV_A_ &d->w);
}
static int
db_reset(lua_State *T)
{
struct db *d;
luaL_checktype(T, 1, LUA_TUSERDATA);
d = lua_touserdata(T, 1);
if (d->conn == NULL)
return err_closed(T);
if (d->w.data != NULL)
return err_busy(T);
if (PQresetStart(d->conn) != 1)
return err_connection(T, d->conn);
lua_settop(T, 1);
switch (PQresetPoll(d->conn)) {
case PGRES_POLLING_READING:
lem_debug("PGRES_POLLING_READING");
ev_io_set(&d->w, PQsocket(d->conn), EV_READ);
break;
case PGRES_POLLING_WRITING:
lem_debug("PGRES_POLLING_WRITING");
ev_io_set(&d->w, PQsocket(d->conn), EV_WRITE);
break;
case PGRES_POLLING_FAILED:
lem_debug("PGRES_POLLING_FAILED");
return err_connection(T, d->conn);
case PGRES_POLLING_OK:
lem_debug("PGRES_POLLING_OK");
return 1;
#ifndef NDEBUG
case PGRES_POLLING_ACTIVE:
assert(0);
#endif
}
d->w.data = T;
d->w.cb = db_reset_cb;
ev_io_start(LEM_ &d->w);
return lua_yield(T, 1);
}
static void
push_tuples(lua_State *T, PGresult *res)
{
int rows = PQntuples(res);
int columns = PQnfields(res);
int i;
lua_createtable(T, rows, 0);
for (i = 0; i < rows; i++) {
int j;
lua_createtable(T, columns, 0);
for (j = 0; j < columns; j++) {
if (PQgetisnull(res, i, j))
lua_pushnil(T);
else
lua_pushlstring(T, PQgetvalue(res, i, j),
PQgetlength(res, i, j));
lua_rawseti(T, -2, j+1);
}
lua_rawseti(T, -2, i+1);
}
/* insert column names as "row 0" */
lua_createtable(T, columns, 0);
for (i = 0; i < columns; i++) {
lua_pushstring(T, PQfname(res, i));
lua_rawseti(T, -2, i+1);
}
lua_rawseti(T, -2, 0);
}
static void
db_error_cb(EV_P_ struct ev_io *w, int revents)
{
struct db *d = (struct db *)w;
lua_State *T = d->w.data;
(void)revents;
if (PQconsumeInput(d->conn) == 0) {
ev_io_stop(EV_A_ &d->w);
lua_settop(T, 0);
lem_queue(T, err_connection(T, d->conn));
d->w.data = NULL;
return;
}
while (!PQisBusy(d->conn)) {
PGresult *res = PQgetResult(d->conn);
if (res == NULL) {
ev_io_stop(EV_A_ &d->w);
lem_queue(T, 2);
d->w.data = NULL;
return;
}
PQclear(res);
}
}
static void
db_exec_cb(EV_P_ struct ev_io *w, int revents)
{
struct db *d = (struct db *)w;
lua_State *T = d->w.data;
PGresult *res;
(void)revents;
if (PQconsumeInput(d->conn) != 1) {
ev_io_stop(EV_A_ &d->w);
lua_settop(T, 0);
lem_queue(T, err_connection(T, d->conn));
d->w.data = NULL;
return;
}
while (!PQisBusy(d->conn)) {
res = PQgetResult(d->conn);
if (res == NULL) {
ev_io_stop(EV_A_ &d->w);
lem_debug("returning %d values", lua_gettop(T) - 1);
lem_queue(T, lua_gettop(T) - 1);
d->w.data = NULL;
return;
}
switch (PQresultStatus(res)) {
case PGRES_EMPTY_QUERY:
lem_debug("PGRES_EMPTY_QUERY");
lua_settop(T, 0);
lua_pushnil(T);
lua_pushliteral(T, "empty query");
goto error;
case PGRES_COMMAND_OK:
lem_debug("PGRES_COMMAND_OK");
lua_pushboolean(T, 1);
break;
case PGRES_TUPLES_OK:
lem_debug("PGRES_TUPLES_OK");
push_tuples(T, res);
break;
case PGRES_COPY_IN:
lem_debug("PGRES_COPY_IN");
(void)PQsetnonblocking(d->conn, 1);
case PGRES_COPY_OUT:
lem_debug("PGRES_COPY_OUT");
PQclear(res);
lua_pushboolean(T, 1);
lem_queue(T, lua_gettop(T) - 1);
d->w.data = NULL;
return;
case PGRES_BAD_RESPONSE:
lem_debug("PGRES_BAD_RESPONSE");
lua_settop(T, 0);
err_connection(T, d->conn);
goto error;
case PGRES_NONFATAL_ERROR:
lem_debug("PGRES_NONFATAL_ERROR");
break;
case PGRES_FATAL_ERROR:
lem_debug("PGRES_FATAL_ERROR");
lua_settop(T, 0);
err_connection(T, d->conn);
goto error;
default:
lem_debug("unknown result status");
lua_settop(T, 0);
lua_pushnil(T);
lua_pushliteral(T, "unknown result status");
goto error;
}
PQclear(res);
}
lem_debug("busy");
return;
error:
PQclear(res);
d->w.cb = db_error_cb;
while (!PQisBusy(d->conn)) {
res = PQgetResult(d->conn);
if (res == NULL) {
ev_io_stop(EV_A_ &d->w);
lem_queue(T, 2);
d->w.data = NULL;
return;
}
PQclear(res);
}
}
static int
db_exec(lua_State *T)
{
struct db *d;
const char *command;
int n;
luaL_checktype(T, 1, LUA_TUSERDATA);
command = luaL_checkstring(T, 2);
d = lua_touserdata(T, 1);
if (d->conn == NULL)
return err_closed(T);
if (d->w.data != NULL)
return err_busy(T);
n = lua_gettop(T) - 2;
if (n > 0) {
const char **values = lem_xmalloc(n * sizeof(char *));
int *lengths = lem_xmalloc(n * sizeof(int));
int i;
for (i = 0; i < n; i++) {
size_t len;
const char *val;
if (lua_isnil(T, i+3)) {
val = NULL;
/* len is ignored by libpq */
} else {
val = lua_tolstring(T, i+3, &len);
if (val == NULL) {
free(values);
free(lengths);
return luaL_argerror(T, i+3, "expected nil or string");
}
}
values[i] = val;
lengths[i] = len;
}
n = PQsendQueryParams(d->conn, command, n,
NULL, values, lengths, NULL, 0);
free(values);
free(lengths);
} else
n = PQsendQuery(d->conn, command);
if (n != 1) {
lem_debug("PQsendQuery failed");
return err_connection(T, d->conn);
}
d->w.data = T;
d->w.cb = db_exec_cb;
ev_io_set(&d->w, PQsocket(d->conn), EV_READ);
ev_io_start(LEM_ &d->w);
lua_settop(T, 1);
return lua_yield(T, 1);
}
static int
db_prepare(lua_State *T)
{
struct db *d;
const char *name;
const char *query;
luaL_checktype(T, 1, LUA_TUSERDATA);
name = luaL_checkstring(T, 2);
query = luaL_checkstring(T, 3);
d = lua_touserdata(T, 1);
if (d->conn == NULL)
return err_closed(T);
if (d->w.data != NULL)
return err_busy(T);
if (PQsendPrepare(d->conn, name, query, 0, NULL) != 1)
return err_connection(T, d->conn);
d->w.data = T;
d->w.cb = db_exec_cb;
ev_io_set(&d->w, PQsocket(d->conn), EV_READ);
ev_io_start(LEM_ &d->w);
lua_settop(T, 1);
return lua_yield(T, 1);
}
static int
db_run(lua_State *T)
{
struct db *d;
const char *name;
int n;
luaL_checktype(T, 1, LUA_TUSERDATA);
name = luaL_checkstring(T, 2);
d = lua_touserdata(T, 1);
if (d->conn == NULL)
return err_closed(T);
if (d->w.data != NULL)
return err_busy(T);
n = lua_gettop(T) - 2;
if (n > 0) {
const char **values = lem_xmalloc(n * sizeof(char *));
int *lengths = lem_xmalloc(n * sizeof(int));
int i;
for (i = 0; i < n; i++) {
size_t len;
const char *val;
if (lua_isnil(T, i+3)) {
val = NULL;
/* len is ignored by libpq */
} else {
val = lua_tolstring(T, i+3, &len);
if (val == NULL) {
free(values);
free(lengths);
return luaL_argerror(T, i+3, "expected nil or string");
}
}
values[i] = val;
lengths[i] = len;
}
n = PQsendQueryPrepared(d->conn, name, n,
values, lengths, NULL, 0);
free(values);
free(lengths);
} else
n = PQsendQueryPrepared(d->conn, name, 0, NULL, NULL, NULL, 0);
if (n != 1)
return err_connection(T, d->conn);
d->w.data = T;
d->w.cb = db_exec_cb;
ev_io_set(&d->w, PQsocket(d->conn), EV_READ);
ev_io_start(LEM_ &d->w);
lua_settop(T, 1);
return lua_yield(T, 1);
}
static void
db_put_cb(EV_P_ struct ev_io *w, int revents)
{
struct db *d = (struct db *)w;
lua_State *T = d->w.data;
size_t len;
const char *data;
int ret;
(void)revents;
data = lua_tolstring(T, 2, &len);
switch (PQputCopyData(d->conn, data, (int)len)) {
case 1: /* data sent */
lem_debug("data sent");
lua_settop(T, 0);
lua_pushboolean(T, 1);
ret = 1;
break;
case 0: /* would block */
lem_debug("would block");
return;
default: /* should be -1 for error */
lem_debug("error");
lua_settop(T, 0);
ret = err_connection(T, d->conn);
break;
}
ev_io_stop(EV_A_ &d->w);
lem_queue(T, ret);
d->w.data = NULL;
}
static int
db_put(lua_State *T)
{
struct db *d;
size_t len;
const char *data;
luaL_checktype(T, 1, LUA_TUSERDATA);
data = luaL_checklstring(T, 2, &len);
d = lua_touserdata(T, 1);
if (d->conn == NULL)
return err_closed(T);
if (d->w.data != NULL)
return err_busy(T);
switch (PQputCopyData(d->conn, data, (int)len)) {
case 1: /* data sent */
lem_debug("data sent");
lua_pushboolean(T, 1);
return 1;
case 0: /* would block */
lem_debug("would block");
break;
default: /* should be -1 for error */
return err_connection(T, d->conn);
}
d->w.data = T;
d->w.cb = db_put_cb;
ev_io_set(&d->w, PQsocket(d->conn), EV_WRITE);
ev_io_start(LEM_ &d->w);
lua_settop(T, 2);
return lua_yield(T, 2);
}
static void
db_done_cb(EV_P_ struct ev_io *w, int revents)
{
struct db *d = (struct db *)w;
lua_State *T = d->w.data;
const char *error = lua_tostring(T, 2);
(void)revents;
switch (PQputCopyEnd(d->conn, error)) {
case 1: /* data sent */
lem_debug("data sent");
ev_io_stop(EV_A_ &d->w);
lua_settop(T, 1);
(void)PQsetnonblocking(d->conn, 0);
d->w.cb = db_exec_cb;
ev_io_set(&d->w, PQsocket(d->conn), EV_READ);
ev_io_start(EV_A_ &d->w);
break;
case 0: /* would block */
lem_debug("would block");
break;
default: /* should be -1 for error */
ev_io_stop(EV_A_ &d->w);
lua_settop(T, 0);
lem_queue(T, err_connection(T, d->conn));
d->w.data = NULL;
break;
}
}
static int
db_done(lua_State *T)
{
struct db *d;
const char *error;
luaL_checktype(T, 1, LUA_TUSERDATA);
error = luaL_optstring(T, 2, NULL);
d = lua_touserdata(T, 1);
if (d->conn == NULL)
return err_closed(T);
if (d->w.data != NULL)
return err_busy(T);
switch (PQputCopyEnd(d->conn, error)) {
case 1: /* data sent */
lem_debug("data sent");
(void)PQsetnonblocking(d->conn, 0);
d->w.data = T;
d->w.cb = db_exec_cb;
ev_io_set(&d->w, PQsocket(d->conn), EV_READ);
ev_io_start(LEM_ &d->w);
lua_settop(T, 1);
return lua_yield(T, 1);
case 0: /* would block */
lem_debug("would block");
break;
default: /* should be -1 for error */
return err_connection(T, d->conn);
}
d->w.data = T;
d->w.cb = db_done_cb;
ev_io_set(&d->w, PQsocket(d->conn), EV_WRITE);
ev_io_start(LEM_ &d->w);
if (error == NULL) {
lua_settop(T, 1);
lua_pushnil(T);
} else
lua_settop(T, 2);
return lua_yield(T, 2);
}
static void
db_get_cb(EV_P_ struct ev_io *w, int revents)
{
struct db *d = (struct db *)w;
lua_State *T = d->w.data;
char *buf;
int ret;
ret = PQgetCopyData(d->conn, &buf, 1);
if (ret > 0) {
lem_debug("got data");
ev_io_stop(EV_A_ &d->w);
lua_pushlstring(T, buf, ret);
PQfreemem(buf);
lem_queue(T, 1);
d->w.data = NULL;
return;
}
switch (ret) {
case 0: /* would block */
lem_debug("would block");
break;
case -1: /* no more data */
lem_debug("no more data");
d->w.cb = db_exec_cb;
db_exec_cb(EV_A_ &d->w, revents);
break;
default: /* should be -2 for error */
ev_io_stop(EV_A_ &d->w);
lua_settop(T, 0);
lem_queue(T, err_connection(T, d->conn));
d->w.data = NULL;
break;
}
}
static int
db_get(lua_State *T)
{
struct db *d;
char *buf;
int ret;
luaL_checktype(T, 1, LUA_TUSERDATA);
d = lua_touserdata(T, 1);
if (d->conn == NULL)
return err_closed(T);
if (d->w.data != NULL)
return err_busy(T);
ret = PQgetCopyData(d->conn, &buf, 1);
if (ret > 0) {
lem_debug("got data");
lua_pushlstring(T, buf, ret);
PQfreemem(buf);
return 1;
}
switch (ret) {
case 0: /* would block */
lem_debug("would block");
d->w.data = T;
d->w.cb = db_get_cb;
ev_io_set(&d->w, PQsocket(d->conn), EV_READ);
ev_io_start(LEM_ &d->w);
lua_settop(T, 1);
return lua_yield(T, 1);
case -1: /* no more data */
lem_debug("no more data");
break;
default: /* should be -2 for error */
return err_connection(T, d->conn);
}
d->w.data = T;
d->w.cb = db_exec_cb;
ev_io_set(&d->w, PQsocket(d->conn), EV_READ);
ev_io_start(LEM_ &d->w);
/* TODO: it is necessary but kinda ugly to call
* the db_exec_cb directly from here.
* find a better solution... */
lua_settop(T, 1);
db_exec_cb(LEM_ &d->w, 0);
return lua_yield(T, 1);
}
int
luaopen_lem_postgres(lua_State *L)
{
lua_newtable(L);
/* create Connection metatable mt */
lua_newtable(L);
lua_pushvalue(L, -1);
lua_setfield(L, -2, "__index");
/* mt.__gc = */
lua_pushcfunction(L, db_gc);
lua_setfield(L, -2, "__gc");
/* mt.close = */
lua_pushcfunction(L, db_close);
lua_setfield(L, -2, "close");
/* mt.reset = */
lua_pushcfunction(L, db_reset);
lua_setfield(L, -2, "reset");
/* mt.exec = */
lua_pushcfunction(L, db_exec);
lua_setfield(L, -2, "exec");
/* mt.prepare = */
lua_pushcfunction(L, db_prepare);
lua_setfield(L, -2, "prepare");
/* mt.run = */
lua_pushcfunction(L, db_run);
lua_setfield(L, -2, "run");
/* mt.put = */
lua_pushcfunction(L, db_put);
lua_setfield(L, -2, "put");
/* mt.done = */
lua_pushcfunction(L, db_done);
lua_setfield(L, -2, "done");
/* mt.get = */
lua_pushcfunction(L, db_get);
lua_setfield(L, -2, "get");
/* connect = */
lua_pushvalue(L, -1); /* upvalue 1: Connection */
lua_pushcclosure(L, postgres_connect, 1);
lua_setfield(L, -3, "connect");
/* set Connection */
lua_setfield(L, -2, "Connection");
return 1;
}