#!/usr/bin/env python3
import psycopg2
from psycopg2 import sql
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import json
import time
import logging
from config import get_config
schema_name = get_config()['db_schema']
#logging.basicConfig(level=logging.DEBUG)
def open_db():
global conn
db_conn_str = get_config()['db_conn_str']
conn = psycopg2.connect(db_conn_str)
column_cache = {}
def get_columns(table_name):
global column_cache
if table_name in column_cache:
return column_cache[table_name]
sql_str = """
SELECT column_name FROM information_schema.columns
WHERE table_schema = %s AND table_name = %s
"""
cur = conn.cursor()
cur.execute(sql_str, (schema_name, table_name))
rows = cur.fetchall()
ret = [row[0] for row in rows]
column_cache[table_name] = ret
return ret
def test_table(table_name):
cols = get_columns(table_name)
try:
assert('id' in cols)
assert(cols[0] == 'id')
assert('updated' in cols)
assert('data' in cols)
except AssertionError:
print(f'Invalid or missing table schema for {table_name:s}')
raise
def gen_sql(table_name, cols):
updates = [sql.Identifier(col) + sql.SQL(' = EXCLUDED.') + sql.Identifier(col) for col in cols[1:]]
comp = sql.SQL("""
INSERT INTO {}.{}
({})
VALUES ({})
ON CONFLICT ({})
DO UPDATE SET {};
""").format(sql.Identifier(schema_name),
sql.Identifier(table_name),
sql.SQL(', ').join(map(sql.Identifier, cols)),
sql.SQL(', ').join(map(sql.Placeholder, cols)),
sql.Identifier(cols[0]),
sql.SQL(', ').join(updates))
return comp
def update_object(kind, obj):
cols = get_columns(kind)
comp = gen_sql(kind, cols)
t = {}
for key in cols:
if key in obj:
t[key] = obj[key]
else:
t[key] = None
t['data'] = json.dumps(obj)
cur = conn.cursor()
cur.execute(comp, t)
def last_updated(kind):
comp = sql.SQL('SELECT EXTRACT(EPOCH FROM MAX(updated)) - 1 FROM {}.{};'
).format(sql.Identifier(schema_name), sql.Identifier(kind))
cur = conn.cursor()
cur.execute(comp)
last = cur.fetchone()[0]
if last is None: return None
return int(last)
def fetch_objects(s, kind, params):
endpoint = f'https://peeringdb.com/api/{kind:s}'
r = s.get(endpoint, params=params )
objs = json.loads(r.text)['data']
for obj in objs:
update_object(kind, obj)
return len(objs)
def initial_sync(s, kind):
initial_step = 100
step = initial_step
lows = 0
low = 0
while lows < 5:
high = low + step + 1
n = fetch_objects(s, kind, { 'id__gt': low, 'id__lt': high, 'depth': 0 } )
low += step
if n > 0:
step = initial_step
lows = 0
else:
step *= 2
lows += 1
fetch_objects(s, kind, { 'id__gt': low, 'depth': 0 } )
def sync_table(s, kind):
test_table(kind)
endpoint = f'https://peeringdb.com/api/{kind:s}'
last = last_updated(kind)
if last is None:
last = int(time.time()) - 3600
initial_sync(s, kind)
fetch_objects(s, kind, { 'since': last, 'depth': 0 } )
def main():
open_db()
s = requests.Session()
retries = Retry(total=5, backoff_factor=1, status_forcelist=[ 502, 503, 504 ])
s.mount('https://', HTTPAdapter(max_retries=retries))
s.auth = get_config()['auth']
req_agent = s.headers.get('User-Agent')
s.headers.update({'User-Agent': f'peeringdb-simplesync/0.1 {req_agent:s}'})
r = s.get('https://peeringdb.com/apidocs/')
s.headers.update({'Accept': 'application/json'})
ignored = [ 'as_set' ]
apidoc = json.loads(r.text)
for key in apidoc:
if key[0] == '_' or key in ignored: continue
try:
sync_table(s, key)
except AssertionError:
print(f'skipping {key:s}...')
conn.commit()
main()