From 2fcddaa154d8fc75d442d71cb15bb7174db35d2a Mon Sep 17 00:00:00 2001 From: David Heryanto Date: Thu, 26 Dec 2019 09:11:19 +0800 Subject: [PATCH] Always set destination table in BigQuery query config in Feast Batch Serving so it can handle large results (#392) * Update BQ query config to always set destination table, so that it can work with large results Refer to: https://cloud.google.com/bigquery/quotas#query_jobs, maximum reponse-size bullet point. * Replace prefix for temp table name * Set expiry on entity rows table * Include exception message in error description GRPC client such as Feast Python SDK will usually not show error cause only error description * Code cleanup * Update batch-retrieval e2e test. Output rows may not have the same order as requested entity rows --- .../service/BigQueryServingService.java | 46 +++++++++++++------ .../bigquery/BatchRetrievalQueryRunnable.java | 35 ++++++++++++-- .../store/bigquery/SubqueryCallable.java | 14 ++++++ tests/e2e/bq-batch-retrieval.py | 5 +- 4 files changed, 80 insertions(+), 20 deletions(-) diff --git a/serving/src/main/java/feast/serving/service/BigQueryServingService.java b/serving/src/main/java/feast/serving/service/BigQueryServingService.java index 701e146ee5..7a950e3c8a 100644 --- a/serving/src/main/java/feast/serving/service/BigQueryServingService.java +++ b/serving/src/main/java/feast/serving/service/BigQueryServingService.java @@ -32,6 +32,7 @@ import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.Table; import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; import com.google.cloud.storage.Storage; import feast.core.FeatureSetProto.FeatureSetSpec; import feast.serving.ServingAPIProto; @@ -56,10 +57,13 @@ import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors; +import org.joda.time.Duration; import org.slf4j.Logger; public class BigQueryServingService implements ServingService { + // Default no of millis for which a temporary table should exist before it is deleted in BigQuery. + public static final long TEMP_TABLE_EXPIRY_DURATION_MS = Duration.standardDays(1).getMillis(); private static final Logger log = org.slf4j.LoggerFactory.getLogger(BigQueryServingService.class); private final BigQuery bigquery; @@ -182,15 +186,15 @@ private Table loadEntities(DatasetSource datasetSource) { switch (datasetSource.getDatasetSourceCase()) { case FILE_SOURCE: try { - String tableName = generateTemporaryTableName(); - log.info("Loading entity dataset to table {}.{}.{}", projectId, datasetId, tableName); - TableId tableId = TableId.of(projectId, datasetId, tableName); - // Currently only avro supported + // Currently only AVRO format is supported if (datasetSource.getFileSource().getDataFormat() != DataFormat.DATA_FORMAT_AVRO) { throw Status.INVALID_ARGUMENT - .withDescription("Invalid file format, only avro supported") + .withDescription("Invalid file format, only AVRO is supported.") .asRuntimeException(); } + + TableId tableId = TableId.of(projectId, datasetId, createTempTableName()); + log.info("Loading entity rows to: {}.{}.{}", projectId, datasetId, tableId.getTable()); LoadJobConfiguration loadJobConfiguration = LoadJobConfiguration.of( tableId, datasetSource.getFileSource().getFileUrisList(), FormatOptions.avro()); @@ -198,6 +202,13 @@ private Table loadEntities(DatasetSource datasetSource) { loadJobConfiguration.toBuilder().setUseAvroLogicalTypes(true).build(); Job job = bigquery.create(JobInfo.of(loadJobConfiguration)); job.waitFor(); + TableInfo expiry = + bigquery + .getTable(tableId) + .toBuilder() + .setExpirationTime(System.currentTimeMillis() + TEMP_TABLE_EXPIRY_DURATION_MS) + .build(); + bigquery.update(expiry); loadedEntityTable = bigquery.getTable(tableId); if (!loadedEntityTable.exists()) { throw new RuntimeException( @@ -207,7 +218,7 @@ private Table loadEntities(DatasetSource datasetSource) { } catch (Exception e) { log.error("Exception has occurred in loadEntities method: ", e); throw Status.INTERNAL - .withDescription("Failed to load entity dataset into store") + .withDescription("Failed to load entity dataset into store: " + e.toString()) .withCause(e) .asRuntimeException(); } @@ -219,20 +230,23 @@ private Table loadEntities(DatasetSource datasetSource) { } } - private String generateTemporaryTableName() { - String source = String.format("feastserving%d", System.currentTimeMillis()); - String guid = UUID.nameUUIDFromBytes(source.getBytes()).toString(); - String suffix = guid.substring(0, Math.min(guid.length(), 10)).replaceAll("-", ""); - return String.format("temp_%s", suffix); - } - private TableId generateUUIDs(Table loadedEntityTable) { try { String uuidQuery = createEntityTableUUIDQuery(generateFullTableName(loadedEntityTable.getTableId())); - QueryJobConfiguration queryJobConfig = QueryJobConfiguration.newBuilder(uuidQuery).build(); + QueryJobConfiguration queryJobConfig = + QueryJobConfiguration.newBuilder(uuidQuery) + .setDestinationTable(TableId.of(projectId, datasetId, createTempTableName())) + .build(); Job queryJob = bigquery.create(JobInfo.of(queryJobConfig)); queryJob.waitFor(); + TableInfo expiry = + bigquery + .getTable(queryJobConfig.getDestinationTable()) + .toBuilder() + .setExpirationTime(System.currentTimeMillis() + TEMP_TABLE_EXPIRY_DURATION_MS) + .build(); + bigquery.update(expiry); queryJobConfig = queryJob.getConfiguration(); return queryJobConfig.getDestinationTable(); } catch (InterruptedException | BigQueryException e) { @@ -242,4 +256,8 @@ private TableId generateUUIDs(Table loadedEntityTable) { .asRuntimeException(); } } + + public static String createTempTableName() { + return "_" + UUID.randomUUID().toString().replace("-", ""); + } } diff --git a/serving/src/main/java/feast/serving/store/bigquery/BatchRetrievalQueryRunnable.java b/serving/src/main/java/feast/serving/store/bigquery/BatchRetrievalQueryRunnable.java index 2d51547d0e..47587e1d0e 100644 --- a/serving/src/main/java/feast/serving/store/bigquery/BatchRetrievalQueryRunnable.java +++ b/serving/src/main/java/feast/serving/store/bigquery/BatchRetrievalQueryRunnable.java @@ -16,6 +16,8 @@ */ package feast.serving.store.bigquery; +import static feast.serving.service.BigQueryServingService.TEMP_TABLE_EXPIRY_DURATION_MS; +import static feast.serving.service.BigQueryServingService.createTempTableName; import static feast.serving.store.bigquery.QueryTemplater.createTimestampLimitQuery; import com.google.auto.value.AutoValue; @@ -27,6 +29,8 @@ import com.google.cloud.bigquery.Job; import com.google.cloud.bigquery.JobInfo; import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; import com.google.cloud.bigquery.TableResult; import com.google.cloud.storage.Blob; import com.google.cloud.storage.Storage; @@ -175,15 +179,17 @@ Job runBatchQuery(List featureSetQueries) ExecutorCompletionService executorCompletionService = new ExecutorCompletionService<>(executorService); - List featureSetInfos = new ArrayList<>(); for (int i = 0; i < featureSetQueries.size(); i++) { QueryJobConfiguration queryJobConfig = - QueryJobConfiguration.newBuilder(featureSetQueries.get(i)).build(); + QueryJobConfiguration.newBuilder(featureSetQueries.get(i)) + .setDestinationTable(TableId.of(projectId(), datasetId(), createTempTableName())) + .build(); Job subqueryJob = bigquery().create(JobInfo.of(queryJobConfig)); executorCompletionService.submit( SubqueryCallable.builder() + .setBigquery(bigquery()) .setFeatureSetInfo(featureSetInfos().get(i)) .setSubqueryJob(subqueryJob) .build()); @@ -191,7 +197,8 @@ Job runBatchQuery(List featureSetQueries) for (int i = 0; i < featureSetQueries.size(); i++) { try { - FeatureSetInfo featureSetInfo = executorCompletionService.take().get(SUBQUERY_TIMEOUT_SECS, TimeUnit.SECONDS); + FeatureSetInfo featureSetInfo = + executorCompletionService.take().get(SUBQUERY_TIMEOUT_SECS, TimeUnit.SECONDS); featureSetInfos.add(featureSetInfo); } catch (InterruptedException | ExecutionException | TimeoutException e) { jobService() @@ -214,9 +221,20 @@ Job runBatchQuery(List featureSetQueries) String joinQuery = QueryTemplater.createJoinQuery( featureSetInfos, entityTableColumnNames(), entityTableName()); - QueryJobConfiguration queryJobConfig = QueryJobConfiguration.newBuilder(joinQuery).build(); + QueryJobConfiguration queryJobConfig = + QueryJobConfiguration.newBuilder(joinQuery) + .setDestinationTable(TableId.of(projectId(), datasetId(), createTempTableName())) + .build(); queryJob = bigquery().create(JobInfo.of(queryJobConfig)); queryJob.waitFor(); + TableInfo expiry = + bigquery() + .getTable(queryJobConfig.getDestinationTable()) + .toBuilder() + .setExpirationTime( + System.currentTimeMillis() + TEMP_TABLE_EXPIRY_DURATION_MS) + .build(); + bigquery().update(expiry); return queryJob; } @@ -248,10 +266,19 @@ private FieldValueList getTimestampLimits(String entityTableName) { QueryJobConfiguration getTimestampLimitsQuery = QueryJobConfiguration.newBuilder(createTimestampLimitQuery(entityTableName)) .setDefaultDataset(DatasetId.of(projectId(), datasetId())) + .setDestinationTable(TableId.of(projectId(), datasetId(), createTempTableName())) .build(); try { Job job = bigquery().create(JobInfo.of(getTimestampLimitsQuery)); TableResult getTimestampLimitsQueryResult = job.waitFor().getQueryResults(); + TableInfo expiry = + bigquery() + .getTable(getTimestampLimitsQuery.getDestinationTable()) + .toBuilder() + .setExpirationTime(System.currentTimeMillis() + TEMP_TABLE_EXPIRY_DURATION_MS) + .build(); + bigquery().update(expiry); + FieldValueList result = null; for (FieldValueList fields : getTimestampLimitsQueryResult.getValues()) { result = fields; diff --git a/serving/src/main/java/feast/serving/store/bigquery/SubqueryCallable.java b/serving/src/main/java/feast/serving/store/bigquery/SubqueryCallable.java index 3c28194e7a..b2a9009a74 100644 --- a/serving/src/main/java/feast/serving/store/bigquery/SubqueryCallable.java +++ b/serving/src/main/java/feast/serving/store/bigquery/SubqueryCallable.java @@ -16,13 +16,16 @@ */ package feast.serving.store.bigquery; +import static feast.serving.service.BigQueryServingService.TEMP_TABLE_EXPIRY_DURATION_MS; import static feast.serving.store.bigquery.QueryTemplater.generateFullTableName; import com.google.auto.value.AutoValue; +import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryException; import com.google.cloud.bigquery.Job; import com.google.cloud.bigquery.QueryJobConfiguration; import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; import feast.serving.store.bigquery.model.FeatureSetInfo; import java.util.concurrent.Callable; @@ -33,6 +36,8 @@ @AutoValue public abstract class SubqueryCallable implements Callable { + public abstract BigQuery bigquery(); + public abstract FeatureSetInfo featureSetInfo(); public abstract Job subqueryJob(); @@ -44,6 +49,8 @@ public static Builder builder() { @AutoValue.Builder public abstract static class Builder { + public abstract Builder setBigquery(BigQuery bigquery); + public abstract Builder setFeatureSetInfo(FeatureSetInfo featureSetInfo); public abstract Builder setSubqueryJob(Job subqueryJob); @@ -57,6 +64,13 @@ public FeatureSetInfo call() throws BigQueryException, InterruptedException { subqueryJob().waitFor(); subqueryConfig = subqueryJob().getConfiguration(); TableId destinationTable = subqueryConfig.getDestinationTable(); + TableInfo expiry = + bigquery() + .getTable(destinationTable) + .toBuilder() + .setExpirationTime(System.currentTimeMillis() + TEMP_TABLE_EXPIRY_DURATION_MS) + .build(); + bigquery().update(expiry); String fullTablePath = generateFullTableName(destinationTable); return new FeatureSetInfo(featureSetInfo(), fullTablePath); diff --git a/tests/e2e/bq-batch-retrieval.py b/tests/e2e/bq-batch-retrieval.py index 067dd14a2f..2d6668eaa8 100644 --- a/tests/e2e/bq-batch-retrieval.py +++ b/tests/e2e/bq-batch-retrieval.py @@ -14,6 +14,7 @@ from feast.type_map import ValueType from google.protobuf.duration_pb2 import Duration +pd.set_option('display.max_columns', None) @pytest.fixture(scope="module") def core_url(pytestconfig): @@ -112,8 +113,8 @@ def test_additional_columns_in_entity_table(client): feature_retrieval_job = client.get_batch_features( entity_rows=entity_df, feature_ids=["additional_columns:1:feature_value"] ) - output = feature_retrieval_job.to_dataframe() - print(output.head()) + output = feature_retrieval_job.to_dataframe().sort_values(by=["entity_id"]) + print(output.head(10)) assert np.allclose(output["additional_float_col"], entity_df["additional_float_col"]) assert output["additional_string_col"].to_list() == entity_df["additional_string_col"].to_list()