Pipelines Config Documentation

This is the documentation for running Taiyo-utils’ Pipelines package. It can be found in src/taiyo_utils/pipelines An Interface is a set of publicly accessible functions on an object which can be used by other parts of the program to interact with the object. The pipelines package consists of the following interfaces:

  1. __init__.py
  2. toHarvester.py
  3. toPreprocessor.py
  4. toModelHandler.py
  5. toHPTservice.py
  6. toPublisher.py
  7. toConfidenceBounds.py

__init__.py

This is the first file to be loaded in a pipelines package so that it can be used to execute a code that we want to execute each time the module is loaded.

from .toHarvester import PipelineHarvester
from .toPreprocessor import PipelineTransformer
from .toModelHandler import PipelineModel
from .toHPTservice import HPTModelInterface

__all__ = [
    "PipelineHarvester",
    "PipelineTransformer",
    "HPTModelInterface"
    "PipelineModel"
    ]

PipelineHarvester

class taiyo_utils.pipelines.PipelineHarvester(config)

This interface basically fetches data from multiple sources in a simple manner.

Parameters:

  • config: (dict) The config dictionary to be passed params: name: (str) More of a metadata kind of entry but is used as a reference of the object retrieved from a getData() method described in the AlphaVantage abstraction. source: (str) Selects the data source from which the data needs to be fetched from params: Basically takes all the parameters supported by the target harvester. Parameters are in the format of key-value pairs(key:parameter_name & value:parameter_value) Each list entry of this dictionary is considered as a job for the Pipeline Harvester
aggregation(self, function=None, freq=None, method='linear', *args, **kwargs)
fetch_data(self)

Aggregation Code Book

Alias Description
B business day frequency
C custom business day frequency
D calendar day frequency
W weekly frequency
M month end frequency
SM semi-month end frequency (15th and end of month)
BM business month end frequency
CBM custom business month end frequency
MS month start frequency
SMS semi-month start frequency (1st and 15th)
BMS business month start frequency
CBMS custom business month start frequency
Q quarter end frequency
BQ business quarter end frequency
QS quarter start frequency
BQS business quarter start frequency
A, Y year end frequency
BA, BY business year end frequency
AS, YS year start frequency
BAS, BYS business year start frequency
BH business hour frequency
H hourly frequency
T, min minutely frequency
S secondly frequency
L, ms milliseconds
U, us microseconds
N nanoseconds

Function: harvester_selector() This function will select the type of harvester service. Args: Source(str): The data source from which the data needs to be fetched. params(dict): the parameters supported by the target harvester Returns: An instance of the harvester class

Function: fetch_data() This function performs the fetching operation on the jobs mentioned, collects the data, and returns as a dictionary using the name field as a key. Function: aggregation() This function returns the aggregated data that is fetched using the fetch_data function. a custom aggregation can be used with this function.

Sample code: Interface = PipelineHarvester(config["Pipelines"]) jobs_data = Interface.fetch_data() aggregated_data = Interface.aggregation() Here jobs_data contains the data in the form of a dictionary and aggregated_data contains the data which is aggregated based on the Timestamp column. the data can be aggregated using a custom aggregation using the dataframe from aggregated_data object EXAMPLE OUTPUT :

The output is a dataframe for this

toPreprocessor.py

Class: PipelineTransformer() This Layer transforms the data fetched for all the models. For each model to train we will write a job and identify them with specific field

init(data, config) Parameters: Data(dataframe): data fetched from the fetch data layer Config (dict): config dictionary passed Pass a config dictionary in this format name: (str)Model name for which data to be preprocessed Source: (str) Which transformer to be used to transform the data params: parameters for the transformer.Parameters are in the format of key-value pairs (key:parameter_name & value:parameter_value) Below is a sample definition of the Transformer definition this standard can be made on a dictionary or a .yml file. Preprocessor Layer: - name: Naive Forecaster source: IdentityTransformer params: split_date: '2019-07-01' Function: _preprocessor_selector() This function is responsible for selecting the type of transformer service. Args: Data(dataframe): data for the transformer job Package(str): The transformer used params(dict): the parameters supported by the transformer Returns: Instance of the preprocessor class.

Function: fit_transform() This function returns all the transformed data in form of a dictionary with each entry pointing to the job name. Args: mode(str,default=’train’): Whether we want to train the model or not Sample code: Interface = PipelineTransformer(X, "close (ANSS)(t+1)", config["Pipelines"]) Interface.fit_transform() response["Transform"] = Interface output format:

toModelHandler.py

Class: PipelineModel() This Layer trains the models on the data fetched for all the models. for each model to train we will write a job and identify them with specific field. the mode flag can be changed to Train Inference note: Inference mode needs the model to be present and the config file should point to it properly. init(preprocessor_interface,config,mode)

Parameters: preprocessor_interface(object): Interface after preprocessing config(dict): config dictionary passed mode(str): If the mode is training then only the model is trained
Pass the config dictionary in this format. name: (str) Name of the model used. source: (str) Name of the package(folder name) which contains the model. model: (str) Name of the model(Unique class name) under which the model is defined. params: Parameters are written under this. architecture: Architecture of the model Below is a sample definition of the Trainer definition this standard can be made on a dictionary or a .yml file. Model Training Layer: - name: IndexerMLP source: neural_network model: IndexerMLP params: split_date: '2019-07-01'

Function: _model_interface_selector() This will select the interface of the model which we want to train. Args: data(dataframe): data on which the model is trained name(str): Name of the model mode(str): If the mode is training then only the model is trained package(str): Name of the package(folder name) which contains the model. model(str): Name of the model(Unique class name) under which the model is defined params: parameters for the training job Returns: An instance of the model interface

Function: _aggregator() This function is used to aggregate the predictions after training all the models. It returns the response dictionary with aggregated predictions. Function: run() This function is for running the pipeline model interface.

INTERFACE FOR MODELS: Class: Name of the model’s interface class Args: All the parameters required for training the model.

Function:select_base_model() This function will return the unique class for the training model.

Function: _train_model() This function is responsible for training the model Function:_inference_model() This function will return the actual values and the predicted values after training Function: run_interface() This function is responsible for running this model interface.

EXAMPLE: This is the sample code for the linear models’ interface. All the other model interfaces are written in a similar manner.

class ARModelInterface(): def init(self, model, data, name, mode, params): self.name = name self.data = data self.params = params self.model = model

def _select_base_model(self):
    self.basemodel = self.model(**self.params)

def _train_model(self):
    self._select_base_model()
    if(getattr(self.basemodel, 'fit_model') is not None):
        self.basemodel.fit_model(
            self.data['y_train'],
            self.data['y_test'],
            **self.params['trainer']
            )
    return self._inference_model()

def _inference_model(self):
    predictions = {}
    actual = {}
    predictions['y_pred'] = [x[0] for x in self.basemodel.history]
    predictions['y_test_pred'] = [x[0] for x in self.basemodel.predictions]
    predictions['y_train_pred'] = [x[0] for x in self.basemodel.history_d]
    predictions['index_dataset'] = self.data['index_dataset']
    predictions['index_train'] = self.data['index_train']
    predictions['index_test'] = self.data['index_test']

    actual['y'] = self.data['y']
    actual['y_test'] = self.data['y_test']
    actual['y_train'] = self.data['y_train']
    actual['index_dataset'] = self.data['index_dataset']
    actual['index_train'] = self.data['index_train']
    actual['index_test'] = self.data['index_test']

    return actual, predictions

def run_interface(self):
    return self._train_model()

Sample code: Interface = PipelineModel(response["Transform"].data, config["Pipelines"], mode="train") response["Models"] = Interface.run()

toHPTservice.py

Class: HPTModelInterface() This is the interface for Hyperparameter Tuning using Bayesian optimization(TPE algorithm) with the help of hyperopt library.

init(self,preprocessor_interface,config,mode)

Parameters: preprocessor_interface(object): Interface after preprocessing config(dict): config dictionary passed mode(str): If the mode is tune then only the model is tuned

The config file is passed in this way: source: (str) Name of the package (folder name) which contains the model. model: (str) Name of the model (Unique class name) under which the model is defined. Optimize (bool): If optimize is true then the model will be tuned loss(str): loss function to be minimized params: Parameters are written under this. name: (str) Name of the model used.

Below is a sample definition of the Trainer definition this standard can be made on a dictionary or a yml file. Model Training Layer: - name: XGBoost source: ensembles optimize: True loss: MAE model: IndexerXGBoost params:

Function: hpt_and_model_interface_selector() This will select the interface of the model which we want to tune. Args: data(dataframe): data on which the model is trained and then tuned name(str): Name of the model Optimize (bool): If optimize is true then the model will be tuned loss(str): loss function to be minimized package(str): Name of the package (folder name) which contains the model. model(str): Name of the model (Unique class name) under which the model is defined params: parameters for the tuning job

Returns: An instance of the interface selected (HPTModelInterface for tuning) Function: Aggregate_best_parameters() This will aggregate the best parameters for every model. The best_params dictionary is returned with the model name as the key. Function: run() This function is used for running this interface. Class: HPTInterface() Interface for hyperparameter tuning.

init(self,model,name,params,optimize,data,dfs_actual,dfs_pred,config,loss,package) Parameters: params (dict): parameters for the neural network model validate (bool): performs validation on the dataset optimize (bool): if true then the model is tuned Otherwise no value is returned config(dict): config dictionary passed name (str): Name of the model to be tuned model (str): class name of the model data (Dataframe): Data to be sent for tuning weights_path (path): model is saved in this location max_evals (int): Maximum Number of evaluations to run dfs_actual (dict): Contains all the actual values of the dataset dfs_pred (dict): Contains all the predicted values of the dataset package(str): a source of the model loss (str): Loss function which we are going to minimize Function: model_interface_selector() This function will select the model which we want to train. Args: space(dict): The parameters which are used to train the model Returns: The instance of the interface selected.

Function: GenerateModel() Objective Function - This will train the model, predict the values and will calculate the error values Args: space (dict): Consists of all the parameters to be passed in the model Returns: The error value which we need to minimize Function: defineSpace() Function to define the search space which will choose the parameters with the help of TPE algorithm Returns: space (dict): Search space Function: minimizeValue() Function that will minimize the loss function values Returns: The best parameters of the model Function: run_interface() Function to run the interface Returns: best_params (dict): The best parameters if present HPT INTERFACE FOR MODELS: Class: Name of the model’s interface class Args: All the parameters required for training the model. Function: select_base_model() This function will return the unique class for the training model. Function: _train_model() This function is responsible for training the model Function: _inference_model() This function will return the actual values and the predicted values after training EXAMPLE: This is the sample code for the Neural networks’ interface. All the other model interfaces are written in a similar manner. class NNInterface(): def init(self,model,data,params,space,validate,weights_path): self.model=model self.data=data self.params=params self.space=space self.validate=validate self.weights_path =weights_path

def _select_base_model(self):
    """
    Function to select the base model

    Args:
        space (dict): Consists of all the parameters to be passed in the model

    """
    model_class = str(self.model).split(".to")[-1].split(".")[0]
    print(model_class)
    if(model_class == "IndexerRNN"):
        self.params["architecture"]["input_shape"] = (
            self.data["X_train"].shape[1],
            self.data["X_train"].shape[2])
    if(self.validate):
        self.params["trainer"]["validation_data"] = (
                self.data["X_val"], self.data["y_val"]
            )
    self.basemodel = self.model(**self.space,input_shape=self.params["architecture"]["input_shape"])

def _train_model(self):
    """
    Function to train the model

    Args:
        params (dict): Consists of all the parameters to be passed in the model

    Returns:
        The output of inference model i.e the actual values and the predicted
        values

    """
    self._select_base_model()
    if(getattr(self.basemodel, "build_model") is not None):
        self.basemodel.build_model()
    if(getattr(self.basemodel, "compile_model") is not None):
        if "optimizer" in self.params:
            self.basemodel.compile_model(**self.params["optimizer"])
    if(getattr(self.basemodel, "fit_model") is not None and
       self.params["trainer"] is not None):
        self.basemodel.fit_model(
            self.data["X_train"], self.data["y_train"],
            **self.params["trainer"]
            )
        self.basemodel.save_model(weights_path=self.weights_path)
    return self._inference_model()

def _inference_model(self): 
    predictions = {}
    actual = {}
    self._select_base_model()
    # self.basemodel.build_model()
    self.basemodel.load_model(weights_path=self.weights_path)
    print(self.data.keys())
    for key, values in self.data.items():
        if(key[0] == "X" and values is not None):
            pred_key = "y" + key[1:] + "_pred"
            predictions[pred_key] = self.basemodel.predict(values)
        else:
            actual[key] = values

    predictions["index_dataset"] = self.data["index_dataset"]
    predictions["index_train"] = self.data["index_train"]
    predictions["index_test"] = self.data["index_test"]

    actual["index_dataset"] = self.data["index_dataset"]
    actual["index_train"] = self.data["index_train"]
    actual["index_test"] = self.data["index_test"]

    if(self.validate):
        predictions["index_val"] = self.data["index_val"]
        actual["index_val"] = self.data["index_val"]

    return actual, predictions

SAMPLE OUTPUT :

Any Proposals or change in the pipelines interface needs a proposal document and according to the maintainers. It will be decided if it needs to be pursued. Please read the contributing.md document before making the proposal.

LEFT: -toConfidenceBounds.py