import io import stardog import pandas as pd def get_prefixes(): return """ PREFIX kglids: PREFIX data: PREFIX schema: PREFIX rdf: """ def get_all_profiled_tables(): return get_prefixes() + """ SELECT ?table_name WHERE { ?table_id rdf:type kglids:Table . ?table_id schema:name ?table_name . } """ def get_top_k_tables(pairs: list): top_k = {} for p in pairs: if p[0] not in top_k: top_k[p[0]] = p[1] else: updated_score = top_k.get(p[0]) + p[1] top_k[p[0]] = updated_score top_k = list(dict(sorted(top_k.items(), key=lambda item: item[1], reverse=True)).keys()) top_k = [list(ele) for ele in top_k] return top_k def get_similar_relation_tables_query(query_table: str, thresh: float): return get_prefixes() + """ SELECT ?table_name1 ?table_name2 ?certainty WHERE { ?table_id schema:name "%s" ; schema:name ?table_name1 . ?column_id kglids:isPartOf ?table_id . <> data:withCertainty ?certainty . FILTER (?certainty >= %s) . ?column_id2 kglids:isPartOf ?table_id2 . ?table_id2 schema:name ?table_name2 . } """ % (query_table, thresh) def get_related_columns_between_2_tables_query(table_name1, table_name2, relationship: str, thresh: float): return get_prefixes() + \ 'SELECT DISTINCT ?table_id1 ?column_name1 ?table_id2 ?column_name2 \nWHERE\n{\n' \ ' ?table_id1 schema:name "%s" .\n' \ ' ?table_id2 schema:name "%s" .\n' \ ' ?column_id1 kglids:isPartOf ?table_id1 .\n' \ ' ?column_id2 kglids:isPartOf ?table_id2 .\n' \ ' <> data:withCertainty ?c .\n' \ ' FILTER(?c >= %s) .\n' \ ' ?column_id1 schema:name ?column_name1 .\n' \ ' ?column_id2 schema:name ?column_name2 .\n' \ '}' % (table_name1, table_name2, relationship, thresh) def attribute_precision_j_query(query_table, table, thresh: float): return get_prefixes() + """ SELECT DISTINCT ?target_table ?target_attribute ?joinable_tables_name ?candidate_attribute ?certainty WHERE { { SELECT DISTINCT ?joinable_tables_name WHERE { { ?table_id1 schema:name "%s" . #si ?column1 kglids:isPartOf ?table_id1 . ?column1 data:hasContentSimilarity ?column2 . ?column2 kglids:isPartOf ?table_id2 . ?table_id2 rdf:type kglids:Table . ?table_id2 schema:name ?joinable_tables_name . } UNION # 1 hop { ?table_id3 schema:name ?joinable_tables_name . ?column3 kglids:isPartOf ?table_id3 . ?column3 data:hasContentSimilarity ?column4 . ?column4 kglids:isPartOf ?table_id4 . ?table_id4 rdf:type kglids:Table . ?table_id4 schema:name ?joinable_tables_name . } UNION # 2 hop { ?table_id5 schema:name ?joinable_tables_name . ?column5 kglids:isPartOf ?table_id5 . ?column5 data:hasContentSimilarity ?column6 . ?column6 kglids:isPartOf ?table_id6 . ?table_id6 rdf:type kglids:Table . ?table_id6 schema:name ?joinable_tables_name . } UNION # 3 hop { ?table_id7 schema:name ?joinable_tables_name . ?column7 kglids:isPartOf ?table_id7 . ?column7 data:hasContentSimilarity ?column8 . ?column8 kglids:isPartOf ?table_id8 . ?table_id8 rdf:type kglids:Table . ?table_id8 schema:name ?joinable_tables_name . } UNION # 4 hop { ?table_id9 schema:name ?joinable_tables_name . ?column9 kglids:isPartOf ?table_id9 . ?column9 data:hasContentSimilarity ?column10 . ?column10 kglids:isPartOf ?table_id10 . ?table_id10 rdf:type kglids:Table . ?table_id10 schema:name ?joinable_tables_name . } UNION # 5 hop { ?table_id11 schema:name ?joinable_tables_name . ?column11 kglids:isPartOf ?table_id11 . ?column11 data:hasContentSimilarity ?column12 . ?column12 kglids:isPartOf ?table_id12 . ?table_id12 rdf:type kglids:Table . ?table_id12 schema:name ?joinable_tables_name . } } } ?table_id_x schema:name ?joinable_tables_name . ?table_id_t schema:name "%s" . #Target ?table_id_t schema:name ?target_table . ?column_x kglids:isPartOf ?table_id_x . ?column_t kglids:isPartOf ?table_id_t . ?column_t schema:name ?target_attribute . ?column_x schema:name ?candidate_attribute . <> data:withCertainty ?certainty. FILTER(?certainty >= %s) . }""" % (table, query_table, thresh) # --------------------QUERY EXEC------------------------- def execute_query(conn: stardog.Connection, query: str, return_type: str = 'json', timeout: int = 0): if return_type == 'csv': result = conn.select(query, content_type='text/csv', timeout=timeout) return pd.read_csv(io.BytesIO(bytes(result))) elif return_type == 'json': result = conn.select(query) return result['results']['bindings'] elif return_type == 'ask': result = conn.select(query) return result['boolean'] elif return_type == 'update': result = conn.update(query) return result else: error = return_type + ' not supported!' raise ValueError(error) attr_pairs = {} def get_related_columns_between_2_tables_attribute_precision(sparql, table1, table2, thresh): if (table1, table2) in attr_pairs: return attr_pairs.get((table1, table2)) else: result = [] res = execute_query(sparql, get_related_columns_between_2_tables_query(table1, table2, 'hasLabelSimilarity', thresh)) for r in res: c1 = r["column_name1"]["value"] c2 = r["column_name2"]["value"] result.append((table1, c1, table2, c2)) attr_pairs[(table1, table2)] = result return attr_pairs.get((table1, table2)) attr_pairs_j = {} def get_related_columns_between_2_tables_j_attribute_precision(SPARQL, query_table: str, table: str, thresh): if (query_table, table) in attr_pairs_j: return attr_pairs_j.get((query_table, table)) else: result = [] res = execute_query(SPARQL, attribute_precision_j_query(query_table, table, thresh)) for r in res: target_t = r["target_table"]["value"] target_attr = r["target_attribute"]["value"] candidate_t = r["joinable_tables_name"]["value"] candidate_attr = r["candidate_attribute"]["value"] result.append((target_t, target_attr, candidate_t, candidate_attr)) attr_pairs_j[(query_table, table)] = result return attr_pairs_j.get((query_table, table)) top_k_per_target_table = {} def get_top_k_related_tables(sparql, query_table, k, thresh): if query_table in top_k_per_target_table: top_k = top_k_per_target_table.get(query_table) return top_k[:k] else: result = [] res = execute_query(sparql, get_similar_relation_tables_query(query_table, thresh)) for r in res: table1 = r["table_name1"]["value"] table2 = r["table_name2"]["value"] certainty = float(r["certainty"]["value"]) result.append([(table1, table2), certainty]) top_k_per_target_table[query_table] = get_top_k_tables(result) return top_k_per_target_table.get(query_table)[:k]