import requests
import json
import time
import os
# HOST = 'http://localhost:8123'
HOST = os.getenv('DB_HOST')
DEFAULT_CONFIG = json.load(open('fixed_config.json'))
def exec_query(sql: str, db: str, settings={}):
if sql.endswith(';'):
sql = sql[:-1]
sql += " FORMAT JSON"
settings['database'] = db
settings.update(DEFAULT_CONFIG)
start = time.time()
try:
r = requests.post(HOST, data=sql, params=settings)
if r.status_code != 200:
print(r.text)
return {'elapsed':time.time()-start, "fail": 1}
r.encoding = 'utf-8'
resp = r.json()
except Exception as e:
print(e)
return {'elapsed':time.time()-start, "fail": 1}
resp['statistics']['fail'] = 0
return resp['statistics']
def get_query_plan(sql: str, db: str, analyze=False):
if sql.endswith(';'):
sql = sql[:-1]
prefix = "explain analyze json=1 " if analyze else "explain json=1 "
sql = prefix + sql
p = {'database': db, 'enable_optimizer': 1}
p.update(DEFAULT_CONFIG)
r = requests.post(HOST, data=sql, params=p)
r.encoding = 'utf-8'
content = r.text
content = content.replace('\\n', '')
content = content.replace('\\', ' ')
return json.loads(content)
if __name__ == "__main__":
print(get_query_plan("select 1", "tpcds", analyze=True))