aboutsummaryrefslogtreecommitdiffstats
path: root/sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'sync.py')
-rwxr-xr-xsync.py150
1 files changed, 150 insertions, 0 deletions
diff --git a/sync.py b/sync.py
new file mode 100755
index 0000000..340b6be
--- /dev/null
+++ b/sync.py
@@ -0,0 +1,150 @@
+#!/usr/bin/env python3
+
+import psycopg2
+from psycopg2 import sql
+psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
+
+import requests
+from requests.adapters import HTTPAdapter
+from requests.packages.urllib3.util.retry import Retry
+
+import json
+
+import time
+
+import logging
+from config import get_config
+schema_name = get_config()['db_schema']
+
+#logging.basicConfig(level=logging.DEBUG)
+
+def open_db():
+ global conn
+ db_conn_str = get_config()['db_conn_str']
+ conn = psycopg2.connect(db_conn_str)
+
+column_cache = {}
+def get_columns(table_name):
+ global column_cache
+ if table_name in column_cache:
+ return column_cache[table_name]
+ sql_str = """
+ SELECT column_name FROM information_schema.columns
+ WHERE table_schema = %s AND table_name = %s
+ """
+ cur = conn.cursor()
+ cur.execute(sql_str, (schema_name, table_name))
+ rows = cur.fetchall()
+ ret = [row[0] for row in rows]
+ column_cache[table_name] = ret
+ return ret
+
+def test_table(table_name):
+ cols = get_columns(table_name)
+ try:
+ assert('id' in cols)
+ assert(cols[0] == 'id')
+ assert('updated' in cols)
+ assert('data' in cols)
+ except AssertionError:
+ print(f'Invalid or missing table schema for {table_name:s}')
+ raise
+
+def gen_sql(table_name, cols):
+ updates = [sql.Identifier(col) + sql.SQL(' = EXCLUDED.') + sql.Identifier(col) for col in cols[1:]]
+ comp = sql.SQL("""
+ INSERT INTO {}.{}
+ ({})
+ VALUES ({})
+ ON CONFLICT ({})
+ DO UPDATE SET {};
+ """).format(sql.Identifier(schema_name),
+ sql.Identifier(table_name),
+ sql.SQL(', ').join(map(sql.Identifier, cols)),
+ sql.SQL(', ').join(map(sql.Placeholder, cols)),
+ sql.Identifier(cols[0]),
+ sql.SQL(', ').join(updates))
+ return comp
+
+def update_object(kind, obj):
+ cols = get_columns(kind)
+ comp = gen_sql(kind, cols)
+ t = {}
+ for key in cols:
+ if key in obj:
+ t[key] = obj[key]
+ else:
+ t[key] = None
+ t['data'] = json.dumps(obj)
+ cur = conn.cursor()
+ cur.execute(comp, t)
+
+def last_updated(kind):
+ comp = sql.SQL('SELECT EXTRACT(EPOCH FROM MAX(updated)) - 1 FROM {}.{};'
+ ).format(sql.Identifier(schema_name), sql.Identifier(kind))
+ cur = conn.cursor()
+ cur.execute(comp)
+ last = cur.fetchone()[0]
+ if last is None: return None
+ return int(last)
+
+def fetch_objects(s, kind, params):
+ endpoint = f'https://peeringdb.com/api/{kind:s}'
+ r = s.get(endpoint, params=params )
+ objs = json.loads(r.text)['data']
+ for obj in objs:
+ update_object(kind, obj)
+ return len(objs)
+
+def initial_sync(s, kind):
+ initial_step = 100
+ step = initial_step
+ lows = 0
+ low = 0
+ while lows < 5:
+ high = low + step + 1
+ n = fetch_objects(s, kind, { 'id__gt': low, 'id__lt': high, 'depth': 0 } )
+ low += step
+ if n > 0:
+ step = initial_step
+ lows = 0
+ else:
+ step *= 2
+ lows += 1
+ fetch_objects(s, kind, { 'id__gt': low, 'depth': 0 } )
+
+def sync_table(s, kind):
+ test_table(kind)
+ endpoint = f'https://peeringdb.com/api/{kind:s}'
+ last = last_updated(kind)
+ if last is None:
+ last = int(time.time()) - 3600
+ initial_sync(s, kind)
+ fetch_objects(s, kind, { 'since': last, 'depth': 0 } )
+
+def main():
+ open_db()
+
+ s = requests.Session()
+ retries = Retry(total=5, backoff_factor=1, status_forcelist=[ 502, 503, 504 ])
+ s.mount('https://', HTTPAdapter(max_retries=retries))
+ s.auth = get_config()['auth']
+
+ req_agent = s.headers.get('User-Agent')
+ s.headers.update({'User-Agent': f'peeringdb-simplesync/0.1 {req_agent:s}'})
+ r = s.get('https://peeringdb.com/apidocs/')
+ s.headers.update({'Accept': 'application/json'})
+
+ ignored = [ 'as_set' ]
+
+ apidoc = json.loads(r.text)
+ for key in apidoc:
+ if key[0] == '_' or key in ignored: continue
+ try:
+ sync_table(s, key)
+ except AssertionError:
+ print(f'skipping {key:s}...')
+
+ conn.commit()
+
+main()