Skip to content

Commit

Permalink
BigQuery: Added listPartitions. (#4923)
Browse files Browse the repository at this point in the history
* added listPartitions

* added unit test

* modified code

* modified unit test for listPartitions

* added integration test

* Fix table name

* Fix integration test
  • Loading branch information
Praful Makani authored and sduskis committed Apr 9, 2019
1 parent 308bc15 commit babb4ac
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,12 @@ public int hashCode() {
*/
Page<Table> listTables(DatasetId datasetId, TableListOption... options);

/**
* @param tableId
* @return A list of the partition ids present in the partitioned table
*/
List<String> listPartitions(TableId tableId);

/**
* Sends an insert all request.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -507,6 +508,26 @@ public Page<Table> listTables(DatasetId datasetId, TableListOption... options) {
return listTables(completeDatasetId, getOptions(), optionMap(options));
}

@Override
public List<String> listPartitions(TableId tableId) {
List<String> partitions = new ArrayList<String>();
Table metaTable =
getTable(TableId.of(tableId.getDataset(), tableId.getTable() + "$__PARTITIONS_SUMMARY__"));
Schema metaSchema = metaTable.getDefinition().getSchema();
String partition_id = null;
for (Field field : metaSchema.getFields()) {
if (field.getName().equals("partition_id")) {
partition_id = field.getName();
break;
}
}
TableResult result = metaTable.list(metaSchema);
for (FieldValueList list : result.iterateAll()) {
partitions.add(list.get(partition_id).getStringValue());
}
return partitions;
}

private static Page<Table> listTables(
final DatasetId datasetId,
final BigQueryOptions serviceOptions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,128 @@ public class BigQueryImplTest {
private static final BigQuery.TableOption TABLE_OPTION_FIELDS =
BigQuery.TableOption.fields(BigQuery.TableField.SCHEMA, BigQuery.TableField.ETAG);

// Table list partitions
private static final Field PROJECT_ID_FIELD =
Field.newBuilder("project_id", LegacySQLTypeName.STRING).setMode(Field.Mode.NULLABLE).build();
private static final Field DATASET_ID_FIELD =
Field.newBuilder("dataset_id", LegacySQLTypeName.STRING).setMode(Field.Mode.NULLABLE).build();
private static final Field TABLE_ID_FIELD =
Field.newBuilder("table_id", LegacySQLTypeName.STRING).setMode(Field.Mode.NULLABLE).build();
private static final Field PARTITION_ID_FIELD =
Field.newBuilder("partition_id", LegacySQLTypeName.STRING)
.setMode(Field.Mode.NULLABLE)
.build();
private static final Field CREATION_TIME_FIELD =
Field.newBuilder("creation_time", LegacySQLTypeName.INTEGER)
.setMode(Field.Mode.NULLABLE)
.build();
private static final Field CREATION_TIMESTAMP_FIELD =
Field.newBuilder("creation_timestamp", LegacySQLTypeName.TIMESTAMP)
.setMode(Field.Mode.NULLABLE)
.build();
private static final Field LAST_MODIFIED_FIELD =
Field.newBuilder("last_modified_time", LegacySQLTypeName.INTEGER)
.setMode(Field.Mode.NULLABLE)
.build();
private static final Field LAST_MODIFIED_TIMESTAMP_FIELD =
Field.newBuilder("last_modified_timestamp", LegacySQLTypeName.TIMESTAMP)
.setMode(Field.Mode.NULLABLE)
.build();
private static final Schema SCHEMA_PARTITIONS =
Schema.of(
PROJECT_ID_FIELD,
DATASET_ID_FIELD,
TABLE_ID_FIELD,
PARTITION_ID_FIELD,
CREATION_TIME_FIELD,
CREATION_TIMESTAMP_FIELD,
LAST_MODIFIED_FIELD,
LAST_MODIFIED_TIMESTAMP_FIELD);
private static final TableDefinition TABLE_DEFINITION_PARTITIONS =
StandardTableDefinition.newBuilder()
.setSchema(SCHEMA_PARTITIONS)
.setNumBytes(0L)
.setNumLongTermBytes(0L)
.setNumRows(3L)
.setLocation("unknown")
.build();
private static final TableInfo TABLE_INFO_PARTITIONS =
TableInfo.newBuilder(TABLE_ID, TABLE_DEFINITION_PARTITIONS)
.setEtag("ETAG")
.setCreationTime(1553689573240L)
.setLastModifiedTime(1553841163438L)
.setNumBytes(0L)
.setNumLongTermBytes(0L)
.setNumRows(BigInteger.valueOf(3L))
.build();
private static final TableCell TABLE_CELL1_PROJECT_ID = new TableCell().setV(PROJECT);
private static final TableCell TABLE_CELL1_DATASET_ID = new TableCell().setV(DATASET);
private static final TableCell TABLE_CELL1_TABLE_ID = new TableCell().setV(TABLE);
private static final TableCell TABLE_CELL1_PARTITION_ID = new TableCell().setV("20190327");
private static final TableCell TABLE_CELL1_CREATION_TIME = new TableCell().setV("1553694932498");
private static final TableCell TABLE_CELL1_CREATION_TIMESTAMP =
new TableCell().setV("1553694932.498");
private static final TableCell TABLE_CELL1_LAST_MODIFIED_TIME =
new TableCell().setV("1553694932989");
private static final TableCell TABLE_CELL1_LAST_MODIFIED_TIMESTAMP =
new TableCell().setV("1553694932.989");

private static final TableCell TABLE_CELL2_PARTITION_ID = new TableCell().setV("20190328");
private static final TableCell TABLE_CELL2_CREATION_TIME = new TableCell().setV("1553754224760");
private static final TableCell TABLE_CELL2_CREATION_TIMESTAMP =
new TableCell().setV("1553754224.76");
private static final TableCell TABLE_CELL2_LAST_MODIFIED_TIME =
new TableCell().setV("1553754225587");
private static final TableCell TABLE_CELL2_LAST_MODIFIED_TIMESTAMP =
new TableCell().setV("1553754225.587");

private static final TableCell TABLE_CELL3_PARTITION_ID = new TableCell().setV("20190329");
private static final TableCell TABLE_CELL3_CREATION_TIME = new TableCell().setV("1553841162879");
private static final TableCell TABLE_CELL3_CREATION_TIMESTAMP =
new TableCell().setV("1553841162.879");
private static final TableCell TABLE_CELL3_LAST_MODIFIED_TIME =
new TableCell().setV("1553841163438");
private static final TableCell TABLE_CELL3_LAST_MODIFIED_TIMESTAMP =
new TableCell().setV("1553841163.438");

private static final TableDataList TABLE_DATA_WITH_PARTITIONS =
new TableDataList()
.setTotalRows(3L)
.setRows(
ImmutableList.of(
new TableRow()
.setF(
ImmutableList.of(
TABLE_CELL1_PROJECT_ID,
TABLE_CELL1_DATASET_ID,
TABLE_CELL1_TABLE_ID,
TABLE_CELL1_PARTITION_ID,
TABLE_CELL1_CREATION_TIME,
TABLE_CELL1_CREATION_TIMESTAMP,
TABLE_CELL1_LAST_MODIFIED_TIME,
TABLE_CELL1_LAST_MODIFIED_TIMESTAMP)),
new TableRow()
.setF(
ImmutableList.of(
TABLE_CELL1_PROJECT_ID,
TABLE_CELL1_DATASET_ID,
TABLE_CELL1_TABLE_ID,
TABLE_CELL2_PARTITION_ID,
TABLE_CELL2_CREATION_TIME,
TABLE_CELL2_CREATION_TIMESTAMP,
TABLE_CELL2_LAST_MODIFIED_TIME,
TABLE_CELL2_LAST_MODIFIED_TIMESTAMP)),
new TableRow()
.setF(
ImmutableList.of(
TABLE_CELL1_PROJECT_ID,
TABLE_CELL1_DATASET_ID,
TABLE_CELL1_TABLE_ID,
TABLE_CELL3_PARTITION_ID,
TABLE_CELL3_CREATION_TIME,
TABLE_CELL3_CREATION_TIMESTAMP,
TABLE_CELL3_LAST_MODIFIED_TIME,
TABLE_CELL3_LAST_MODIFIED_TIMESTAMP))));
// Table list options
private static final BigQuery.TableListOption TABLE_LIST_PAGE_SIZE =
BigQuery.TableListOption.pageSize(42L);
Expand Down Expand Up @@ -639,6 +761,20 @@ public void testGetTable() {
assertEquals(new Table(bigquery, new TableInfo.BuilderImpl(TABLE_INFO_WITH_PROJECT)), table);
}

@Test
public void testListPartition() {
EasyMock.expect(
bigqueryRpcMock.getTable(
PROJECT, DATASET, "table$__PARTITIONS_SUMMARY__", EMPTY_RPC_OPTIONS))
.andReturn(TABLE_INFO_PARTITIONS.toPb());
EasyMock.expect(bigqueryRpcMock.listTableData(PROJECT, DATASET, TABLE, EMPTY_RPC_OPTIONS))
.andReturn(TABLE_DATA_WITH_PARTITIONS);
EasyMock.replay(bigqueryRpcMock);
bigquery = options.getService();
List<String> partition = bigquery.listPartitions(TABLE_ID_WITH_PROJECT);
assertEquals(3, partition.size());
}

@Test
public void testGetTableNotFoundWhenThrowIsDisabled() {
EasyMock.expect(bigqueryRpcMock.getTable(PROJECT, DATASET, TABLE, EMPTY_RPC_OPTIONS))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.junit.Assert.fail;

import com.google.api.gax.paging.Page;
import com.google.cloud.Date;
import com.google.cloud.RetryOption;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQuery.DatasetDeleteOption;
Expand Down Expand Up @@ -92,6 +93,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -641,6 +643,35 @@ public void testListTablesWithPartitioning() {
}
}

@Test
public void testListPartitions() throws InterruptedException {
String tableName = "test_table_partitions";
Date date = Date.fromJavaUtilDate(new java.util.Date());
String partitionDate = date.toString().replaceAll("-", "");
TableId tableId = TableId.of(DATASET, tableName + "$" + partitionDate);
String query =
String.format(
"CREATE OR REPLACE TABLE %s.%s ( StringField STRING )"
+ " PARTITION BY DATE(_PARTITIONTIME) "
+ "OPTIONS( partition_expiration_days=1)",
DATASET, tableName);
Job job = bigquery.create(JobInfo.of(QueryJobConfiguration.newBuilder(query).build()));
job.waitFor();
assertTrue(job.isDone());
try {
Map<String, Object> row = new HashMap<String, Object>();
row.put("StringField", "StringValue");
InsertAllRequest request = InsertAllRequest.newBuilder(tableId).addRow(row).build();
InsertAllResponse response = bigquery.insertAll(request);
assertFalse(response.hasErrors());
assertEquals(0, response.getInsertErrors().size());
List<String> partitions = bigquery.listPartitions(TableId.of(DATASET, tableName));
assertEquals(1, partitions.size());
} finally {
bigquery.delete(DATASET, tableName);
}
}

@Test
public void testUpdateTable() {
String tableName = "test_update_table";
Expand Down

0 comments on commit babb4ac

Please sign in to comment.