The Taiyo Pipelines Interface uses the following blocks for running ML pipelines. the source can be found in src/taiyo_utils/pipelines
This is the document for the interfaces below:
Pipeline Harvester
This interface basically fetches data from multiple sources in a simple manner. Pass a config dictionary in this format - name: more of a meta data kind of a entry but is used as a reference of the object retried from a getData() method described in the AlphaVantage abstraction - source: selects the data source from which the data needs to be fetched from - params: basically takes all the parameters supported by the target harvester
each list entry of this dictionary is considered as a job for the Pipeline Harvester
Fetch Data Layer
- name: Ansys OHLC
source: AlphaVantage
params:
retry: 6
fetch_data function
this function performs the fetching operation on the jobs mentioned, collects the data and returns as a dictionary using name field as a key.
aggregation function
this function aggregates the data fetched using the previous function. a custom aggregation can be used with this function.
sample code:
Interface = PipelineHarvester(config["Pipelines"])
jobs_data = Interface.fetch_data()
aggregated_data = Interface.aggregation()
here jobs_data containes the data in the form of dictionary and aggregated_data containes the data which is aggregated based on Timestamp column. the data can be aggregated using a custom aggregation using the dataframe from aggregated_data object
output format:
# for fetch_data function
{
"Ansys OHLC": ..(DataFrame with OHLC data),
...
}
# for aggregation function
Timestamp Open, High, Low, Close
2019-01-01 .., .., .., ..
...
...
...
the output is a dataframe for this
There are also some technical features which are added to the feature list. This includes Moving Average, Bolinger Bands, RSI etc. Below is a sample definition of it.
Feature Generation Layer:
- function: sma
params:
n: 5
target: *tgt
- function: It is one of the technical indicators. This can take values such as sma, ema, rsi and so on.
- params: It includes all the necessary items associated with the above mentioned function.
Pipeline Transformer
This Layer transformes the data fetched for all the models. for each model to train we will write a job and identify them with specific field
- name: is used to describe the job or model name for which data to be preprocessed
- source: which transformer to be used to transform the data
- params: parameters for the transformer job
Below is a sample definition of the Transformer definition this standard can be made on a dictionary or a yml file.
Preprocessor Layer:
- name: LSTM
source: SequenceTransformer
params:
target: *dt_tgt_t_1
split_date: *split_date
val_size: 0
seq_len: 10
state_file_path: Staging/Hormel_Food_Corp/LSTM_close_daily.ts
and passed into the abstraction along with the target data
NOTE : Mention the exact same format of the state_file_path. The asset name should be the way it is defined for AlphaVantage above with underscore instead of spaces. Remove "." s from the end of Corp. or Co.
sample code:
Interface = PipelineTransformer(X, "close (ANSS)(t+1)", config["Pipelines"])
Interface.fit_transform()
response["Transform"] = Interface
here fit_transform returns all the transformed data in form of a dictionary with each entry pointing to the job name.
output format:
{
"LSTM": {
"X_train": ..
"X_test": ..
"y_train": ..
"y_test": ...
},
...
}
Pipeline model
This Layer traines the models on the data fetched for all the models. for each model to train we will write a job and identify them with specific field. the mode flag can be changed to - "train" - "inference"
note: inference mode needs the model to be present and the config file should point to it properly.
- name: is used to describe the job or model name for which data to be preprocessed. the name of the preprocessor job must match the model as this field maps both of them
- source: which model architecture to be used to for training the model
- model: which model to be used from the selected architecture.
- params: parameters for the training job
Below is a sample definition of the Trainer definition. This standard can be made on a dictionary or a yml file.
Model Training Layer:
- name: LSTM
source: neural_network
model: IndexerLSTM
params:
architecture:
neurons:
- 64
- 64
dropouts:
- 0.2
r_dropouts:
- 0
- 0.2
activations:
- tanh
- tanh
weights_path: Staging/SBA_Communications/LSTM_close_monthly.h5
optimizer:
loss: mse
optimizer: RMSprop
# name: RMSprop
# lr: 0.001
# rho: 0.9
metrics:
- mean_absolute_error
trainer:
epochs: 50
n_splits: 7
batch_size: 400
verbose: true
validate: false
and passed into the abstraction along with the target data.
sample code:
Interface = PipelineModel(response["Transform"].data, config["Pipelines"], mode="train")
response["Models"] = Interface.run()
output format:
# run function output
{
"LSTM": {
"pred":{
"y_train_pred":{..}
"y_test_pred":{..},
"y_val_pred": {..},
},
"actual":{..}
},
....
}
Post Processing
The output from the model training layer is passed to a post-processing layer for carrying out operation that are needed before presenting the final forecast. This include resampling the data and the predictions.
Below is a sample definition of the Trainer definition
PostProcessing Layer:
target: MA
detrended-models:
- LSTM
- GRU
- BDLSTM
After all these phases, the forecast is stored in the variable name forecast. Then this is passed furthur for visualization and performance evaluation.
Coming Soon:
- Ensemble Layer
- More Data Sources
- More Model Architectures
Any Proposals or change in the pipelines interface needs a proposal document and according to the maintainers. it will be decided if it needs to be pursued.
please read the contributing.md document before making the proposals.