Source code for mlbox.preprocessing.reader


# coding: utf-8
# Author: Axel ARONIO DE ROMBLAY <axelderomblay@gmail.com>
# License: BSD 3 clause

import numpy as np
import pandas as pd
import pickle
from sklearn.preprocessing import LabelEncoder
import os
import warnings
import time

os.system("ipcluster start --profile=home &")
import ipyparallel as ipp


def convert_list(serie):

    """Converts lists in a pandas serie into a dataframe
    where which element of a list is a column

    Parameters
    ----------
    serie : pandas Serie
        The serie you want to cast into a dataframe

    Returns
    -------
    pandas DataFrame
        The converted dataframe
    """

    import numpy
    import pandas

    if(serie.apply(lambda x: type(x)==list).sum()>0):

        serie = serie.apply(lambda x: [x] if type(x)!=list else x)
        cut = int(numpy.percentile(serie.apply(len),90))   #a tester

        serie = serie.apply(lambda x: x[:cut])

        return pandas.DataFrame(serie.tolist(),index=serie.index,columns=[serie.name+"_item"+str(i+1) for i in range(cut)])

    else:

        return serie



def convert_float_and_dates(serie, date_strategy):

    """Converts into float if possible and converts dates

    Parameters
    ----------
    serie : pandas Serie
        The serie you want to convert

    date_strategy : str, defaut = "complete"
        The strategy to encode dates :
        
        - complete : creates timestamp from 01/01/2017, month, day and day_of_week
        - to_timestamp : creates timestamp from 01/01/2017

    Returns
    -------
    pandas DataFrame
        The converted dataframe
    """

    import pandas

    ### dtype is already a date ###
    if (serie.dtype == 'datetime64[ns]'):

        df = pandas.DataFrame([], index=serie.index)
        df[serie.name + "_TIMESTAMP"] = (pandas.DatetimeIndex(serie) - pandas.datetime(2017, 1, 1)).total_seconds()

        if (date_strategy == "complete"):
            df[serie.name + "_MONTH"] = pandas.DatetimeIndex(serie).month.astype(
                float)  # be careful with nan ! object or float ??
            df[serie.name + "_DAY"] = pandas.DatetimeIndex(serie).day.astype(
                float)  # be careful with nan ! object or float ??
            df[serie.name + "_DAYOFWEEK"] = pandas.DatetimeIndex(serie).dayofweek.astype(
                float)  # be careful with nan ! object or float ??

        return df

    else:

        ### convert float ###
        try:
            serie = serie.apply(float)

        except:
            pass

        ### cleaning/converting dates ###

        if (serie.dtype != 'object'):
            return serie

        else:
            # trying to cast into date
            df = pandas.DataFrame([], index=serie.index)

            try:

                df[serie.name + "_TIMESTAMP"] = (
                pandas.DatetimeIndex(pandas.to_datetime(serie)) - pandas.datetime(2017, 1, 1)).total_seconds()

                if (date_strategy == "complete"):
                    df[serie.name + "_MONTH"] = pandas.DatetimeIndex(pandas.to_datetime(serie)).month.astype(
                        float)  # be careful with nan ! object or float ??
                    df[serie.name + "_DAY"] = pandas.DatetimeIndex(pandas.to_datetime(serie)).day.astype(
                        float)  # be careful with nan ! object or float ??
                    df[serie.name + "_DAYOFWEEK"] = pandas.DatetimeIndex(pandas.to_datetime(serie)).dayofweek.astype(
                        float)  # be careful with nan ! object or float ??

                return df

            except:

                return serie



[docs]class Reader(): """Reads and cleans data Parameters ---------- sep : str, defaut = None Delimiter to use when reading a csv file. header : int or None, defaut = 0. If header=0, the first line is considered as a header. Otherwise, there is no header. Useful for csv and xls files. to_hdf5 : bool, defaut = True If True, dumps each file to hdf5 format to_path : str, defaut = "save" Name of the folder where files and encoders are saved verbose : bool, defaut = True Verbose mode """ def __init__(self, sep = None, header = 0, to_hdf5 = False, to_path = "save", verbose = True): self.sep = sep self.header = header self.to_hdf5 = to_hdf5 self.to_path = to_path self.verbose = verbose self.__client = ipp.Client(profile='home') self.__dview = self.__client.direct_view()
[docs] def clean(self, path, date_strategy = "complete", drop_duplicate = False): """Reads and cleans data (accepted formats : csv, xls, json and h5): - del Unnamed columns - casts lists into variables - try to cast variables into float - cleans dates - drop duplicates (if drop_duplicate=True) Parameters ---------- path : str The path to the dataset. date_strategy : str, defaut = "complete" The strategy to encode dates : - complete : creates timestamp from 01/01/2017, month, day and day_of_week - to_timestamp : creates timestamp from 01/01/2017 drop_duplicate : bool, defaut = False If True, drop duplicates when reading each file. Returns ------- pandas dataframe Cleaned dataset. """ ########################################################################### ################################# reading ################################# ########################################################################### start_time = time.time() if (path == None): raise ValueError("You must specify the path to load the data") else: type_doc = path.split(".")[-1] if (type_doc == 'csv'): if (self.sep == None): raise ValueError("You must specify the separator for a csv file") else: if (self.verbose): print("") print("reading csv : " + path.split("/")[-1] + " ...") df = pd.read_csv(path, sep=self.sep, header=self.header, engine='c', error_bad_lines=False) elif (type_doc == 'xls'): if (self.verbose): print("") print("reading xls : " + path.split("/")[-1] + " ...") df = pd.read_excel(path, header=self.header) elif (type_doc == 'h5'): if (self.verbose): print("") print("reading hdf5 : " + path.split("/")[-1] + " ...") df = pd.read_hdf(path) elif (type_doc == 'json'): if (self.verbose): print("") print("reading json : " + path.split("/")[-1] + " ...") df = pd.read_json(path) else: raise ValueError("The document extension cannot be handled") ### deleting unknow column ### try: del df["Unnamed: 0"] except: pass ############################################################ ################## cleaning lists, floats and dates ####### ########################################################### if (self.verbose): print("cleaning data...") df = pd.concat(self.__dview.map_sync(convert_list, [df[col] for col in df.columns]), axis=1) df = pd.concat(self.__dview.map_sync(convert_float_and_dates, [df[col] for col in df.columns], [date_strategy for col in df.columns]), axis=1) ### dropping duplicates ### if (drop_duplicate): if (self.verbose): print("dropping duplicates") df = df.drop_duplicates() else: pass if (self.verbose): print("CPU time: %s seconds" % (time.time() - start_time)) return df
[docs] def train_test_split(self, Lpath, target_name): """Creates train and test datasets Given a list of several paths and a target name, automatically creates and cleans train and test datasets. IMPORTANT: a dataset is considered as a test set if it does not contain the target value. Otherwise it is considered as part of a train set. Also determines the task and encodes the target (classification problem only). Finally dumps the datasets to hdf5, and eventually the target encoder. Parameters ---------- Lpath : list, defaut = None List of str paths to load the data target_name : str, defaut = None The name of the target. Works for both classification (multiclass or not) and regression. Returns ------- dict Dictionnary containing : - 'train' : pandas dataframe for train dataset - 'test' : pandas dataframe for test dataset - 'target' : pandas serie for the target """ col = [] col_train = [] col_test = [] df_train = dict() df_test = dict() y_train = dict() if (type(Lpath) != list): raise ValueError("You must specify a list of paths to load all the data") elif (self.to_path is None): raise ValueError("You must specify a path to save your data and make sure your files are not already saved") else: ###################################################### ################## reading the files ################## ###################################################### for path in Lpath: ### reading each file ### df = self.clean(path, date_strategy="complete", drop_duplicate=False) ### checking if the target exists to split into test and train if (target_name in df.columns): df_train[path] = df[df[target_name].isnull() == False].drop(target_name, axis=1) df_test[path] = df[df[target_name].isnull() == True].drop(target_name, axis=1) y_train[path] = df[target_name][df[target_name].isnull() == False] else: df_test[path] = df del df # exceptions if (sum([df_train[path].shape[0] for path in df_train.keys()]) == 0): raise ValueError("You have no train dataset. Please check that the target name is correct") if ((sum([df_test[path].shape[0] for path in df_test.keys()]) == 0) & (self.verbose)): print("") print("You have no test dataset !") ### finding the common subset of features for i, df in enumerate(df_train.values()): if (i == 0): col_train = df.columns else: col_train = list(set(col_train) & set(df.columns)) for i, df in enumerate(df_test.values()): if (i == 0): col_test = df.columns else: col_test = list(set(col_test) & set(df.columns)) col = sorted(list(set(col_train) & set(col_test))) # subset of common features if (self.verbose): print("") print("number of common features : " + str(len(col))) ###################################################### ### creating train, test and target dataframes ### ##################################################### print("") print("gathering and crunching for train and test datasets") df_train = pd.concat([df[col] for df in df_train.values()]) # optimiser !! df_test = pd.concat([df[col] for df in df_test.values()]) # optimiser !! y_train = pd.concat([y for y in y_train.values()]) # optimiser !! ### checking shape of the target ### if (type(y_train) == pd.core.frame.DataFrame): raise ValueError("Your target contains more than two columns ! Please check that only one column is named " + target_name) else: pass ### handling indexes ### if (self.verbose): print("reindexing for train and test datasets") if (df_train.index.nunique() < df_train.shape[0]): df_train.index = range(df_train.shape[0]) if (df_test.index.nunique() < df_test.shape[0]): df_test.index = range(df_test.shape[0]) if (y_train.index.nunique() < y_train.shape[0]): y_train.index = range(y_train.shape[0]) ### dropping duplicates ### if (self.verbose): print("dropping training duplicates") df_train[target_name] = y_train.values # temp adding target to check (x,y) duplicates... df_train = df_train.drop_duplicates() del df_train[target_name] y_train = y_train.loc[df_train.index] # need to reindex ? ### deleting constant variables ### if (self.verbose): print("dropping constant variables on training set") for var in col: if (df_train[var].nunique(dropna=False) == 1): del df_train[var] del df_test[var] ### missing values ### sparse_features = (df_train.isnull().sum()*100./df_train.shape[0]).sort_values(ascending=False) sparse = True if(sparse_features.max()==0.0): sparse = False ### print information ### if (self.verbose): print("") print("number of categorical features : " + str(len(df_train.dtypes[df_train.dtypes == 'object'].index))) print("number of numerical features : " + str(len(df_train.dtypes[df_train.dtypes != 'object'].index))) # bool is considerated as a numerical variable print("number of training samples : " + str(df_train.shape[0])) print("number of test samples : " + str(df_test.shape[0])) if(sparse): print("") print("Top sparse features (% missing values on train set):") print(np.round(sparse_features[sparse_features>0.0][:5],1)) else: print("") print("you have no missing values on train set...") ###################################################### ################## encoding target ################## ###################################################### task = "regression" if (y_train.nunique() <= 2): task = "classification" else: if (y_train.dtype == object): task = "classification" else: # no needs to convert into float pass if (self.verbose): print("") print("task : " + task) if (task == "classification"): if (self.verbose): print(y_train.value_counts()) print("encoding target") enc = LabelEncoder() y_train = pd.Series(enc.fit_transform(y_train.values), index=y_train.index, name=target_name, dtype='int') else: pass ###################################################### ################## dumping ############################ ###################################################### ### creating a folder to save the files and target encoder ### try: os.mkdir(self.to_path) except OSError: pass if (self.to_hdf5): start_time = time.time() if (self.verbose): print("") print("dumping files into directory : " + self.to_path) df_train[target_name] = y_train.values # temp adding target to dump train file... df_train.to_hdf(self.to_path + '/df_train.h5', 'train') del df_train[target_name] if (self.verbose): print("train dumped") df_test.to_hdf(self.to_path + '/df_test.h5', 'test') if (self.verbose): print("test dumped") print("CPU time: %s seconds" % (time.time() - start_time)) else: pass if (task == "classification"): fhand = open(self.to_path + '/target_encoder.obj', 'wb') pickle.dump(enc, fhand) fhand.close() else: pass return {"train": df_train, "test": df_test, 'target': y_train}