Skip to content

Commit

Permalink
update iceberg catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
teo authored and teo committed Mar 19, 2024
1 parent 45da827 commit 97b76e3
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 53 deletions.
178 changes: 178 additions & 0 deletions bin/gravitino-debug.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
#!/bin/bash
#
# Copyright 2023 Datastrato Pvt Ltd.
# This software is licensed under the Apache License version 2.
#
#set -ex
USAGE="-e Usage: bin/gravitino.sh [--config <conf-dir>]\n\t
{start|stop|restart|status}"

if [[ "$1" == "--config" ]]; then
shift
conf_dir="$1"
if [[ ! -d "${conf_dir}" ]]; then
echo "ERROR : ${conf_dir} is not a directory"
echo ${USAGE}
exit 1
else
export GRAVITINO_CONF_DIR="${conf_dir}"
fi
shift
fi

bin="$(dirname "${BASH_SOURCE-$0}")"
bin="$(cd "${bin}">/dev/null; pwd)"

. "${bin}/common.sh"

check_java_version

function check_process_status() {
local pid=$(found_gravitino_server_pid)

if [[ -z "${pid}" ]]; then
echo "Gravitino Server is not running"
else
echo "Gravitino Server is running[PID:$pid]"
fi
}

function found_gravitino_server_pid() {
process_name='GravitinoServer';
RUNNING_PIDS=$(ps x | grep ${process_name} | grep -v grep | awk '{print $1}');

if [[ -z "${RUNNING_PIDS}" ]]; then
return
fi

if ! kill -0 ${RUNNING_PIDS} > /dev/null 2>&1; then
echo "Gravitino Server running but process is dead"
fi

echo "${RUNNING_PIDS}"
}

function wait_for_gravitino_server_to_die() {
timeout=10
timeoutTime=$(date "+%s")
let "timeoutTime+=$timeout"
currentTime=$(date "+%s")
forceKill=1

while [[ $currentTime -lt $timeoutTime ]]; do
local pid=$(found_gravitino_server_pid)
if [[ -z "${pid}" ]]; then
forceKill=0
break
fi

$(kill ${pid} > /dev/null 2> /dev/null)
if kill -0 ${pid} > /dev/null 2>&1; then
sleep 3
else
forceKill=0
break
fi
currentTime=$(date "+%s")
done

if [[ forceKill -ne 0 ]]; then
$(kill -9 ${pid} > /dev/null 2> /dev/null)
fi
}

function start() {
local pid=$(found_gravitino_server_pid)

if [[ ! -z "${pid}" ]]; then
if kill -0 ${pid} >/dev/null 2>&1; then
echo "Gravitino Server is already running"
return 0;
fi
fi

if [[ ! -d "${GRAVITINO_LOG_DIR}" ]]; then
echo "Log dir doesn't exist, create ${GRAVITINO_LOG_DIR}"
mkdir -p "${GRAVITINO_LOG_DIR}"
fi

nohup ${JAVA_RUNNER} ${JAVA_OPTS} ${GRAVITINO_DEBUG_OPTS} -cp ${GRAVITINO_CLASSPATH} ${GRAVITINO_SERVER_NAME} >> "${GRAVITINO_OUTFILE}" 2>&1 &

pid=$!
if [[ -z "${pid}" ]]; then
echo "Gravitino Server start error!"
return 1;
else
echo "Gravitino Server start success!"
fi

sleep 2
check_process_status
}

function stop() {
local pid

pid=$(found_gravitino_server_pid)

if [[ -z "${pid}" ]]; then
echo "Gravitino Server is not running"
else
wait_for_gravitino_server_to_die
echo "Gravitino Server stop"
fi
}

HOSTNAME=$(hostname)
GRAVITINO_OUTFILE="${GRAVITINO_LOG_DIR}/gravitino-server.out"
GRAVITINO_SERVER_NAME=com.datastrato.gravitino.server.GravitinoServer

JAVA_OPTS+=" -Dfile.encoding=UTF-8"
JAVA_OPTS+=" -Dlog4j2.configurationFile=file://${GRAVITINO_CONF_DIR}/log4j2.properties"
JAVA_OPTS+=" -Dgravitino.log.path=${GRAVITINO_LOG_DIR} ${GRAVITINO_MEM}"
JAVA_OPTS+=" -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=7052"
if [ "$JVM_VERSION" -eq 17 ]; then
JAVA_OPTS+=" -XX:+IgnoreUnrecognizedVMOptions"
JAVA_OPTS+=" --add-opens java.base/java.io=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/java.lang.invoke=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/java.lang.reflect=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/java.lang=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/java.math=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/java.net=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/java.nio=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/java.text=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/java.time=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/java.util.concurrent.atomic=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/java.util.concurrent=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/java.util.regex=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/java.util=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/jdk.internal.ref=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/jdk.internal.reflect=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.sql/java.sql=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/sun.util.calendar=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/sun.nio.ch=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/sun.nio.cs=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/sun.security.action=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/sun.util.calendar=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.security.jgss/sun.security.krb5=ALL-UNNAMED"
fi

addJarInDir "${GRAVITINO_HOME}/libs"

case "${1}" in
start)
start
;;
stop)
stop
;;
restart)
stop
start
;;
status)
check_process_status
;;
*)
echo ${USAGE}
esac
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class IcebergCatalog extends BaseCatalog<IcebergCatalog> {
/** @return The short name of the catalog. */
@Override
public String shortName() {
return "lakehouse-iceberg";
return "bili-lakehouse-iceberg";
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class IcebergCatalog extends BaseCatalog<IcebergCatalog> {
/** @return The short name of the catalog. */
@Override
public String shortName() {
return "com-lakehouse-iceberg";
return "lakehouse-iceberg";
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@
package com.datastrato.gravitino.catalog.lakehouse.iceberg;

import static com.datastrato.gravitino.catalog.BaseCatalog.CATALOG_BYPASS_PREFIX;
import static com.datastrato.gravitino.utils.OneMetaConstants.CORE_SITE_PATH;
import static com.datastrato.gravitino.utils.OneMetaConstants.HDFS_SITE_PATH;
import static com.datastrato.gravitino.utils.OneMetaConstants.HIVE_SITE_PATH;
import static com.datastrato.gravitino.utils.OneMetaConstants.MOUNT_TABLE_PATH;

import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.Namespace;
Expand Down Expand Up @@ -48,22 +44,16 @@
import java.util.stream.Collectors;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.rest.requests.RenameTableRequest;
import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest;
import org.apache.iceberg.rest.responses.GetNamespaceResponse;
import org.apache.iceberg.rest.responses.ListTablesResponse;
import org.apache.iceberg.rest.responses.LoadTableResponse;
import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse;
import org.apache.iceberg.sdk.HiveCatalogUtils;
import org.apache.iceberg.sdk.auth.AuthUtils;
import org.apache.iceberg.sdk.auth.HdfsAuthentication;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -85,7 +75,6 @@ public class IcebergCatalogOperations implements CatalogOperations, SupportsSche
private final CatalogEntity entity;

private IcebergTableOpsHelper icebergTableOpsHelper;
private Configuration icebergSdkConf = null;

/**
* Constructs a new instance of IcebergCatalogOperations.
Expand Down Expand Up @@ -121,7 +110,6 @@ public void initialize(Map<String, String> conf) throws RuntimeException {
this.icebergTableOpsHelper = icebergTableOps.createIcebergTableOpsHelper();
this.icebergTablePropertiesMetadata = new IcebergTablePropertiesMetadata();
this.icebergSchemaPropertiesMetadata = new IcebergSchemaPropertiesMetadata();
icebergSdkConf = createDefaultConfiguration();
}

/** Closes the Iceberg catalog and releases the associated client pool. */
Expand Down Expand Up @@ -562,43 +550,6 @@ public boolean purgeTable(NameIdentifier tableIdent) throws UnsupportedOperation
return false;
}
}
/**
* Purges a table from the Iceberg.
*
* @param tableIdent The identifier of the table to purge.
* @return true if the table is successfully purged; false if the table does not exist.
* @throws UnsupportedOperationException If the table type is EXTERNAL_TABLE, it cannot be purged.
*/
@Override
public boolean purgeTableOneMeta(NameIdentifier tableIdent) {
// use iceberg sdk
try {
HdfsAuthentication hdfsAuthentication = AuthUtils.createHdfsAuthentication(icebergSdkConf);
hdfsAuthentication.doAs(
() -> {
TableIdentifier identifier = TableIdentifier.of(tableIdent.name(), tableIdent.name());
HiveCatalog hiveCatalog = HiveCatalogUtils.createHiveCatalog(icebergSdkConf);
hiveCatalog.dropTable(identifier, true);
return null;
});
hdfsAuthentication.close();
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
LOG.warn("Iceberg table {} does not exist", tableIdent.name());
return false;
} catch (Throwable e) {
LOG.info("Purge Iceberg table Error : {}", tableIdent.name());
}
return true;
}

private Configuration createDefaultConfiguration() {
Configuration defaultConf = new Configuration();
defaultConf.addResource(new Path(CORE_SITE_PATH));
defaultConf.addResource(new Path(HDFS_SITE_PATH));
defaultConf.addResource(new Path(HIVE_SITE_PATH));
defaultConf.addResource(new Path(MOUNT_TABLE_PATH));
return defaultConf;
}

// TODO. We should figure out a better way to get the current user from servlet container.
private static String currentUser() {
Expand Down
4 changes: 2 additions & 2 deletions docs/open-api/catalogs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ components:
enum:
- hive
- lakehouse-iceberg
- com-lakehouse-iceberg
- bili-lakehouse-iceberg
- jdbc-mysql
- jdbc-postgresql
comment:
Expand Down Expand Up @@ -200,7 +200,7 @@ components:
enum:
- hive
- lakehouse-iceberg
- com-lakehouse-iceberg
- bili-lakehouse-iceberg
- jdbc-mysql
- jdbc-postgresql
comment:
Expand Down

0 comments on commit 97b76e3

Please sign in to comment.