aqetuner / connector.py
connector.py
Raw
import requests
import json
import time
import os

# HOST = 'http://localhost:8123'
HOST = os.getenv('DB_HOST')

DEFAULT_CONFIG = json.load(open('fixed_config.json'))

def exec_query(sql: str, db: str, settings={}):
    if sql.endswith(';'):
        sql = sql[:-1]
    sql += " FORMAT JSON"
    settings['database'] = db
    settings.update(DEFAULT_CONFIG)
    start = time.time()
    try:
        r = requests.post(HOST, data=sql, params=settings)
        if r.status_code != 200:
            print(r.text)
            return {'elapsed':time.time()-start, "fail": 1}
        r.encoding = 'utf-8'
        resp = r.json()
    except Exception as e:
        print(e)
        return {'elapsed':time.time()-start, "fail": 1}
    resp['statistics']['fail'] = 0
    return resp['statistics']

def get_query_plan(sql: str, db: str, analyze=False):
    if sql.endswith(';'):
        sql = sql[:-1]
    prefix = "explain analyze json=1 " if analyze else "explain json=1 "
    sql = prefix + sql
    p = {'database': db, 'enable_optimizer': 1}
    p.update(DEFAULT_CONFIG)
    r = requests.post(HOST, data=sql, params=p)
    r.encoding = 'utf-8'
    content = r.text
    content = content.replace('\\n', '')
    content = content.replace('\\', ' ')
    return json.loads(content)

if __name__ == "__main__":
    print(get_query_plan("select 1", "tpcds", analyze=True))