aboutsummaryrefslogblamecommitdiffstats
path: root/sync.py
blob: 8388d25e6b01805729f9a0adc79ea30cb0c07dcc (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11










                                                              
           



                                


                             

                                                         



































                                                                    







                                                                                                                       
                      
                                  





















                                                                   

                                   
                       




                                       









                                                                            
                                         
                                                    

                       

                                               

                            












                                       
                                                                      






                               
                                              







                                                    
                                              
 










                                                                          








                                                                





                                                                                  
                  


                                                                               






                                                             



                                                    






                                       







                                         
#!/usr/bin/env python3

import psycopg2
from psycopg2 import sql
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)

import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

import json
import time
import re
import yaml

from urllib.parse import urljoin

import logging
from config import get_config

schema_name = get_config().get('db_schemad', 'peeringdb')

#logging.basicConfig(level=logging.DEBUG)

def open_db():
    global conn
    db_conn_str = get_config()['db_conn_str']
    conn = psycopg2.connect(db_conn_str)

column_cache = {}
def get_columns(table_name):
    global column_cache
    if table_name in column_cache:
        return column_cache[table_name]
    sql_str = """
        SELECT column_name FROM information_schema.columns
        WHERE table_schema = %s AND table_name = %s
        """
    cur = conn.cursor()
    cur.execute(sql_str, (schema_name, table_name))
    rows = cur.fetchall()
    ret = [row[0] for row in rows]
    column_cache[table_name] = ret
    return ret

def test_table(table_name):
    cols = get_columns(table_name)
    try:
        assert('id' in cols)
        assert(cols[0] == 'id')
        assert('updated' in cols)
        assert('data' in cols)
    except AssertionError:
        print(f'Invalid or missing table schema for {table_name:s}')
        raise

def gen_sql(table_name, cols):
    updates = []
    for col in cols[1:]:
        if col == 'updated':
            stmt = sql.SQL('updated = CASE WHEN EXCLUDED.deleted IS NOT NULL THEN t.updated ELSE EXCLUDED.updated END')
        else:
            stmt = sql.SQL('{} = EXCLUDED.{}').format(sql.Identifier(col), sql.Identifier(col))
        updates.append(stmt)

    comp = sql.SQL("""
            INSERT INTO {}.{} AS t
                ({})
                VALUES ({})
                ON CONFLICT ({})
                DO UPDATE SET {};
            """).format(sql.Identifier(schema_name),
                    sql.Identifier(table_name),
                    sql.SQL(', ').join(map(sql.Identifier, cols)),
                    sql.SQL(', ').join(map(sql.Placeholder, cols)),
                    sql.Identifier(cols[0]),
                    sql.SQL(', ').join(updates))
    return comp

def update_object(kind, obj):
    cols = get_columns(kind)
    comp = gen_sql(kind, cols)
    t = {}
    for key in cols:
        if key in obj:
            t[key] = obj[key]
        else:
            t[key] = None
    t['data'] = json.dumps(obj)
    if t['status'] == 'deleted':
        t['deleted'] = t['updated']
    cur = conn.cursor()
    try:
        cur.execute(comp, t)
    except:
        print(kind, t['id'], t['data'])
        raise

def last_updated(kind):
    comp = sql.SQL('SELECT EXTRACT(EPOCH FROM MAX(updated)) - 1 FROM {}.{};'
                ).format(sql.Identifier(schema_name), sql.Identifier(kind))
    cur = conn.cursor()
    cur.execute(comp)
    last = cur.fetchone()[0]
    if last is None: return None
    return int(last)

def fetch_objects(s, kind, extra_params):
    endpoint = f'https://peeringdb.com/api/{kind:s}'
    params = {
            'depth': 0,
            'status__in': 'ok,pending,deleted',
            'since': 1,
            **extra_params }

    r = s.get(endpoint, params=params )
    objs = json.loads(r.text)['data']
    for obj in objs:
        update_object(kind, obj)
    return len(objs)

def initial_sync(s, kind):
    initial_step = 100
    step = initial_step
    lows = 0
    low = 0
    while lows < 5:
        high = low + step + 1
        n = fetch_objects(s, kind, { 'id__gt': low, 'id__lt': high } )
        low += step
        if n > 0:
            step = initial_step
            lows = 0
        else:
            step *= 2
            lows += 1
    fetch_objects(s, kind, { 'id__gt': low } )

def sync_table(s, kind):
    test_table(kind)
    endpoint = f'https://peeringdb.com/api/{kind:s}'
    last = last_updated(kind)
    if last is None:
        last = int(time.time()) - 3600
        initial_sync(s, kind)
    fetch_objects(s, kind, { 'since': last } )

def find_spec(s, url):
    s.headers.update({'Accept': 'text/html'})
    r = s.get(url)

    # look for something like <redoc spec-url='/s/2.20.2/api-schema.yaml'>
    p = re.compile(r'<redoc[^>]+\bspec-url=([\'"])([^\'">]+)\1[^>]*>')
    m = p.search(r.text)
    assert(m)

    return urljoin(url, m[2])

def handle_auth(s):
    auth = get_config().get('auth')
    if type(auth) == str:
        # API-Key
        s.headers.update({'Authorization': f'Api-Key {auth:s}'})
    else:
        # eg. HTTPBasicAuth('username', 'password')
        s.auth = auth

def main():
    open_db()

    s = requests.Session()
    retries = Retry(total=5, backoff_factor=1, status_forcelist=[ 502, 503, 504 ])
    s.mount('https://', HTTPAdapter(max_retries=retries))
    handle_auth(s)

    req_agent = s.headers.get('User-Agent')
    s.headers.update({'User-Agent': f'peeringdb-simplesync/0.1 {req_agent:s}'})

    spec_url = find_spec(s, 'https://peeringdb.com/apidocs/')

    s.headers.update({'Accept': 'application/x-yaml'})
    r = s.get(spec_url)

    # subsequent requests are going to be JSON
    s.headers.update({'Accept': 'application/json'})

    ignored = [ 'as_set' ]

    apidoc = yaml.safe_load(r.text)
    p = re.compile(r'^/api/([a-z_]+)$')
    for path in apidoc['paths']:
        m = p.match(path)
        if not m: continue
        key = m[1]
        if key in ignored: continue
        try:
            sync_table(s, key)
        except AssertionError:
            print(f'skipping {key:s}...')

    conn.commit()

main()