#!/usr/bin/env python3 import psycopg2 from psycopg2 import sql psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) import requests from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry import json import time import logging from config import get_config schema_name = get_config()['db_schema'] #logging.basicConfig(level=logging.DEBUG) def open_db(): global conn db_conn_str = get_config()['db_conn_str'] conn = psycopg2.connect(db_conn_str) column_cache = {} def get_columns(table_name): global column_cache if table_name in column_cache: return column_cache[table_name] sql_str = """ SELECT column_name FROM information_schema.columns WHERE table_schema = %s AND table_name = %s """ cur = conn.cursor() cur.execute(sql_str, (schema_name, table_name)) rows = cur.fetchall() ret = [row[0] for row in rows] column_cache[table_name] = ret return ret def test_table(table_name): cols = get_columns(table_name) try: assert('id' in cols) assert(cols[0] == 'id') assert('updated' in cols) assert('data' in cols) except AssertionError: print(f'Invalid or missing table schema for {table_name:s}') raise def gen_sql(table_name, cols): updates = [sql.Identifier(col) + sql.SQL(' = EXCLUDED.') + sql.Identifier(col) for col in cols[1:]] comp = sql.SQL(""" INSERT INTO {}.{} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {}; """).format(sql.Identifier(schema_name), sql.Identifier(table_name), sql.SQL(', ').join(map(sql.Identifier, cols)), sql.SQL(', ').join(map(sql.Placeholder, cols)), sql.Identifier(cols[0]), sql.SQL(', ').join(updates)) return comp def update_object(kind, obj): cols = get_columns(kind) comp = gen_sql(kind, cols) t = {} for key in cols: if key in obj: t[key] = obj[key] else: t[key] = None t['data'] = json.dumps(obj) cur = conn.cursor() cur.execute(comp, t) def last_updated(kind): comp = sql.SQL('SELECT EXTRACT(EPOCH FROM MAX(updated)) - 1 FROM {}.{};' ).format(sql.Identifier(schema_name), sql.Identifier(kind)) cur = conn.cursor() cur.execute(comp) last = cur.fetchone()[0] if last is None: return None return int(last) def fetch_objects(s, kind, params): endpoint = f'https://peeringdb.com/api/{kind:s}' r = s.get(endpoint, params=params ) objs = json.loads(r.text)['data'] for obj in objs: update_object(kind, obj) return len(objs) def initial_sync(s, kind): initial_step = 100 step = initial_step lows = 0 low = 0 while lows < 5: high = low + step + 1 n = fetch_objects(s, kind, { 'id__gt': low, 'id__lt': high, 'depth': 0 } ) low += step if n > 0: step = initial_step lows = 0 else: step *= 2 lows += 1 fetch_objects(s, kind, { 'id__gt': low, 'depth': 0 } ) def sync_table(s, kind): test_table(kind) endpoint = f'https://peeringdb.com/api/{kind:s}' last = last_updated(kind) if last is None: last = int(time.time()) - 3600 initial_sync(s, kind) fetch_objects(s, kind, { 'since': last, 'depth': 0 } ) def main(): open_db() s = requests.Session() retries = Retry(total=5, backoff_factor=1, status_forcelist=[ 502, 503, 504 ]) s.mount('https://', HTTPAdapter(max_retries=retries)) s.auth = get_config()['auth'] req_agent = s.headers.get('User-Agent') s.headers.update({'User-Agent': f'peeringdb-simplesync/0.1 {req_agent:s}'}) r = s.get('https://peeringdb.com/apidocs/') s.headers.update({'Accept': 'application/json'}) ignored = [ 'as_set' ] apidoc = json.loads(r.text) for key in apidoc: if key[0] == '_' or key in ignored: continue try: sync_table(s, key) except AssertionError: print(f'skipping {key:s}...') conn.commit() main()