The Taiyo Pipelines Interface uses the following blocks for running ML pipelines. the source can be found in src/taiyo_utils/pipelines

This is the document for the interfaces below:

Pipeline Harvester

This interface basically fetches data from multiple sources in a simple manner. Pass a config dictionary in this format - name: more of a meta data kind of a entry but is used as a reference of the object retried from a getData() method described in the AlphaVantage abstraction - source: selects the data source from which the data needs to be fetched from - params: basically takes all the parameters supported by the target harvester

each list entry of this dictionary is considered as a job for the Pipeline Harvester

Fetch Data Layer
- name: Ansys OHLC
  source: AlphaVantage
  params:
    retry: 6

fetch_data function

this function performs the fetching operation on the jobs mentioned, collects the data and returns as a dictionary using name field as a key.

aggregation function

this function aggregates the data fetched using the previous function. a custom aggregation can be used with this function.

sample code:

Interface = PipelineHarvester(config["Pipelines"])
jobs_data = Interface.fetch_data()
aggregated_data = Interface.aggregation()

here jobs_data containes the data in the form of dictionary and aggregated_data containes the data which is aggregated based on Timestamp column. the data can be aggregated using a custom aggregation using the dataframe from aggregated_data object

output format:

# for fetch_data function
{
    "Ansys OHLC": ..(DataFrame with OHLC data),
    ...
}
# for aggregation function
Timestamp Open, High, Low, Close
2019-01-01 .., .., .., ..
...
...
...
the output is a dataframe for this

There are also some technical features which are added to the feature list. This includes Moving Average, Bolinger Bands, RSI etc. Below is a sample definition of it.

Feature Generation Layer:
    - function: sma
    params: 
        n: 5
        target: *tgt
  • function: It is one of the technical indicators. This can take values such as sma, ema, rsi and so on.
  • params: It includes all the necessary items associated with the above mentioned function.

Pipeline Transformer

This Layer transformes the data fetched for all the models. for each model to train we will write a job and identify them with specific field

  • name: is used to describe the job or model name for which data to be preprocessed
  • source: which transformer to be used to transform the data
  • params: parameters for the transformer job

Below is a sample definition of the Transformer definition this standard can be made on a dictionary or a yml file.

Preprocessor Layer:
    - name: LSTM
    source: SequenceTransformer
    params:
        target: *dt_tgt_t_1
        split_date: *split_date
        val_size: 0
        seq_len: 10
        state_file_path: Staging/Hormel_Food_Corp/LSTM_close_daily.ts

and passed into the abstraction along with the target data

NOTE : Mention the exact same format of the state_file_path. The asset name should be the way it is defined for AlphaVantage above with underscore instead of spaces. Remove "." s from the end of Corp. or Co.

sample code:

Interface = PipelineTransformer(X, "close (ANSS)(t+1)", config["Pipelines"])
Interface.fit_transform()
response["Transform"] = Interface

here fit_transform returns all the transformed data in form of a dictionary with each entry pointing to the job name.

output format:

{
    "LSTM": {
        "X_train": ..
        "X_test": ..
        "y_train": ..
        "y_test": ...
    },
    ...
}

Pipeline model

This Layer traines the models on the data fetched for all the models. for each model to train we will write a job and identify them with specific field. the mode flag can be changed to - "train" - "inference"

note: inference mode needs the model to be present and the config file should point to it properly.

  • name: is used to describe the job or model name for which data to be preprocessed. the name of the preprocessor job must match the model as this field maps both of them
  • source: which model architecture to be used to for training the model
  • model: which model to be used from the selected architecture.
  • params: parameters for the training job

Below is a sample definition of the Trainer definition. This standard can be made on a dictionary or a yml file.

Model Training Layer:
    - name: LSTM
      source: neural_network
      model: IndexerLSTM
      params:
        architecture:
          neurons:
          - 64
          - 64
          dropouts:
          - 0.2
          r_dropouts:
          - 0
          - 0.2
          activations:
          - tanh
          - tanh
          weights_path: Staging/SBA_Communications/LSTM_close_monthly.h5
        optimizer: 
          loss: mse
          optimizer: RMSprop 
          #   name: RMSprop
          #   lr: 0.001
          #   rho: 0.9
          metrics:
            - mean_absolute_error
        trainer:
          epochs: 50
          n_splits: 7
          batch_size: 400
          verbose: true
          validate: false

and passed into the abstraction along with the target data.

sample code:

Interface = PipelineModel(response["Transform"].data, config["Pipelines"], mode="train")
response["Models"] = Interface.run()

output format:

# run function output
{
    "LSTM": {
        "pred":{
            "y_train_pred":{..}
            "y_test_pred":{..},
            "y_val_pred": {..},
        },
        "actual":{..}
    },
    ....
}

Post Processing

The output from the model training layer is passed to a post-processing layer for carrying out operation that are needed before presenting the final forecast. This include resampling the data and the predictions.

Below is a sample definition of the Trainer definition

PostProcessing Layer:
    target: MA
    detrended-models:
    - LSTM
    - GRU
    - BDLSTM

After all these phases, the forecast is stored in the variable name forecast. Then this is passed furthur for visualization and performance evaluation.

Coming Soon:

  • Ensemble Layer
  • More Data Sources
  • More Model Architectures

Any Proposals or change in the pipelines interface needs a proposal document and according to the maintainers. it will be decided if it needs to be pursued.

please read the contributing.md document before making the proposals.