From fdfecee2044cd1ea80012cdf9f0b4cf96089deb0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Asbj=C3=B8rn=20Sloth=20T=C3=B8nnesen?= Date: Wed, 3 Jun 2020 20:27:20 +0000 Subject: Add support for asynchronous notifications MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit If a notification handler is set using db:set_notifier(cb) then the event loop will be kept alive until db:close() is called or the event loop. Prior to this a database connection would never keep the event loop running. https://www.postgresql.org/docs/current/libpq-notify.html Signed-off-by: Asbjørn Sloth Tønnesen --- lem/postgres.c | 171 +++++++++++++++++++++++++++++++++++++++++++----- lem/postgres/queued.lua | 26 ++++++-- test.lua | 48 +++++++++++++- 3 files changed, 218 insertions(+), 27 deletions(-) diff --git a/lem/postgres.c b/lem/postgres.c index 7d1ec37..141a084 100644 --- a/lem/postgres.c +++ b/lem/postgres.c @@ -25,6 +25,7 @@ struct db { struct ev_io w; PGconn *conn; + lua_State *notifier; }; static int @@ -59,19 +60,72 @@ err_connection(lua_State *T, PGconn *conn) return 2; } -static int -db_gc(lua_State *T) +static void db_do_notification(lua_State *N, PGnotify *n) { - struct db *d = lua_touserdata(T, 1); + lua_State *S = lem_newthread(); - if (d->conn != NULL) - PQfinish(d->conn); + /* copy function from notifier */ + lua_pushnil(N); + lua_copy(N, 1, 2); + lua_xmove(N, S, 1); - return 0; + /* push arguments */ + lua_pushstring(S, n->relname); + lua_pushstring(S, n->extra); + lua_pushinteger(S, n->be_pid); + + lem_queue(S, 3); } -static int -db_close(lua_State *T) +static void +db_process_notifications(struct db *d) +{ + lua_State *N = d->notifier; + PGnotify *n; + + if (N == NULL) return; + while ((n = PQnotifies(d->conn)) != NULL) { + lem_debug("received notification '%s' with payload \"%s\" from PID %d\n", + n->relname, n->extra, n->be_pid); + db_do_notification(N, n); + PQfreemem(n); + } +} + +static void +db_idle_cb(EV_P_ struct ev_io *w, int revents) +{ + struct db *d = (struct db *)w; + lem_debug("awoken during idle\n"); + PQconsumeInput(d->conn); + db_process_notifications(d); +} + +static void +db_idle_or_stop(struct db *d) +{ + if (d->notifier == NULL) { + ev_io_stop(LEM_ &d->w); + } else { + d->w.cb = db_idle_cb; + ev_io_start(LEM_ &d->w); + } +} + +static void +db_unset_notifier(struct db *d) +{ + lua_State *N = d->notifier; + if (N != NULL) { + lem_forgetthread(N); + d->notifier = NULL; + } + if (d->w.cb == db_idle_cb) + ev_io_stop(LEM_ &d->w); +} + +static struct db * +db_cleanup(lua_State *T) { struct db *d; lua_State *S; @@ -79,20 +133,47 @@ db_close(lua_State *T) luaL_checktype(T, 1, LUA_TUSERDATA); d = lua_touserdata(T, 1); if (d->conn == NULL) - return err_closed(T); + return d; S = d->w.data; + ev_io_stop(LEM_ &d->w); if (S != NULL) { - ev_io_stop(LEM_ &d->w); lua_pushnil(S); lua_pushliteral(S, "interrupted"); lem_queue(S, 2); d->w.data = NULL; } - PQfinish(d->conn); + + return d; +} + +static int +db_gc(lua_State *T) +{ + struct db *d = db_cleanup(T); + d->conn = NULL; + + // leaking d->notifier since it might be + // unsafe to call lem_forgetthread here, + // in case that db_gc is called during + // LEM shutdown. + + return 0; +} + +static int +db_close(lua_State *T) +{ + struct db *d = db_cleanup(T); + + if (d->conn == NULL) + return err_closed(T); + d->conn = NULL; + db_unset_notifier(d); + lua_pushboolean(T, 1); return 1; } @@ -113,7 +194,7 @@ postgres_connect_cb(EV_P_ struct ev_io *w, int revents) (void)revents; - ev_io_stop(EV_A_ &d->w); + db_idle_or_stop(d); switch (PQconnectPoll(d->conn)) { case PGRES_POLLING_READING: lem_debug("PGRES_POLLING_READING, socket = %d", PQsocket(d->conn)); @@ -146,6 +227,7 @@ postgres_connect_cb(EV_P_ struct ev_io *w, int revents) #endif } + d->w.cb = postgres_connect_cb; ev_io_start(EV_A_ &d->w); } @@ -170,6 +252,7 @@ postgres_connect(lua_State *T) lua_settop(T, 0); d = lua_newuserdata(T, sizeof(struct db)); + d->notifier = NULL; lua_pushvalue(T, lua_upvalueindex(1)); lua_setmetatable(T, -2); @@ -220,7 +303,7 @@ db_reset_cb(EV_P_ struct ev_io *w, int revents) (void)revents; - ev_io_stop(EV_A_ &d->w); + db_idle_or_stop(d); switch (PQresetPoll(d->conn)) { case PGRES_POLLING_READING: lem_debug("PGRES_POLLING_READING, socket = %d", PQsocket(d->conn)); @@ -253,6 +336,7 @@ db_reset_cb(EV_P_ struct ev_io *w, int revents) #endif } + d->w.cb = db_reset_cb; ev_io_start(EV_A_ &d->w); } @@ -381,11 +465,13 @@ db_exec_cb(EV_P_ struct ev_io *w, int revents) return; } + db_process_notifications(d); + while (!PQisBusy(d->conn)) { res = PQgetResult(d->conn); if (res == NULL) { - ev_io_stop(EV_A_ &d->w); + db_idle_or_stop(d); lem_debug("returning %d values", lua_gettop(T) - 1); lem_queue(T, lua_gettop(T) - 1); d->w.data = NULL; @@ -457,7 +543,7 @@ error: res = PQgetResult(d->conn); if (res == NULL) { - ev_io_stop(EV_A_ &d->w); + db_idle_or_stop(d); lem_queue(T, 2); d->w.data = NULL; return; @@ -639,7 +725,7 @@ db_put_cb(EV_P_ struct ev_io *w, int revents) break; } - ev_io_stop(EV_A_ &d->w); + db_idle_or_stop(d); lem_queue(T, ret); d->w.data = NULL; } @@ -695,7 +781,7 @@ db_done_cb(EV_P_ struct ev_io *w, int revents) switch (PQputCopyEnd(d->conn, error)) { case 1: /* data sent */ lem_debug("data sent"); - ev_io_stop(EV_A_ &d->w); + db_idle_or_stop(d); lua_settop(T, 1); (void)PQsetnonblocking(d->conn, 0); d->w.cb = db_exec_cb; @@ -774,7 +860,7 @@ db_get_cb(EV_P_ struct ev_io *w, int revents) ret = PQgetCopyData(d->conn, &buf, 1); if (ret > 0) { lem_debug("got data"); - ev_io_stop(EV_A_ &d->w); + db_idle_or_stop(d); lua_pushlstring(T, buf, ret); PQfreemem(buf); @@ -857,6 +943,52 @@ db_get(lua_State *T) return lua_yield(T, 1); } +static int +db_set_notifier(lua_State *T) +{ + struct db *d; + lua_State *N; + int type; + + luaL_checktype(T, 1, LUA_TUSERDATA); + d = lua_touserdata(T, 1); + + if (d->conn == NULL) + return err_closed(T); + + if (lua_gettop(T) < 2) { + db_unset_notifier(d); + return; + } + + type = lua_type(T, 2); + + if (type == LUA_TNIL) { + db_unset_notifier(d); + return; + } + + if (type != LUA_TFUNCTION) + return luaL_argerror(T, 2, "expected nil or a function"); + + lua_settop(T, 2); + + N = d->notifier; + if (N == NULL) + N = d->notifier = lem_newthread(); + else + lua_settop(N, 0); + lua_xmove(T, N, 1); + + /* process outstanding notifications */ + db_process_notifications(d); + + /* make sure we are listening for new ones */ + db_idle_or_stop(d); + + return 0; +} + int luaopen_lem_postgres(lua_State *L) { @@ -893,6 +1025,9 @@ luaopen_lem_postgres(lua_State *L) /* mt.get = */ lua_pushcfunction(L, db_get); lua_setfield(L, -2, "get"); + /* mt.set_notifier = */ + lua_pushcfunction(L, db_set_notifier); + lua_setfield(L, -2, "set_notifier"); /* connect = */ lua_pushvalue(L, -1); /* upvalue 1: Connection */ diff --git a/lem/postgres/queued.lua b/lem/postgres/queued.lua index 2f597f4..f405dc0 100644 --- a/lem/postgres/queued.lua +++ b/lem/postgres/queued.lua @@ -1,6 +1,7 @@ -- -- This file is part of lem-postgres. -- Copyright 2011 Emil Renner Berthing +-- Copyright 2020 Asbjørn Sloth Tønnesen -- -- lem-postgres is free software: you can redistribute it and/or -- modify it under the terms of the GNU General Public License as @@ -22,18 +23,32 @@ local postgres = require 'lem.postgres' local remove = table.remove local thisthread, suspend, resume = utils.thisthread, utils.suspend, utils.resume +local Connection = postgres.Connection + local QConnection = {} QConnection.__index = QConnection +function QConnection:set_notifier(cb) + Connection.set_notifier(self.conn, cb) +end + +local function cancel_all(t) + for i = 1, t.n - 1 do + resume(t[i]) + t[i] = nil + end +end + function QConnection:close() local ok, err = self.conn:close() - for i = 1, self.n - 1 do - resume(self[i]) - self[i] = nil - end + cancel_all(self) return ok, err end -QConnection.__gc = QConnection.close + +function QConnection:__gc() + cancel_all(self) + self.conn = nil +end local function lock(self) local n = self.n @@ -64,7 +79,6 @@ local function wrap(method) end end -local Connection = postgres.Connection QConnection.exec = wrap(Connection.exec) QConnection.prepare = wrap(Connection.prepare) QConnection.run = wrap(Connection.run) diff --git a/test.lua b/test.lua index 03658d8..80f774f 100755 --- a/test.lua +++ b/test.lua @@ -60,15 +60,17 @@ end local utils = require 'lem.utils' local postgres = require 'lem.postgres' -local db = assert(postgres.connect([[ +local dbconnstr = [[ host=localhost user=myuser password=mypasswd dbname=mydb -]])) +]] + +local db = assert(postgres.connect(dbconnstr)) assert(db:exec( -'CREATE TABLE mytable (id serial PRIMARY KEY, name TEXT, foo integer)')) +'CREATE TEMPORARY TABLE mytable (id serial PRIMARY KEY, name TEXT, foo integer)')) assert(db:exec("COPY mytable (name, foo) FROM STDIN (delimiter ',')")) assert(db:put('alpha,1\n')) @@ -107,6 +109,46 @@ end assert(db:exec('DROP TABLE mytable')) +assert(db:exec('LISTEN mytest;')) + +assert(db:exec('NOTIFY mytest;')) +assert(db:exec("NOTIFY mytest, 'foo';")) + +db:set_notifier(function(ev, payload, pid) + print('notification', ev, payload, pid) +end) + +assert(db:exec('SELECT pg_notify($1, $2);', 'mytest', 'bar')) +assert(db:exec('SELECT pg_notify($1, $2), pg_notify($1, $3);', 'mytest', 'foo', 'bar')) + +db:set_notifier(function(ev, payload, pid) + print('\nnotification', ev, payload, pid) +end) + +utils.spawn(function() + local sleeper = utils.newsleeper() + for i=1,19 do + sleeper:sleep(0.20) + io.write('.') + end + print('') +end) + + +utils.spawn(function() + local sleeper = utils.newsleeper() + local db2 = assert(postgres.connect(dbconnstr)) + for i=1,3 do + sleeper:sleep(1) + assert(db2:exec("NOTIFY mytest, 'foo from secondary connection';")) + end +end) + +local sleeper = utils.newsleeper() +print('\ngoing to sleep') +sleeper:sleep(4) +print('awake again\n') + print("Exiting " .. arg[0]) -- vim: syntax=lua ts=2 sw=2 noet: -- cgit v1.2.1