Skip to content

Commit

Permalink
Add unique ingestion id for all batch ingestions (#656)
Browse files Browse the repository at this point in the history
* Add unique dataset id for all batch ingestions

* Rename to ingestion_id
  • Loading branch information
Chen Zhiling authored May 3, 2020
1 parent 7a0ff91 commit 3d9bafd
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 6 deletions.
1 change: 1 addition & 0 deletions protos/feast/core/Store.proto
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ message Store {
// ====================|==================|================================
// - event_timestamp | TIMESTAMP | event time of the FeatureRow
// - created_timestamp | TIMESTAMP | processing time of the ingestion of the FeatureRow
// - ingestion_id | STRING | unique id identifying groups of rows that have been ingested together
// - job_id | STRING | identifier for the job that writes the FeatureRow to the corresponding BigQuery table
//
// BigQuery table created will be partitioned by the field "event_timestamp"
Expand Down
3 changes: 3 additions & 0 deletions protos/feast/types/FeatureRow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,7 @@ message FeatureRow {
// <project>/<feature-set-name>:<version>. This value will be used by the feast ingestion job to filter
// rows, and write the values to the correct tables.
string feature_set = 6;

// Identifier tying this feature row to a specific ingestion job.
string ingestion_id = 7;
}
17 changes: 17 additions & 0 deletions sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import shutil
import tempfile
import time
import uuid
from collections import OrderedDict
from math import ceil
from typing import Dict, List, Optional, Tuple, Union
Expand Down Expand Up @@ -825,13 +826,15 @@ def ingest(
# Loop optimization declarations
produce = producer.produce
flush = producer.flush
ingestion_id = _generate_ingestion_id(feature_set)

# Transform and push data to Kafka
if feature_set.source.source_type == "Kafka":
for chunk in get_feature_row_chunks(
file=dest_path,
row_groups=list(range(pq_file.num_row_groups)),
fs=feature_set,
ingestion_id=ingestion_id,
max_workers=max_workers,
):

Expand Down Expand Up @@ -916,6 +919,20 @@ def _build_feature_references(
return features


def _generate_ingestion_id(feature_set: FeatureSet) -> str:
"""
Generates a UUID from the feature set name, version, and the current time.
Args:
feature_set: Feature set of the dataset to be ingested.
Returns:
UUID unique to current time and the feature set provided.
"""
uuid_str = f"{feature_set.name}_{feature_set.version}_{int(time.time())}"
return str(uuid.uuid3(uuid.NAMESPACE_DNS, uuid_str))


def _read_table_from_source(
source: Union[pd.DataFrame, str], chunk_size: int, max_workers: int
) -> Tuple[str, str]:
Expand Down
20 changes: 16 additions & 4 deletions sdk/python/feast/loaders/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@


def _encode_pa_tables(
file: str, feature_set: str, fields: dict, row_group_idx: int
file: str, feature_set: str, fields: dict, ingestion_id: str, row_group_idx: int
) -> List[bytes]:
"""
Helper function to encode a PyArrow table(s) read from parquet file(s) into
Expand All @@ -49,6 +49,9 @@ def _encode_pa_tables(
fields (dict[str, enum.Enum.ValueType]):
A mapping of field names to their value types.
ingestion_id (str):
UUID unique to this ingestion job.
row_group_idx(int):
Row group index to read and encode into byte like FeatureRow
protobuf objects.
Expand Down Expand Up @@ -81,7 +84,9 @@ def _encode_pa_tables(
# Iterate through the rows
for row_idx in range(table.num_rows):
feature_row = FeatureRow(
event_timestamp=datetime_col[row_idx], feature_set=feature_set
event_timestamp=datetime_col[row_idx],
feature_set=feature_set,
ingestion_id=ingestion_id,
)
# Loop optimization declaration
ext = feature_row.fields.extend
Expand All @@ -97,7 +102,11 @@ def _encode_pa_tables(


def get_feature_row_chunks(
file: str, row_groups: List[int], fs: FeatureSet, max_workers: int
file: str,
row_groups: List[int],
fs: FeatureSet,
ingestion_id: str,
max_workers: int,
) -> Iterable[List[bytes]]:
"""
Iterator function to encode a PyArrow table read from a parquet file to
Expand All @@ -115,6 +124,9 @@ def get_feature_row_chunks(
fs (feast.feature_set.FeatureSet):
FeatureSet describing parquet files.
ingestion_id (str):
UUID unique to this ingestion job.
max_workers (int):
Maximum number of workers to spawn.
Expand All @@ -128,7 +140,7 @@ def get_feature_row_chunks(
field_map = {field.name: field.dtype for field in fs.fields.values()}

pool = Pool(max_workers)
func = partial(_encode_pa_tables, file, feature_set, field_map)
func = partial(_encode_pa_tables, file, feature_set, field_map, ingestion_id)
for chunk in pool.imap(func, row_groups):
yield chunk
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public abstract class BigQueryFeatureSink implements FeatureSink {
"Event time for the FeatureRow";
public static final String BIGQUERY_CREATED_TIMESTAMP_FIELD_DESCRIPTION =
"Processing time of the FeatureRow ingestion in Feast\"";
public static final String BIGQUERY_INGESTION_ID_FIELD_DESCRIPTION =
"Unique id identifying groups of rows that have been ingested together";
public static final String BIGQUERY_JOB_ID_FIELD_DESCRIPTION =
"Feast import job ID for the FeatureRow";

Expand Down Expand Up @@ -108,10 +110,13 @@ public void prepareWrite(FeatureSetProto.FeatureSet featureSet) {
Table table = bigquery.getTable(tableId);
if (table != null) {
log.info(
"Writing to existing BigQuery table '{}:{}.{}'",
getProjectId(),
"Updating and writing to existing BigQuery table '{}:{}.{}'",
datasetId.getProject(),
datasetId.getDataset(),
tableName);
TableDefinition tableDefinition = createBigQueryTableDefinition(featureSet.getSpec());
TableInfo tableInfo = TableInfo.of(tableId, tableDefinition);
bigquery.update(tableInfo);
return;
}

Expand Down Expand Up @@ -166,6 +171,8 @@ private TableDefinition createBigQueryTableDefinition(FeatureSetProto.FeatureSet
"created_timestamp",
Pair.of(
StandardSQLTypeName.TIMESTAMP, BIGQUERY_CREATED_TIMESTAMP_FIELD_DESCRIPTION),
"ingestion_id",
Pair.of(StandardSQLTypeName.STRING, BIGQUERY_INGESTION_ID_FIELD_DESCRIPTION),
"job_id",
Pair.of(StandardSQLTypeName.STRING, BIGQUERY_JOB_ID_FIELD_DESCRIPTION));
for (Map.Entry<String, Pair<StandardSQLTypeName, String>> entry :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
public class FeatureRowToTableRow implements SerializableFunction<FeatureRow, TableRow> {
private static final String EVENT_TIMESTAMP_COLUMN = "event_timestamp";
private static final String CREATED_TIMESTAMP_COLUMN = "created_timestamp";
private static final String INGESTION_ID_COLUMN = "ingestion_id";
private static final String JOB_ID_COLUMN = "job_id";
private final String jobId;

Expand All @@ -47,6 +48,7 @@ public TableRow apply(FeatureRow featureRow) {
TableRow tableRow = new TableRow();
tableRow.set(EVENT_TIMESTAMP_COLUMN, Timestamps.toString(featureRow.getEventTimestamp()));
tableRow.set(CREATED_TIMESTAMP_COLUMN, Instant.now().toString());
tableRow.set(INGESTION_ID_COLUMN, featureRow.getIngestionId());
tableRow.set(JOB_ID_COLUMN, jobId);

for (Field field : featureRow.getFieldsList()) {
Expand Down

0 comments on commit 3d9bafd

Please sign in to comment.