Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#287] feat(iceberg): add iceberg rest catalog namespace interface #302

Merged
merged 1 commit into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions catalog-lakehouse/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/
description = "catalog-lakehouse"

plugins {
`maven-publish`
id("java")
id("idea")
id("com.diffplug.spotless")
}

dependencies {
implementation(project(":common"))
implementation(project(":core"))
implementation(libs.jackson.databind)
implementation(libs.jackson.annotations)
implementation(libs.jackson.datatype.jdk8)
implementation(libs.jackson.datatype.jsr310)
implementation(libs.guava)
implementation(libs.bundles.log4j)
implementation(libs.bundles.jetty)
implementation(libs.bundles.jersey)
implementation(libs.bundles.iceberg)

compileOnly(libs.lombok)
annotationProcessor(libs.lombok)

testImplementation(libs.slf4j.jdk14)
testImplementation(libs.junit.jupiter.api)
testImplementation(libs.junit.jupiter.params)
testRuntimeOnly(libs.junit.jupiter.engine)
testImplementation(libs.mockito.core)
testImplementation(libs.jersey.test.framework.core) {
exclude(group = "org.junit.jupiter")
}
testImplementation(libs.jersey.test.framework.provider.jetty) {
exclude(group = "org.junit.jupiter")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.graviton.catalog.lakehouse.iceberg.iceberg.ops;

import com.datastrato.graviton.catalog.lakehouse.iceberg.iceberg.utils.IcebergCatalogUtil;
import com.google.common.base.Preconditions;
import java.util.Optional;
import javax.ws.rs.NotSupportedException;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.rest.CatalogHandlers;
import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest;
import org.apache.iceberg.rest.responses.CreateNamespaceResponse;
import org.apache.iceberg.rest.responses.GetNamespaceResponse;
import org.apache.iceberg.rest.responses.ListNamespacesResponse;
import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IcebergTableOps {

private static final Logger LOG = LoggerFactory.getLogger(IcebergTableOps.class);

private Catalog catalog;
private SupportsNamespaces asNamespaceCatalog;
private final String DEFAULT_ICEBERG_CATALOG_TYPE = "memory";

public IcebergTableOps() {
catalog = IcebergCatalogUtil.loadIcebergCatalog(DEFAULT_ICEBERG_CATALOG_TYPE);
if (catalog instanceof SupportsNamespaces) {
asNamespaceCatalog = (SupportsNamespaces) catalog;
}
}

private void validateNamespace(Optional<Namespace> namespace) {
namespace.ifPresent(
n ->
Preconditions.checkArgument(
n.toString().isEmpty() == false, "Namespace couldn't be empty"));
if (asNamespaceCatalog == null) {
throw new NotSupportedException("The underlying catalog doesn't support namespace operation");
}
}

public CreateNamespaceResponse createNamespace(CreateNamespaceRequest request) {
validateNamespace(Optional.of(request.namespace()));
return CatalogHandlers.createNamespace(asNamespaceCatalog, request);
}

public void dropNamespace(Namespace namespace) {
validateNamespace(Optional.of(namespace));
CatalogHandlers.dropNamespace(asNamespaceCatalog, namespace);
}

public GetNamespaceResponse loadNamespace(Namespace namespace) {
validateNamespace(Optional.of(namespace));
return CatalogHandlers.loadNamespace(asNamespaceCatalog, namespace);
}

public ListNamespacesResponse listNamespace(Namespace parent) {
validateNamespace(Optional.empty());
return CatalogHandlers.listNamespaces(asNamespaceCatalog, parent);
}

public UpdateNamespacePropertiesResponse updateNamespaceProperties(
Namespace namespace, UpdateNamespacePropertiesRequest updateNamespacePropertiesRequest) {
validateNamespace(Optional.of(namespace));
return CatalogHandlers.updateNamespaceProperties(
asNamespaceCatalog, namespace, updateNamespacePropertiesRequest);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.graviton.catalog.lakehouse.iceberg.iceberg.utils;

import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.inmemory.InMemoryCatalog;

public class IcebergCatalogUtil {

private static InMemoryCatalog loadMemoryCatalog() {
InMemoryCatalog memoryCatalog = new InMemoryCatalog();
Map<String, String> properties = new HashMap<>();

memoryCatalog.initialize("memory", properties);
return memoryCatalog;
}

public static Catalog loadIcebergCatalog(String catalogType) {
switch (catalogType.toLowerCase(Locale.ENGLISH)) {
case "memory":
return loadMemoryCatalog();
// todo: add hive, jdbc catalog
default:
throw new RuntimeException(
catalogType
+ " catalog is not supported yet, supported catalogs: [memory]"
+ catalogType);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.graviton.catalog.lakehouse.iceberg.iceberg.web;

import com.google.common.collect.ImmutableMap;
import java.util.Map;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.ext.ExceptionMapper;
import javax.ws.rs.ext.Provider;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.ForbiddenException;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchIcebergTableException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NotAuthorizedException;
import org.apache.iceberg.exceptions.UnprocessableEntityException;
import org.apache.iceberg.exceptions.ValidationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Provider
public class IcebergExceptionMapper implements ExceptionMapper<Exception> {

private static final Logger LOG = LoggerFactory.getLogger(IcebergExceptionMapper.class);

private static final Map<Class<? extends Exception>, Integer> EXCEPTION_ERROR_CODES =
ImmutableMap.<Class<? extends Exception>, Integer>builder()
.put(IllegalArgumentException.class, 400)
.put(ValidationException.class, 400)
.put(NamespaceNotEmptyException.class, 400)
.put(NotAuthorizedException.class, 401)
.put(ForbiddenException.class, 403)
.put(NoSuchNamespaceException.class, 404)
.put(NoSuchTableException.class, 404)
.put(NoSuchIcebergTableException.class, 404)
.put(UnsupportedOperationException.class, 406)
.put(AlreadyExistsException.class, 409)
.put(CommitFailedException.class, 409)
.put(UnprocessableEntityException.class, 422)
.put(CommitStateUnknownException.class, 500)
.build();

@Override
public Response toResponse(Exception ex) {
int status =
EXCEPTION_ERROR_CODES.getOrDefault(
ex.getClass(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
if (status == Status.INTERNAL_SERVER_ERROR.getStatusCode()) {
LOG.warn("Iceberg REST server unexpected exception:", ex);
} else {
LOG.info(
"Iceberg REST server error maybe caused by user request, response http status: {}, exception: {}, exception message: {}",
ex.getClass(),
ex.getMessage());
}
return IcebergRestUtils.errorResponse(ex, status);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.graviton.catalog.lakehouse.iceberg.iceberg.web;

import com.datastrato.graviton.json.JsonUtils;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import javax.ws.rs.ext.ContextResolver;
import javax.ws.rs.ext.Provider;
import org.apache.iceberg.rest.RESTSerializers;

@Provider
public class IcebergObjectMapperProvider implements ContextResolver<ObjectMapper> {

@Override
public ObjectMapper getContext(Class<?> type) {
ObjectMapper mapper = JsonUtils.objectMapper();
mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
mapper.setPropertyNamingStrategy(new PropertyNamingStrategy.KebabCaseStrategy());
RESTSerializers.registerAll(mapper);
return mapper;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.graviton.catalog.lakehouse.iceberg.iceberg.web;

import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.iceberg.rest.responses.ErrorResponse;

public class IcebergRestUtils {

private IcebergRestUtils() {}

public static <T> Response ok(T t) {
return Response.status(Response.Status.OK).entity(t).type(MediaType.APPLICATION_JSON).build();
}

public static <T> Response noContent() {
return Response.status(Status.NO_CONTENT).build();
}

public static Response errorResponse(Exception ex, int httpStatus) {
ErrorResponse errorResponse =
ErrorResponse.builder()
.responseCode(httpStatus)
.withType(ex.getClass().getSimpleName())
.withMessage(ex.getMessage())
.withStackTrace(ex)
.build();
return Response.status(httpStatus)
.entity(errorResponse)
.type(MediaType.APPLICATION_JSON)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.graviton.catalog.lakehouse.iceberg.iceberg.web.rest;

import com.datastrato.graviton.catalog.lakehouse.iceberg.iceberg.web.IcebergRestUtils;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.iceberg.rest.responses.ConfigResponse;

@Path("/v1/config")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public class IcebergConfig {

@Context private HttpServletRequest httpRequest;

@GET
@Produces(MediaType.APPLICATION_JSON)
public Response getConfig() {
ConfigResponse response = ConfigResponse.builder().build();
return IcebergRestUtils.ok(response);
}
}
Loading