#!/usr/bin/env python3 import psycopg2 from psycopg2 import sql psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) import requests from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry import json import time import re import yaml from urllib.parse import urljoin import logging from config import get_config schema_name = get_config().get('db_schemad', 'peeringdb') domain_name = get_config().get('domain_name', 'www.peeringdb.com') user_agent = 'peeringdb-simplesync/0.2' #logging.basicConfig(level=logging.DEBUG) def open_db(): global conn db_conn_str = get_config()['db_conn_str'] conn = psycopg2.connect(db_conn_str) column_cache = {} def get_columns(table_name): global column_cache if table_name in column_cache: return column_cache[table_name] sql_str = """ SELECT column_name FROM information_schema.columns WHERE table_schema = %s AND table_name = %s """ cur = conn.cursor() cur.execute(sql_str, (schema_name, table_name)) rows = cur.fetchall() ret = [row[0] for row in rows] column_cache[table_name] = ret return ret def test_table(table_name): cols = get_columns(table_name) try: assert('id' in cols) assert(cols[0] == 'id') assert('updated' in cols) assert('data' in cols) except AssertionError: print(f'Invalid or missing table schema for {table_name:s}') raise def gen_sql(table_name, cols): updates = [] for col in cols[1:]: if col == 'updated': stmt = sql.SQL('updated = CASE WHEN EXCLUDED.deleted IS NOT NULL THEN t.updated ELSE EXCLUDED.updated END') else: stmt = sql.SQL('{} = EXCLUDED.{}').format(sql.Identifier(col), sql.Identifier(col)) updates.append(stmt) comp = sql.SQL(""" INSERT INTO {}.{} AS t ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {}; """).format(sql.Identifier(schema_name), sql.Identifier(table_name), sql.SQL(', ').join(map(sql.Identifier, cols)), sql.SQL(', ').join(map(sql.Placeholder, cols)), sql.Identifier(cols[0]), sql.SQL(', ').join(updates)) return comp def update_object(kind, obj): cols = get_columns(kind) comp = gen_sql(kind, cols) t = {} for key in cols: if key in obj: t[key] = obj[key] else: t[key] = None t['data'] = json.dumps(obj) if t['status'] == 'deleted': t['deleted'] = t['updated'] cur = conn.cursor() try: cur.execute(comp, t) except: print(kind, t['id'], t['data']) raise def last_updated(kind): comp = sql.SQL('SELECT EXTRACT(EPOCH FROM MAX(updated)) - 1 FROM {}.{};' ).format(sql.Identifier(schema_name), sql.Identifier(kind)) cur = conn.cursor() cur.execute(comp) last = cur.fetchone()[0] if last is None: return None return int(last) def fetch_objects(s, kind, extra_params): endpoint = f'https://{domain_name:s}/api/{kind:s}' params = { 'depth': 0, 'status__in': 'ok,pending,deleted', 'since': 1, **extra_params } r = s.get(endpoint, params=params ) objs = json.loads(r.text)['data'] for obj in objs: update_object(kind, obj) return len(objs) def initial_sync(s, kind): initial_step = 100 step = initial_step lows = 0 low = 0 while lows < 5: high = low + step + 1 n = fetch_objects(s, kind, { 'id__gt': low, 'id__lt': high } ) low += step if n > 0: step = initial_step lows = 0 else: step *= 2 lows += 1 fetch_objects(s, kind, { 'id__gt': low } ) def sync_table(s, kind): test_table(kind) endpoint = f'https://{domain_name:s}/api/{kind:s}' last = last_updated(kind) if last is None: last = int(time.time()) - 3600 initial_sync(s, kind) fetch_objects(s, kind, { 'since': last } ) def find_spec(s, url): s.headers.update({'Accept': 'text/html'}) r = s.get(url) # look for something like p = re.compile(r']+\bspec-url=([\'"])([^\'">]+)\1[^>]*>') m = p.search(r.text) assert(m) return urljoin(url, m[2]) def find_object_types_via_apidocs(s): ret = [] spec_url = find_spec(s, f'https://{domain_name:s}/apidocs/') s.headers.update({'Accept': 'application/x-yaml'}) r = s.get(spec_url) ignored_types = [ 'as_set' ] apidoc = yaml.safe_load(r.text) p = re.compile(r'^/api/([a-z_]+)$') for path in apidoc['paths']: m = p.match(path) if not m: continue key = m[1] if key in ignored_types: continue ret.append(key) return ret def handle_auth(s): auth = get_config().get('auth') if type(auth) == str: # API-Key s.headers.update({'Authorization': f'Api-Key {auth:s}'}) else: # eg. HTTPBasicAuth('username', 'password') s.auth = auth def main(): open_db() s = requests.Session() retries = Retry(total=16, backoff_factor=1.5, status_forcelist=[ 429, 502, 503, 504 ]) s.mount('https://', HTTPAdapter(max_retries=retries)) handle_auth(s) req_agent = s.headers.get('User-Agent') s.headers.update({'User-Agent': f'{user_agent:s} {req_agent:s}'}) keys = get_config().get('object_types') if not keys: keys = find_object_types_via_apidocs(s) print('Consider setting \'object_types\':', keys) # subsequent requests are going to be JSON s.headers.update({'Accept': 'application/json'}) for key in keys: try: sync_table(s, key) except AssertionError: print(f'skipping {key:s}...') conn.commit() main()