Skip to content

Commit

Permalink
support start with Gravitino
Browse files Browse the repository at this point in the history
  • Loading branch information
FANNG1 committed Jul 3, 2024
1 parent c27f37a commit 83c1479
Show file tree
Hide file tree
Showing 38 changed files with 318 additions and 560 deletions.
1 change: 1 addition & 0 deletions .github/workflows/backend-integration-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ jobs:
integration-test/build/trino-ci-container-log/hdfs/*.*
distribution/package/logs/gravitino-server.out
distribution/package/logs/gravitino-server.log
distribution/package/extensions/iceberg-rest-server/logs/*.*
catalogs/**/*.log
catalogs/**/*.tar
distribution/**/*.log
186 changes: 186 additions & 0 deletions bin/iceberg-rest-server.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
#!/bin/bash
#
# Copyright 2024 Datastrato Pvt Ltd.
# This software is licensed under the Apache License version 2.
#
#set -ex
USAGE="-e Usage: bin/iceberg-rest-server.sh [--config <conf-dir>]\n\t
{start|run|stop|restart|status}"

if [[ "$1" == "--config" ]]; then
shift
conf_dir="$1"
if [[ ! -d "${conf_dir}" ]]; then
echo "ERROR : ${conf_dir} is not a directory"
echo ${USAGE}
exit 1
else
export GRAVITINO_CONF_DIR="${conf_dir}"
fi
shift
fi

bin="$(dirname "${BASH_SOURCE-$0}")"
bin="$(cd "${bin}">/dev/null; pwd)"

. "${bin}/common.sh"

check_java_version

function check_process_status() {
local pid=$(found_iceberg_rest_server_pid)

if [[ -z "${pid}" ]]; then
echo "IcebergRESTServer is not running"
else
echo "IcebergRESTServer is running[PID:$pid]"
fi
}

function found_iceberg_rest_server_pid() {
process_name='IcebergRESTServer';
RUNNING_PIDS=$(ps x | grep ${process_name} | grep -v grep | awk '{print $1}');

if [[ -z "${RUNNING_PIDS}" ]]; then
return
fi

if ! kill -0 ${RUNNING_PIDS} > /dev/null 2>&1; then
echo "IcebergRESTServer running but process is dead"
fi

echo "${RUNNING_PIDS}"
}

function wait_for_iceberg_rest_server_to_die() {
timeout=10
timeoutTime=$(date "+%s")
let "timeoutTime+=$timeout"
currentTime=$(date "+%s")
forceKill=1

while [[ $currentTime -lt $timeoutTime ]]; do
local pid=$(found_iceberg_rest_server_pid)
if [[ -z "${pid}" ]]; then
forceKill=0
break
fi

$(kill ${pid} > /dev/null 2> /dev/null)
if kill -0 ${pid} > /dev/null 2>&1; then
sleep 3
else
forceKill=0
break
fi
currentTime=$(date "+%s")
done

if [[ forceKill -ne 0 ]]; then
$(kill -9 ${pid} > /dev/null 2> /dev/null)
fi
}

function start() {
local pid=$(found_iceberg_rest_server_pid)

if [[ ! -z "${pid}" ]]; then
if kill -0 ${pid} >/dev/null 2>&1; then
echo "IcebergRESTServer is already running"
return 0;
fi
fi

if [[ ! -d "${GRAVITINO_LOG_DIR}" ]]; then
echo "Log dir doesn't exist, create ${GRAVITINO_LOG_DIR}"
mkdir -p "${GRAVITINO_LOG_DIR}"
fi

nohup ${JAVA_RUNNER} ${JAVA_OPTS} ${GRAVITINO_DEBUG_OPTS} -cp ${GRAVITINO_CLASSPATH} ${GRAVITINO_SERVER_NAME} >> "${GRAVITINO_OUTFILE}" 2>&1 &

pid=$!
if [[ -z "${pid}" ]]; then
echo "IcebergRESTServer start error!"
return 1;
else
echo "IcebergRESTServer start success!"
fi

sleep 2
check_process_status
}

function run() {
${JAVA_RUNNER} ${JAVA_OPTS} ${GRAVITINO_DEBUG_OPTS} -cp ${GRAVITINO_CLASSPATH} ${GRAVITINO_SERVER_NAME}
}

function stop() {
local pid

pid=$(found_iceberg_rest_server_pid)

if [[ -z "${pid}" ]]; then
echo "IcebergRESTServer is not running"
else
wait_for_iceberg_rest_server_to_die
echo "IcebergRESTServer stop"
fi
}

HOSTNAME=$(hostname)
GRAVITINO_OUTFILE="${GRAVITINO_LOG_DIR}/iceberg-rest-server.out"
GRAVITINO_SERVER_NAME=com.datastrato.gravitino.server.IcebergRESTServer

JAVA_OPTS+=" -Dfile.encoding=UTF-8"
JAVA_OPTS+=" -Dlog4j2.configurationFile=file://${GRAVITINO_CONF_DIR}/log4j2.properties"
JAVA_OPTS+=" -Dgravitino.log.path=${GRAVITINO_LOG_DIR} ${GRAVITINO_MEM}"
if [ "$JVM_VERSION" -eq 17 ]; then
JAVA_OPTS+=" -XX:+IgnoreUnrecognizedVMOptions"
JAVA_OPTS+=" --add-opens java.base/java.io=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/java.lang.invoke=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/java.lang.reflect=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/java.lang=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/java.math=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/java.net=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/java.nio=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/java.text=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/java.time=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/java.util.concurrent.atomic=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/java.util.concurrent=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/java.util.regex=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/java.util=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/jdk.internal.ref=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/jdk.internal.reflect=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.sql/java.sql=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/sun.util.calendar=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/sun.nio.ch=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/sun.nio.cs=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/sun.security.action=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.base/sun.util.calendar=ALL-UNNAMED"
JAVA_OPTS+=" --add-opens java.security.jgss/sun.security.krb5=ALL-UNNAMED"
fi

#JAVA_OPTS+=" -Djava.securit.krb5.conf=/etc/krb5.conf"

addJarInDir "${GRAVITINO_HOME}/libs"

case "${1}" in
start)
start
;;
run)
run
;;
stop)
stop
;;
restart)
stop
start
;;
status)
check_process_status
;;
*)
echo ${USAGE}
esac
20 changes: 5 additions & 15 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ tasks {
val outputDir = projectDir.dir("distribution")

val compileDistribution by registering {
dependsOn("copySubprojectDependencies", "copyCatalogLibAndConfigs", "copySubprojectLib")
dependsOn("copySubprojectDependencies", "copyCatalogLibAndConfigs", "copySubprojectLib", "copyIcebergRESTServer")

group = "gravitino distribution"
outputs.dir(projectDir.dir("distribution/package"))
Expand Down Expand Up @@ -544,30 +544,20 @@ tasks {
}
}

val compileIcebergRESTServer by registering {
val copyIcebergRESTServer by registering {
dependsOn("iceberg-rest-server:copyLibs")
group = "Iceberg REST server distribution"
outputs.dir(projectDir.dir("distribution/${rootProject.name}-iceberg-rest-server"))
outputs.dir(projectDir.dir("distribution/package/extensions/iceberg-rest-server"))
doLast {
copy {
from(projectDir.dir("conf")) { into("${rootProject.name}-iceberg-rest-server/conf") }
from(projectDir.dir("bin")) { into("${rootProject.name}-iceberg-rest-server/bin") }
from(projectDir.dir("conf")) { into("package/extensions/iceberg-rest-server/conf") }
from(projectDir.dir("bin")) { into("package/extensions/iceberg-rest-server/bin") }
into(outputDir)
rename { fileName ->
fileName.replace(".template", "")
}
fileMode = 0b111101101
}
copy {
from(projectDir.dir("licenses")) { into("${rootProject.name}-iceberg-rest-server/licenses") }
from(projectDir.file("LICENSE.bin")) { into("${rootProject.name}-iceberg-rest-server") }
from(projectDir.file("NOTICE.bin")) { into("${rootProject.name}-iceberg-rest-server") }
from(projectDir.file("README.md")) { into("${rootProject.name}-iceberg-rest-server") }
into(outputDir)
rename { fileName ->
fileName.replace(".bin", "")
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.Namespace;
import com.datastrato.gravitino.SchemaChange;
import com.datastrato.gravitino.catalog.lakehouse.iceberg.ops.IcebergTableOps;
import com.datastrato.gravitino.catalog.lakehouse.iceberg.ops.IcebergTableOpsHelper;
import com.datastrato.gravitino.connector.CatalogInfo;
import com.datastrato.gravitino.connector.CatalogOperations;
Expand All @@ -21,6 +20,8 @@
import com.datastrato.gravitino.exceptions.NonEmptySchemaException;
import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException;
import com.datastrato.gravitino.exceptions.TableAlreadyExistsException;
import com.datastrato.gravitino.iceberg.common.IcebergConfig;
import com.datastrato.gravitino.iceberg.common.ops.IcebergTableOps;
import com.datastrato.gravitino.meta.AuditInfo;
import com.datastrato.gravitino.rel.Column;
import com.datastrato.gravitino.rel.Table;
Expand All @@ -44,6 +45,7 @@
import java.util.stream.Collectors;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
Expand Down Expand Up @@ -96,7 +98,7 @@ public void initialize(
IcebergConfig icebergConfig = new IcebergConfig(resultConf);

this.icebergTableOps = new IcebergTableOps(icebergConfig);
this.icebergTableOpsHelper = icebergTableOps.createIcebergTableOpsHelper();
this.icebergTableOpsHelper = new IcebergTableOpsHelper(icebergTableOps.getCatalog());
}

/** Closes the Iceberg catalog and releases the associated client pool. */
Expand Down Expand Up @@ -387,14 +389,28 @@ public Table alterTable(NameIdentifier tableIdent, TableChange... changes)

private Table internalUpdateTable(NameIdentifier tableIdent, TableChange... changes)
throws NoSuchTableException, IllegalArgumentException {
LoadTableResponse loadTableResponse =
updateTable(tableIdent, icebergTableOpsHelper, icebergTableOps, changes);
return IcebergTable.fromIcebergTable(loadTableResponse.tableMetadata(), tableIdent.name());
}

@VisibleForTesting
public static LoadTableResponse updateTable(
NameIdentifier tableIdent,
IcebergTableOpsHelper icebergTableOpsHelper,
IcebergTableOps icebergTableOps,
TableChange... changes) {
try {
String[] levels = tableIdent.namespace().levels();
IcebergTableOpsHelper.IcebergTableChange icebergTableChange =
icebergTableOpsHelper.buildIcebergTableChanges(
NameIdentifier.of(levels[levels.length - 1], tableIdent.name()), changes);
LoadTableResponse loadTableResponse = icebergTableOps.updateTable(icebergTableChange);
Transaction transaction = icebergTableChange.getTransaction();
transaction.commitTransaction();
LoadTableResponse loadTableResponse =
icebergTableOps.loadTable(icebergTableChange.getTableIdentifier());
loadTableResponse.validate();
return IcebergTable.fromIcebergTable(loadTableResponse.tableMetadata(), tableIdent.name());
return loadTableResponse;
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
throw new NoSuchTableException(e, ICEBERG_TABLE_DOES_NOT_EXIST_MSG, tableIdent.name());
}
Expand Down
Loading

0 comments on commit 83c1479

Please sign in to comment.