KGTOSA / TOSG_Extraction / TOSG_Extraction_NC.py
TOSG_Extraction_NC.py
Raw
import pandas as pd
import datetime
import requests
import traceback
import sys
import argparse
from threading import Thread
import threading

lock = threading.Lock()
def ExecQueryAndWriteOutput(endpoint_url,query, offset, batch_size, f):
    start_t = datetime.datetime.now()
    r_query = query.replace("?offset", str(offset))
    r_query = r_query.replace("?limit", str(batch_size))
    body = {'query': r_query}
    headers = {'Content-Type': 'application/x-www-form-urlencoded',
               'Accept-Encoding': 'gzip',
               'Accept': 'text/tab-separated-values; charset=UTF-8'}
    r = requests.post(endpoint_url, data=body, headers=headers)
    lock.acquire()
    f.write(r.text.replace(""""subject"	"predicate"	"object"\n""", ""))
    end_t = datetime.datetime.now()
    print("Query idx: ", (offset / batch_size), "/", batches_count, " records count=", len(r.text.split('\n')),
          " time=", end_t - start_t, " sec.")
    lock.release()
def ExecQuery(endpoint_url,query):
    start_t = datetime.datetime.now()
    body = {'query': query}
    headers = {
               #'Content-Type': 'application/sparql-update',
               'Content-Type':"application/x-www-form-urlencoded",
               'Accept-Encoding': 'gzip',
               'Accept': 'text/tab-separated-values; charset=UTF-8'}
    r = requests.post(endpoint_url, data=body, headers=headers)
    return r.text.split('\n')[1]

def get_d1h1_query(graph_uri,target_rel_uri):
    query="""select distinct ?s as ?subject ?p as ?predicate ?o as ?object  
           where
           {
                select ?s ?p ?o
                from <"""+graph_uri+""">
                where 
                {
                ?s <"""+target_rel_uri+"""> ?label.
                ?s ?p ?o.
                filter(!isBlank(?o)).
                }
                limit ?limit 
                offset ?offset
           } 
        """
    return query
def get_d2h1_query(graph_uri,target_rel_uri):
    query="""select distinct ?s as ?subject ?p as ?predicate ?o as ?object
            where
            {
                 select ?o2 as ?s  ?p2 as ?p  ?s as ?o
                 from <"""+graph_uri+""">
                 where
                 {
                  ?s <"""+target_rel_uri+"""> ?label.
                  ?o2 ?p2 ?s.
                  filter(!isBlank(?o2)).
                 }
                limit ?limit 
                offset ?offset
            } 
            """
    return [get_d1h1_query(graph_uri,target_rel_uri),query]
def get_d1h2_query(graph_uri,target_rel_uri):
    query="""select distinct ?s as ?subject ?p as ?predicate ?o as ?object
            where
            {
                 select ?o2 as ?s  ?p3 as ?p  ?o3 as ?o
                 from <"""+graph_uri+""">
                 where
                 {
                  ?s <"""+target_rel_uri+"""> ?label.
                  ?s ?p2 ?o2.
                  ?o2 ?p3 ?o3.
                  filter(!isBlank(?o2)).
                  filter(!isBlank(?o3)).
                 }
                limit ?limit 
                offset ?offset
            }  
            """
    return [get_d1h1_query(graph_uri,target_rel_uri),query]
def get_d2h2_query(graph_uri,target_rel_uri):
    query1="""select distinct ?s as ?subject ?p as ?predicate ?o as ?object
            where
            {     
                 select ?o2 as ?s  ?p3 as ?p  ?o3 as ?o
                 from <"""+graph_uri+""">
                 where
                 {
                  ?s <"""+target_rel_uri+"""> ?label.
                  ?s ?p2 ?o2.
                  ?o2 ?p3 ?o3.
                  filter(!isBlank(?o2)).
                  filter(!isBlank(?o3)).
                 }
                limit ?limit 
                offset ?offset
            }
            """
    query2 = """select distinct ?s as ?subject ?p as ?predicate ?o as ?object
                where
                {     
                    select ?o3 as ?s  ?p3 as ?p  ?o2 as ?o
                      from <""" + graph_uri + """>
                     where
                     {
                      ?s <""" + target_rel_uri + """> ?label.
                      ?o2 ?p ?s.
                      ?o3 ?p3 ?o2.
                      filter(!isBlank(?o2)).
                      filter(!isBlank(?o3)).
                     }
                    limit ?limit 
                    offset ?offset
                }
                """
    return  [get_d1h1_query(graph_uri,target_rel_uri),get_d2h1_query(graph_uri,target_rel_uri),query1,query2]

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--start_offset', dest='start_offset', type=int, help='Add start_offset', default=0)
    parser.add_argument('--sparql_endpoint', type=str, help='SPARQL endpoint URL', default='http://206.12.98.118:8890/sparql')
    parser.add_argument('--graph_uri', type=str, help=' KG URI', default='http://dblp.org')
    parser.add_argument('--target_rel_uri', type=str, help='target_rel_uri URI',default='https://dblp.org/rdf/schema#publishedIn')
    parser.add_argument('--TOSG', type=str, help='TOSG Pattern',default='d2h1')
    parser.add_argument('--batch_size', type=int, help='batch_size', default='1000000')
    parser.add_argument('--out_file', dest='out_file', type=str, help='output file to write trplies to', default='dblp_pv.tsv')
    parser.add_argument('--threads_count', dest='threads_count', type=int, help='output file to write trplies to', default=64)
    args = parser.parse_args()
    print('args=',args)
    start_offset = args.start_offset
    graph_uri=args.graph_uri
    sparql_endpoint =args.sparql_endpoint
    target_rel_uri=args.target_rel_uri
    TOSG=args.TOSG
    batch_size=args.batch_size
    out_file=args.out_file.split('.')[0]+"_"+TOSG+".tsv"
    threads_count = args.threads_count
    queries=[]
    if TOSG=='d1h1':
        queries=[get_d1h1_query(graph_uri,target_rel_uri)]
    elif TOSG=='d1h2':
        queries=get_d1h2_query(graph_uri,target_rel_uri)
    elif TOSG=='d2h1':
        queries=get_d2h1_query(graph_uri,target_rel_uri)
    elif TOSG=='d2h2':
        queries=get_d2h2_query(graph_uri,target_rel_uri)
    ###################################
    q_start_t = datetime.datetime.now()
    rows_count_lst=[]
    for query  in queries:
        rows_count_query = query.replace("select distinct ?s as ?subject ?p as ?predicate ?o as ?object", "select count(*) as ?c")
        rows_count_query=rows_count_query.replace("limit ?limit" ,"")
        rows_count_query = rows_count_query.replace("offset ?offset", "")
        rows_count=ExecQuery(sparql_endpoint,rows_count_query)
        rows_count_lst.append(int(rows_count))
    q_end_t = datetime.datetime.now()
    print("rows_count=", sum(rows_count_lst), "Query Time=",q_end_t - q_start_t, " sec.")
    #######################
    q_start_t = datetime.datetime.now()
    with open(out_file, 'w') as f:
        for q_idx,query  in enumerate(queries):
            batches_count = int(rows_count_lst[q_idx] / batch_size) + 1
            print("query idx=",q_idx,"batches_count=", batches_count)
            th_idx = 1
            th_lst = []
            for idx, offset in enumerate(range(start_offset, rows_count_lst[q_idx], batch_size)):
                try:
                    t = Thread(target=ExecQueryAndWriteOutput, args=(sparql_endpoint,query, offset, batch_size, f))
                    th_lst.append(t)
                    t.start()
                    th_idx = th_idx + 1
                    if th_idx == threads_count:
                        th_idx = 0
                        for th in th_lst:
                            th.join()
                        th_lst = []
                        print(threads_count, " threads finish at ", datetime.datetime.now() - q_start_t, " sec.")
                except  Exception as e:
                    print("Exception", e)
            for th in th_lst:
                th.join()
            print(threads_count, " threads finish at ", datetime.datetime.now() - q_start_t, " sec.")
    q_end_t = datetime.datetime.now()
    print("total time ", q_end_t - q_start_t, " sec.")