aboutsummaryrefslogblamecommitdiffstats
path: root/sync.py
blob: 340b6be87448861e47f44820f7e805ecd6e12d97 (plain) (tree)





















































































































































                                                                                                       
#!/usr/bin/env python3

import psycopg2
from psycopg2 import sql
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)

import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

import json

import time

import logging
from config import get_config
schema_name = get_config()['db_schema']

#logging.basicConfig(level=logging.DEBUG)

def open_db():
    global conn
    db_conn_str = get_config()['db_conn_str']
    conn = psycopg2.connect(db_conn_str)

column_cache = {}
def get_columns(table_name):
    global column_cache
    if table_name in column_cache:
        return column_cache[table_name]
    sql_str = """
        SELECT column_name FROM information_schema.columns
        WHERE table_schema = %s AND table_name = %s
        """
    cur = conn.cursor()
    cur.execute(sql_str, (schema_name, table_name))
    rows = cur.fetchall()
    ret = [row[0] for row in rows]
    column_cache[table_name] = ret
    return ret

def test_table(table_name):
    cols = get_columns(table_name)
    try:
        assert('id' in cols)
        assert(cols[0] == 'id')
        assert('updated' in cols)
        assert('data' in cols)
    except AssertionError:
        print(f'Invalid or missing table schema for {table_name:s}')
        raise

def gen_sql(table_name, cols):
    updates = [sql.Identifier(col) + sql.SQL(' = EXCLUDED.') + sql.Identifier(col) for col in cols[1:]]
    comp = sql.SQL("""
            INSERT INTO {}.{}
                ({})
                VALUES ({})
                ON CONFLICT ({})
                DO UPDATE SET {};
            """).format(sql.Identifier(schema_name),
                    sql.Identifier(table_name),
                    sql.SQL(', ').join(map(sql.Identifier, cols)),
                    sql.SQL(', ').join(map(sql.Placeholder, cols)),
                    sql.Identifier(cols[0]),
                    sql.SQL(', ').join(updates))
    return comp

def update_object(kind, obj):
    cols = get_columns(kind)
    comp = gen_sql(kind, cols)
    t = {}
    for key in cols:
        if key in obj:
            t[key] = obj[key]
        else:
            t[key] = None
    t['data'] = json.dumps(obj)
    cur = conn.cursor()
    cur.execute(comp, t)

def last_updated(kind):
    comp = sql.SQL('SELECT EXTRACT(EPOCH FROM MAX(updated)) - 1 FROM {}.{};'
                ).format(sql.Identifier(schema_name), sql.Identifier(kind))
    cur = conn.cursor()
    cur.execute(comp)
    last = cur.fetchone()[0]
    if last is None: return None
    return int(last)

def fetch_objects(s, kind, params):
    endpoint = f'https://peeringdb.com/api/{kind:s}'
    r = s.get(endpoint, params=params )
    objs = json.loads(r.text)['data']
    for obj in objs:
        update_object(kind, obj)
    return len(objs)

def initial_sync(s, kind):
    initial_step = 100
    step = initial_step
    lows = 0
    low = 0
    while lows < 5:
        high = low + step + 1
        n = fetch_objects(s, kind, { 'id__gt': low, 'id__lt': high, 'depth': 0 } )
        low += step
        if n > 0:
            step = initial_step
            lows = 0
        else:
            step *= 2
            lows += 1
    fetch_objects(s, kind, { 'id__gt': low, 'depth': 0 } )

def sync_table(s, kind):
    test_table(kind)
    endpoint = f'https://peeringdb.com/api/{kind:s}'
    last = last_updated(kind)
    if last is None:
        last = int(time.time()) - 3600
        initial_sync(s, kind)
    fetch_objects(s, kind, { 'since': last, 'depth': 0 } )

def main():
    open_db()

    s = requests.Session()
    retries = Retry(total=5, backoff_factor=1, status_forcelist=[ 502, 503, 504 ])
    s.mount('https://', HTTPAdapter(max_retries=retries))
    s.auth = get_config()['auth']

    req_agent = s.headers.get('User-Agent')
    s.headers.update({'User-Agent': f'peeringdb-simplesync/0.1 {req_agent:s}'})
    r = s.get('https://peeringdb.com/apidocs/')
    s.headers.update({'Accept': 'application/json'})

    ignored = [ 'as_set' ]

    apidoc = json.loads(r.text)
    for key in apidoc:
        if key[0] == '_' or key in ignored: continue
        try:
            sync_table(s, key)
        except AssertionError:
            print(f'skipping {key:s}...')

    conn.commit()

main()