diff --git a/protos/feast/core/Store.proto b/protos/feast/core/Store.proto index 0aa4c8cd42..f35561467e 100644 --- a/protos/feast/core/Store.proto +++ b/protos/feast/core/Store.proto @@ -69,6 +69,7 @@ message Store { // ====================|==================|================================ // - event_timestamp | TIMESTAMP | event time of the FeatureRow // - created_timestamp | TIMESTAMP | processing time of the ingestion of the FeatureRow + // - ingestion_id | STRING | unique id identifying groups of rows that have been ingested together // - job_id | STRING | identifier for the job that writes the FeatureRow to the corresponding BigQuery table // // BigQuery table created will be partitioned by the field "event_timestamp" diff --git a/protos/feast/types/FeatureRow.proto b/protos/feast/types/FeatureRow.proto index c170cd5d50..c19a393fde 100644 --- a/protos/feast/types/FeatureRow.proto +++ b/protos/feast/types/FeatureRow.proto @@ -39,4 +39,7 @@ message FeatureRow { // /:. This value will be used by the feast ingestion job to filter // rows, and write the values to the correct tables. string feature_set = 6; + + // Identifier tying this feature row to a specific ingestion job. + string ingestion_id = 7; } diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 0a38236a51..0221a79b4b 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -18,6 +18,7 @@ import shutil import tempfile import time +import uuid from collections import OrderedDict from math import ceil from typing import Dict, List, Optional, Tuple, Union @@ -825,6 +826,7 @@ def ingest( # Loop optimization declarations produce = producer.produce flush = producer.flush + ingestion_id = _generate_ingestion_id(feature_set) # Transform and push data to Kafka if feature_set.source.source_type == "Kafka": @@ -832,6 +834,7 @@ def ingest( file=dest_path, row_groups=list(range(pq_file.num_row_groups)), fs=feature_set, + ingestion_id=ingestion_id, max_workers=max_workers, ): @@ -916,6 +919,20 @@ def _build_feature_references( return features +def _generate_ingestion_id(feature_set: FeatureSet) -> str: + """ + Generates a UUID from the feature set name, version, and the current time. + + Args: + feature_set: Feature set of the dataset to be ingested. + + Returns: + UUID unique to current time and the feature set provided. + """ + uuid_str = f"{feature_set.name}_{feature_set.version}_{int(time.time())}" + return str(uuid.uuid3(uuid.NAMESPACE_DNS, uuid_str)) + + def _read_table_from_source( source: Union[pd.DataFrame, str], chunk_size: int, max_workers: int ) -> Tuple[str, str]: diff --git a/sdk/python/feast/loaders/ingest.py b/sdk/python/feast/loaders/ingest.py index 4d215cc990..34d0356ea7 100644 --- a/sdk/python/feast/loaders/ingest.py +++ b/sdk/python/feast/loaders/ingest.py @@ -26,7 +26,7 @@ def _encode_pa_tables( - file: str, feature_set: str, fields: dict, row_group_idx: int + file: str, feature_set: str, fields: dict, ingestion_id: str, row_group_idx: int ) -> List[bytes]: """ Helper function to encode a PyArrow table(s) read from parquet file(s) into @@ -49,6 +49,9 @@ def _encode_pa_tables( fields (dict[str, enum.Enum.ValueType]): A mapping of field names to their value types. + ingestion_id (str): + UUID unique to this ingestion job. + row_group_idx(int): Row group index to read and encode into byte like FeatureRow protobuf objects. @@ -81,7 +84,9 @@ def _encode_pa_tables( # Iterate through the rows for row_idx in range(table.num_rows): feature_row = FeatureRow( - event_timestamp=datetime_col[row_idx], feature_set=feature_set + event_timestamp=datetime_col[row_idx], + feature_set=feature_set, + ingestion_id=ingestion_id, ) # Loop optimization declaration ext = feature_row.fields.extend @@ -97,7 +102,11 @@ def _encode_pa_tables( def get_feature_row_chunks( - file: str, row_groups: List[int], fs: FeatureSet, max_workers: int + file: str, + row_groups: List[int], + fs: FeatureSet, + ingestion_id: str, + max_workers: int, ) -> Iterable[List[bytes]]: """ Iterator function to encode a PyArrow table read from a parquet file to @@ -115,6 +124,9 @@ def get_feature_row_chunks( fs (feast.feature_set.FeatureSet): FeatureSet describing parquet files. + ingestion_id (str): + UUID unique to this ingestion job. + max_workers (int): Maximum number of workers to spawn. @@ -128,7 +140,7 @@ def get_feature_row_chunks( field_map = {field.name: field.dtype for field in fs.fields.values()} pool = Pool(max_workers) - func = partial(_encode_pa_tables, file, feature_set, field_map) + func = partial(_encode_pa_tables, file, feature_set, field_map, ingestion_id) for chunk in pool.imap(func, row_groups): yield chunk return diff --git a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/BigQueryFeatureSink.java b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/BigQueryFeatureSink.java index d155d3f1f5..5d8f3d25cb 100644 --- a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/BigQueryFeatureSink.java +++ b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/BigQueryFeatureSink.java @@ -42,6 +42,8 @@ public abstract class BigQueryFeatureSink implements FeatureSink { "Event time for the FeatureRow"; public static final String BIGQUERY_CREATED_TIMESTAMP_FIELD_DESCRIPTION = "Processing time of the FeatureRow ingestion in Feast\""; + public static final String BIGQUERY_INGESTION_ID_FIELD_DESCRIPTION = + "Unique id identifying groups of rows that have been ingested together"; public static final String BIGQUERY_JOB_ID_FIELD_DESCRIPTION = "Feast import job ID for the FeatureRow"; @@ -108,10 +110,13 @@ public void prepareWrite(FeatureSetProto.FeatureSet featureSet) { Table table = bigquery.getTable(tableId); if (table != null) { log.info( - "Writing to existing BigQuery table '{}:{}.{}'", - getProjectId(), + "Updating and writing to existing BigQuery table '{}:{}.{}'", + datasetId.getProject(), datasetId.getDataset(), tableName); + TableDefinition tableDefinition = createBigQueryTableDefinition(featureSet.getSpec()); + TableInfo tableInfo = TableInfo.of(tableId, tableDefinition); + bigquery.update(tableInfo); return; } @@ -166,6 +171,8 @@ private TableDefinition createBigQueryTableDefinition(FeatureSetProto.FeatureSet "created_timestamp", Pair.of( StandardSQLTypeName.TIMESTAMP, BIGQUERY_CREATED_TIMESTAMP_FIELD_DESCRIPTION), + "ingestion_id", + Pair.of(StandardSQLTypeName.STRING, BIGQUERY_INGESTION_ID_FIELD_DESCRIPTION), "job_id", Pair.of(StandardSQLTypeName.STRING, BIGQUERY_JOB_ID_FIELD_DESCRIPTION)); for (Map.Entry> entry : diff --git a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/FeatureRowToTableRow.java b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/FeatureRowToTableRow.java index 12833b31b8..6a69b96d71 100644 --- a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/FeatureRowToTableRow.java +++ b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/FeatureRowToTableRow.java @@ -31,6 +31,7 @@ public class FeatureRowToTableRow implements SerializableFunction { private static final String EVENT_TIMESTAMP_COLUMN = "event_timestamp"; private static final String CREATED_TIMESTAMP_COLUMN = "created_timestamp"; + private static final String INGESTION_ID_COLUMN = "ingestion_id"; private static final String JOB_ID_COLUMN = "job_id"; private final String jobId; @@ -47,6 +48,7 @@ public TableRow apply(FeatureRow featureRow) { TableRow tableRow = new TableRow(); tableRow.set(EVENT_TIMESTAMP_COLUMN, Timestamps.toString(featureRow.getEventTimestamp())); tableRow.set(CREATED_TIMESTAMP_COLUMN, Instant.now().toString()); + tableRow.set(INGESTION_ID_COLUMN, featureRow.getIngestionId()); tableRow.set(JOB_ID_COLUMN, jobId); for (Field field : featureRow.getFieldsList()) {