kglids / api / api.py
api.py
Raw
from api.template import *
from api.helpers.helper import *
from collections import Counter
from tqdm import tqdm


class KGLiDS:
    def __init__(self, endpoint: str = 'localhost', port=5820, db: str = 'kglids'):
        self.conn = connect_to_stardog(endpoint, port, db)
        self.conn.begin()

    def get_datasets_info(self, show_query: bool = False):
        return get_datasets_info(self.conn, show_query).sort_values('Dataset', ignore_index=True, ascending=True)

    def get_tables_info(self, dataset: str = '', show_query: bool = False):
        if not dataset:
            print('Showing all available table(s): ')
        else:
            print("Showing table(s) for '{}' dataset: ".format(dataset))
        return get_tables_info(self.conn, dataset, show_query).sort_values('Dataset', ignore_index=True, ascending=True)

    def recommend_k_joinable_tables(self, table: pd.Series, k: int = 5, show_query: bool = False):
        if not isinstance(table, pd.Series):
            raise ValueError("table needs to be a type 'pd.Series'")
        # if not isinstance(dataset, str):
        #     raise ValueError("dataset needs to be a type 'str'")
        if not isinstance(k, int):
            raise ValueError("k needs to be a type 'int'")
        elif isinstance(table, pd.Series) and isinstance(k, int):
            dataset = table["Dataset"]
            table = table["Table"]
            recommendations = recommend_tables(self.conn, dataset, table, k, 'data:hasPrimaryKeyForeignKeySimilarity',
                                               show_query)
            recommendations['Score'] = list(map(lambda x: round(x, 2), [float(i) / max(recommendations['Score'].
                                                       tolist()) for i in (recommendations['Score'].tolist())]))

            print('Showing the top-{} joinable table recommendations:'.format(len(recommendations)))
            return recommendations

    def recommend_k_unionable_tables(self, table: pd.Series, k: int = 5, show_query: bool = False):
        if not isinstance(table, pd.Series):
            raise ValueError("table needs to be a type 'pd.Series'")
        # if not isinstance(dataset, str):
        #     raise ValueError("dataset needs to be a type 'str'")
        if not isinstance(k, int):
            raise ValueError("k needs to be a type 'int'")
        elif isinstance(table, pd.Series) and isinstance(k, int):
            dataset = table["Dataset"]
            table = table["Table"]
            recommendations = recommend_tables(self.conn, dataset, table, k, 'data:hasSemanticSimilarity',
                                               show_query)
            recommendations['Score'] = list(map(lambda x: round(x, 2), [float(i) / max(recommendations['Score'].
                                                tolist()) for i in (recommendations['Score'].tolist())]))

            print('Showing the top-{} unionable table recommendations:'.format(len(recommendations)))
            return recommendations

    def get_table_info(self, table, show_query: bool = False):
        dataset = table["Dataset"]
        if 'Recommended_table' in table.keys():
            table = table["Recommended_table"]
        else:
            table = table["Table"]
        return get_table_info(self.conn, dataset, table, show_query)

    def show_graph_info(self, show_query: bool = False):
        print('Information captured: ')
        return show_graph_info(self.conn, show_query)

    def search_tables_on(self, conditions: list, show_query: bool = False):

        def parsed_conditions(user_conditions):
            error_message = 'conditions need to be in encapsulated in list.\n' \
                            'lists in the list are associated by an \"and\" condition.\n' \
                            'String in each tuple will be joined by an \"or\" condition.\n' \
                            ' For instance [[a,b],[c]]'
            if not isinstance(user_conditions, list):
                raise TypeError(error_message)
            else:
                for l in user_conditions:
                    if not isinstance(l, list):
                        raise TypeError(error_message)
                    else:
                        for s in l:
                            if not isinstance(s, str):
                                raise TypeError(error_message)

            i = 1
            filters = []
            statements = []
            for t in user_conditions:
                sts = '?column' + str(i) + ' rdf:type kglids:Column.' \
                                           '\n?column' + str(i) + ' kglids:isPartOf ?table.' \
                                                                  '\n?column' + str(
                    i) + ' rdfs:label ?label' + str(i) + '.'

                statements.append(sts)
                or_conditions = '|'.join(t)
                regex = 'regex(?label' + str(i) + ', "' + or_conditions + '", "i")'
                filters.append(regex)
                i += 1
            return '\n'.join(statements), ' && '.join(filters)

        data = search_tables_on(self.conn, parsed_conditions(conditions), show_query)
        print('Showing recommendations as per the following conditions:\nCondition = ', conditions)
        df = pd.DataFrame(list(data), columns=['Dataset', 'Table', 'Number_of_columns',
                                                 'Number_of_rows', 'Path_to_table']).sort_values('Number_of_rows',
                                                                                                 ignore_index=True,
                                                                                               ascending=False)
        df['Number_of_rows'] = df['Number_of_rows'].apply(lambda x: int(x))
        df['Number_of_columns'] = df['Number_of_columns'].apply(lambda x: int(x))
        return df

    def get_path_between_tables(self, source_table: pd.Series, target_table: pd.Series, hops: int,
                                relation: str = 'data:hasPrimaryKeyForeignKeySimilarity', show_query: bool = False):
        return get_path_between_tables(self.conn, source_table, target_table, hops, relation, show_query)

    def query(self, rdf_query: str):
        return query_kglids(self.conn, rdf_query)

    def get_top_scoring_ml_model(self, dataset: str = '', show_query=False):
        return get_top_scoring_ml_model(self.conn, dataset, show_query)

    def get_pipelines_info(self, author: str = '', show_query=False):
        return get_pipelines_info(self.conn, author, show_query).sort_values('Number_of_votes', ignore_index=True,
                                                                             ascending=False)

    def get_most_recent_pipeline(self, dataset: str = '', show_query=False):
        return get_most_recent_pipeline(self.conn, dataset, show_query)

    def get_top_k_scoring_pipelines_for_dataset(self, dataset: str = '', k: int = None, show_query=False):
        return get_top_k_scoring_pipelines_for_dataset(self.conn, dataset, k, show_query)

    def get_most_popular_parameters(self, library: str, parameters='all'):
        pass

    def search_classifier(self, dataset: str = '', show_query=False):
        return search_classifier(self.conn, dataset, show_query)

    def get_hyperparameters(self, classifier: pd.Series, show_query=False):
        pipeline_name = classifier['Pipeline']
        classifier = classifier['Classifier']
        return get_hyperparameters(self.conn, pipeline_name, classifier, show_query)

    def get_top_k_library_used(self, dataset: str = '', k: int = 5, show_query=False):
        return get_library_usage(self.conn, dataset, k, show_query)

    def get_top_used_libraries(self, k: int = 5, task: str = 'classification', show_query: bool = False):
        supported_tasks = ['classification', 'regression', 'visualization', 'clustering']
        if task not in supported_tasks:
            raise ValueError(' invalid task, try using one of the following tasks: \n'
                             'classification, regression, visualization or clustering!')
        else:
            library_info = get_top_used_libraries(self.conn, task, show_query)
            if len(library_info) == 0:
                print('No library found for {}'.format(task))
                return
            library_info['Module'] = library_info['Module'].apply(lambda module: module.replace('/', '.'))
            # fetch top k libraries by maximum occurrence
            libraries = library_info['Library']
            library_count = Counter(libraries)
            if k > len(library_count):
                k = len(library_count)
                if k == 1:
                    print('Single library was found for {}: '.format(k, task))
                else:
                    print('Maximum {} libraries were found for {}: '.format(k, task))
            else:
                if k == 1:
                    print('Showing the top used library for {}: '.format(task))
                else:
                    print('Showing the top {} libraries for {}: '.format(k, task))
            libraries = sorted(library_count, key=library_count.get, reverse=True)[:k]

            for i in libraries[:len(libraries)-1]:
                print(i + ',', end=' ')
            print(libraries[-1])
            for k, v in tqdm(library_info.to_dict('index').items()):
                if v['Library'] not in libraries:
                    library_info = library_info.drop(k)
            return library_info.sort_values(by=['Library']).reset_index(drop=True)

    def get_pipelines_calling_libraries(self, components: list, show_query: bool = False):
        return get_pipelines_calling_libraries(self.conn, components, show_query)

    def get_pipelines_for_deep_learning(self, show_query: bool = False):
        return get_pipelines_for_deep_learning(self.conn, show_query)

    def recommend_transformations(self, show_query: bool = False):
        return recommend_transformations(self.conn, show_query)

    def get_pipelines_by_tags(self, tag: str = '', show_query: bool = False):
        return get_pipelines_by_tags(self.conn, tag, show_query)

    def show_pipeline_usage_by_task(self, show_query: bool = False):
        usage = dict()
        usage['classification'] = sum(get_pipelines_by_tags(self.conn, tag='classification', show_query=show_query)['Number_of_pipelines'].tolist())
        usage['clustering'] = sum(get_pipelines_by_tags(self.conn, tag='clustering', show_query=show_query)['Number_of_pipelines'].tolist())
        usage['visualization'] = sum(get_pipelines_by_tags(self.conn, tag='visualization', show_query=show_query)['Number_of_pipelines'].tolist())
        usage['cleaning'] = sum(get_pipelines_by_tags(self.conn, tag='cleaning', show_query=show_query)['Number_of_pipelines'].tolist())
        usage['regression'] = sum(get_pipelines_by_tags(self.conn, tag='regression', show_query=show_query)['Number_of_pipelines'].tolist())
        usage['deep learning'] = sum(get_pipelines_by_tags(self.conn, tag='deep learning', show_query=show_query)['Number_of_pipelines'].tolist()) + \
                        sum(get_pipelines_by_tags(self.conn, tag='neural networks', show_query=show_query)['Number_of_pipelines'].tolist())

        tasks = list(usage.keys())
        data = list(usage.values())
        colors = ("red", "orange", "yellow", "limegreen", "seagreen", "dodgerblue")

        def func(pct):
            return "{:.1f}%".format(pct)

        wp = {'linewidth': 0, 'edgecolor': "black"}
        label_data = [str(i) + " pipelines" for i in data]
        fig, ax = plt.subplots(figsize=(10, 8))
        wedges, texts, autotexts = ax.pie(data,
                                          autopct=lambda pct: func(pct),
                                          colors=colors,
                                          labels=label_data,
                                          textprops=dict(color="black"),
                                          wedgeprops=wp)
        ax.legend(wedges, tasks,
                  loc="center left",
                  bbox_to_anchor=(1.2, 0, 0.5, 1), fontsize=15)

        plt.setp(autotexts, size=10, weight='bold')
        plt.title("Pipeline usage by tasks", fontsize=15)
        plt.show()

    def show_top_k_models_by_task(self, task: str, k: int = 5, show_query: bool = False):
        if task == 'classification':
            plot_top_k_classifiers(self.conn, k, show_query)

        elif task == 'regression':
            plot_top_k_regressors(self.conn, k, show_query)

        else:
            raise ValueError(' invalid task, try using one of the following tasks: \n'
                                 'classification or regression')