Skip to content

Commit

Permalink
[#1774] feat(server): Add REST API for adding partition (#1775)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Add REST API for adding partition

### Why are the changes needed?

Fix: #1774 

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

UTs added
  • Loading branch information
mchades authored Jan 30, 2024
1 parent e82e2be commit 4c1d7cc
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.dto.requests;

import com.datastrato.gravitino.dto.rel.partitions.PartitionDTO;
import com.datastrato.gravitino.rest.RESTRequest;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import lombok.extern.jackson.Jacksonized;

@Getter
@EqualsAndHashCode
@ToString
@Builder
@Jacksonized
public class AddPartitionsRequest implements RESTRequest {

@JsonProperty("partitions")
private final PartitionDTO[] partitions;

public AddPartitionsRequest() {
this(null);
}

public AddPartitionsRequest(PartitionDTO[] partitions) {
this.partitions = partitions;
}

@Override
public void validate() throws IllegalArgumentException {
Preconditions.checkArgument(partitions != null, "partitions must not be null");
Preconditions.checkArgument(
partitions.length == 1, "Haven't yet implemented multiple partitions");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.dto.responses;

import com.datastrato.gravitino.dto.rel.partitions.PartitionDTO;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;

@Getter
@ToString
@EqualsAndHashCode(callSuper = true)
public class PartitionListResponse extends BaseResponse {

@JsonProperty("partitions")
private final PartitionDTO[] partitions;

public PartitionListResponse(PartitionDTO[] partitions) {
super(0);
this.partitions = partitions;
}

// This is the constructor that is used by Jackson deserializer
public PartitionListResponse() {
super();
this.partitions = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import com.datastrato.gravitino.rel.partitions.IdentityPartition;
import com.datastrato.gravitino.rel.partitions.ListPartition;
import com.datastrato.gravitino.rel.partitions.Partition;
import com.datastrato.gravitino.rel.partitions.Partitions;
import com.datastrato.gravitino.rel.partitions.RangePartition;
import java.util.Arrays;
import org.apache.commons.lang3.ArrayUtils;
Expand Down Expand Up @@ -363,6 +364,31 @@ public static Index[] fromDTOs(IndexDTO[] indexDTOS) {
return Arrays.stream(indexDTOS).map(DTOConverters::fromDTO).toArray(Index[]::new);
}

public static Partition fromDTO(PartitionDTO partitionDTO) {
switch (partitionDTO.type()) {
case IDENTITY:
IdentityPartitionDTO identityPartitionDTO = (IdentityPartitionDTO) partitionDTO;
return Partitions.identity(
identityPartitionDTO.name(),
identityPartitionDTO.fieldNames(),
identityPartitionDTO.values(),
identityPartitionDTO.properties());
case RANGE:
RangePartitionDTO rangePartitionDTO = (RangePartitionDTO) partitionDTO;
return Partitions.range(
rangePartitionDTO.name(),
rangePartitionDTO.lower(),
rangePartitionDTO.upper(),
rangePartitionDTO.properties());
case LIST:
ListPartitionDTO listPartitionDTO = (ListPartitionDTO) partitionDTO;
return Partitions.list(
listPartitionDTO.name(), listPartitionDTO.lists(), listPartitionDTO.properties());
default:
throw new IllegalArgumentException("Unsupported partition type: " + partitionDTO.type());
}
}

public static SortOrder fromDTO(SortOrderDTO sortOrderDTO) {
return SortOrders.of(
fromFunctionArg(sortOrderDTO.sortTerm()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,27 @@
*/
package com.datastrato.gravitino.server.web.rest;

import static com.datastrato.gravitino.dto.util.DTOConverters.fromDTO;

import com.codahale.metrics.annotation.ResponseMetered;
import com.codahale.metrics.annotation.Timed;
import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.catalog.CatalogOperationDispatcher;
import com.datastrato.gravitino.dto.rel.partitions.PartitionDTO;
import com.datastrato.gravitino.dto.requests.AddPartitionsRequest;
import com.datastrato.gravitino.dto.responses.PartitionListResponse;
import com.datastrato.gravitino.dto.responses.PartitionNameListResponse;
import com.datastrato.gravitino.dto.responses.PartitionResponse;
import com.datastrato.gravitino.dto.util.DTOConverters;
import com.datastrato.gravitino.metrics.MetricNames;
import com.datastrato.gravitino.rel.Table;
import com.datastrato.gravitino.rel.partitions.Partition;
import com.datastrato.gravitino.server.web.Utils;
import com.google.common.base.Preconditions;
import javax.inject.Inject;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
Expand Down Expand Up @@ -82,4 +89,32 @@ public Response getPartition(
return ExceptionHandlers.handlePartitionException(OperationType.GET, "", table, e);
}
}

@POST
@Produces("application/vnd.gravitino.v1+json")
@Timed(name = "add-partitions." + MetricNames.HTTP_PROCESS_DURATION, absolute = true)
@ResponseMetered(name = "add-partitions", absolute = true)
public Response addPartitions(
@PathParam("metalake") String metalake,
@PathParam("catalog") String catalog,
@PathParam("schema") String schema,
@PathParam("table") String table,
AddPartitionsRequest request) {
Preconditions.checkArgument(
request.getPartitions().length == 1, "Only one partition is supported");

try {
return Utils.doAs(
httpRequest,
() -> {
NameIdentifier tableIdent = NameIdentifier.of(metalake, catalog, schema, table);
Table loadTable = dispatcher.loadTable(tableIdent);
Partition p =
loadTable.supportPartitions().addPartition(fromDTO(request.getPartitions()[0]));
return Utils.ok(new PartitionListResponse(new PartitionDTO[] {DTOConverters.toDTO(p)}));
});
} catch (Exception e) {
return ExceptionHandlers.handlePartitionException(OperationType.CREATE, "", table, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
import static org.mockito.Mockito.when;

import com.datastrato.gravitino.catalog.CatalogOperationDispatcher;
import com.datastrato.gravitino.dto.rel.partitions.PartitionDTO;
import com.datastrato.gravitino.dto.requests.AddPartitionsRequest;
import com.datastrato.gravitino.dto.responses.ErrorConstants;
import com.datastrato.gravitino.dto.responses.ErrorResponse;
import com.datastrato.gravitino.dto.responses.PartitionListResponse;
import com.datastrato.gravitino.dto.responses.PartitionNameListResponse;
import com.datastrato.gravitino.dto.responses.PartitionResponse;
import com.datastrato.gravitino.dto.util.DTOConverters;
Expand All @@ -35,6 +38,7 @@
import java.io.IOException;
import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.Application;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
Expand Down Expand Up @@ -153,7 +157,11 @@ public Partition getPartition(String partitionName) throws NoSuchPartitionExcept
@Override
public Partition addPartition(Partition partition)
throws PartitionAlreadyExistsException {
return null;
if (partitions.containsKey(partition.name())) {
throw new PartitionAlreadyExistsException(partition.name());
} else {
return partition;
}
}

@Override
Expand Down Expand Up @@ -235,4 +243,50 @@ public void testGetPartition() {
Assertions.assertEquals(NoSuchPartitionException.class.getSimpleName(), errorResp2.getType());
Assertions.assertTrue(errorResp2.getMessage().contains("p3"));
}

@Test
public void testAddPartition() {
mockPartitionedTable();

Partition newPartition =
Partitions.identity(
"p3",
new String[][] {colName},
new Literal[] {Literals.stringLiteral("v3")},
Maps.newHashMap());

AddPartitionsRequest req =
new AddPartitionsRequest(new PartitionDTO[] {DTOConverters.toDTO(newPartition)});
Response resp =
target(partitionPath(metalake, catalog, schema, table))
.request(MediaType.APPLICATION_JSON_TYPE)
.accept("application/vnd.gravitino.v1+json")
.post(Entity.entity(req, MediaType.APPLICATION_JSON_TYPE));

Assertions.assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus());
Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getMediaType());

PartitionListResponse partitionResp = resp.readEntity(PartitionListResponse.class);
Assertions.assertEquals(0, partitionResp.getCode());

Partition[] partition = partitionResp.getPartitions();
Assertions.assertEquals(1, partition.length);
Assertions.assertEquals(DTOConverters.toDTO(newPartition), partition[0]);

// Test throws exception
req = new AddPartitionsRequest(new PartitionDTO[] {DTOConverters.toDTO(partition1)});
Response resp2 =
target(partitionPath(metalake, catalog, schema, table))
.request(MediaType.APPLICATION_JSON_TYPE)
.accept("application/vnd.gravitino.v1+json")
.post(Entity.entity(req, MediaType.APPLICATION_JSON_TYPE));

Assertions.assertEquals(Response.Status.CONFLICT.getStatusCode(), resp2.getStatus());

ErrorResponse errorResp2 = resp2.readEntity(ErrorResponse.class);
Assertions.assertEquals(ErrorConstants.ALREADY_EXISTS_CODE, errorResp2.getCode());
Assertions.assertEquals(
PartitionAlreadyExistsException.class.getSimpleName(), errorResp2.getType());
Assertions.assertTrue(errorResp2.getMessage().contains(partition1.name()));
}
}

0 comments on commit 4c1d7cc

Please sign in to comment.