diff --git a/protos/feast/core/Store.proto b/protos/feast/core/Store.proto index ebd5f73afe..f35561467e 100644 --- a/protos/feast/core/Store.proto +++ b/protos/feast/core/Store.proto @@ -69,7 +69,7 @@ message Store { // ====================|==================|================================ // - event_timestamp | TIMESTAMP | event time of the FeatureRow // - created_timestamp | TIMESTAMP | processing time of the ingestion of the FeatureRow - // - dataset_id | STRING | identifier of the batch dataset a row belongs to + // - ingestion_id | STRING | unique id identifying groups of rows that have been ingested together // - job_id | STRING | identifier for the job that writes the FeatureRow to the corresponding BigQuery table // // BigQuery table created will be partitioned by the field "event_timestamp" diff --git a/protos/feast/types/FeatureRow.proto b/protos/feast/types/FeatureRow.proto index c3614e0027..c19a393fde 100644 --- a/protos/feast/types/FeatureRow.proto +++ b/protos/feast/types/FeatureRow.proto @@ -40,7 +40,6 @@ message FeatureRow { // rows, and write the values to the correct tables. string feature_set = 6; - // Identifier tying this feature row to a specific ingestion dataset. For - // batch loads, this dataset id can be attributed to a single ingestion job. - string dataset_id = 7; + // Identifier tying this feature row to a specific ingestion job. + string ingestion_id = 7; } diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 47c84f18d5..0221a79b4b 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -826,7 +826,7 @@ def ingest( # Loop optimization declarations produce = producer.produce flush = producer.flush - dataset_id = _generate_dataset_id(feature_set) + ingestion_id = _generate_ingestion_id(feature_set) # Transform and push data to Kafka if feature_set.source.source_type == "Kafka": @@ -834,7 +834,7 @@ def ingest( file=dest_path, row_groups=list(range(pq_file.num_row_groups)), fs=feature_set, - dataset_id=dataset_id, + ingestion_id=ingestion_id, max_workers=max_workers, ): @@ -919,7 +919,7 @@ def _build_feature_references( return features -def _generate_dataset_id(feature_set: FeatureSet) -> str: +def _generate_ingestion_id(feature_set: FeatureSet) -> str: """ Generates a UUID from the feature set name, version, and the current time. diff --git a/sdk/python/feast/loaders/ingest.py b/sdk/python/feast/loaders/ingest.py index 3c4e8b99d0..34d0356ea7 100644 --- a/sdk/python/feast/loaders/ingest.py +++ b/sdk/python/feast/loaders/ingest.py @@ -26,7 +26,7 @@ def _encode_pa_tables( - file: str, feature_set: str, fields: dict, dataset_id: str, row_group_idx: int + file: str, feature_set: str, fields: dict, ingestion_id: str, row_group_idx: int ) -> List[bytes]: """ Helper function to encode a PyArrow table(s) read from parquet file(s) into @@ -49,8 +49,8 @@ def _encode_pa_tables( fields (dict[str, enum.Enum.ValueType]): A mapping of field names to their value types. - dataset_id (str): - UUID unique to this dataset. + ingestion_id (str): + UUID unique to this ingestion job. row_group_idx(int): Row group index to read and encode into byte like FeatureRow @@ -86,7 +86,7 @@ def _encode_pa_tables( feature_row = FeatureRow( event_timestamp=datetime_col[row_idx], feature_set=feature_set, - dataset_id=dataset_id, + ingestion_id=ingestion_id, ) # Loop optimization declaration ext = feature_row.fields.extend @@ -102,7 +102,11 @@ def _encode_pa_tables( def get_feature_row_chunks( - file: str, row_groups: List[int], fs: FeatureSet, dataset_id: str, max_workers: int + file: str, + row_groups: List[int], + fs: FeatureSet, + ingestion_id: str, + max_workers: int, ) -> Iterable[List[bytes]]: """ Iterator function to encode a PyArrow table read from a parquet file to @@ -120,8 +124,8 @@ def get_feature_row_chunks( fs (feast.feature_set.FeatureSet): FeatureSet describing parquet files. - dataset_id (str): - UUID unique to this dataset. + ingestion_id (str): + UUID unique to this ingestion job. max_workers (int): Maximum number of workers to spawn. @@ -136,7 +140,7 @@ def get_feature_row_chunks( field_map = {field.name: field.dtype for field in fs.fields.values()} pool = Pool(max_workers) - func = partial(_encode_pa_tables, file, feature_set, field_map, dataset_id) + func = partial(_encode_pa_tables, file, feature_set, field_map, ingestion_id) for chunk in pool.imap(func, row_groups): yield chunk return diff --git a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/BigQueryFeatureSink.java b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/BigQueryFeatureSink.java index 33f79f8d97..5d8f3d25cb 100644 --- a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/BigQueryFeatureSink.java +++ b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/BigQueryFeatureSink.java @@ -42,8 +42,8 @@ public abstract class BigQueryFeatureSink implements FeatureSink { "Event time for the FeatureRow"; public static final String BIGQUERY_CREATED_TIMESTAMP_FIELD_DESCRIPTION = "Processing time of the FeatureRow ingestion in Feast\""; - public static final String BIGQUERY_DATASET_ID_FIELD_DESCRIPTION = - "Identifier of the batch dataset a row belongs to"; + public static final String BIGQUERY_INGESTION_ID_FIELD_DESCRIPTION = + "Unique id identifying groups of rows that have been ingested together"; public static final String BIGQUERY_JOB_ID_FIELD_DESCRIPTION = "Feast import job ID for the FeatureRow"; @@ -171,8 +171,8 @@ private TableDefinition createBigQueryTableDefinition(FeatureSetProto.FeatureSet "created_timestamp", Pair.of( StandardSQLTypeName.TIMESTAMP, BIGQUERY_CREATED_TIMESTAMP_FIELD_DESCRIPTION), - "dataset_id", - Pair.of(StandardSQLTypeName.STRING, BIGQUERY_DATASET_ID_FIELD_DESCRIPTION), + "ingestion_id", + Pair.of(StandardSQLTypeName.STRING, BIGQUERY_INGESTION_ID_FIELD_DESCRIPTION), "job_id", Pair.of(StandardSQLTypeName.STRING, BIGQUERY_JOB_ID_FIELD_DESCRIPTION)); for (Map.Entry> entry : diff --git a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/FeatureRowToTableRow.java b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/FeatureRowToTableRow.java index 9eaf504558..6a69b96d71 100644 --- a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/FeatureRowToTableRow.java +++ b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/FeatureRowToTableRow.java @@ -31,7 +31,7 @@ public class FeatureRowToTableRow implements SerializableFunction { private static final String EVENT_TIMESTAMP_COLUMN = "event_timestamp"; private static final String CREATED_TIMESTAMP_COLUMN = "created_timestamp"; - private static final String DATASET_ID_COLUMN = "dataset_id"; + private static final String INGESTION_ID_COLUMN = "ingestion_id"; private static final String JOB_ID_COLUMN = "job_id"; private final String jobId; @@ -48,7 +48,7 @@ public TableRow apply(FeatureRow featureRow) { TableRow tableRow = new TableRow(); tableRow.set(EVENT_TIMESTAMP_COLUMN, Timestamps.toString(featureRow.getEventTimestamp())); tableRow.set(CREATED_TIMESTAMP_COLUMN, Instant.now().toString()); - tableRow.set(DATASET_ID_COLUMN, featureRow.getDatasetId()); + tableRow.set(INGESTION_ID_COLUMN, featureRow.getIngestionId()); tableRow.set(JOB_ID_COLUMN, jobId); for (Field field : featureRow.getFieldsList()) {