Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unique ingestion id for all batch ingestions #656

Merged
merged 2 commits into from
May 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions protos/feast/core/Store.proto
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ message Store {
// ====================|==================|================================
// - event_timestamp | TIMESTAMP | event time of the FeatureRow
// - created_timestamp | TIMESTAMP | processing time of the ingestion of the FeatureRow
// - ingestion_id | STRING | unique id identifying groups of rows that have been ingested together
// - job_id | STRING | identifier for the job that writes the FeatureRow to the corresponding BigQuery table
//
// BigQuery table created will be partitioned by the field "event_timestamp"
Expand Down
3 changes: 3 additions & 0 deletions protos/feast/types/FeatureRow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,7 @@ message FeatureRow {
// <project>/<feature-set-name>:<version>. This value will be used by the feast ingestion job to filter
// rows, and write the values to the correct tables.
string feature_set = 6;

// Identifier tying this feature row to a specific ingestion job.
string ingestion_id = 7;
}
17 changes: 17 additions & 0 deletions sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import shutil
import tempfile
import time
import uuid
from collections import OrderedDict
from math import ceil
from typing import Dict, List, Optional, Tuple, Union
Expand Down Expand Up @@ -825,13 +826,15 @@ def ingest(
# Loop optimization declarations
produce = producer.produce
flush = producer.flush
ingestion_id = _generate_ingestion_id(feature_set)

# Transform and push data to Kafka
if feature_set.source.source_type == "Kafka":
for chunk in get_feature_row_chunks(
file=dest_path,
row_groups=list(range(pq_file.num_row_groups)),
fs=feature_set,
ingestion_id=ingestion_id,
max_workers=max_workers,
):

Expand Down Expand Up @@ -916,6 +919,20 @@ def _build_feature_references(
return features


def _generate_ingestion_id(feature_set: FeatureSet) -> str:
"""
Generates a UUID from the feature set name, version, and the current time.

Args:
feature_set: Feature set of the dataset to be ingested.

Returns:
UUID unique to current time and the feature set provided.
"""
uuid_str = f"{feature_set.name}_{feature_set.version}_{int(time.time())}"
return str(uuid.uuid3(uuid.NAMESPACE_DNS, uuid_str))


def _read_table_from_source(
source: Union[pd.DataFrame, str], chunk_size: int, max_workers: int
) -> Tuple[str, str]:
Expand Down
20 changes: 16 additions & 4 deletions sdk/python/feast/loaders/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@


def _encode_pa_tables(
file: str, feature_set: str, fields: dict, row_group_idx: int
file: str, feature_set: str, fields: dict, ingestion_id: str, row_group_idx: int
) -> List[bytes]:
"""
Helper function to encode a PyArrow table(s) read from parquet file(s) into
Expand All @@ -49,6 +49,9 @@ def _encode_pa_tables(
fields (dict[str, enum.Enum.ValueType]):
A mapping of field names to their value types.

ingestion_id (str):
UUID unique to this ingestion job.

row_group_idx(int):
Row group index to read and encode into byte like FeatureRow
protobuf objects.
Expand Down Expand Up @@ -81,7 +84,9 @@ def _encode_pa_tables(
# Iterate through the rows
for row_idx in range(table.num_rows):
feature_row = FeatureRow(
event_timestamp=datetime_col[row_idx], feature_set=feature_set
event_timestamp=datetime_col[row_idx],
feature_set=feature_set,
ingestion_id=ingestion_id,
)
# Loop optimization declaration
ext = feature_row.fields.extend
Expand All @@ -97,7 +102,11 @@ def _encode_pa_tables(


def get_feature_row_chunks(
file: str, row_groups: List[int], fs: FeatureSet, max_workers: int
file: str,
row_groups: List[int],
fs: FeatureSet,
ingestion_id: str,
max_workers: int,
) -> Iterable[List[bytes]]:
"""
Iterator function to encode a PyArrow table read from a parquet file to
Expand All @@ -115,6 +124,9 @@ def get_feature_row_chunks(
fs (feast.feature_set.FeatureSet):
FeatureSet describing parquet files.

ingestion_id (str):
UUID unique to this ingestion job.

max_workers (int):
Maximum number of workers to spawn.

Expand All @@ -128,7 +140,7 @@ def get_feature_row_chunks(
field_map = {field.name: field.dtype for field in fs.fields.values()}

pool = Pool(max_workers)
func = partial(_encode_pa_tables, file, feature_set, field_map)
func = partial(_encode_pa_tables, file, feature_set, field_map, ingestion_id)
for chunk in pool.imap(func, row_groups):
yield chunk
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public abstract class BigQueryFeatureSink implements FeatureSink {
"Event time for the FeatureRow";
public static final String BIGQUERY_CREATED_TIMESTAMP_FIELD_DESCRIPTION =
"Processing time of the FeatureRow ingestion in Feast\"";
public static final String BIGQUERY_INGESTION_ID_FIELD_DESCRIPTION =
"Unique id identifying groups of rows that have been ingested together";
public static final String BIGQUERY_JOB_ID_FIELD_DESCRIPTION =
"Feast import job ID for the FeatureRow";

Expand Down Expand Up @@ -108,10 +110,13 @@ public void prepareWrite(FeatureSetProto.FeatureSet featureSet) {
Table table = bigquery.getTable(tableId);
if (table != null) {
log.info(
"Writing to existing BigQuery table '{}:{}.{}'",
getProjectId(),
"Updating and writing to existing BigQuery table '{}:{}.{}'",
datasetId.getProject(),
datasetId.getDataset(),
tableName);
TableDefinition tableDefinition = createBigQueryTableDefinition(featureSet.getSpec());
TableInfo tableInfo = TableInfo.of(tableId, tableDefinition);
bigquery.update(tableInfo);
return;
}

Expand Down Expand Up @@ -166,6 +171,8 @@ private TableDefinition createBigQueryTableDefinition(FeatureSetProto.FeatureSet
"created_timestamp",
Pair.of(
StandardSQLTypeName.TIMESTAMP, BIGQUERY_CREATED_TIMESTAMP_FIELD_DESCRIPTION),
"ingestion_id",
Pair.of(StandardSQLTypeName.STRING, BIGQUERY_INGESTION_ID_FIELD_DESCRIPTION),
"job_id",
Pair.of(StandardSQLTypeName.STRING, BIGQUERY_JOB_ID_FIELD_DESCRIPTION));
for (Map.Entry<String, Pair<StandardSQLTypeName, String>> entry :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
public class FeatureRowToTableRow implements SerializableFunction<FeatureRow, TableRow> {
private static final String EVENT_TIMESTAMP_COLUMN = "event_timestamp";
private static final String CREATED_TIMESTAMP_COLUMN = "created_timestamp";
private static final String INGESTION_ID_COLUMN = "ingestion_id";
private static final String JOB_ID_COLUMN = "job_id";
private final String jobId;

Expand All @@ -47,6 +48,7 @@ public TableRow apply(FeatureRow featureRow) {
TableRow tableRow = new TableRow();
tableRow.set(EVENT_TIMESTAMP_COLUMN, Timestamps.toString(featureRow.getEventTimestamp()));
tableRow.set(CREATED_TIMESTAMP_COLUMN, Instant.now().toString());
tableRow.set(INGESTION_ID_COLUMN, featureRow.getIngestionId());
tableRow.set(JOB_ID_COLUMN, jobId);

for (Field field : featureRow.getFieldsList()) {
Expand Down