From 97b76e335fe4f2b9702a87f2e3954d911b5e4698 Mon Sep 17 00:00:00 2001 From: teo Date: Tue, 19 Mar 2024 21:37:13 +0800 Subject: [PATCH] update iceberg catalog --- bin/gravitino-debug.sh | 178 ++++++++++++++++++ .../lakehouse/iceberg/IcebergCatalog.java | 2 +- .../lakehouse/iceberg/IcebergCatalog.java | 2 +- .../iceberg/IcebergCatalogOperations.java | 49 ----- docs/open-api/catalogs.yaml | 4 +- 5 files changed, 182 insertions(+), 53 deletions(-) create mode 100755 bin/gravitino-debug.sh diff --git a/bin/gravitino-debug.sh b/bin/gravitino-debug.sh new file mode 100755 index 00000000000..22d4414db4d --- /dev/null +++ b/bin/gravitino-debug.sh @@ -0,0 +1,178 @@ +#!/bin/bash +# +# Copyright 2023 Datastrato Pvt Ltd. +# This software is licensed under the Apache License version 2. +# +#set -ex +USAGE="-e Usage: bin/gravitino.sh [--config ]\n\t + {start|stop|restart|status}" + +if [[ "$1" == "--config" ]]; then + shift + conf_dir="$1" + if [[ ! -d "${conf_dir}" ]]; then + echo "ERROR : ${conf_dir} is not a directory" + echo ${USAGE} + exit 1 + else + export GRAVITINO_CONF_DIR="${conf_dir}" + fi + shift +fi + +bin="$(dirname "${BASH_SOURCE-$0}")" +bin="$(cd "${bin}">/dev/null; pwd)" + +. "${bin}/common.sh" + +check_java_version + +function check_process_status() { + local pid=$(found_gravitino_server_pid) + + if [[ -z "${pid}" ]]; then + echo "Gravitino Server is not running" + else + echo "Gravitino Server is running[PID:$pid]" + fi +} + +function found_gravitino_server_pid() { + process_name='GravitinoServer'; + RUNNING_PIDS=$(ps x | grep ${process_name} | grep -v grep | awk '{print $1}'); + + if [[ -z "${RUNNING_PIDS}" ]]; then + return + fi + + if ! kill -0 ${RUNNING_PIDS} > /dev/null 2>&1; then + echo "Gravitino Server running but process is dead" + fi + + echo "${RUNNING_PIDS}" +} + +function wait_for_gravitino_server_to_die() { + timeout=10 + timeoutTime=$(date "+%s") + let "timeoutTime+=$timeout" + currentTime=$(date "+%s") + forceKill=1 + + while [[ $currentTime -lt $timeoutTime ]]; do + local pid=$(found_gravitino_server_pid) + if [[ -z "${pid}" ]]; then + forceKill=0 + break + fi + + $(kill ${pid} > /dev/null 2> /dev/null) + if kill -0 ${pid} > /dev/null 2>&1; then + sleep 3 + else + forceKill=0 + break + fi + currentTime=$(date "+%s") + done + + if [[ forceKill -ne 0 ]]; then + $(kill -9 ${pid} > /dev/null 2> /dev/null) + fi +} + +function start() { + local pid=$(found_gravitino_server_pid) + + if [[ ! -z "${pid}" ]]; then + if kill -0 ${pid} >/dev/null 2>&1; then + echo "Gravitino Server is already running" + return 0; + fi + fi + + if [[ ! -d "${GRAVITINO_LOG_DIR}" ]]; then + echo "Log dir doesn't exist, create ${GRAVITINO_LOG_DIR}" + mkdir -p "${GRAVITINO_LOG_DIR}" + fi + + nohup ${JAVA_RUNNER} ${JAVA_OPTS} ${GRAVITINO_DEBUG_OPTS} -cp ${GRAVITINO_CLASSPATH} ${GRAVITINO_SERVER_NAME} >> "${GRAVITINO_OUTFILE}" 2>&1 & + + pid=$! + if [[ -z "${pid}" ]]; then + echo "Gravitino Server start error!" + return 1; + else + echo "Gravitino Server start success!" + fi + + sleep 2 + check_process_status +} + +function stop() { + local pid + + pid=$(found_gravitino_server_pid) + + if [[ -z "${pid}" ]]; then + echo "Gravitino Server is not running" + else + wait_for_gravitino_server_to_die + echo "Gravitino Server stop" + fi +} + +HOSTNAME=$(hostname) +GRAVITINO_OUTFILE="${GRAVITINO_LOG_DIR}/gravitino-server.out" +GRAVITINO_SERVER_NAME=com.datastrato.gravitino.server.GravitinoServer + +JAVA_OPTS+=" -Dfile.encoding=UTF-8" +JAVA_OPTS+=" -Dlog4j2.configurationFile=file://${GRAVITINO_CONF_DIR}/log4j2.properties" +JAVA_OPTS+=" -Dgravitino.log.path=${GRAVITINO_LOG_DIR} ${GRAVITINO_MEM}" +JAVA_OPTS+=" -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=7052" +if [ "$JVM_VERSION" -eq 17 ]; then + JAVA_OPTS+=" -XX:+IgnoreUnrecognizedVMOptions" + JAVA_OPTS+=" --add-opens java.base/java.io=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/java.lang.invoke=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/java.lang.reflect=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/java.lang=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/java.math=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/java.net=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/java.nio=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/java.text=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/java.time=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/java.util.concurrent.atomic=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/java.util.concurrent=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/java.util.regex=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/java.util=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/jdk.internal.ref=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/jdk.internal.reflect=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.sql/java.sql=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/sun.util.calendar=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/sun.nio.ch=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/sun.nio.cs=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/sun.security.action=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.base/sun.util.calendar=ALL-UNNAMED" + JAVA_OPTS+=" --add-opens java.security.jgss/sun.security.krb5=ALL-UNNAMED" +fi + +addJarInDir "${GRAVITINO_HOME}/libs" + +case "${1}" in + start) + start + ;; + stop) + stop + ;; + restart) + stop + start + ;; + status) + check_process_status + ;; + *) + echo ${USAGE} +esac diff --git a/catalogs/catalog-bili-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/bili/lakehouse/iceberg/IcebergCatalog.java b/catalogs/catalog-bili-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/bili/lakehouse/iceberg/IcebergCatalog.java index fd87bf9d747..b07d355ecca 100644 --- a/catalogs/catalog-bili-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/bili/lakehouse/iceberg/IcebergCatalog.java +++ b/catalogs/catalog-bili-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/bili/lakehouse/iceberg/IcebergCatalog.java @@ -16,7 +16,7 @@ public class IcebergCatalog extends BaseCatalog { /** @return The short name of the catalog. */ @Override public String shortName() { - return "lakehouse-iceberg"; + return "bili-lakehouse-iceberg"; } /** diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalog.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalog.java index c4ed71fcdd4..db84b4b3b20 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalog.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalog.java @@ -16,7 +16,7 @@ public class IcebergCatalog extends BaseCatalog { /** @return The short name of the catalog. */ @Override public String shortName() { - return "com-lakehouse-iceberg"; + return "lakehouse-iceberg"; } /** diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java index 779a83990da..cdd09b94bc1 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java @@ -5,10 +5,6 @@ package com.datastrato.gravitino.catalog.lakehouse.iceberg; import static com.datastrato.gravitino.catalog.BaseCatalog.CATALOG_BYPASS_PREFIX; -import static com.datastrato.gravitino.utils.OneMetaConstants.CORE_SITE_PATH; -import static com.datastrato.gravitino.utils.OneMetaConstants.HDFS_SITE_PATH; -import static com.datastrato.gravitino.utils.OneMetaConstants.HIVE_SITE_PATH; -import static com.datastrato.gravitino.utils.OneMetaConstants.MOUNT_TABLE_PATH; import com.datastrato.gravitino.NameIdentifier; import com.datastrato.gravitino.Namespace; @@ -48,22 +44,16 @@ import java.util.stream.Collectors; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; -import org.apache.iceberg.hive.HiveCatalog; import org.apache.iceberg.rest.requests.RenameTableRequest; import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest; import org.apache.iceberg.rest.responses.GetNamespaceResponse; import org.apache.iceberg.rest.responses.ListTablesResponse; import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; -import org.apache.iceberg.sdk.HiveCatalogUtils; -import org.apache.iceberg.sdk.auth.AuthUtils; -import org.apache.iceberg.sdk.auth.HdfsAuthentication; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,7 +75,6 @@ public class IcebergCatalogOperations implements CatalogOperations, SupportsSche private final CatalogEntity entity; private IcebergTableOpsHelper icebergTableOpsHelper; - private Configuration icebergSdkConf = null; /** * Constructs a new instance of IcebergCatalogOperations. @@ -121,7 +110,6 @@ public void initialize(Map conf) throws RuntimeException { this.icebergTableOpsHelper = icebergTableOps.createIcebergTableOpsHelper(); this.icebergTablePropertiesMetadata = new IcebergTablePropertiesMetadata(); this.icebergSchemaPropertiesMetadata = new IcebergSchemaPropertiesMetadata(); - icebergSdkConf = createDefaultConfiguration(); } /** Closes the Iceberg catalog and releases the associated client pool. */ @@ -562,43 +550,6 @@ public boolean purgeTable(NameIdentifier tableIdent) throws UnsupportedOperation return false; } } - /** - * Purges a table from the Iceberg. - * - * @param tableIdent The identifier of the table to purge. - * @return true if the table is successfully purged; false if the table does not exist. - * @throws UnsupportedOperationException If the table type is EXTERNAL_TABLE, it cannot be purged. - */ - @Override - public boolean purgeTableOneMeta(NameIdentifier tableIdent) { - // use iceberg sdk - try { - HdfsAuthentication hdfsAuthentication = AuthUtils.createHdfsAuthentication(icebergSdkConf); - hdfsAuthentication.doAs( - () -> { - TableIdentifier identifier = TableIdentifier.of(tableIdent.name(), tableIdent.name()); - HiveCatalog hiveCatalog = HiveCatalogUtils.createHiveCatalog(icebergSdkConf); - hiveCatalog.dropTable(identifier, true); - return null; - }); - hdfsAuthentication.close(); - } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { - LOG.warn("Iceberg table {} does not exist", tableIdent.name()); - return false; - } catch (Throwable e) { - LOG.info("Purge Iceberg table Error : {}", tableIdent.name()); - } - return true; - } - - private Configuration createDefaultConfiguration() { - Configuration defaultConf = new Configuration(); - defaultConf.addResource(new Path(CORE_SITE_PATH)); - defaultConf.addResource(new Path(HDFS_SITE_PATH)); - defaultConf.addResource(new Path(HIVE_SITE_PATH)); - defaultConf.addResource(new Path(MOUNT_TABLE_PATH)); - return defaultConf; - } // TODO. We should figure out a better way to get the current user from servlet container. private static String currentUser() { diff --git a/docs/open-api/catalogs.yaml b/docs/open-api/catalogs.yaml index ffb0a0e2e10..87abbf79a1a 100644 --- a/docs/open-api/catalogs.yaml +++ b/docs/open-api/catalogs.yaml @@ -161,7 +161,7 @@ components: enum: - hive - lakehouse-iceberg - - com-lakehouse-iceberg + - bili-lakehouse-iceberg - jdbc-mysql - jdbc-postgresql comment: @@ -200,7 +200,7 @@ components: enum: - hive - lakehouse-iceberg - - com-lakehouse-iceberg + - bili-lakehouse-iceberg - jdbc-mysql - jdbc-postgresql comment: