Skip to content

Commit

Permalink
fix create_dataset (#208)
Browse files Browse the repository at this point in the history
* fix create_dataset to create table inside feast's dataset

* fix tests for create dataset

* Format code with Google style

* Remove unused clock variable in BigQueryTrainingDatasetCreator

* Update comment for createDataset in BigQueryTrainingDatasetCreator

* Clean up print function for createBqTableDescription()

* fix tests on create_dataset

- remove Clock
- update public dataset test

* Fix typo

* Mount service account during unit test in prow so we can test with actual BigQuery

* Add TODO in BigQueryTrainingDatasetCreatorTest
  • Loading branch information
budi authored and feast-ci-bot committed Jun 12, 2019
1 parent d500001 commit 3a29339
Show file tree
Hide file tree
Showing 12 changed files with 676 additions and 140 deletions.
22 changes: 22 additions & 0 deletions .prow/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,38 @@ presubmits:
decorate: true
always_run: true
spec:
volumes:
- name: service-account
secret:
secretName: prow-service-account
containers:
- image: maven:3.6-jdk-8
volumeMounts:
- name: service-account
mountPath: /etc/service-account
readOnly: true
env:
- name: GOOGLE_APPLICATION_CREDENTIALS
value: /etc/service-account/service-account.json
command: [".prow/scripts/run_unit_test.sh", "--component", "core"]

- name: unit-test-ingestion
decorate: true
always_run: true
spec:
volumes:
- name: service-account
secret:
secretName: prow-service-account
containers:
- image: maven:3.6-jdk-8
volumeMounts:
- name: service-account
mountPath: /etc/service-account
readOnly: true
env:
- name: GOOGLE_APPLICATION_CREDENTIALS
value: /etc/service-account/service-account.json
command: [".prow/scripts/run_unit_test.sh", "--component", "ingestion"]

- name: unit-test-serving
Expand Down
14 changes: 6 additions & 8 deletions core/src/main/java/feast/core/config/TrainingConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@
import com.hubspot.jinjava.Jinjava;
import feast.core.config.StorageConfig.StorageSpecs;
import feast.core.dao.FeatureInfoRepository;
import feast.core.training.BigQueryTraningDatasetCreator;
import feast.core.training.BigQueryDatasetTemplater;
import feast.core.training.BigQueryTraningDatasetCreator;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.time.Clock;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -31,18 +30,17 @@ public BigQueryDatasetTemplater getBigQueryTrainingDatasetTemplater(
Resource resource = new ClassPathResource("templates/bq_training.tmpl");
InputStream resourceInputStream = resource.getInputStream();
String tmpl = CharStreams.toString(new InputStreamReader(resourceInputStream, Charsets.UTF_8));
return new BigQueryDatasetTemplater(new Jinjava(), tmpl, storageSpecs.getWarehouseStorageSpec(),
featureInfoRepository);
return new BigQueryDatasetTemplater(
new Jinjava(), tmpl, storageSpecs.getWarehouseStorageSpec(), featureInfoRepository);
}

@Bean
public BigQueryTraningDatasetCreator getBigQueryTrainingDatasetCreator(
BigQueryDatasetTemplater templater, StorageSpecs storageSpecs,
BigQueryDatasetTemplater templater,
StorageSpecs storageSpecs,
@Value("${feast.core.projectId}") String projectId,
@Value("${feast.core.datasetPrefix}") String datasetPrefix) {
BigQuery bigquery = BigQueryOptions.newBuilder().setProjectId(projectId).build().getService();
Clock clock = Clock.systemUTC();
return new BigQueryTraningDatasetCreator(templater, clock,
projectId, datasetPrefix);
return new BigQueryTraningDatasetCreator(templater, projectId, datasetPrefix);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,54 +21,59 @@
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.JobException;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TableResult;
import com.google.common.base.Strings;
import com.google.protobuf.Timestamp;
import feast.core.DatasetServiceProto.DatasetInfo;
import feast.core.DatasetServiceProto.FeatureSet;
import feast.core.exception.TrainingDatasetCreationException;
import java.time.Clock;
import java.math.BigInteger;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class BigQueryTraningDatasetCreator {

private final BigQueryDatasetTemplater templater;
private final DateTimeFormatter formatter;
private final Clock clock;
private final String projectId;
private final String datasetPrefix;
private transient BigQuery bigQuery;


public BigQueryTraningDatasetCreator(
BigQueryDatasetTemplater templater,
Clock clock,
String projectId,
String datasetPrefix) {
this(templater, clock, projectId, datasetPrefix,
this(templater, projectId, datasetPrefix,
BigQueryOptions.newBuilder().setProjectId(projectId).build().getService());
}

public BigQueryTraningDatasetCreator(
BigQueryDatasetTemplater templater,
Clock clock,
String projectId,
String datasetPrefix,
BigQuery bigQuery) {
this.templater = templater;
this.clock = clock;
this.formatter = DateTimeFormatter.ofPattern("yyyyMMdd").withZone(ZoneId.of("UTC"));
this.projectId = projectId;
this.datasetPrefix = datasetPrefix;
this.bigQuery = bigQuery;
}

/**
* Create dataset for a feature set
* Create a training dataset for a feature set for features created between startDate (inclusive)
* and endDate (inclusive)
*
* @param featureSet feature set for which the training dataset should be created
* @param startDate starting date of the training dataset (inclusive)
Expand All @@ -85,58 +90,92 @@ public DatasetInfo createDataset(
String namePrefix) {
try {
String query = templater.createQuery(featureSet, startDate, endDate, limit);
String tableName = createBqTableName(startDate, endDate, namePrefix);
String bqDatasetName = createBqDatasetName(featureSet.getEntityName());

createBqDatasetIfMissing(bqDatasetName);

TableId destinationTable =
TableId.of(projectId, createBqDatasetName(featureSet.getEntityName()), tableName);
QueryJobConfiguration queryConfig =
QueryJobConfiguration.newBuilder(query)
.setAllowLargeResults(true)
.setDestinationTable(destinationTable)
.build();
JobOption jobOption = JobOption.fields();
bigQuery.query(queryConfig, jobOption);
String tableName =
createBqTableName(datasetPrefix, featureSet, startDate, endDate, namePrefix);
String tableDescription = createBqTableDescription(featureSet, startDate, endDate, query);

Map<String, String> options = templater.getStorageSpec().getOptionsMap();
String bq_dataset = options.get("dataset");
TableId destinationTableId = TableId.of(projectId, bq_dataset, tableName);

// Create the BigQuery table that will store the training dataset if not exists
if (bigQuery.getTable(destinationTableId) == null) {
QueryJobConfiguration queryConfig =
QueryJobConfiguration.newBuilder(query)
.setAllowLargeResults(true)
.setDestinationTable(destinationTableId)
.build();
JobOption jobOption = JobOption.fields();
TableResult res = bigQuery.query(queryConfig, jobOption);
if (res != null) {
Table destinationTable = bigQuery.getTable(destinationTableId);
TableInfo tableInfo =
destinationTable.toBuilder().setDescription(tableDescription).build();
bigQuery.update(tableInfo);
}
}

return DatasetInfo.newBuilder()
.setName(createTrainingDatasetName(namePrefix, featureSet.getEntityName(), tableName))
.setTableUrl(toTableUrl(destinationTable))
.setName(tableName)
.setTableUrl(toTableUrl(destinationTableId))
.build();
} catch (JobException e) {
log.error("Failed creating training dataset", e);
throw new TrainingDatasetCreationException("Failed creating training dataset", e);
} catch (InterruptedException e) {
log.error("Training dataset creation was interrupted", e);
throw new TrainingDatasetCreationException("Training dataset creation was interrupted", e);
throw new TrainingDatasetCreationException("Training dataset creation was interrupted",
e);
}
}

private void createBqDatasetIfMissing(String bqDatasetName) {
if (bigQuery.getDataset(bqDatasetName) != null) {
return;
}
private String createBqTableName(
String datasetPrefix,
FeatureSet featureSet,
Timestamp startDate,
Timestamp endDate,
String namePrefix) {

// create dataset
bigQuery.create(com.google.cloud.bigquery.DatasetInfo.of(bqDatasetName));
}
List<String> features = new ArrayList(featureSet.getFeatureIdsList());
Collections.sort(features);

String datasetId = String.format("%s_%s_%s", features, startDate, endDate);
StringBuilder hashText;

// create hash from datasetId
try {
MessageDigest md = MessageDigest.getInstance("SHA-1");
byte[] messageDigest = md.digest(datasetId.getBytes());
BigInteger no = new BigInteger(1, messageDigest);
hashText = new StringBuilder(no.toString(16));
while (hashText.length() < 32) {
hashText.insert(0, "0");
}
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}

private String createBqTableName(Timestamp startDate, Timestamp endDate, String namePrefix) {
String currentTime = String.valueOf(clock.millis());
if (!Strings.isNullOrEmpty(namePrefix)) {
// only alphanumeric and underscore are allowed
namePrefix = namePrefix.replaceAll("[^a-zA-Z0-9_]", "_");
return String.format(
"%s_%s_%s_%s",
namePrefix, currentTime, formatTimestamp(startDate), formatTimestamp(endDate));
"%s_%s_%s_%s", datasetPrefix, featureSet.getEntityName(), namePrefix,
hashText.toString());
}

return String.format(
"%s_%s_%s", currentTime, formatTimestamp(startDate), formatTimestamp(endDate));
"%s_%s_%s", datasetPrefix, featureSet.getEntityName(), hashText.toString());
}

private String createBqDatasetName(String entity) {
return String.format("%s_%s", datasetPrefix, entity);
private String createBqTableDescription(
FeatureSet featureSet, Timestamp startDate, Timestamp endDate, String query) {
return String.format(
"Feast Dataset for %s features.\nContains data from %s to %s.\n Last edited at %s.\n\n-----\n\n%s",
featureSet.getEntityName(),
formatTimestamp(startDate),
formatTimestamp(endDate),
Instant.now(),
query);
}

private String formatTimestamp(Timestamp timestamp) {
Expand All @@ -148,11 +187,4 @@ private String toTableUrl(TableId tableId) {
return String.format(
"%s.%s.%s", tableId.getProject(), tableId.getDataset(), tableId.getTable());
}

private String createTrainingDatasetName(String namePrefix, String entityName, String tableName) {
if (!Strings.isNullOrEmpty(namePrefix)) {
return tableName;
}
return String.format("%s_%s", entityName, tableName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,35 @@
*/
package feast.core.training;

import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.cloud.bigquery.BigQuery;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Timestamps;
import feast.core.DatasetServiceProto.DatasetInfo;
import feast.core.DatasetServiceProto.FeatureSet;
import feast.core.storage.BigQueryStorageManager;
import feast.specs.StorageSpecProto.StorageSpec;
import java.time.Clock;
import java.time.Instant;
import java.util.Arrays;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

// TODO: Should consider testing with "actual" BigQuery vs mocking it
// because the mocked BigQuery client is very basic and may miss important functionalities
// such as an actual table / dataset is actually created
// In the test method, should probably add a condition so that tests can be skipped if
// the user running the tests do not have permission to manage BigQuery (although ideally they should have)
// Example of adding the condition whether or not to accept the test result as valid:
// https://stackoverflow.com/questions/1689242/conditionally-ignoring-tests-in-junit-4

public class BigQueryTraningDatasetCreatorTest {

public static final String projectId = "the-project";
Expand All @@ -48,26 +55,23 @@ public class BigQueryTraningDatasetCreatorTest {
private BigQueryDatasetTemplater templater;
@Mock
private BigQuery bq;
@Mock
private Clock clock;

@Before
public void setUp() throws Exception {
public void setUp() {
MockitoAnnotations.initMocks(this);
when(templater.getStorageSpec()).thenReturn(StorageSpec.newBuilder()
.setId("BIGQUERY1")
.setType(BigQueryStorageManager.TYPE)
.putOptions("project", "project")
.putOptions("dataset", "dataset")
.build());
creator = new BigQueryTraningDatasetCreator(templater, clock, projectId, datasetPrefix, bq);
creator = new BigQueryTraningDatasetCreator(templater, projectId, datasetPrefix, bq);

when(templater.createQuery(
any(FeatureSet.class), any(Timestamp.class), any(Timestamp.class), anyLong()))
.thenReturn("SELECT * FROM `project.dataset.table`");
}


@Test
public void shouldCreateCorrectDatasetIfPrefixNotSpecified() {
String entityName = "myentity";
Expand All @@ -85,14 +89,15 @@ public void shouldCreateCorrectDatasetIfPrefixNotSpecified() {
long limit = 999;
String namePrefix = "";

DatasetInfo dsInfo =
creator.createDataset(featureSet, startDate, endDate, limit, namePrefix);
assertThat(dsInfo.getName(), equalTo("myentity_0_20180101_20190101"));
DatasetInfo dsInfo = creator.createDataset(featureSet, startDate, endDate, limit, namePrefix);
assertThat(
dsInfo.getName(), equalTo("feast_myentity_b0009f0f7df634ddc130571319e0deb9742eb1da"));
assertThat(
dsInfo.getTableUrl(),
equalTo(
String.format(
"%s.%s_%s.%s", projectId, datasetPrefix, entityName, "0_20180101_20190101")));
"%s.dataset.%s_%s_%s",
projectId, datasetPrefix, entityName, "b0009f0f7df634ddc130571319e0deb9742eb1da")));
}

@Test
Expand All @@ -112,15 +117,20 @@ public void shouldCreateCorrectDatasetIfPrefixIsSpecified() {
long limit = 999;
String namePrefix = "mydataset";

DatasetInfo dsInfo =
creator.createDataset(featureSet, startDate, endDate, limit, namePrefix);
DatasetInfo dsInfo = creator.createDataset(featureSet, startDate, endDate, limit, namePrefix);
assertThat(
dsInfo.getTableUrl(),
equalTo(
String.format(
"%s.%s_%s.%s", projectId, datasetPrefix, entityName,
"mydataset_0_20180101_20190101")));
assertThat(dsInfo.getName(), equalTo("mydataset_0_20180101_20190101"));
"%s.dataset.%s_%s_%s_%s",
projectId,
datasetPrefix,
entityName,
namePrefix,
"b0009f0f7df634ddc130571319e0deb9742eb1da")));
assertThat(
dsInfo.getName(),
equalTo("feast_myentity_mydataset_b0009f0f7df634ddc130571319e0deb9742eb1da"));
}

@Test
Expand Down
Loading

0 comments on commit 3a29339

Please sign in to comment.