kglids / kg_governor / pipeline_abstraction / src / run.py
run.py
Raw
import ast
import json
import os
import pandas as pd
import time

import src.Calls as Calls
import src.util as util
from typing import Dict

from src.datatypes import Library
from src.pipeline_abstraction import NodeVisitor
from src.datatypes import GraphInformation
from src.json_to_rdf import build_pipeline_rdf_page, build_default_rdf_page, build_library_rdf_page

libraries: Dict[str, Library]
libraries = dict()
default_graph = []


def main():
    overall_start = time.time()
    
    OUTPUT_PATH = '../../../storage/knowledge_graph/pipelines_and_libraries/'

    dataset: os.DirEntry
    # loop through datasets & pipelines
    for dataset in os.scandir('../data/kaggle'):
        pipeline: os.DirEntry
        if dataset.is_dir():
            working_file = {}
            table: os.DirEntry
            if not os.path.isdir(f'../kaggle_small/{dataset.name}'):
                continue
            for table in os.scandir(f'../kaggle_small/{dataset.name}'):
                if table.name != '.DS_Store':
                    try:
                        working_file[table.name] = pd.read_csv(table.path, nrows=1)
                    except Exception as e:
                        print("-<>", table.name, e)
            if not os.path.isdir(f'{dataset.path}/notebooks/'):
                continue
            for pipeline in os.scandir(f'{dataset.path}/notebooks'):
                if pipeline.is_dir():
                    try:
                        with open(f'{pipeline.path}/pipeline_info.json', 'r') as f:
                            pipeline_info = json.load(f)
                        with open(f'{pipeline.path}/kernel-metadata.json', 'r') as f:
                            metadata = json.load(f)
                        pipeline_info['tags'] = metadata['keywords']
                        file: os.DirEntry
                        for file in os.scandir(pipeline.path):
                            if '.py' in file.name:
                                pipeline_analysis(working_file=working_file,
                                                  dataset=dataset,
                                                  file_path=file.path,
                                                  pipeline_info=pipeline_info,
                                                  output_path=OUTPUT_PATH,
                                                  output_filename=metadata['id'].replace('/', '.'))
                    except FileNotFoundError as e:
                        continue

    libs = [library.str() for library in libraries.values()]
    # with open('kaggle/library.json', 'w') as f:
    #     json.dump(libs, f)
    #
    # with open('kaggle/default.json', 'w') as f:
    #     json.dump(default_graph, f)

    with open(os.path.join(OUTPUT_PATH, 'library.ttl'), 'w') as f:
        f.write(build_library_rdf_page(libs))

    with open(os.path.join(OUTPUT_PATH, 'default.ttl'), 'w') as f:
        f.write(build_default_rdf_page(default_graph))


    # literals = {}
    # rdf_file = ''
    #
    # for node, value in literals.items():
    #     rdf_file = rdf_file.replace(node, value)
    #
    # with open(f'../kaggle_rdf/default.ttl', 'w') as output_file:
    #     output_file.write(rdf_file)

    overall_end = time.time()
    print(overall_end - overall_start)
    print(Calls.read_csv_call.count)


def pipeline_analysis(working_file, dataset: os.DirEntry, file_path, pipeline_info, output_path, output_filename):
    starting_time = time.time()
    SOURCE = 'kaggle'
    DATASET_NAME = dataset.name
    PYTHON_FILE_NAME = output_filename

    # Read pipeline file
    with open(file_path, 'r') as src_file:
        src = src_file.read()

    try:
        tree = ast.parse(src)
    except SyntaxError as e:
        with open(f'./errors.csv', 'a') as output_file:
            output_file.write(f'{PYTHON_FILE_NAME},{e.msg}\n')
        return

    # Initialize graph information linked list and Node Visitor
    graph = GraphInformation(PYTHON_FILE_NAME, SOURCE, DATASET_NAME, libraries)
    node_visitor = NodeVisitor(graph_information=graph)
    node_visitor.working_file = working_file

    # Pipeline analysis
    node_visitor.visit(tree)
    ending_time = time.time()
    print("Processing time: ", ending_time - starting_time, 'seconds')

    # Datastructures preparation for insertion to Neo4j
    file_elements = [el.str() for el in graph.files.values()]
    nodes = []
    head = graph.head
    line = 1
    while head is not None:
        head.generate_uri(SOURCE, DATASET_NAME, PYTHON_FILE_NAME, line)
        line += 1
        head = head.next
    head = graph.head
    while head is not None:
        nodes.append(head.str())
        head = head.next

    # with open(f'kaggle/{output_filename}.json', 'w') as f:
    #     json.dump(nodes, f)
    #
    # with open(f'kaggle/{output_filename}-files.json', 'w') as f:
    #     json.dump(file_elements, f)
    if not os.path.isdir(os.path.join(output_path, output_filename)):
        os.mkdir(os.path.join(output_path, output_filename))

    with open(os.path.join(output_path, output_filename + '.ttl'), 'w') as f:
        f.write(build_pipeline_rdf_page(nodes, file_elements))

    pipeline_info['uri'] = util.create_pipeline_uri(SOURCE, DATASET_NAME, output_filename)
    pipeline_info['dataset'] = util.create_dataset_uri(SOURCE, DATASET_NAME)
    default_graph.append(pipeline_info)


if __name__ == "__main__":
    main()