Skip to content

Commit

Permalink
Rename to ingestion_id
Browse files Browse the repository at this point in the history
  • Loading branch information
zhilingc committed May 1, 2020
1 parent fc0656d commit 6bedf64
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 21 deletions.
2 changes: 1 addition & 1 deletion protos/feast/core/Store.proto
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ message Store {
// ====================|==================|================================
// - event_timestamp | TIMESTAMP | event time of the FeatureRow
// - created_timestamp | TIMESTAMP | processing time of the ingestion of the FeatureRow
// - dataset_id | STRING | identifier of the batch dataset a row belongs to
// - ingestion_id | STRING | unique id identifying groups of rows that have been ingested together
// - job_id | STRING | identifier for the job that writes the FeatureRow to the corresponding BigQuery table
//
// BigQuery table created will be partitioned by the field "event_timestamp"
Expand Down
5 changes: 2 additions & 3 deletions protos/feast/types/FeatureRow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ message FeatureRow {
// rows, and write the values to the correct tables.
string feature_set = 6;

// Identifier tying this feature row to a specific ingestion dataset. For
// batch loads, this dataset id can be attributed to a single ingestion job.
string dataset_id = 7;
// Identifier tying this feature row to a specific ingestion job.
string ingestion_id = 7;
}
6 changes: 3 additions & 3 deletions sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -826,15 +826,15 @@ def ingest(
# Loop optimization declarations
produce = producer.produce
flush = producer.flush
dataset_id = _generate_dataset_id(feature_set)
ingestion_id = _generate_ingestion_id(feature_set)

# Transform and push data to Kafka
if feature_set.source.source_type == "Kafka":
for chunk in get_feature_row_chunks(
file=dest_path,
row_groups=list(range(pq_file.num_row_groups)),
fs=feature_set,
dataset_id=dataset_id,
ingestion_id=ingestion_id,
max_workers=max_workers,
):

Expand Down Expand Up @@ -919,7 +919,7 @@ def _build_feature_references(
return features


def _generate_dataset_id(feature_set: FeatureSet) -> str:
def _generate_ingestion_id(feature_set: FeatureSet) -> str:
"""
Generates a UUID from the feature set name, version, and the current time.
Expand Down
20 changes: 12 additions & 8 deletions sdk/python/feast/loaders/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@


def _encode_pa_tables(
file: str, feature_set: str, fields: dict, dataset_id: str, row_group_idx: int
file: str, feature_set: str, fields: dict, ingestion_id: str, row_group_idx: int
) -> List[bytes]:
"""
Helper function to encode a PyArrow table(s) read from parquet file(s) into
Expand All @@ -49,8 +49,8 @@ def _encode_pa_tables(
fields (dict[str, enum.Enum.ValueType]):
A mapping of field names to their value types.
dataset_id (str):
UUID unique to this dataset.
ingestion_id (str):
UUID unique to this ingestion job.
row_group_idx(int):
Row group index to read and encode into byte like FeatureRow
Expand Down Expand Up @@ -86,7 +86,7 @@ def _encode_pa_tables(
feature_row = FeatureRow(
event_timestamp=datetime_col[row_idx],
feature_set=feature_set,
dataset_id=dataset_id,
ingestion_id=ingestion_id,
)
# Loop optimization declaration
ext = feature_row.fields.extend
Expand All @@ -102,7 +102,11 @@ def _encode_pa_tables(


def get_feature_row_chunks(
file: str, row_groups: List[int], fs: FeatureSet, dataset_id: str, max_workers: int
file: str,
row_groups: List[int],
fs: FeatureSet,
ingestion_id: str,
max_workers: int,
) -> Iterable[List[bytes]]:
"""
Iterator function to encode a PyArrow table read from a parquet file to
Expand All @@ -120,8 +124,8 @@ def get_feature_row_chunks(
fs (feast.feature_set.FeatureSet):
FeatureSet describing parquet files.
dataset_id (str):
UUID unique to this dataset.
ingestion_id (str):
UUID unique to this ingestion job.
max_workers (int):
Maximum number of workers to spawn.
Expand All @@ -136,7 +140,7 @@ def get_feature_row_chunks(
field_map = {field.name: field.dtype for field in fs.fields.values()}

pool = Pool(max_workers)
func = partial(_encode_pa_tables, file, feature_set, field_map, dataset_id)
func = partial(_encode_pa_tables, file, feature_set, field_map, ingestion_id)
for chunk in pool.imap(func, row_groups):
yield chunk
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ public abstract class BigQueryFeatureSink implements FeatureSink {
"Event time for the FeatureRow";
public static final String BIGQUERY_CREATED_TIMESTAMP_FIELD_DESCRIPTION =
"Processing time of the FeatureRow ingestion in Feast\"";
public static final String BIGQUERY_DATASET_ID_FIELD_DESCRIPTION =
"Identifier of the batch dataset a row belongs to";
public static final String BIGQUERY_INGESTION_ID_FIELD_DESCRIPTION =
"Unique id identifying groups of rows that have been ingested together";
public static final String BIGQUERY_JOB_ID_FIELD_DESCRIPTION =
"Feast import job ID for the FeatureRow";

Expand Down Expand Up @@ -171,8 +171,8 @@ private TableDefinition createBigQueryTableDefinition(FeatureSetProto.FeatureSet
"created_timestamp",
Pair.of(
StandardSQLTypeName.TIMESTAMP, BIGQUERY_CREATED_TIMESTAMP_FIELD_DESCRIPTION),
"dataset_id",
Pair.of(StandardSQLTypeName.STRING, BIGQUERY_DATASET_ID_FIELD_DESCRIPTION),
"ingestion_id",
Pair.of(StandardSQLTypeName.STRING, BIGQUERY_INGESTION_ID_FIELD_DESCRIPTION),
"job_id",
Pair.of(StandardSQLTypeName.STRING, BIGQUERY_JOB_ID_FIELD_DESCRIPTION));
for (Map.Entry<String, Pair<StandardSQLTypeName, String>> entry :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
public class FeatureRowToTableRow implements SerializableFunction<FeatureRow, TableRow> {
private static final String EVENT_TIMESTAMP_COLUMN = "event_timestamp";
private static final String CREATED_TIMESTAMP_COLUMN = "created_timestamp";
private static final String DATASET_ID_COLUMN = "dataset_id";
private static final String INGESTION_ID_COLUMN = "ingestion_id";
private static final String JOB_ID_COLUMN = "job_id";
private final String jobId;

Expand All @@ -48,7 +48,7 @@ public TableRow apply(FeatureRow featureRow) {
TableRow tableRow = new TableRow();
tableRow.set(EVENT_TIMESTAMP_COLUMN, Timestamps.toString(featureRow.getEventTimestamp()));
tableRow.set(CREATED_TIMESTAMP_COLUMN, Instant.now().toString());
tableRow.set(DATASET_ID_COLUMN, featureRow.getDatasetId());
tableRow.set(INGESTION_ID_COLUMN, featureRow.getIngestionId());
tableRow.set(JOB_ID_COLUMN, jobId);

for (Field field : featureRow.getFieldsList()) {
Expand Down

0 comments on commit 6bedf64

Please sign in to comment.