From b6efaaa4c1df8b96d1509d031ccde86cb25f2911 Mon Sep 17 00:00:00 2001 From: surfiniaburger Date: Sun, 17 Mar 2024 20:06:10 +0100 Subject: [PATCH 1/2] pvolume --- awesome-giza-actions/pool/README.md | 321 ++++++++++++++++++ .../pool/predict_cairo_action.py | 185 ++++++++++ .../pool/predict_onnx_action.py | 185 ++++++++++ awesome-giza-actions/pool/pyproject.toml | 27 ++ awesome-giza-actions/pool/train_action.py | 244 +++++++++++++ 5 files changed, 962 insertions(+) create mode 100644 awesome-giza-actions/pool/README.md create mode 100644 awesome-giza-actions/pool/predict_cairo_action.py create mode 100644 awesome-giza-actions/pool/predict_onnx_action.py create mode 100644 awesome-giza-actions/pool/pyproject.toml create mode 100644 awesome-giza-actions/pool/train_action.py diff --git a/awesome-giza-actions/pool/README.md b/awesome-giza-actions/pool/README.md new file mode 100644 index 0000000..17658e2 --- /dev/null +++ b/awesome-giza-actions/pool/README.md @@ -0,0 +1,321 @@ +## Time Series Forecasting with WaveNet for Pool Volume and Cryptocurrency Trading +### Problem Setting + +The project aims to develop a WaveNet model for time series forecasting, specifically for predicting pool volumes using time-series data and cryptocurrency trading volume in the financial market. The WaveNet architecture is chosen for its ability to capture long-range dependencies in sequential data effectively. The ZKML (Zero-Knowledge Machine Learning) framework is utilized to ensure the verifiability of the model predictions, which is crucial for applications in financial markets. Interested readers can refer to the WaveNet paper by van den Oord et al. (2016) for a detailed understanding of the architecture. + +### Project Installation + +To reproduce the project, follow these steps: + + Clone the repository: + + +``` +git clone https://github.com/gizatechxyz/Giza-Hub.git + + Navigate to the project directory: +``` +``` +cd awesome-giza-actions/pool +``` + Install dependencies: + +``` +poetry shell +poetry install + +``` + +## Overview of Model Development + +### The model development process involves several key steps: + +#### Defining Model Hyperparameters: +This task defines the hyperparameters such as input shape, number of filters, number of residual blocks, kernel size, number of classes, batch size, and window size required for the WaveNet model. + +``` +@task(name="Define Model Hyperparameters") +def define_model_hyperparameters(): + selected_columns = ['trading_volume_usd', 'blockchain_arbitrum', 'blockchain_avalanche_c', 'blockchain_base', 'blockchain_ethereum', 'blockchain_gnosis', 'blockchain_optimism', 'blockchain_polygon', 'token_pair_wstETH-wUSDM', 'token_pair_xSNXa-YFI', 'token_pair_yCURVE-YFI', 'day_of_week', 'month', 'year'] + window_size = 30 + input_shape = (window_size, len(selected_columns)) + num_filters = 32 + num_blocks = 5 + kernel_size = 8 + num_classes = 1 # Assuming regression task, change to number of classes for classification task + batch_size = 32 + + return input_shape, num_filters, num_blocks, kernel_size, num_classes, batch_size, window_size +``` + +#### Loading Data: +The dataset, containing daily trade volume from a cryptocurrency exchange, is loaded using the DatasetsLoader module from Giza dataset. Additionally, pool volume data is loaded from Polar into a DataFrame. + +``` +@task(name="Load Data") +def load_data(): + loader = DatasetsLoader() + # Load data from Polar into a DataFrame + df_polar = loader.load('balancer-daily-trade-volume') + return df_polar +``` + +#### Preprocessing Data: +Data preprocessing involves extracting relevant features, performing one-hot encoding for categorical variables, standardizing numerical features, and extracting temporal features such as day of the week, month, and year for both datasets. + +``` +@task(name="Preprocess Data") +def preprocess_data(df_polar): + # Extracting data from the Polar DataFrame + data = { + 'day': df_polar['day'], + 'pool_id': df_polar['pool_id'], + 'blockchain': df_polar['blockchain'], + 'token_pair': df_polar['token_pair'], + 'trading_volume_usd': df_polar['trading_volume_usd'] + } + + # Creating a new Pandas DataFrame + df_pandas = pd.DataFrame(data) + + # Perform one-hot encoding for categorical variables + df_encoded = pd.get_dummies(df_pandas, columns=['blockchain', 'token_pair']) + + # Initialize StandardScaler + standard_scaler = StandardScaler() + + # Perform Standardization on numerical features + df_encoded[['trading_volume_usd']] = standard_scaler.fit_transform(df_encoded[['trading_volume_usd']]) + + # Convert 'day' column to datetime format + df_encoded['day'] = pd.to_datetime(df_encoded['day']) + + # Extract relevant features: day of the week, month, and year + df_encoded['day_of_week'] = df_encoded['day'].dt.dayofweek + df_encoded['month'] = df_encoded['day'].dt.month + df_encoded['year'] = df_encoded['day'].dt.year + + return df_encoded +``` + +#### Creating Sequences: +Time series data is transformed into input-output sequences suitable for training the WaveNet model. + +``` +@task(name="Create Sequences") +def create_sequences(df_encoded, window_size): + # Calculate the total number of data points + total_data_points = df_encoded.shape[0] + + # Calculate the total number of sequences + total_sequences = total_data_points - window_size + 1 + + # Select only necessary columns from the DataFrame + selected_columns = ['trading_volume_usd', 'blockchain_arbitrum', 'blockchain_avalanche_c', 'blockchain_base', 'blockchain_ethereum', 'blockchain_gnosis', 'blockchain_optimism', 'blockchain_polygon', 'token_pair_wstETH-wUSDM', 'token_pair_xSNXa-YFI', 'token_pair_yCURVE-YFI', 'day_of_week', 'month', 'year'] + df_selected = df_encoded[selected_columns] + + # Slide a window of this length across your time-series data + sequences_input = [] + sequences_target = [] + + for i in range(total_sequences): + # Extract the historical data points as the input sequence + input_sequence = df_selected.iloc[i : i + window_size].values + sequences_input.append(input_sequence) + # Extract the next data point as the target for prediction + target = df_selected.iloc[i + window_size - 1, 2] + sequences_target.append(target) + + # Convert lists to numpy arrays + sequences_input = np.array(sequences_input) + sequences_target = np.array(sequences_target) + + # Reshape the target sequences to match the shape of the input sequences + sequences_target = sequences_target.reshape(-1, 1) + + sequences_input = sequences_input.astype(np.float32) + sequences_target = sequences_target.astype(np.float32) + + return sequences_input, sequences_target +``` + +#### Preparing Datasets: +The datasets are split into training, validation, and testing sets for model training and evaluation. + +``` +@task(name="Prepare Datasets") +def prepare_datasets(df_encoded): + print("Prepare dataset...") + + # Splitting into training and testing sets (80% train, 20% test) + X_train_val, X_test, y_train_val, y_test = train_test_split(df_encoded.drop(columns=['trading_volume_usd']), df_encoded['trading_volume_usd'], test_size=0.2, random_state=42) + + # Splitting the training set into training and validation sets (80% train, 20% validation) + X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=0.2, random_state=42) + + print("✅ Datasets prepared successfully") + + return X_train, y_train, X_test, y_test +``` + +#### Building the Model: +The WaveNet model architecture is constructed using TensorFlow's Keras API, comprising convolutional layers with dilated causal convolutions and skip connections. + +``` +@task(name="Train model") +def build_wavenet(input_shape, num_filters, num_blocks, kernel_size, num_classes): + def residual_block(x, filters, kernel_size, dilation_rate): + # Dilated causal convolution + conv = Conv1D(filters=filters, kernel_size=kernel_size, dilation_rate=dilation_rate, padding='causal')(x) + conv = Activation('relu')(conv) + + # 1x1 convolution to adjust the number of filters + conv = Conv1D(filters=filters, kernel_size=1)(conv) + + # Skip connection + x = Add()([x, conv]) + return x + + inputs = Input(shape=input_shape) + + # Initial convolution block + x = Conv1D(filters=num_filters, kernel_size=1, padding='causal')(inputs) + + # Dilated causal convolutions + for i in range(num_blocks): + dilation_rate = 2 ** i + x = residual_block(x, num_filters, kernel_size, dilation_rate) + + # Output layer + outputs = Conv1D(filters=num_classes, kernel_size=1, activation='softmax')(x) + + model = tf.keras.Model(inputs, outputs) + + + print("✅ Model trained successfully") + return model +``` + +#### Test the model +This task defines a function to evaluate the performance of the trained model on the test dataset. It computes the loss and mean absolute error (MAE) metrics and prints the results. + +``` +@task(name="Test Model") +def evaluate_model(model, X_test, y_test): + """ + Evaluates the trained model on the test dataset. + + Args: + model: The trained model to be evaluated. + X_test: Input features of the test dataset. + y_test: Target labels of the test dataset. + """ + # Evaluate the model + test_loss, test_mae = model.evaluate(X_test, y_test) + print("Test Loss:", test_loss) + print("Test MAE:", test_mae) +``` + +#### Converting to ONNX: +The trained TensorFlow model is converted to the ONNX format for verifiability using the tf2onnx library. + +``` +@task(name="Convert To ONNX") +def convert_to_onnx(model, onnx_file_path): + """ + Convert a TensorFlow model to ONNX format and save it. + + Args: + model (tf.keras.Model): The TensorFlow model to convert. + onnx_file_path (str): The path to save the ONNX model file. + """ + # Convert TensorFlow model to ONNX + onnx_model, _ = tf2onnx.convert.from_keras(model) + + # Save the ONNX model + with open(onnx_file_path, "wb") as f: + f.write(onnx_model.SerializeToString()) + + print(f"Model has been converted to ONNX and saved as {onnx_file_path}") +``` + +#### Prediction with ONNX +This task defines a function to make predictions using the ONNX format of the trained WaveNet model. The input data is fed to the model, and the predictions are returned after reshaping the output. + +``` +@task(name="Prediction with ONNX") +def prediction(X_val): + model = GizaModel(model_path="./wavenet.onnx") + + # Ensure the input data matches the expected shape (1, 30, 14) + if X_val.shape != (1, 30, 14): + print("Invalid input shape. Expected shape: (1, 30, 14)") + return None + + result = model.predict(input_feed={"input_1": X_val}, verifiable=False) + + return result.reshape(-1, 1) # Reshape the output to (30, 1) +``` + +#### Predictions with Cairo +This task defines a function to make predictions using the CAIRO format of the trained WaveNet model. The input data is fed to the model, and the predictions along with the request ID are returned. + +``` +@task(name="Prediction with CAIRO") +def prediction(X_val, model_id, version_id): + model = GizaModel(id=model_id, version=version_id) + + # Ensure the input data matches the expected shape (1, 30, 14) + if X_val.shape != (1, 30, 14): + print("Invalid input shape. Expected shape: (1, 30, 14)") + return None + + (result, request_id) = model.predict(input_feed={"input_1": X_val}, verifiable=True, output_dtype='Tensor' + ) + + return result, request_id +``` + +#### Download and verify the proof job +This command downloads the proof job associated with a specific model deployment from the Giza platform +``` +giza deployments download-proof --model-id --version-id --deployment-id --proof-id --output-path +``` + +While this command verifies the downloaded proof file to ensure the integrity and authenticity of the model predictions. +``` +giza verify --proof PATH_OF_THE_PROOF +``` + +### Model Performance + +The model's performance is crucial in assessing its effectiveness in predicting cryptocurrency trading volumes. Two primary metrics used for evaluation are Mean Squared Error (MSE) and Mean Absolute Error (MAE). + +Here are the performance metrics obtained from testing the model: + + Test Loss: 0.9989119172096252 + Test MAE: 0.9989119172096252 + +These values provide insights into how well the model generalizes to unseen data. A lower test loss and MAE indicate better performance, as they signify that the model's predictions are closer to the actual values. + +Possible improvements to enhance model performance may include adjusting hyperparameters such as learning rate, batch size, or introducing additional layers to capture more complex patterns in the data. Additionally, exploring alternative architectures or incorporating external data sources could also lead to better predictions. Regular monitoring and iterative refinement of the model are essential for achieving optimal performance in forecasting cryptocurrency trading volumes. + +### Giza Integration + +In addition to using Giza Dataset, Giza CLI & Actions are used to make the WaveNet model verifiable. This involves defining tasks and actions within the Giza framework to ensure the reproducibility and verifiability of the model predictions. + + + +## Tech Stack + + Giza Actions SDK + Giza cli + Giza Virtual Environment + Giza Dataset + WaveNet + Jupyter Notebook + Tensorflow + Poetry + Cairo + EZKL + ONNX diff --git a/awesome-giza-actions/pool/predict_cairo_action.py b/awesome-giza-actions/pool/predict_cairo_action.py new file mode 100644 index 0000000..c89b216 --- /dev/null +++ b/awesome-giza-actions/pool/predict_cairo_action.py @@ -0,0 +1,185 @@ +from giza_actions.action import Action, action +from giza_actions.task import task +from giza_datasets import DatasetsLoader +import numpy as np +import pandas as pd +from sklearn.model_selection import train_test_split +from sklearn.preprocessing import StandardScaler +from giza_actions.model import GizaModel + + +@task(name="Define Model Hyperparameters") +def define_model_hyperparameters(): + selected_columns = ['trading_volume_usd', 'blockchain_arbitrum', 'blockchain_avalanche_c', 'blockchain_base', 'blockchain_ethereum', 'blockchain_gnosis', 'blockchain_optimism', 'blockchain_polygon', 'token_pair_wstETH-wUSDM', 'token_pair_xSNXa-YFI', 'token_pair_yCURVE-YFI', 'day_of_week', 'month', 'year'] + window_size = 30 + input_shape = (window_size, len(selected_columns)) + num_filters = 32 + num_blocks = 5 + kernel_size = 8 + num_classes = 1 # Assuming regression task, change to number of classes for classification task + batch_size = 32 + + return input_shape, num_filters, num_blocks, kernel_size, num_classes, batch_size, window_size + +@task(name="Load Data") +def load_data(): + loader = DatasetsLoader() + # Load data from Polar into a DataFrame + df_polar = loader.load('balancer-daily-trade-volume') + return df_polar + +@task(name="Preprocess Data") +def preprocess_data(df_polar): + # Extracting data from the Polar DataFrame + data = { + 'day': df_polar['day'], + 'pool_id': df_polar['pool_id'], + 'blockchain': df_polar['blockchain'], + 'token_pair': df_polar['token_pair'], + 'trading_volume_usd': df_polar['trading_volume_usd'] + } + + # Creating a new Pandas DataFrame + df_pandas = pd.DataFrame(data) + + # Perform one-hot encoding for categorical variables + df_encoded = pd.get_dummies(df_pandas, columns=['blockchain', 'token_pair']) + + # Initialize StandardScaler + standard_scaler = StandardScaler() + + # Perform Standardization on numerical features + df_encoded[['trading_volume_usd']] = standard_scaler.fit_transform(df_encoded[['trading_volume_usd']]) + + # Convert 'day' column to datetime format + df_encoded['day'] = pd.to_datetime(df_encoded['day']) + + # Extract relevant features: day of the week, month, and year + df_encoded['day_of_week'] = df_encoded['day'].dt.dayofweek + df_encoded['month'] = df_encoded['day'].dt.month + df_encoded['year'] = df_encoded['day'].dt.year + + return df_encoded + +@task(name="Create Sequences") +def create_sequences(df_encoded, window_size): + # Calculate the total number of data points + total_data_points = df_encoded.shape[0] + + # Calculate the total number of sequences + total_sequences = total_data_points - window_size + 1 + + # Select only necessary columns from the DataFrame + selected_columns = ['trading_volume_usd', 'blockchain_arbitrum', 'blockchain_avalanche_c', 'blockchain_base', 'blockchain_ethereum', 'blockchain_gnosis', 'blockchain_optimism', 'blockchain_polygon', 'token_pair_wstETH-wUSDM', 'token_pair_xSNXa-YFI', 'token_pair_yCURVE-YFI', 'day_of_week', 'month', 'year'] + df_selected = df_encoded[selected_columns] + + # Slide a window of this length across your time-series data + sequences_input = [] + sequences_target = [] + + for i in range(total_sequences): + # Extract the historical data points as the input sequence + input_sequence = df_selected.iloc[i : i + window_size].values + sequences_input.append(input_sequence) + # Extract the next data point as the target for prediction + target = df_selected.iloc[i + window_size - 1, 2] + sequences_target.append(target) + + # Convert lists to numpy arrays + sequences_input = np.array(sequences_input) + sequences_target = np.array(sequences_target) + + # Reshape the target sequences to match the shape of the input sequences + sequences_target = sequences_target.reshape(-1, 1) + + sequences_input = sequences_input.astype(np.float32) + sequences_target = sequences_target.astype(np.float32) + + return sequences_input, sequences_target + + + +@task(name="Prepare Datasets") +def prepare_datasets(df_encoded): + print("Prepare dataset...") + + # Splitting into training and testing sets (80% train, 20% test) + X_train_val, X_test, y_train_val, y_test = train_test_split(df_encoded.drop(columns=['trading_volume_usd']), df_encoded['trading_volume_usd'], test_size=0.2, random_state=42) + + # Splitting the training set into training and validation sets (80% train, 20% validation) + X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=0.2, random_state=42) + + print("✅ Datasets prepared successfully") + + return X_train, y_train, X_test, y_test, X_val, y_val + + + +@task(name="Prediction with CAIRO") +def prediction(X_val, model_id, version_id): + model = GizaModel(id=model_id, version=version_id) + + # Ensure the input data matches the expected shape (1, 30, 14) + if X_val.shape != (1, 30, 14): + print("Invalid input shape. Expected shape: (1, 30, 14)") + return None + + (result, request_id) = model.predict(input_feed={"input_1": X_val}, verifiable=True, output_dtype='Tensor' + ) + + return result, request_id + + + +@action(name="Execution: Prediction with CAIRO", log_prints=True) +def execution(): + input_shape, num_filters, num_blocks, kernel_size, num_classes, batch_size, window_size = define_model_hyperparameters() + df_polar = load_data() + df_encoded = preprocess_data(df_polar) + sequences_input, sequences_target = create_sequences(df_encoded, window_size) + + + # Prepare datasets + X_train, y_train, X_test, y_test, X_val, y_val = prepare_datasets(df_encoded) + + + # Subsample the data + num_samples_to_select = 30 # Adjust as needed + if num_samples_to_select > X_val.shape[0]: + print("Number of samples to select exceeds the size of the dataset.") + return None + random_indices = np.random.choice(X_val.shape[0], num_samples_to_select, replace=False) + X_val_subset = X_val.iloc[random_indices] + y_val_subset = y_val.iloc[random_indices] + + selected_columns = ['blockchain_arbitrum', 'blockchain_avalanche_c', 'blockchain_base', 'blockchain_ethereum', 'blockchain_gnosis', 'blockchain_optimism', 'blockchain_polygon', 'token_pair_wstETH-wUSDM', 'token_pair_xSNXa-YFI', 'token_pair_yCURVE-YFI', 'day_of_week', 'month', 'year'] + X_val_selected = X_val_subset[selected_columns] + + # Convert the DataFrame to a NumPy array and ensure it is of type float32 + X_val_array = X_val_selected.values.astype(np.float32) + + print("Shape of X_val_array before reshaping:", X_val_array.shape) + # Assuming X_val_array has shape (30, 13) + # Create an array of zeros with shape (30, 1) + zeros_column = np.zeros((30, 1)) + + # Concatenate the zeros column to X_val_array along the second axis + X_val_array_modified = np.concatenate((X_val_array, zeros_column), axis=1) + + # Convert the input data to float32 + X_val_array_modified = X_val_array_modified.astype(np.float32) + + + # Perform prediction with CAIRO + (result, request_id) = prediction(X_val_array_modified.reshape(1, 30, 14), model_id=377, version_id=1) + if result is not None: + print(f"Predicted Pool Volumes: {result}") + print("Request id: ", request_id) + print("✅ Pool Volumes predicted successfully") + + return result + + +if __name__ == "__main__": + action_deploy = Action(entrypoint=execution, name="pool-volume-prediction-with-cairo-action") + action_deploy.serve(name="pool-volume-prediction-with-cairo-deployment") diff --git a/awesome-giza-actions/pool/predict_onnx_action.py b/awesome-giza-actions/pool/predict_onnx_action.py new file mode 100644 index 0000000..0fd0452 --- /dev/null +++ b/awesome-giza-actions/pool/predict_onnx_action.py @@ -0,0 +1,185 @@ +from giza_actions.action import Action, action +from giza_actions.task import task +from giza_datasets import DatasetsLoader +import numpy as np +import pandas as pd +from sklearn.model_selection import train_test_split +from sklearn.preprocessing import StandardScaler +from giza_actions.model import GizaModel +from sklearn.metrics import r2_score + +@task(name="Define Model Hyperparameters") +def define_model_hyperparameters(): + selected_columns = ['trading_volume_usd', 'blockchain_arbitrum', 'blockchain_avalanche_c', 'blockchain_base', 'blockchain_ethereum', 'blockchain_gnosis', 'blockchain_optimism', 'blockchain_polygon', 'token_pair_wstETH-wUSDM', 'token_pair_xSNXa-YFI', 'token_pair_yCURVE-YFI', 'day_of_week', 'month', 'year'] + window_size = 30 + input_shape = (window_size, len(selected_columns)) + num_filters = 32 + num_blocks = 5 + kernel_size = 8 + num_classes = 1 # Assuming regression task, change to number of classes for classification task + batch_size = 32 + + return input_shape, num_filters, num_blocks, kernel_size, num_classes, batch_size, window_size + +@task(name="Load Data") +def load_data(): + loader = DatasetsLoader() + # Load data from Polar into a DataFrame + df_polar = loader.load('balancer-daily-trade-volume') + return df_polar + +@task(name="Preprocess Data") +def preprocess_data(df_polar): + # Extracting data from the Polar DataFrame + data = { + 'day': df_polar['day'], + 'pool_id': df_polar['pool_id'], + 'blockchain': df_polar['blockchain'], + 'token_pair': df_polar['token_pair'], + 'trading_volume_usd': df_polar['trading_volume_usd'] + } + + # Creating a new Pandas DataFrame + df_pandas = pd.DataFrame(data) + + # Perform one-hot encoding for categorical variables + df_encoded = pd.get_dummies(df_pandas, columns=['blockchain', 'token_pair']) + + # Initialize StandardScaler + standard_scaler = StandardScaler() + + # Perform Standardization on numerical features + df_encoded[['trading_volume_usd']] = standard_scaler.fit_transform(df_encoded[['trading_volume_usd']]) + + # Convert 'day' column to datetime format + df_encoded['day'] = pd.to_datetime(df_encoded['day']) + + # Extract relevant features: day of the week, month, and year + df_encoded['day_of_week'] = df_encoded['day'].dt.dayofweek + df_encoded['month'] = df_encoded['day'].dt.month + df_encoded['year'] = df_encoded['day'].dt.year + + return df_encoded + +@task(name="Create Sequences") +def create_sequences(df_encoded, window_size): + # Calculate the total number of data points + total_data_points = df_encoded.shape[0] + + # Calculate the total number of sequences + total_sequences = total_data_points - window_size + 1 + + # Select only necessary columns from the DataFrame + selected_columns = ['trading_volume_usd', 'blockchain_arbitrum', 'blockchain_avalanche_c', 'blockchain_base', 'blockchain_ethereum', 'blockchain_gnosis', 'blockchain_optimism', 'blockchain_polygon', 'token_pair_wstETH-wUSDM', 'token_pair_xSNXa-YFI', 'token_pair_yCURVE-YFI', 'day_of_week', 'month', 'year'] + df_selected = df_encoded[selected_columns] + + # Slide a window of this length across your time-series data + sequences_input = [] + sequences_target = [] + + for i in range(total_sequences): + # Extract the historical data points as the input sequence + input_sequence = df_selected.iloc[i : i + window_size].values + sequences_input.append(input_sequence) + # Extract the next data point as the target for prediction + target = df_selected.iloc[i + window_size - 1, 2] + sequences_target.append(target) + + # Convert lists to numpy arrays + sequences_input = np.array(sequences_input) + sequences_target = np.array(sequences_target) + + # Reshape the target sequences to match the shape of the input sequences + sequences_target = sequences_target.reshape(-1, 1) + + sequences_input = sequences_input.astype(np.float32) + sequences_target = sequences_target.astype(np.float32) + + return sequences_input, sequences_target + + + +@task(name="Prepare Datasets") +def prepare_datasets(df_encoded): + print("Prepare dataset...") + + # Splitting into training and testing sets (80% train, 20% test) + X_train_val, X_test, y_train_val, y_test = train_test_split(df_encoded.drop(columns=['trading_volume_usd']), df_encoded['trading_volume_usd'], test_size=0.2, random_state=42) + + # Splitting the training set into training and validation sets (80% train, 20% validation) + X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=0.2, random_state=42) + + print("✅ Datasets prepared successfully") + + return X_train, y_train, X_test, y_test, X_val, y_val + + +@task(name="Prediction with ONNX") +def prediction(X_val): + model = GizaModel(model_path="./wavenet.onnx") + + # Ensure the input data matches the expected shape (1, 30, 14) + if X_val.shape != (1, 30, 14): + print("Invalid input shape. Expected shape: (1, 30, 14)") + return None + + result = model.predict(input_feed={"input_1": X_val}, verifiable=False) + + return result.reshape(-1, 1) # Reshape the output to (30, 1) + + + +@action(name="Execution: Prediction with ONNX", log_prints=True) +def execution(): + input_shape, num_filters, num_blocks, kernel_size, num_classes, batch_size, window_size = define_model_hyperparameters() + df_polar = load_data() + df_encoded = preprocess_data(df_polar) + sequences_input, sequences_target = create_sequences(df_encoded, window_size) + + + # Prepare datasets + X_train, y_train, X_test, y_test, X_val, y_val = prepare_datasets(df_encoded) + + + # Subsample the data + num_samples_to_select = 30 # Adjust as needed + if num_samples_to_select > X_val.shape[0]: + print("Number of samples to select exceeds the size of the dataset.") + return None + random_indices = np.random.choice(X_val.shape[0], num_samples_to_select, replace=False) + X_val_subset = X_val.iloc[random_indices] + y_val_subset = y_val.iloc[random_indices] + + selected_columns = ['blockchain_arbitrum', 'blockchain_avalanche_c', 'blockchain_base', 'blockchain_ethereum', 'blockchain_gnosis', 'blockchain_optimism', 'blockchain_polygon', 'token_pair_wstETH-wUSDM', 'token_pair_xSNXa-YFI', 'token_pair_yCURVE-YFI', 'day_of_week', 'month', 'year'] + X_val_selected = X_val_subset[selected_columns] + + # Convert the DataFrame to a NumPy array and ensure it is of type float32 + X_val_array = X_val_selected.values.astype(np.float32) + + print("Shape of X_val_array before reshaping:", X_val_array.shape) + # Assuming X_val_array has shape (30, 13) + # Create an array of zeros with shape (30, 1) + zeros_column = np.zeros((30, 1)) + + # Concatenate the zeros column to X_val_array along the second axis + X_val_array_modified = np.concatenate((X_val_array, zeros_column), axis=1) + + # Convert the input data to float32 + X_val_array_modified = X_val_array_modified.astype(np.float32) + + + # Perform prediction with ONNX + result = prediction(X_val_array_modified.reshape(1, 30, 14)) + if result is not None: + print(f"Predicted Pool Volumes: {result}") + print("✅ Pool Volumes predicted successfully") + + # Calculate R-squared + r_squared = r2_score(y_val_subset, result) + print("R-squared:", r_squared) + + return result + +if __name__ == "__main__": + action_deploy = Action(entrypoint=execution, name="pool-volume-prediction-action") + action_deploy.serve(name="pool-volume-prediction-deployment") diff --git a/awesome-giza-actions/pool/pyproject.toml b/awesome-giza-actions/pool/pyproject.toml new file mode 100644 index 0000000..156dddc --- /dev/null +++ b/awesome-giza-actions/pool/pyproject.toml @@ -0,0 +1,27 @@ +[tool.poetry] +name = "yearn" +version = "0.1.0" +description = "Giza Actions SDK project" +authors = ["Giza "] +readme = "README.md" + +[tool.poetry.dependencies] +python = "^3.11" +giza-actions = "^0.1.3" +giza-datasets = "^0.1.0" +torch = "^2.1.2" +torchvision = "^0.16.2" +scipy = "^1.12.0" +numpy = "^1.26.4" +certifi = "^2024.2.2" +pandas = "^2.2.0" +seaborn = "^0.13.2" +matplotlib = "^3.8.2" +scikit-learn = "^1.4.0" +onnx = "^1.15.0" +tf20nnx ="^1.16.1" +tensorflow = "^2.15.0" + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" diff --git a/awesome-giza-actions/pool/train_action.py b/awesome-giza-actions/pool/train_action.py new file mode 100644 index 0000000..d15dde0 --- /dev/null +++ b/awesome-giza-actions/pool/train_action.py @@ -0,0 +1,244 @@ +import numpy as np +from giza_actions.action import Action, action +from giza_actions.task import task +from giza_datasets import DatasetsLoader +import os +import certifi +os.environ['SSL_CERT_FILE'] = certifi.where() +import pandas as pd +import polars as pl +import tf2onnx +import matplotlib.pyplot as plt +from sklearn.model_selection import train_test_split +from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score +from sklearn.preprocessing import StandardScaler +from tensorflow.keras.utils import Sequence +import tensorflow as tf +from tensorflow.keras.layers import Input, Conv1D, Activation, Add +import onnx +from typing import List + +@task(name="Define Model Hyperparameters") +def define_model_hyperparameters(): + selected_columns = ['trading_volume_usd', 'blockchain_arbitrum', 'blockchain_avalanche_c', 'blockchain_base', 'blockchain_ethereum', 'blockchain_gnosis', 'blockchain_optimism', 'blockchain_polygon', 'token_pair_wstETH-wUSDM', 'token_pair_xSNXa-YFI', 'token_pair_yCURVE-YFI', 'day_of_week', 'month', 'year'] + window_size = 30 + input_shape = (window_size, len(selected_columns)) + num_filters = 32 + num_blocks = 5 + kernel_size = 8 + num_classes = 1 # Assuming regression task, change to number of classes for classification task + batch_size = 32 + + return input_shape, num_filters, num_blocks, kernel_size, num_classes, batch_size, window_size + +@task(name="Load Data") +def load_data(): + loader = DatasetsLoader() + # Load data from Polar into a DataFrame + df_polar = loader.load('balancer-daily-trade-volume') + return df_polar + +@task(name="Preprocess Data") +def preprocess_data(df_polar): + # Extracting data from the Polar DataFrame + data = { + 'day': df_polar['day'], + 'pool_id': df_polar['pool_id'], + 'blockchain': df_polar['blockchain'], + 'token_pair': df_polar['token_pair'], + 'trading_volume_usd': df_polar['trading_volume_usd'] + } + + # Creating a new Pandas DataFrame + df_pandas = pd.DataFrame(data) + + # Perform one-hot encoding for categorical variables + df_encoded = pd.get_dummies(df_pandas, columns=['blockchain', 'token_pair']) + + # Initialize StandardScaler + standard_scaler = StandardScaler() + + # Perform Standardization on numerical features + df_encoded[['trading_volume_usd']] = standard_scaler.fit_transform(df_encoded[['trading_volume_usd']]) + + # Convert 'day' column to datetime format + df_encoded['day'] = pd.to_datetime(df_encoded['day']) + + # Extract relevant features: day of the week, month, and year + df_encoded['day_of_week'] = df_encoded['day'].dt.dayofweek + df_encoded['month'] = df_encoded['day'].dt.month + df_encoded['year'] = df_encoded['day'].dt.year + + return df_encoded + +@task(name="Create Sequences") +def create_sequences(df_encoded, window_size): + # Calculate the total number of data points + total_data_points = df_encoded.shape[0] + + # Calculate the total number of sequences + total_sequences = total_data_points - window_size + 1 + + # Select only necessary columns from the DataFrame + selected_columns = ['trading_volume_usd', 'blockchain_arbitrum', 'blockchain_avalanche_c', 'blockchain_base', 'blockchain_ethereum', 'blockchain_gnosis', 'blockchain_optimism', 'blockchain_polygon', 'token_pair_wstETH-wUSDM', 'token_pair_xSNXa-YFI', 'token_pair_yCURVE-YFI', 'day_of_week', 'month', 'year'] + df_selected = df_encoded[selected_columns] + + # Slide a window of this length across your time-series data + sequences_input = [] + sequences_target = [] + + for i in range(total_sequences): + # Extract the historical data points as the input sequence + input_sequence = df_selected.iloc[i : i + window_size].values + sequences_input.append(input_sequence) + # Extract the next data point as the target for prediction + target = df_selected.iloc[i + window_size - 1, 2] + sequences_target.append(target) + + # Convert lists to numpy arrays + sequences_input = np.array(sequences_input) + sequences_target = np.array(sequences_target) + + # Reshape the target sequences to match the shape of the input sequences + sequences_target = sequences_target.reshape(-1, 1) + + sequences_input = sequences_input.astype(np.float32) + sequences_target = sequences_target.astype(np.float32) + + return sequences_input, sequences_target + + + +@task(name="Prepare Datasets") +def prepare_datasets(df_encoded): + print("Prepare dataset...") + + # Splitting into training and testing sets (80% train, 20% test) + X_train_val, X_test, y_train_val, y_test = train_test_split(df_encoded.drop(columns=['trading_volume_usd']), df_encoded['trading_volume_usd'], test_size=0.2, random_state=42) + + # Splitting the training set into training and validation sets (80% train, 20% validation) + X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=0.2, random_state=42) + + print("✅ Datasets prepared successfully") + + return X_train, y_train, X_test, y_test + +@task(name=" Data Generator") +class DataGenerator(Sequence): + def __init__(self, sequences_input, sequences_target, batch_size): + self.sequences_input = sequences_input + self.sequences_target = sequences_target + self.batch_size = batch_size + + def __len__(self): + return len(self.sequences_input) // self.batch_size + + def __getitem__(self, idx): + batch_inputs = self.sequences_input[idx * self.batch_size : (idx + 1) * self.batch_size] + batch_targets = self.sequences_target[idx * self.batch_size : (idx + 1) * self.batch_size] + return np.array(batch_inputs), np.array(batch_targets) + +@task(name="Test Model") +def evaluate_model(model, X_test, y_test): + # Evaluate the model + test_loss, test_mae = model.evaluate(X_test, y_test) + print("Test Loss:", test_loss) + print("Test MAE:", test_mae) + + +@task(name="Train model") +def build_wavenet(input_shape, num_filters, num_blocks, kernel_size, num_classes): + def residual_block(x, filters, kernel_size, dilation_rate): + # Dilated causal convolution + conv = Conv1D(filters=filters, kernel_size=kernel_size, dilation_rate=dilation_rate, padding='causal')(x) + conv = Activation('relu')(conv) + + # 1x1 convolution to adjust the number of filters + conv = Conv1D(filters=filters, kernel_size=1)(conv) + + # Skip connection + x = Add()([x, conv]) + return x + + inputs = Input(shape=input_shape) + + # Initial convolution block + x = Conv1D(filters=num_filters, kernel_size=1, padding='causal')(inputs) + + # Dilated causal convolutions + for i in range(num_blocks): + dilation_rate = 2 ** i + x = residual_block(x, num_filters, kernel_size, dilation_rate) + + # Output layer + outputs = Conv1D(filters=num_classes, kernel_size=1, activation='softmax')(x) + + model = tf.keras.Model(inputs, outputs) + + + print("✅ Model trained successfully") + return model + + +@task(name="Convert To ONNX") +def convert_to_onnx(model, onnx_file_path): + """ + Convert a TensorFlow model to ONNX format and save it. + + Args: + model (tf.keras.Model): The TensorFlow model to convert. + onnx_file_path (str): The path to save the ONNX model file. + """ + # Convert TensorFlow model to ONNX + onnx_model, _ = tf2onnx.convert.from_keras(model) + + # Save the ONNX model + with open(onnx_file_path, "wb") as f: + f.write(onnx_model.SerializeToString()) + + print(f"Model has been converted to ONNX and saved as {onnx_file_path}") + +@action(name="Action: Convert To ONNX", log_prints=True) +def execution(): + + input_shape, num_filters, num_blocks, kernel_size, num_classes, batch_size, window_size = define_model_hyperparameters() + df_polar = load_data() + df_encoded = preprocess_data(df_polar) + sequences_input, sequences_target = create_sequences(df_encoded, window_size) + + + # Prepare datasets + X_train, y_train, X_test, y_test = prepare_datasets(df_encoded) + + # Create data loaders + data_generator = DataGenerator(sequences_input, sequences_target, batch_size) + + # Train the model + # Build the WaveNet model + model = build_wavenet(input_shape, num_filters, num_blocks, kernel_size, num_classes) + + # Compile the model + model.compile(optimizer='adam', loss='mse', metrics=['mae']) + + # Print model summary + model.summary() + + + data_generator = DataGenerator(sequences_input, sequences_target, batch_size) + + # Train the model using the fit method + model.fit(data_generator, epochs=1) + # Evaluate the model + # evaluate_model(model, X_test, y_test) + + + # Convert to ONNX + onnx_file_path = "wavenet.onnx" + convert_to_onnx(model, onnx_file_path) + + + +# Create an Action object and serve it +if __name__ == "__main__": + action_deploy = Action(entrypoint=execution, name="pytorch-trade-action") + action_deploy.serve(name="pytorch-trade-deployment") From 2877667eb558fb6cb155e5a8318e569896f22c89 Mon Sep 17 00:00:00 2001 From: surfiniaburger Date: Thu, 28 Mar 2024 23:15:58 +0100 Subject: [PATCH 2/2] removed output dtype from the workflow --- awesome-giza-actions/pool/README.md | 2 +- awesome-giza-actions/pool/predict_cairo_action.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/awesome-giza-actions/pool/README.md b/awesome-giza-actions/pool/README.md index 17658e2..6f10a8c 100644 --- a/awesome-giza-actions/pool/README.md +++ b/awesome-giza-actions/pool/README.md @@ -270,7 +270,7 @@ def prediction(X_val, model_id, version_id): print("Invalid input shape. Expected shape: (1, 30, 14)") return None - (result, request_id) = model.predict(input_feed={"input_1": X_val}, verifiable=True, output_dtype='Tensor' + (result, request_id) = model.predict(input_feed={"input_1": X_val}, verifiable=True ) return result, request_id diff --git a/awesome-giza-actions/pool/predict_cairo_action.py b/awesome-giza-actions/pool/predict_cairo_action.py index c89b216..62ead60 100644 --- a/awesome-giza-actions/pool/predict_cairo_action.py +++ b/awesome-giza-actions/pool/predict_cairo_action.py @@ -124,7 +124,7 @@ def prediction(X_val, model_id, version_id): print("Invalid input shape. Expected shape: (1, 30, 14)") return None - (result, request_id) = model.predict(input_feed={"input_1": X_val}, verifiable=True, output_dtype='Tensor' + (result, request_id) = model.predict(input_feed={"input_1": X_val}, verifiable=True ) return result, request_id