from pydantic import BaseModelfrom rialto.runner.config_loader import PipelineConfigfrom rialto.jobs import config
Rialto is a framework for building and deploying machine learning features in a scalable and reusable way. It provides a set of tools that make it easy to define and deploy new features, and it provides a way to orchestrate the execution of these features.
The name Rialto is a reference to the Rialto Bridge in Venice, Italy. The Rialto Bridge was a major marketplace for goods and ideas the Middle Ages. Rialto is intended to be a foundation of a similar marketplace for machine learning features, where users and find and share reusable features.
Sphinx-Generated autodocs pages available here.
pip install rialto
This library currently contains:
Runner is the orchestrator and scheduler of Rialto. It can be used to execute any job but is primarily designed to execute feature maker jobs. The core of runner is execution of a Transformation class that can be extended for any purpose and the execution configuration that defines the handling of i/o, scheduling, dependencies and reporting.
For the details on the interface see the implementation Inside the transformation you have access to a TableReader, date of running, and if provided to Runner, a live spark session and metadata manager. You can either implement your jobs directly via extending the Transformation class, or by using the jobs abstraction.
Bellow is the minimal code necessary to execute the runner.
from rialto.runner import Runner
# Create the runner instance
runner = Runner(spark, config_path=cfg)
# Execute the run
runner()
A runner by default executes all the jobs provided in the configuration file, for all the viable execution dates according to the configuration file for which the job has not yet run successfully (i.e. the date partition doesn't exist on the storage) This behavior can be modified by various parameters and switches available.
- run_date - date at which the runner is triggered (defaults to day of running)
- rerun - rerun all jobs even if they already succeeded in the past runs
- op - run only selected operation / pipeline
- skip_dependencies - ignore dependency checks and run all jobs
- overrides - dictionary of overrides for the configuration
Transformations are not included in the runner itself, it imports them dynamically according to the configuration, therefore it's necessary to have them locally installed.
runner:
watched_period_units: "months" # unit of default run period
watched_period_value: 2 # value of default run period
mail:
to: # a list of email addresses
- [email protected]
- [email protected]
sender: rialto.noreply@domain # what a sender should say
smtp: smtp.server.url # your smtp server
subject: "Rialto report" # Email subject header
pipelines: # a list of pipelines to run
- name: Pipeline1 #Pipeline name & a name of table it creates (table will be converted to have _ instead of uppercase letters)
module: # Python transformation class location
python_module: module_name
python_class: Pipeline1Class
schedule:
frequency: weekly # daily/weekly/monthly
day: 7 # day of the week or month
info_date_shift: #Optional shift in the written information date from the scheduled day
- units: "days" # days/weeks/months/years
value: 5 # subtracted from scheduled day
dependencies: # list of dependent tables
- table: catalog.schema.table1
name: "table1" # Optional table name, used to recall dependency details in transformation
date_col: generation_date # Mandatory date column name
interval: # mandatory availability interval, subtracted from scheduled day
units: "days"
value: 1
- table: catalog.schema.table2
name: "table2"
interval:
units: "months"
value: 1
target:
target_schema: catalog.schema # schema where tables will be created, must exist
target_partition_column: INFORMATION_DATE # date to partition new tables on
metadata_manager: # optional
metadata_schema: catalog.metadata # schema where metadata is stored
feature_loader: # optional
config_path: model_features_config.yaml # path to the feature loader configuration file
feature_schema: catalog.feature_tables # schema where feature tables are stored
metadata_schema: catalog.metadata # schema where metadata is stored
extras: #optional arguments processed as dictionary
some_value: 3
some_other_value: giraffe
- name: PipelineTable1 # will be written as pipeline_table1
module:
python_module: module_name
python_class: Pipeline2Class
schedule:
frequency: monthly
day: 6
info_date_shift: # can be combined as a list
- units: "days"
value: 5
- units: "months"
value: 3
dependencies:
- table: catalog.schema.table3
interval:
units: "days"
value: 6
target:
target_schema: catalog.schema # schema where tables will be created, must exist
target_partition_column: INFORMATION_DATE # date to partition new tables on
The configuration can be dynamically overridden by providing a dictionary of overrides to the runner. All overrides must adhere to configurations schema, with pipeline.extras section available for custom schema. Here are few examples of overrides:
Specify the path to the value in the configuration file as a dot-separated string
Runner(
spark,
config_path="tests/overrider.yaml",
run_date="2023-03-31",
overrides={"runner.watch_period_value": 4},
)
You can refer to list elements by their index (starting with 0)
overrides={"runner.mail.to[1]": "[email protected]"}
You can append to list by using index -1
overrides={"runner.mail.to[-1]": "[email protected]"}
You can use the following syntax to find a specific element in a list by its attribute value
overrides={"pipelines[name=SimpleGroup].target.target_schema": "new_schema"},
You can directly replace a bigger section of the configuration by providing a dictionary When the whole section doesn't exist, it will be added to the configuration, however it needs to be added as a whole. i.e. if the yaml file doesn't specify feature_loader, you can't just add a feature_loader.config_path, you need to add the whole section.
overrides={"pipelines[name=SimpleGroup].feature_loader":
{"config_path": "features_cfg.yaml",
"feature_schema": "catalog.features",
"metadata_schema": "catalog.metadata"}}
You can provide multiple overrides at once, the order of execution is not guaranteed
overrides={"runner.watch_period_value": 4,
"runner.watch_period_units": "weeks",
"pipelines[name=SimpleGroup].target.target_schema": "new_schema",
"pipelines[name=SimpleGroup].feature_loader":
{"config_path": "features_cfg.yaml",
"feature_schema": "catalog.features",
"metadata_schema": "catalog.metadata"}
}
The purpose of (feature) maker is to simplify feature creation, allow for consistent feature implementation that is standardized and easy to test.
Maker provides the utilities to define feature generating functions by wrapping python functions with provided decorators and a framework to execute these functions.
FeatureMaker is pre-initialized class that provides the means to execute pre-defined feature functions. It has 2 modes, sequential features and aggregated features. Features need to be defined in a separate importable module as in the examples.
Since there is always only one FeatureMaker object, you can use its state variables from inside feature functions. Specifically FeatureMaker.make_date and FeatureMaker.key. These variables are set when calling a FeatureMaker.make(...) function to the values of its parameters.
Sequential feature generation can be simply interpreted as appending a new column for every feature to the existing dataframe.
from rialto.maker import FeatureMaker
from my_features import simple_features
features, metadata = FeatureMaker.make(
df=input_data,
key="KEY",
make_date=run_date,
features_module=simple_features,
keep_preexisting=True)
In aggregated generation, the source dataframe is grouped by the key or keys and the features themselves are the aggregations put into the spark agg function.
from rialto.maker import FeatureMaker
from my_features import agg_features
features, metadata = FeatureMaker.make_aggregated(
df=input_data,
key="KEY",
make_date=run_date,
features_module=agg_features)
There are also make_single_feature and make_single_agg_feature available, intended to be used in tests. (See full documentation)
Features, whether sequential or aggregated ar defined as simple python functions that return a PySpark function, that generates the desired column. In the case of sequential, this function is inserted into PySpark withColumn function as following
data_frame.withColumn("feature_name", RETURNED_FUNCTION_GOES_HERE)
in the case of aggregated features, they are used as follows
data_frame.groupBy("key").agg(RETURNED_FUNCTION_GOES_HERE.alias("feature_name"))
All the features in one python module are processed in one call of FeatureMakers make functions, therefore you can't mix aggregated and sequential features into one module.
To define Rialto features, the framework provides a set of decorators.
This registers the function as a feature. Every feature has to be decorated with @feature as the outermost wrapper.
import pyspark.sql.functions as F
import rialto.maker as rfm
from pyspark.sql import Column
from rialto.metadata import ValueType as VT
@rfm.feature(VT.numerical)
def RECENCY() -> Column:
return F.months_between(F.lit(rfm.FeatureMaker.make_date), F.col("DATE"))
@feature takes one parameter, metadata enum of value type.
Provides an option to pass a string describing the feature that is then saved as part of feature metadata.
import pyspark.sql.functions as F
import rialto.maker as rfm
from pyspark.sql import Column
from rialto.metadata import ValueType as VT
@rfm.feature(VT.numerical)
@rfm.desc("Recency of the action")
def RECENCY() -> Column:
return F.months_between(F.lit(rfm.FeatureMaker.make_date), F.col("DATE"))
Inspired by @pytest.mark.parametrize, it has similar interface and fulfills the same role. It allows you to invoke the feature function multiple times with different values of the parameter. If multiple @params are used, the number of final features will be a product of all parameters. The feature function has to expect a parameter with the same name as the @params name.
import pyspark.sql.functions as F
import rialto.maker as fml
from pyspark.sql import Column
from rialto.metadata import ValueType as VT
@fml.feature(VT.numerical)
@fml.param("product", ["A", "B", "C"])
@fml.param("status", ["ACTIVE", "INACTIVE"])
@fml.desc("Number of given products with given status")
def NUM(product: str, status: str) -> Column:
return F.count(F.when((F.col("PRODUCT") == product) & (F.col("STATUS") == status), F.lit(1)))
The above code creates following features
- NUM_PRODUCT_A_STATUS_ACTIVE
- NUM_PRODUCT_A_STATUS_INACTIVE
- NUM_PRODUCT_B_STATUS_ACTIVE
- NUM_PRODUCT_B_STATUS_INACTIVE
- NUM_PRODUCT_C_STATUS_ACTIVE
- NUM_PRODUCT_C_STATUS_INACTIVE
@depends allows for a definition of features that depend on each other / need to be calculated in certain order. Dependency resolution uses the raw feature names (function names) not parameter expanded names.
import pyspark.sql.functions as F
import rialto.maker as rfm
from pyspark.sql import Column
from rialto.metadata import ValueType as VT
@rfm.feature(VT.numerical)
@rfm.desc("Recency of the action")
def RECENCY() -> Column:
return F.months_between(F.lit(rfm.FeatureMaker.make_date), F.col("DATE"))
@rfm.feature(VT.numerical)
@rfm.desc("Action month delay")
@rfm.param("month", [1, 2, 3])
@rfm.depends("RECENCY")
def DELAY(month) -> Column:
recency = F.col("RECENCY")
return F.when((recency < month) & (recency >= month - 1), F.lit(1))
In this example, @depends, ensures that RECENCY is calculated before DELAY
Publicly available as rialto.maker.utils.feature_name(...), this function is used to create final names of all features. It concatenates the name of the python function with any given @params and their values used in that instance of the feature.
Rialto jobs simplify creation of runner transformations. Instead of having to inherit the base Transformation class, this module offers two decorators: datasource, and job.
As the names might suggest,
- datasource registers the function below as a valid datasource, which can be used as dependency
- job registers the decorated function as a Rialto transformation.
The output / return value of both functions should/has to be a python dataframe (or nothing for jobs, more on that later).
Both jobs and datasources can request dependencies as function arguments. Rialto will attempt to resolve those dependencies and provide them during the job run(s).
We have a set of pre-defined dependencies:
- run_date gives you the date on which the job is supposed to run
- spark contains the spark session
- config returns the custom config you can specify with each job run
- dependencies returns a dictionary containing the job dependencies config
- table_reader returns TableReader
- feature_loader provides PysparkFeatureLoader
- metadata_manager provides MetadataManager
Apart from that, each datasource also becomes a fully usable dependency. Note, that this means that datasources can also be dependent on other datasources - just beware of any circular dependencies!
With that sorted out, we can now provide a quick example of the rialto.jobs module's capabilities:
from pyspark.sql import DataFrame
from rialto.common import TableReader
from rialto.jobs import config_parser, job, datasource
from rialto.runner.config_loader import PipelineConfig
from pydantic import BaseModel
class ConfigModel(BaseModel):
some_value: int
some_other_value: str
@config_parser
def my_config(config: PipelineConfig):
return ConfigModel(**config.extras)
@datasource
def my_datasource(run_date: datetime.date, table_reader: TableReader) -> DataFrame:
return table_reader.get_latest("my_catalog.my_schema.my_table", date_until=run_date)
@job
def my_job(my_datasource: DataFrame, my_config: ConfigModel) -> DataFrame:
return my_datasource.withColumn("HelloWorld", F.lit(my_config.some_value))
This piece of code
- creates a rialto transformation called my_job, which is then callable by the rialto runner.
- It sources the my_datasource and then runs my_job on top of that datasource.
- Rialto adds VERSION (of your package) and INFORMATION_DATE (as per config) columns automatically.
- The rialto runner stores the final to a catalog, to a table according to the job's name.
Note, that by default, the rialto job name is your function name. To allow more flexibility, we allow renaming of the job:
@job(custom_name="my_custom_name")
def f(...):
...
Just note that any WeirdCaseNames will be transformed to lower_case_with_underscores.
If you want to disable versioning of your job (adding package VERSION column to your output):
@job(disable_version=True)
def my_job(...):
...
These parameters can be used separately, or combined.
The rules for the dependencies are fairly straightforward. Both jobs and datasources can only depend on pre-defined dependencies and other datasources. Meaning:
- datasource -> datasource -> job is perfectly fine,
- datasource -> job -> datasource will result in an error.
Secondly, the jobs can, but don't necessarily have to output a dataframe. In case your job doesn't output a dataframe, your job will return an artificially-created, one-row dataframe, which will ensure that rialto notices that the job ran successfully. This can be useful in model training.
Finally, remember, that your jobs are still just Rialto Transformations internally. Meaning that at the end of the day, you should always read some data, do some operations on it and either return a pyspark DataFrame, or not return anything and let the framework return the placeholder one.
Datasources required for a job (or another datasource) can be defined in a different module. To register your module as a datasource, you can use the following functions:
from rialto.jobs import register_dependency_callable, register_dependency_module
import my_package.my_datasources as md
import my_package.my_datasources_big as big_md
# Register an entire dependency module
register_dependency_module(md)
# Register a single datasource from a bigger module
register_dependency_callable(big_md.sample_datasource)
@job
def my_job(my_datasource, sample_datasource: DataFrame, ...):
...
Each job/datasource can only resolve datasources it has defined as dependencies.
NOTE: While register_dependency_module
only registers a module as available dependencies, the register_dependency_callable
actually brings the datasource into the targed module - and thus becomes available for export in the dependency chains.
One of the main advantages of the jobs module is simplification of unit tests for your transformations. Rialto provides following tools:
Assuming we have a my_package.test_job_module.py module:
@datasource
def datasource_a(...)
... code...
@job
def my_job(datasource_a, ...)
... code...
The disable_job_decorators context manager, as the name suggests, disables all decorator functionality and lets you access your functions as raw functions - making it super simple to unit-test:
from rialto.jobs.test_utils import disable_job_decorators
import my_package.test_job_module as tjm
# Datasource Testing
def test_datasource_a():
... mocks here...
with disable_job_decorators(tjm):
datasource_a_output = tjm.datasource_a(...mocks...)
...asserts...
# Job Testing
def test_my_job():
datasource_a_mock = ...
...other mocks...
with disable_job_decorators(tjm):
job_output = tjm.my_job(datasource_a_mock, ...mocks...)
...asserts...
In complex use cases, it may happen that the dependencies of a job become quite complex. Or you simply want to be sure that you didn't accidentally misspelled your dependency name:
from rialto.jobs.test_utils import resolver_resolves
import my_job.test_job_module as tjm
def test_my_job_resolves(spark):
assert resolver_resolves(spark, tjm.my_job)
The code above fails if my_job depends on an undefined datasource (even indirectly), and detects cases where there's a circular dependency.
This module is used to load features from feature store into your models and scripts. Loader provides options to load singular features, whole feature groups, as well as a selection of features from multiple groups defined in a config file, and served as a singular dataframe. It also provides interface to access feature metadata.
Two public classes are exposed form this module. DatabricksLoader(DataLoader), PysparkFeatureLoader(FeatureLoaderInterface).
This class needs to be instantiated with an active spark session, data loader and a path to the metadata schema (in the format of "catalog_name.schema_name").
from rialto.loader import PysparkFeatureLoader
feature_loader = PysparkFeatureLoader(spark= spark_instance, feature_schema="catalog.schema", metadata_schema= "catalog.schema2", date_column="information_date")
from rialto.loader import PysparkFeatureLoader
from datetime import datetime
feature_loader = PysparkFeatureLoader(spark, "feature_catalog.feature_schema", "metadata_catalog.metadata_schema")
my_date = datetime.strptime("2020-01-01", "%Y-%m-%d").date()
feature = feature_loader.get_feature(group_name="CustomerFeatures", feature_name="AGE", information_date=my_date)
metadata = feature_loader.get_feature_metadata(group_name="CustomerFeatures", feature_name="AGE")
This method of data access is only recommended for experimentation, as the group schema can evolve over time.
from rialto.loader import PysparkFeatureLoader
from datetime import datetime
feature_loader = PysparkFeatureLoader(spark, "feature_catalog.feature_schema", "metadata_catalog.metadata_schema")
my_date = datetime.strptime("2020-01-01", "%Y-%m-%d").date()
features = feature_loader.get_group(group_name="CustomerFeatures", information_date=my_date)
metadata = feature_loader.get_group_metadata(group_name="CustomerFeatures")
from rialto.loader import PysparkFeatureLoader
from datetime import datetime
feature_loader = PysparkFeatureLoader(spark, "feature_catalog.feature_schema", "metadata_catalog.metadata_schema")
my_date = datetime.strptime("2020-01-01", "%Y-%m-%d").date()
features = feature_loader.get_features_from_cfg(path="local/configuration/file.yaml", information_date=my_date)
metadata = feature_loader.get_metadata_from_cfg(path="local/configuration/file.yaml")
The configuration file is expected to be in a yaml format and has 3 sections: selection, base, maps.
- selection is a list of feature groups and desired features in those groups. Each group also needs a prefix defined, which will prefix a name of every feature from that group
- base is a feature group with a set of keys defined, a unique selection of these keys from this group will form the base of the resulting dataframe and all feature will be left-joined onto that
- maps is an optional list of dataframes that will be joined onto the base as a whole and their purpose is to bridge feature sets with different primary keys, i.e. we have a map linking IDnumbers and AccountNumbers so we can use feature groups based on either of the primary keys.
selection:
- group: Group_A
prefix: A_PREFIX
features:
- Feature_A1
- Feature_A2
- group: Group_B
prefix: B_PREFIX
features:
- Feature_B1
- Feature_B2
base:
group: Group_D
keys:
- Column_Key1
- Column_Key2
maps:
- MapGroup1
- MapGroup2
Rialto metadata module is designed specifically to work with features. It's a support function to pass data from maker to loader, and to have additional metadata available for further features processing.
Metadata module consists of 3 parts: enums, metadata dataclasses and MetadataManager.
There's two enums available used across Rialto, Schedule and ValueType. Schedule defines the frequency of execution (daily, weekly, monthly, yearly) and ValueType defines the feature type (nominal, ordinal, numeric)
There are 2 metadata data classes: FeatureMetadata and GroupMetadata. They are used as containers to pass metadata information around, and are the return types of metadata requests in loader.
FeatureMetadata
value_type: ValueType # type of feature value
name: str # feature name
description: str # feature description
group: GroupMetadata # feature group this feature belongs to
GroupMetadata
name: str # group name (Original name mirroring the transformation class name)
frequency: Schedule # generation frequency
description: str # group description
key: List[str] # group primary keys
fs_name: str = None # actual table name of this feature group in DataBricks
features: List[str] = None # A list of feature names belonging to this group
This class manages the metadata and provides the interface to fetch or write. Metadata is stored in 2 dataframes group_metadata and feature_metadata, in the schema provided as a parameter.
Common houses functions and utilities used throughout the whole framework. Most importantly it defines DataReader class which is implemented as TableReader with two public functions, get_latest(...) and get_table(...).
TableReader can be used as a standalone delta or parquet table loader.
initialization:
from rialto.common import TableReader
reader = TableReader(spark=spark_instance)
usage of get_table:
# get whole table
df = reader.get_table(table="catalog.schema.table", date_column="information_date")
# get a slice of the table
from datetime import datetime
start = datetime.strptime("2020-01-01", "%Y-%m-%d").date()
end = datetime.strptime("2024-01-01", "%Y-%m-%d").date()
df = reader.get_table(table="catalog.schema.table", date_from=start, date_to=end, date_column="information_date")
usage of get_latest:
# most recent partition
df = reader.get_latest(table="catalog.schema.table", date_column="information_date")
# most recent partition until
until = datetime.strptime("2020-01-01", "%Y-%m-%d").date()
df = reader.get_latest(table="catalog.schema.table", date_until=until, date_column="information_date")
For full information on parameters and their optionality see technical documentation.
TableReader needs an active spark session and an information which column is the date column. There are three options how to pass that information on.
Contributing:
- Implement the change in a custom branch split off develop
- Create a pull request from your branch to develop
- Get PR approved, merged to develop
Releasing:
- Create a Release/1.x.y branch off develop
- Bump all necessary versions
- Create PR from release branch to MASTER
- Merge to master with a merge commit
- Actions will fast-forward new commits from master to develop and deploy new version to pypi repository