From 6b7297545b71fda63aff0a74e680660e8c22222b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Asbj=C3=B8rn=20Sloth=20T=C3=B8nnesen?= Date: Fri, 6 Sep 2019 13:43:35 +0000 Subject: initial commit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Asbjørn Sloth Tønnesen --- sync.py | 150 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 150 insertions(+) create mode 100755 sync.py (limited to 'sync.py') diff --git a/sync.py b/sync.py new file mode 100755 index 0000000..340b6be --- /dev/null +++ b/sync.py @@ -0,0 +1,150 @@ +#!/usr/bin/env python3 + +import psycopg2 +from psycopg2 import sql +psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) + +import requests +from requests.adapters import HTTPAdapter +from requests.packages.urllib3.util.retry import Retry + +import json + +import time + +import logging +from config import get_config +schema_name = get_config()['db_schema'] + +#logging.basicConfig(level=logging.DEBUG) + +def open_db(): + global conn + db_conn_str = get_config()['db_conn_str'] + conn = psycopg2.connect(db_conn_str) + +column_cache = {} +def get_columns(table_name): + global column_cache + if table_name in column_cache: + return column_cache[table_name] + sql_str = """ + SELECT column_name FROM information_schema.columns + WHERE table_schema = %s AND table_name = %s + """ + cur = conn.cursor() + cur.execute(sql_str, (schema_name, table_name)) + rows = cur.fetchall() + ret = [row[0] for row in rows] + column_cache[table_name] = ret + return ret + +def test_table(table_name): + cols = get_columns(table_name) + try: + assert('id' in cols) + assert(cols[0] == 'id') + assert('updated' in cols) + assert('data' in cols) + except AssertionError: + print(f'Invalid or missing table schema for {table_name:s}') + raise + +def gen_sql(table_name, cols): + updates = [sql.Identifier(col) + sql.SQL(' = EXCLUDED.') + sql.Identifier(col) for col in cols[1:]] + comp = sql.SQL(""" + INSERT INTO {}.{} + ({}) + VALUES ({}) + ON CONFLICT ({}) + DO UPDATE SET {}; + """).format(sql.Identifier(schema_name), + sql.Identifier(table_name), + sql.SQL(', ').join(map(sql.Identifier, cols)), + sql.SQL(', ').join(map(sql.Placeholder, cols)), + sql.Identifier(cols[0]), + sql.SQL(', ').join(updates)) + return comp + +def update_object(kind, obj): + cols = get_columns(kind) + comp = gen_sql(kind, cols) + t = {} + for key in cols: + if key in obj: + t[key] = obj[key] + else: + t[key] = None + t['data'] = json.dumps(obj) + cur = conn.cursor() + cur.execute(comp, t) + +def last_updated(kind): + comp = sql.SQL('SELECT EXTRACT(EPOCH FROM MAX(updated)) - 1 FROM {}.{};' + ).format(sql.Identifier(schema_name), sql.Identifier(kind)) + cur = conn.cursor() + cur.execute(comp) + last = cur.fetchone()[0] + if last is None: return None + return int(last) + +def fetch_objects(s, kind, params): + endpoint = f'https://peeringdb.com/api/{kind:s}' + r = s.get(endpoint, params=params ) + objs = json.loads(r.text)['data'] + for obj in objs: + update_object(kind, obj) + return len(objs) + +def initial_sync(s, kind): + initial_step = 100 + step = initial_step + lows = 0 + low = 0 + while lows < 5: + high = low + step + 1 + n = fetch_objects(s, kind, { 'id__gt': low, 'id__lt': high, 'depth': 0 } ) + low += step + if n > 0: + step = initial_step + lows = 0 + else: + step *= 2 + lows += 1 + fetch_objects(s, kind, { 'id__gt': low, 'depth': 0 } ) + +def sync_table(s, kind): + test_table(kind) + endpoint = f'https://peeringdb.com/api/{kind:s}' + last = last_updated(kind) + if last is None: + last = int(time.time()) - 3600 + initial_sync(s, kind) + fetch_objects(s, kind, { 'since': last, 'depth': 0 } ) + +def main(): + open_db() + + s = requests.Session() + retries = Retry(total=5, backoff_factor=1, status_forcelist=[ 502, 503, 504 ]) + s.mount('https://', HTTPAdapter(max_retries=retries)) + s.auth = get_config()['auth'] + + req_agent = s.headers.get('User-Agent') + s.headers.update({'User-Agent': f'peeringdb-simplesync/0.1 {req_agent:s}'}) + r = s.get('https://peeringdb.com/apidocs/') + s.headers.update({'Accept': 'application/json'}) + + ignored = [ 'as_set' ] + + apidoc = json.loads(r.text) + for key in apidoc: + if key[0] == '_' or key in ignored: continue + try: + sync_table(s, key) + except AssertionError: + print(f'skipping {key:s}...') + + conn.commit() + +main() -- cgit v1.2.1