Skip to content

Commit

Permalink
[#1769] feat(partition): Support listPartitions for Hive catalog (#…
Browse files Browse the repository at this point in the history
…1799)

### What changes were proposed in this pull request?

Support `listPartitions` for Hive catalog

### Why are the changes needed?

Fix: #1769 

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?

tests added
  • Loading branch information
mchades authored Jan 31, 2024
1 parent f4b9d1c commit 2a0a388
Show file tree
Hide file tree
Showing 8 changed files with 218 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -57,7 +58,31 @@ public String[] listPartitionNames() {

@Override
public Partition[] listPartitions() {
throw new UnsupportedOperationException();
List<String> partitionNames;
List<org.apache.hadoop.hive.metastore.api.Partition> partitions;
try {
partitionNames =
table
.clientPool()
.run(c -> c.listPartitionNames(table.schemaName(), table.name(), (short) -1));
partitions =
table
.clientPool()
.run(c -> c.getPartitionsByNames(table.schemaName(), table.name(), partitionNames));
} catch (TException | InterruptedException e) {
throw new RuntimeException(e);
}

// should never happen
Preconditions.checkArgument(
partitionNames.size() == partitions.size(),
"oops?! partition names and partitions size are not equal: %s vs %s",
partitionNames.size(),
partitions.size());

return IntStream.range(0, partitionNames.size())
.mapToObj(i -> fromHivePartition(partitionNames.get(i), partitions.get(i)))
.toArray(Partition[]::new);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ public void testListPartitionNames() {
&& Arrays.asList(partitionNames).contains(existingPartition.name()));
}

@Test
public void testListPartitions() {
Partition[] partitions = hiveTable.supportPartitions().listPartitions();
// there maybe other partitions in the list, so we only check the added partition
Assertions.assertTrue(
partitions.length > 0 && Arrays.asList(partitions).contains(existingPartition));
}

@Test
public void testGetPartition() {
Partition partition = hiveTable.supportPartitions().getPartition(existingPartition.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.annotations.VisibleForTesting;
import java.net.URLEncoder;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import lombok.SneakyThrows;
Expand Down Expand Up @@ -124,7 +125,16 @@ public String getPartitionRequestPath() {

@Override
public Partition[] listPartitions() {
throw new UnsupportedOperationException();
Map<String, String> params = new HashMap<>();
params.put("details", "true");
PartitionListResponse resp =
restClient.get(
getPartitionRequestPath(),
params,
PartitionListResponse.class,
Collections.emptyMap(),
ErrorHandlers.partitionErrorHandler());
return resp.getPartitions();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,46 @@ public void testListPartitionNames() throws JsonProcessingException {
Assertions.assertEquals("table does not support partition operations", exception.getMessage());
}

@Test
public void testListPartitions() throws JsonProcessingException {
String partitionName = "p1";
RangePartitionDTO partition =
RangePartitionDTO.builder()
.withName(partitionName)
.withLower(
new LiteralDTO.Builder()
.withDataType(Types.IntegerType.get())
.withValue("1")
.build())
.withUpper(
new LiteralDTO.Builder()
.withDataType(Types.IntegerType.get())
.withValue("10")
.build())
.build();
String partitionPath =
withSlash(((RelationalTable) partitionedTable).getPartitionRequestPath());
PartitionListResponse resp = new PartitionListResponse(new PartitionDTO[] {partition});

buildMockResource(Method.GET, partitionPath, null, resp, SC_OK);

Partition[] partitions = partitionedTable.supportPartitions().listPartitions();
Assertions.assertEquals(1, partitions.length);
Assertions.assertTrue(partitions[0] instanceof RangePartition);
Assertions.assertEquals(partition, partitions[0]);

// test throws exception
ErrorResponse errorResp =
ErrorResponse.unsupportedOperation("table does not support partition operations");
buildMockResource(Method.GET, partitionPath, null, errorResp, SC_NOT_IMPLEMENTED);

UnsupportedOperationException exception =
Assertions.assertThrows(
UnsupportedOperationException.class,
() -> partitionedTable.supportPartitions().listPartitions());
Assertions.assertEquals("table does not support partition operations", exception.getMessage());
}

@Test
public void testGetPartition() throws JsonProcessingException {
String partitionName = "p1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,13 @@ public static IndexDTO[] toDTOs(Index[] indexes) {
return Arrays.stream(indexes).map(DTOConverters::toDTO).toArray(IndexDTO[]::new);
}

public static PartitionDTO[] toDTOs(Partition[] partitions) {
if (ArrayUtils.isEmpty(partitions)) {
return new PartitionDTO[0];
}
return Arrays.stream(partitions).map(DTOConverters::toDTO).toArray(PartitionDTO[]::new);
}

public static Distribution fromDTO(DistributionDTO distributionDTO) {
if (DistributionDTO.NONE.equals(distributionDTO) || null == distributionDTO) {
return Distributions.NONE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -670,6 +671,23 @@ public void testCreatePartitionedHiveTable() throws TException, InterruptedExcep

@Test
public void testListPartitionNames() throws TException, InterruptedException {
// test empty partitions
ColumnDTO[] columns = createColumns();
NameIdentifier nameIdentifier =
NameIdentifier.of(metalakeName, catalogName, schemaName, tableName);
Table nonPartitionedTable =
catalog
.asTableCatalog()
.createTable(
nameIdentifier,
columns,
TABLE_COMMENT,
ImmutableMap.of(),
Transforms.EMPTY_TRANSFORM);
String[] result = nonPartitionedTable.supportPartitions().listPartitionNames();
Assertions.assertEquals(0, result.length);

// test partitioned table
Table createdTable = preparePartitionedTable();

String[] partitionNames = createdTable.supportPartitions().listPartitionNames();
Expand All @@ -678,6 +696,61 @@ public void testListPartitionNames() throws TException, InterruptedException {
partitionNames);
}

@Test
public void testListPartitions() throws TException, InterruptedException {
// test empty partitions
ColumnDTO[] columns = createColumns();
NameIdentifier nameIdentifier =
NameIdentifier.of(metalakeName, catalogName, schemaName, tableName);
Table nonPartitionedTable =
catalog
.asTableCatalog()
.createTable(
nameIdentifier,
columns,
TABLE_COMMENT,
ImmutableMap.of(),
Transforms.EMPTY_TRANSFORM);
Partition[] result = nonPartitionedTable.supportPartitions().listPartitions();
Assertions.assertEquals(0, result.length);

// test partitioned table
Table createdTable = preparePartitionedTable();
String insertTemplate =
"INSERT INTO TABLE %s.%s "
+ "PARTITION (hive_col_name2='2023-01-02', hive_col_name3='gravitino_it_test2') "
+ "VALUES %s, %s";
sparkSession.sql(String.format(insertTemplate, schemaName, createdTable.name(), "(1)", "(2)"));

// update partition stats
String partition1 = "hive_col_name2='2023-01-01', hive_col_name3='gravitino_it_test'";
String partition2 = "hive_col_name2='2023-01-02', hive_col_name3='gravitino_it_test2'";
sparkSession.sql(
String.format(
"ANALYZE TABLE %s.%s PARTITION (%s) COMPUTE STATISTICS",
schemaName, createdTable.name(), partition1));
sparkSession.sql(
String.format(
"ANALYZE TABLE %s.%s PARTITION (%s) COMPUTE STATISTICS",
schemaName, createdTable.name(), partition2));

Partition[] partitions = createdTable.supportPartitions().listPartitions();
Assertions.assertEquals(2, partitions.length);
String partition1Name = "hive_col_name2=2023-01-01/hive_col_name3=gravitino_it_test";
String partition2Name = "hive_col_name2=2023-01-02/hive_col_name3=gravitino_it_test2";
Set<String> partitionNames =
Arrays.stream(partitions).map(Partition::name).collect(Collectors.toSet());
Assertions.assertTrue(partitionNames.contains(partition1Name));
Assertions.assertTrue(partitionNames.contains(partition2Name));
for (Partition partition : partitions) {
if (partition.name().equals(partition1Name)) {
Assertions.assertEquals("1", partition.properties().get("spark.sql.statistics.numRows"));
} else if (partition.name().equals(partition2Name)) {
Assertions.assertEquals("2", partition.properties().get("spark.sql.statistics.numRows"));
}
}
}

@Test
public void testGetPartition() throws TException, InterruptedException {
Table createdTable = preparePartitionedTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package com.datastrato.gravitino.server.web.rest;

import static com.datastrato.gravitino.dto.util.DTOConverters.fromDTO;
import static com.datastrato.gravitino.dto.util.DTOConverters.toDTOs;

import com.codahale.metrics.annotation.ResponseMetered;
import com.codahale.metrics.annotation.Timed;
Expand All @@ -23,11 +24,13 @@
import com.google.common.base.Preconditions;
import javax.inject.Inject;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;

Expand All @@ -50,15 +53,21 @@ public Response listPartitionNames(
@PathParam("metalake") String metalake,
@PathParam("catalog") String catalog,
@PathParam("schema") String schema,
@PathParam("table") String table) {
@PathParam("table") String table,
@QueryParam("details") @DefaultValue("false") boolean verbose) {
try {
return Utils.doAs(
httpRequest,
() -> {
NameIdentifier tableIdent = NameIdentifier.of(metalake, catalog, schema, table);
Table loadTable = dispatcher.loadTable(tableIdent);
String[] partitionNames = loadTable.supportPartitions().listPartitionNames();
return Utils.ok(new PartitionNameListResponse(partitionNames));
if (verbose) {
Partition[] partitions = loadTable.supportPartitions().listPartitions();
return Utils.ok(new PartitionListResponse(toDTOs(partitions)));
} else {
String[] partitionNames = loadTable.supportPartitions().listPartitionNames();
return Utils.ok(new PartitionNameListResponse((partitionNames)));
}
});
} catch (Exception e) {
return ExceptionHandlers.handlePartitionException(OperationType.LIST, "", table, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public String[] listPartitionNames() {

@Override
public Partition[] listPartitions() {
return new Partition[0];
return partitions.values().toArray(new Partition[0]);
}

@Override
Expand Down Expand Up @@ -211,6 +211,46 @@ public void testListPartitionNames() {
Assertions.assertTrue(errorResp2.getMessage().contains("test exception"));
}

@Test
public void testListPartitions() {
Table mockedTable = mockPartitionedTable();

Response resp =
target(partitionPath(metalake, catalog, schema, table))
.queryParam("details", "true")
.request(MediaType.APPLICATION_JSON_TYPE)
.accept("application/vnd.gravitino.v1+json")
.get();

Assertions.assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus());
Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getMediaType());

PartitionListResponse listResp = resp.readEntity(PartitionListResponse.class);
Assertions.assertEquals(0, listResp.getCode());

Partition[] partitions = listResp.getPartitions();
Assertions.assertEquals(2, partitions.length);
Assertions.assertEquals(DTOConverters.toDTO(partition1), partitions[0]);
Assertions.assertEquals(DTOConverters.toDTO(partition2), partitions[1]);

// Test throws exception
doThrow(new RuntimeException("test exception")).when(mockedTable).supportPartitions();
Response resp2 =
target(partitionPath(metalake, catalog, schema, table))
.queryParam("details", "true")
.request(MediaType.APPLICATION_JSON_TYPE)
.accept("application/vnd.gravitino.v1+json")
.get();

Assertions.assertEquals(
Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), resp2.getStatus());

ErrorResponse errorResp2 = resp2.readEntity(ErrorResponse.class);
Assertions.assertEquals(ErrorConstants.INTERNAL_ERROR_CODE, errorResp2.getCode());
Assertions.assertEquals(RuntimeException.class.getSimpleName(), errorResp2.getType());
Assertions.assertTrue(errorResp2.getMessage().contains("test exception"));
}

@Test
public void testGetPartition() {
mockPartitionedTable();
Expand Down

0 comments on commit 2a0a388

Please sign in to comment.