Skip to content

Commit

Permalink
[#287] feat(iceberg): add iceberg rest catalog namespace interface (#302
Browse files Browse the repository at this point in the history
)

### What changes were proposed in this pull request?
to support iceberg rest api, we add interfaces:
1. config
2. create&drop&get&list schema


### Why are the changes needed?
iceberg api spec

Fix: #287 

### Does this PR introduce _any_ user-facing change?
add new interfaces


### How was this patch tested?
1. manually tested in local env with `memory catalog` 
2. add ut to test rest interfaces
  • Loading branch information
FANNG1 authored Sep 6, 2023
1 parent 3662ed7 commit f95d6ae
Show file tree
Hide file tree
Showing 14 changed files with 770 additions and 2 deletions.
41 changes: 41 additions & 0 deletions catalog-lakehouse/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/
description = "catalog-lakehouse"

plugins {
`maven-publish`
id("java")
id("idea")
id("com.diffplug.spotless")
}

dependencies {
implementation(project(":common"))
implementation(project(":core"))
implementation(libs.jackson.databind)
implementation(libs.jackson.annotations)
implementation(libs.jackson.datatype.jdk8)
implementation(libs.jackson.datatype.jsr310)
implementation(libs.guava)
implementation(libs.bundles.log4j)
implementation(libs.bundles.jetty)
implementation(libs.bundles.jersey)
implementation(libs.bundles.iceberg)

compileOnly(libs.lombok)
annotationProcessor(libs.lombok)

testImplementation(libs.slf4j.jdk14)
testImplementation(libs.junit.jupiter.api)
testImplementation(libs.junit.jupiter.params)
testRuntimeOnly(libs.junit.jupiter.engine)
testImplementation(libs.mockito.core)
testImplementation(libs.jersey.test.framework.core) {
exclude(group = "org.junit.jupiter")
}
testImplementation(libs.jersey.test.framework.provider.jetty) {
exclude(group = "org.junit.jupiter")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.graviton.catalog.lakehouse.iceberg.iceberg.ops;

import com.datastrato.graviton.catalog.lakehouse.iceberg.iceberg.utils.IcebergCatalogUtil;
import com.google.common.base.Preconditions;
import java.util.Optional;
import javax.ws.rs.NotSupportedException;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.rest.CatalogHandlers;
import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest;
import org.apache.iceberg.rest.responses.CreateNamespaceResponse;
import org.apache.iceberg.rest.responses.GetNamespaceResponse;
import org.apache.iceberg.rest.responses.ListNamespacesResponse;
import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IcebergTableOps {

private static final Logger LOG = LoggerFactory.getLogger(IcebergTableOps.class);

private Catalog catalog;
private SupportsNamespaces asNamespaceCatalog;
private final String DEFAULT_ICEBERG_CATALOG_TYPE = "memory";

public IcebergTableOps() {
catalog = IcebergCatalogUtil.loadIcebergCatalog(DEFAULT_ICEBERG_CATALOG_TYPE);
if (catalog instanceof SupportsNamespaces) {
asNamespaceCatalog = (SupportsNamespaces) catalog;
}
}

private void validateNamespace(Optional<Namespace> namespace) {
namespace.ifPresent(
n ->
Preconditions.checkArgument(
n.toString().isEmpty() == false, "Namespace couldn't be empty"));
if (asNamespaceCatalog == null) {
throw new NotSupportedException("The underlying catalog doesn't support namespace operation");
}
}

public CreateNamespaceResponse createNamespace(CreateNamespaceRequest request) {
validateNamespace(Optional.of(request.namespace()));
return CatalogHandlers.createNamespace(asNamespaceCatalog, request);
}

public void dropNamespace(Namespace namespace) {
validateNamespace(Optional.of(namespace));
CatalogHandlers.dropNamespace(asNamespaceCatalog, namespace);
}

public GetNamespaceResponse loadNamespace(Namespace namespace) {
validateNamespace(Optional.of(namespace));
return CatalogHandlers.loadNamespace(asNamespaceCatalog, namespace);
}

public ListNamespacesResponse listNamespace(Namespace parent) {
validateNamespace(Optional.empty());
return CatalogHandlers.listNamespaces(asNamespaceCatalog, parent);
}

public UpdateNamespacePropertiesResponse updateNamespaceProperties(
Namespace namespace, UpdateNamespacePropertiesRequest updateNamespacePropertiesRequest) {
validateNamespace(Optional.of(namespace));
return CatalogHandlers.updateNamespaceProperties(
asNamespaceCatalog, namespace, updateNamespacePropertiesRequest);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.graviton.catalog.lakehouse.iceberg.iceberg.utils;

import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.inmemory.InMemoryCatalog;

public class IcebergCatalogUtil {

private static InMemoryCatalog loadMemoryCatalog() {
InMemoryCatalog memoryCatalog = new InMemoryCatalog();
Map<String, String> properties = new HashMap<>();

memoryCatalog.initialize("memory", properties);
return memoryCatalog;
}

public static Catalog loadIcebergCatalog(String catalogType) {
switch (catalogType.toLowerCase(Locale.ENGLISH)) {
case "memory":
return loadMemoryCatalog();
// todo: add hive, jdbc catalog
default:
throw new RuntimeException(
catalogType
+ " catalog is not supported yet, supported catalogs: [memory]"
+ catalogType);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.graviton.catalog.lakehouse.iceberg.iceberg.web;

import com.google.common.collect.ImmutableMap;
import java.util.Map;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.ext.ExceptionMapper;
import javax.ws.rs.ext.Provider;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.ForbiddenException;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchIcebergTableException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NotAuthorizedException;
import org.apache.iceberg.exceptions.UnprocessableEntityException;
import org.apache.iceberg.exceptions.ValidationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Provider
public class IcebergExceptionMapper implements ExceptionMapper<Exception> {

private static final Logger LOG = LoggerFactory.getLogger(IcebergExceptionMapper.class);

private static final Map<Class<? extends Exception>, Integer> EXCEPTION_ERROR_CODES =
ImmutableMap.<Class<? extends Exception>, Integer>builder()
.put(IllegalArgumentException.class, 400)
.put(ValidationException.class, 400)
.put(NamespaceNotEmptyException.class, 400)
.put(NotAuthorizedException.class, 401)
.put(ForbiddenException.class, 403)
.put(NoSuchNamespaceException.class, 404)
.put(NoSuchTableException.class, 404)
.put(NoSuchIcebergTableException.class, 404)
.put(UnsupportedOperationException.class, 406)
.put(AlreadyExistsException.class, 409)
.put(CommitFailedException.class, 409)
.put(UnprocessableEntityException.class, 422)
.put(CommitStateUnknownException.class, 500)
.build();

@Override
public Response toResponse(Exception ex) {
int status =
EXCEPTION_ERROR_CODES.getOrDefault(
ex.getClass(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
if (status == Status.INTERNAL_SERVER_ERROR.getStatusCode()) {
LOG.warn("Iceberg REST server unexpected exception:", ex);
} else {
LOG.info(
"Iceberg REST server error maybe caused by user request, response http status: {}, exception: {}, exception message: {}",
ex.getClass(),
ex.getMessage());
}
return IcebergRestUtils.errorResponse(ex, status);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.graviton.catalog.lakehouse.iceberg.iceberg.web;

import com.datastrato.graviton.json.JsonUtils;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import javax.ws.rs.ext.ContextResolver;
import javax.ws.rs.ext.Provider;
import org.apache.iceberg.rest.RESTSerializers;

@Provider
public class IcebergObjectMapperProvider implements ContextResolver<ObjectMapper> {

@Override
public ObjectMapper getContext(Class<?> type) {
ObjectMapper mapper = JsonUtils.objectMapper();
mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
mapper.setPropertyNamingStrategy(new PropertyNamingStrategy.KebabCaseStrategy());
RESTSerializers.registerAll(mapper);
return mapper;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.graviton.catalog.lakehouse.iceberg.iceberg.web;

import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.iceberg.rest.responses.ErrorResponse;

public class IcebergRestUtils {

private IcebergRestUtils() {}

public static <T> Response ok(T t) {
return Response.status(Response.Status.OK).entity(t).type(MediaType.APPLICATION_JSON).build();
}

public static <T> Response noContent() {
return Response.status(Status.NO_CONTENT).build();
}

public static Response errorResponse(Exception ex, int httpStatus) {
ErrorResponse errorResponse =
ErrorResponse.builder()
.responseCode(httpStatus)
.withType(ex.getClass().getSimpleName())
.withMessage(ex.getMessage())
.withStackTrace(ex)
.build();
return Response.status(httpStatus)
.entity(errorResponse)
.type(MediaType.APPLICATION_JSON)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.graviton.catalog.lakehouse.iceberg.iceberg.web.rest;

import com.datastrato.graviton.catalog.lakehouse.iceberg.iceberg.web.IcebergRestUtils;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.iceberg.rest.responses.ConfigResponse;

@Path("/v1/config")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public class IcebergConfig {

@Context private HttpServletRequest httpRequest;

@GET
@Produces(MediaType.APPLICATION_JSON)
public Response getConfig() {
ConfigResponse response = ConfigResponse.builder().build();
return IcebergRestUtils.ok(response);
}
}
Loading

0 comments on commit f95d6ae

Please sign in to comment.