#!/usr/bin/env python3
import psycopg2
from psycopg2 import sql
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import json
import time
import re
import yaml
from urllib.parse import urljoin
import logging
from config import get_config
schema_name = get_config().get('db_schemad', 'peeringdb')
domain_name = get_config().get('domain_name', 'www.peeringdb.com')
user_agent = 'peeringdb-simplesync/0.2'
#logging.basicConfig(level=logging.DEBUG)
def open_db():
global conn
db_conn_str = get_config()['db_conn_str']
conn = psycopg2.connect(db_conn_str)
column_cache = {}
def get_columns(table_name):
global column_cache
if table_name in column_cache:
return column_cache[table_name]
sql_str = """
SELECT column_name FROM information_schema.columns
WHERE table_schema = %s AND table_name = %s
"""
cur = conn.cursor()
cur.execute(sql_str, (schema_name, table_name))
rows = cur.fetchall()
ret = [row[0] for row in rows]
column_cache[table_name] = ret
return ret
def test_table(table_name):
cols = get_columns(table_name)
try:
assert('id' in cols)
assert(cols[0] == 'id')
assert('updated' in cols)
assert('data' in cols)
except AssertionError:
print(f'Invalid or missing table schema for {table_name:s}')
raise
def gen_sql(table_name, cols):
updates = []
for col in cols[1:]:
if col == 'updated':
stmt = sql.SQL('updated = CASE WHEN EXCLUDED.deleted IS NOT NULL THEN t.updated ELSE EXCLUDED.updated END')
else:
stmt = sql.SQL('{} = EXCLUDED.{}').format(sql.Identifier(col), sql.Identifier(col))
updates.append(stmt)
comp = sql.SQL("""
INSERT INTO {}.{} AS t
({})
VALUES ({})
ON CONFLICT ({})
DO UPDATE SET {};
""").format(sql.Identifier(schema_name),
sql.Identifier(table_name),
sql.SQL(', ').join(map(sql.Identifier, cols)),
sql.SQL(', ').join(map(sql.Placeholder, cols)),
sql.Identifier(cols[0]),
sql.SQL(', ').join(updates))
return comp
def update_object(kind, obj):
cols = get_columns(kind)
comp = gen_sql(kind, cols)
t = {}
for key in cols:
if key in obj:
t[key] = obj[key]
else:
t[key] = None
t['data'] = json.dumps(obj)
if t['status'] == 'deleted':
t['deleted'] = t['updated']
cur = conn.cursor()
try:
cur.execute(comp, t)
except:
print(kind, t['id'], t['data'])
raise
def last_updated(kind):
comp = sql.SQL('SELECT EXTRACT(EPOCH FROM MAX(updated)) - 1 FROM {}.{};'
).format(sql.Identifier(schema_name), sql.Identifier(kind))
cur = conn.cursor()
cur.execute(comp)
last = cur.fetchone()[0]
if last is None: return None
return int(last)
def fetch_objects(s, kind, extra_params):
endpoint = f'https://{domain_name:s}/api/{kind:s}'
params = {
'depth': 0,
'status__in': 'ok,pending,deleted',
'since': 1,
**extra_params }
r = s.get(endpoint, params=params )
objs = json.loads(r.text)['data']
for obj in objs:
update_object(kind, obj)
return len(objs)
def initial_sync(s, kind):
initial_step = 100
step = initial_step
lows = 0
low = 0
while lows < 5:
high = low + step + 1
n = fetch_objects(s, kind, { 'id__gt': low, 'id__lt': high } )
low += step
if n > 0:
step = initial_step
lows = 0
else:
step *= 2
lows += 1
fetch_objects(s, kind, { 'id__gt': low } )
def sync_table(s, kind):
test_table(kind)
endpoint = f'https://{domain_name:s}/api/{kind:s}'
last = last_updated(kind)
if last is None:
last = int(time.time()) - 3600
initial_sync(s, kind)
fetch_objects(s, kind, { 'since': last } )
def find_spec(s, url):
s.headers.update({'Accept': 'text/html'})
r = s.get(url)
# look for something like <redoc spec-url='/s/2.20.2/api-schema.yaml'>
p = re.compile(r'<redoc[^>]+\bspec-url=([\'"])([^\'">]+)\1[^>]*>')
m = p.search(r.text)
assert(m)
return urljoin(url, m[2])
def find_object_types_via_apidocs(s):
ret = []
spec_url = find_spec(s, f'https://{domain_name:s}/apidocs/')
s.headers.update({'Accept': 'application/x-yaml'})
r = s.get(spec_url)
ignored_types = [ 'as_set' ]
apidoc = yaml.safe_load(r.text)
p = re.compile(r'^/api/([a-z_]+)$')
for path in apidoc['paths']:
m = p.match(path)
if not m: continue
key = m[1]
if key in ignored_types: continue
ret.append(key)
return ret
def handle_auth(s):
auth = get_config().get('auth')
if type(auth) == str:
# API-Key
s.headers.update({'Authorization': f'Api-Key {auth:s}'})
else:
# eg. HTTPBasicAuth('username', 'password')
s.auth = auth
def main():
open_db()
s = requests.Session()
retries = Retry(total=16,
backoff_factor=1.5,
status_forcelist=[ 429, 502, 503, 504 ])
s.mount('https://', HTTPAdapter(max_retries=retries))
handle_auth(s)
req_agent = s.headers.get('User-Agent')
s.headers.update({'User-Agent': f'{user_agent:s} {req_agent:s}'})
keys = get_config().get('object_types')
if not keys:
keys = find_object_types_via_apidocs(s)
print('Consider setting \'object_types\':', keys)
# subsequent requests are going to be JSON
s.headers.update({'Accept': 'application/json'})
for key in keys:
try:
sync_table(s, key)
except AssertionError:
print(f'skipping {key:s}...')
conn.commit()
main()