summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lem/postgres.c171
-rw-r--r--lem/postgres/queued.lua26
-rwxr-xr-xtest.lua48
3 files changed, 218 insertions, 27 deletions
diff --git a/lem/postgres.c b/lem/postgres.c
index 7d1ec37..141a084 100644
--- a/lem/postgres.c
+++ b/lem/postgres.c
@@ -25,6 +25,7 @@
struct db {
struct ev_io w;
PGconn *conn;
+ lua_State *notifier;
};
static int
@@ -59,19 +60,72 @@ err_connection(lua_State *T, PGconn *conn)
return 2;
}
-static int
-db_gc(lua_State *T)
+static void db_do_notification(lua_State *N, PGnotify *n)
{
- struct db *d = lua_touserdata(T, 1);
+ lua_State *S = lem_newthread();
- if (d->conn != NULL)
- PQfinish(d->conn);
+ /* copy function from notifier */
+ lua_pushnil(N);
+ lua_copy(N, 1, 2);
+ lua_xmove(N, S, 1);
- return 0;
+ /* push arguments */
+ lua_pushstring(S, n->relname);
+ lua_pushstring(S, n->extra);
+ lua_pushinteger(S, n->be_pid);
+
+ lem_queue(S, 3);
}
-static int
-db_close(lua_State *T)
+static void
+db_process_notifications(struct db *d)
+{
+ lua_State *N = d->notifier;
+ PGnotify *n;
+
+ if (N == NULL) return;
+ while ((n = PQnotifies(d->conn)) != NULL) {
+ lem_debug("received notification '%s' with payload \"%s\" from PID %d\n",
+ n->relname, n->extra, n->be_pid);
+ db_do_notification(N, n);
+ PQfreemem(n);
+ }
+}
+
+static void
+db_idle_cb(EV_P_ struct ev_io *w, int revents)
+{
+ struct db *d = (struct db *)w;
+ lem_debug("awoken during idle\n");
+ PQconsumeInput(d->conn);
+ db_process_notifications(d);
+}
+
+static void
+db_idle_or_stop(struct db *d)
+{
+ if (d->notifier == NULL) {
+ ev_io_stop(LEM_ &d->w);
+ } else {
+ d->w.cb = db_idle_cb;
+ ev_io_start(LEM_ &d->w);
+ }
+}
+
+static void
+db_unset_notifier(struct db *d)
+{
+ lua_State *N = d->notifier;
+ if (N != NULL) {
+ lem_forgetthread(N);
+ d->notifier = NULL;
+ }
+ if (d->w.cb == db_idle_cb)
+ ev_io_stop(LEM_ &d->w);
+}
+
+static struct db *
+db_cleanup(lua_State *T)
{
struct db *d;
lua_State *S;
@@ -79,20 +133,47 @@ db_close(lua_State *T)
luaL_checktype(T, 1, LUA_TUSERDATA);
d = lua_touserdata(T, 1);
if (d->conn == NULL)
- return err_closed(T);
+ return d;
S = d->w.data;
+ ev_io_stop(LEM_ &d->w);
if (S != NULL) {
- ev_io_stop(LEM_ &d->w);
lua_pushnil(S);
lua_pushliteral(S, "interrupted");
lem_queue(S, 2);
d->w.data = NULL;
}
-
PQfinish(d->conn);
+
+ return d;
+}
+
+static int
+db_gc(lua_State *T)
+{
+ struct db *d = db_cleanup(T);
+ d->conn = NULL;
+
+ // leaking d->notifier since it might be
+ // unsafe to call lem_forgetthread here,
+ // in case that db_gc is called during
+ // LEM shutdown.
+
+ return 0;
+}
+
+static int
+db_close(lua_State *T)
+{
+ struct db *d = db_cleanup(T);
+
+ if (d->conn == NULL)
+ return err_closed(T);
+
d->conn = NULL;
+ db_unset_notifier(d);
+
lua_pushboolean(T, 1);
return 1;
}
@@ -113,7 +194,7 @@ postgres_connect_cb(EV_P_ struct ev_io *w, int revents)
(void)revents;
- ev_io_stop(EV_A_ &d->w);
+ db_idle_or_stop(d);
switch (PQconnectPoll(d->conn)) {
case PGRES_POLLING_READING:
lem_debug("PGRES_POLLING_READING, socket = %d", PQsocket(d->conn));
@@ -146,6 +227,7 @@ postgres_connect_cb(EV_P_ struct ev_io *w, int revents)
#endif
}
+ d->w.cb = postgres_connect_cb;
ev_io_start(EV_A_ &d->w);
}
@@ -170,6 +252,7 @@ postgres_connect(lua_State *T)
lua_settop(T, 0);
d = lua_newuserdata(T, sizeof(struct db));
+ d->notifier = NULL;
lua_pushvalue(T, lua_upvalueindex(1));
lua_setmetatable(T, -2);
@@ -220,7 +303,7 @@ db_reset_cb(EV_P_ struct ev_io *w, int revents)
(void)revents;
- ev_io_stop(EV_A_ &d->w);
+ db_idle_or_stop(d);
switch (PQresetPoll(d->conn)) {
case PGRES_POLLING_READING:
lem_debug("PGRES_POLLING_READING, socket = %d", PQsocket(d->conn));
@@ -253,6 +336,7 @@ db_reset_cb(EV_P_ struct ev_io *w, int revents)
#endif
}
+ d->w.cb = db_reset_cb;
ev_io_start(EV_A_ &d->w);
}
@@ -381,11 +465,13 @@ db_exec_cb(EV_P_ struct ev_io *w, int revents)
return;
}
+ db_process_notifications(d);
+
while (!PQisBusy(d->conn)) {
res = PQgetResult(d->conn);
if (res == NULL) {
- ev_io_stop(EV_A_ &d->w);
+ db_idle_or_stop(d);
lem_debug("returning %d values", lua_gettop(T) - 1);
lem_queue(T, lua_gettop(T) - 1);
d->w.data = NULL;
@@ -457,7 +543,7 @@ error:
res = PQgetResult(d->conn);
if (res == NULL) {
- ev_io_stop(EV_A_ &d->w);
+ db_idle_or_stop(d);
lem_queue(T, 2);
d->w.data = NULL;
return;
@@ -639,7 +725,7 @@ db_put_cb(EV_P_ struct ev_io *w, int revents)
break;
}
- ev_io_stop(EV_A_ &d->w);
+ db_idle_or_stop(d);
lem_queue(T, ret);
d->w.data = NULL;
}
@@ -695,7 +781,7 @@ db_done_cb(EV_P_ struct ev_io *w, int revents)
switch (PQputCopyEnd(d->conn, error)) {
case 1: /* data sent */
lem_debug("data sent");
- ev_io_stop(EV_A_ &d->w);
+ db_idle_or_stop(d);
lua_settop(T, 1);
(void)PQsetnonblocking(d->conn, 0);
d->w.cb = db_exec_cb;
@@ -774,7 +860,7 @@ db_get_cb(EV_P_ struct ev_io *w, int revents)
ret = PQgetCopyData(d->conn, &buf, 1);
if (ret > 0) {
lem_debug("got data");
- ev_io_stop(EV_A_ &d->w);
+ db_idle_or_stop(d);
lua_pushlstring(T, buf, ret);
PQfreemem(buf);
@@ -857,6 +943,52 @@ db_get(lua_State *T)
return lua_yield(T, 1);
}
+static int
+db_set_notifier(lua_State *T)
+{
+ struct db *d;
+ lua_State *N;
+ int type;
+
+ luaL_checktype(T, 1, LUA_TUSERDATA);
+ d = lua_touserdata(T, 1);
+
+ if (d->conn == NULL)
+ return err_closed(T);
+
+ if (lua_gettop(T) < 2) {
+ db_unset_notifier(d);
+ return;
+ }
+
+ type = lua_type(T, 2);
+
+ if (type == LUA_TNIL) {
+ db_unset_notifier(d);
+ return;
+ }
+
+ if (type != LUA_TFUNCTION)
+ return luaL_argerror(T, 2, "expected nil or a function");
+
+ lua_settop(T, 2);
+
+ N = d->notifier;
+ if (N == NULL)
+ N = d->notifier = lem_newthread();
+ else
+ lua_settop(N, 0);
+ lua_xmove(T, N, 1);
+
+ /* process outstanding notifications */
+ db_process_notifications(d);
+
+ /* make sure we are listening for new ones */
+ db_idle_or_stop(d);
+
+ return 0;
+}
+
int
luaopen_lem_postgres(lua_State *L)
{
@@ -893,6 +1025,9 @@ luaopen_lem_postgres(lua_State *L)
/* mt.get = <db_get> */
lua_pushcfunction(L, db_get);
lua_setfield(L, -2, "get");
+ /* mt.set_notifier = <db_set_notifier> */
+ lua_pushcfunction(L, db_set_notifier);
+ lua_setfield(L, -2, "set_notifier");
/* connect = <postgres_connect> */
lua_pushvalue(L, -1); /* upvalue 1: Connection */
diff --git a/lem/postgres/queued.lua b/lem/postgres/queued.lua
index 2f597f4..f405dc0 100644
--- a/lem/postgres/queued.lua
+++ b/lem/postgres/queued.lua
@@ -1,6 +1,7 @@
--
-- This file is part of lem-postgres.
-- Copyright 2011 Emil Renner Berthing
+-- Copyright 2020 Asbjørn Sloth Tønnesen
--
-- lem-postgres is free software: you can redistribute it and/or
-- modify it under the terms of the GNU General Public License as
@@ -22,18 +23,32 @@ local postgres = require 'lem.postgres'
local remove = table.remove
local thisthread, suspend, resume = utils.thisthread, utils.suspend, utils.resume
+local Connection = postgres.Connection
+
local QConnection = {}
QConnection.__index = QConnection
+function QConnection:set_notifier(cb)
+ Connection.set_notifier(self.conn, cb)
+end
+
+local function cancel_all(t)
+ for i = 1, t.n - 1 do
+ resume(t[i])
+ t[i] = nil
+ end
+end
+
function QConnection:close()
local ok, err = self.conn:close()
- for i = 1, self.n - 1 do
- resume(self[i])
- self[i] = nil
- end
+ cancel_all(self)
return ok, err
end
-QConnection.__gc = QConnection.close
+
+function QConnection:__gc()
+ cancel_all(self)
+ self.conn = nil
+end
local function lock(self)
local n = self.n
@@ -64,7 +79,6 @@ local function wrap(method)
end
end
-local Connection = postgres.Connection
QConnection.exec = wrap(Connection.exec)
QConnection.prepare = wrap(Connection.prepare)
QConnection.run = wrap(Connection.run)
diff --git a/test.lua b/test.lua
index 03658d8..80f774f 100755
--- a/test.lua
+++ b/test.lua
@@ -60,15 +60,17 @@ end
local utils = require 'lem.utils'
local postgres = require 'lem.postgres'
-local db = assert(postgres.connect([[
+local dbconnstr = [[
host=localhost
user=myuser
password=mypasswd
dbname=mydb
-]]))
+]]
+
+local db = assert(postgres.connect(dbconnstr))
assert(db:exec(
-'CREATE TABLE mytable (id serial PRIMARY KEY, name TEXT, foo integer)'))
+'CREATE TEMPORARY TABLE mytable (id serial PRIMARY KEY, name TEXT, foo integer)'))
assert(db:exec("COPY mytable (name, foo) FROM STDIN (delimiter ',')"))
assert(db:put('alpha,1\n'))
@@ -107,6 +109,46 @@ end
assert(db:exec('DROP TABLE mytable'))
+assert(db:exec('LISTEN mytest;'))
+
+assert(db:exec('NOTIFY mytest;'))
+assert(db:exec("NOTIFY mytest, 'foo';"))
+
+db:set_notifier(function(ev, payload, pid)
+ print('notification', ev, payload, pid)
+end)
+
+assert(db:exec('SELECT pg_notify($1, $2);', 'mytest', 'bar'))
+assert(db:exec('SELECT pg_notify($1, $2), pg_notify($1, $3);', 'mytest', 'foo', 'bar'))
+
+db:set_notifier(function(ev, payload, pid)
+ print('\nnotification', ev, payload, pid)
+end)
+
+utils.spawn(function()
+ local sleeper = utils.newsleeper()
+ for i=1,19 do
+ sleeper:sleep(0.20)
+ io.write('.')
+ end
+ print('')
+end)
+
+
+utils.spawn(function()
+ local sleeper = utils.newsleeper()
+ local db2 = assert(postgres.connect(dbconnstr))
+ for i=1,3 do
+ sleeper:sleep(1)
+ assert(db2:exec("NOTIFY mytest, 'foo from secondary connection';"))
+ end
+end)
+
+local sleeper = utils.newsleeper()
+print('\ngoing to sleep')
+sleeper:sleep(4)
+print('awake again\n')
+
print("Exiting " .. arg[0])
-- vim: syntax=lua ts=2 sw=2 noet: