Skip to content

Commit

Permalink
fix retry
Browse files Browse the repository at this point in the history
  • Loading branch information
mchades committed Feb 24, 2024
1 parent ea8cd44 commit 14ec352
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -124,7 +125,33 @@ public void start() {
container.start();
}

protected abstract boolean checkContainerStatus(int retryLimit);
protected abstract boolean checkContainerStatus(int timeoutMillis);

protected boolean checkContainerStatusWithRetry(int timeoutMillis, BooleanSupplier... checker) {
int nRetry = 0;
int sleepTimeMillis = 5_000;
int totalSleepTimeMillis = 0;

// retry until timeout
for (BooleanSupplier check : checker) {
while (totalSleepTimeMillis < timeoutMillis) {
if (check.getAsBoolean()) {
break;
}

sleepTimeMillis = sleepTimeMillis * (int) Math.pow(2, nRetry++);
sleepTimeMillis = Math.min(sleepTimeMillis, timeoutMillis - totalSleepTimeMillis);
LOG.info("retrying after {}ms (retry count: {})", sleepTimeMillis, nRetry);
try {
Thread.sleep(sleepTimeMillis);
} catch (InterruptedException e) {
// ignore
}
totalSleepTimeMillis += sleepTimeMillis;
}
}
return totalSleepTimeMillis < timeoutMillis;
}

// Execute the command in the container.
public Container.ExecResult executeInContainer(String... commandAndArgs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import com.datastrato.gravitino.catalog.hive.HiveClientPool;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand All @@ -18,7 +17,6 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.thrift.TException;
import org.rnorth.ducttape.Preconditions;
import org.slf4j.Logger;
Expand Down Expand Up @@ -59,94 +57,68 @@ protected void setupContainer() {
@Override
public void start() {
super.start();
Preconditions.check("Hive container startup failed!", checkContainerStatus(5));
Preconditions.check("Hive container startup failed!", checkContainerStatus(60_000));
}

@Override
protected boolean checkContainerStatus(int retryLimit) {
int nRetry = 0;
boolean isHiveContainerReady = false;
int sleepTimeMillis = 10_000;
while (nRetry++ < retryLimit) {
try {
String[] commandAndArgs = new String[] {"bash", "/tmp/check-status.sh"};
Container.ExecResult execResult = executeInContainer(commandAndArgs);
if (execResult.getExitCode() != 0) {
String message =
format(
"Command [%s] exited with %s",
String.join(" ", commandAndArgs), execResult.getExitCode());
LOG.error("{}", message);
LOG.error("stderr: {}", execResult.getStderr());
LOG.error("stdout: {}", execResult.getStdout());
} else {
LOG.info("Hive container startup success!");
isHiveContainerReady = true;
break;
}
LOG.info(
"Hive container is not ready, recheck({}/{}) after {}ms",
nRetry,
retryLimit,
sleepTimeMillis);
Thread.sleep(sleepTimeMillis);
} catch (RuntimeException e) {
LOG.error(e.getMessage(), e);
} catch (InterruptedException e) {
// ignore
}
}

// Test hive client if it can connect to hive server
boolean isHiveConnectSuccess = false;
HiveConf hiveConf = new HiveConf();
String hiveMetastoreUris =
String.format("thrift://%s:%d", getContainerIpAddress(), HiveContainer.HIVE_METASTORE_PORT);
hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, hiveMetastoreUris);
HiveClientPool hiveClientPool = new HiveClientPool(1, hiveConf);

nRetry = 0;
while (nRetry++ < retryLimit) {
try {
List<String> databases = hiveClientPool.run(IMetaStoreClient::getAllDatabases);
if (!databases.isEmpty()) {
isHiveConnectSuccess = true;
break;
}
Thread.sleep(3000);
} catch (TException | InterruptedException e) {
LOG.warn("Failed to connect to hive server, retrying...", e);
}
}

// Test HDFS client if it can connect to HDFS server
boolean isHdfsConnectSuccess = false;
nRetry = 0;
Configuration conf = new Configuration();
conf.set(
"fs.defaultFS",
String.format("hdfs://%s:%d", getContainerIpAddress(), HiveContainer.HDFS_DEFAULTFS_PORT));
while (nRetry++ < retryLimit) {
try (FileSystem fs = FileSystem.get(conf)) {
Path directoryPath = new Path("/");
FileStatus[] fileStatuses = fs.listStatus(directoryPath);
if (fileStatuses.length > 0) {
isHdfsConnectSuccess = true;
break;
}
Thread.sleep(3000);
} catch (IOException | InterruptedException e) {
LOG.warn("Failed to connect to HDFS server, retrying...", e);
}
}

LOG.info(
"Hive container status: isHiveContainerReady={}, isHiveConnectSuccess={}, isHdfsConnectSuccess={}",
isHiveContainerReady,
isHiveConnectSuccess,
isHdfsConnectSuccess);

return isHiveContainerReady && isHiveConnectSuccess && isHdfsConnectSuccess;
protected boolean checkContainerStatus(int timeoutMillis) {

return checkContainerStatusWithRetry(
timeoutMillis,
() -> {
// Check if the HDFS datanode is ready
try {
String[] commandAndArgs = new String[] {"bash", "/tmp/check-status.sh"};
Container.ExecResult execResult = executeInContainer(commandAndArgs);
if (execResult.getExitCode() == 0) {
LOG.info("Hive container startup success!");
return true;
}

String message =
format(
"Command [%s] exited with %s",
String.join(" ", commandAndArgs), execResult.getExitCode());
LOG.error("{}", message);
LOG.error("stderr: {}", execResult.getStderr());
LOG.error("stdout: {}", execResult.getStdout());
} catch (RuntimeException e) {
LOG.error(e.getMessage(), e);
}
return false;
},
() -> {
// Test hive client if it can connect to hive server
try {
HiveConf hiveConf = new HiveConf();
String hiveMetastoreUris =
String.format(
"thrift://%s:%d", getContainerIpAddress(), HiveContainer.HIVE_METASTORE_PORT);
hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, hiveMetastoreUris);
HiveClientPool hiveClientPool = new HiveClientPool(1, hiveConf);
hiveClientPool.run(c -> c.getDatabases("default"));
return true;
} catch (TException | InterruptedException e) {
LOG.warn("Failed to connect to hive server", e);
}
return false;
},
() -> {
// Test HDFS client if it can connect to HDFS server
Configuration conf = new Configuration();
conf.set(
"fs.defaultFS",
String.format(
"hdfs://%s:%d", getContainerIpAddress(), HiveContainer.HDFS_DEFAULTFS_PORT));
try (FileSystem fs = FileSystem.get(conf)) {
Path directoryPath = new Path("/");
FileStatus[] fileStatuses = fs.listStatus(directoryPath);
return fileStatuses.length > 0;
} catch (IOException e) {
LOG.warn("Failed to connect to HDFS server", e);
}
return false;
});
}

public static class Builder extends BaseContainer.Builder<HiveContainer.Builder, HiveContainer> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,28 +61,12 @@ public void start() {
super.start();

Preconditions.check("Initialization Trino JDBC connect failed!", initTrinoJdbcConnection());
Preconditions.check("Trino container startup failed!", checkContainerStatus(5));
Preconditions.check("Trino container startup failed!", checkContainerStatus(60_000));
}

@Override
protected boolean checkContainerStatus(int retryLimit) {
int nRetry = 0;
boolean isTrinoJdbcConnectionReady = false;
int sleepTime = 5000;
while (nRetry++ < retryLimit && !isTrinoJdbcConnectionReady) {
isTrinoJdbcConnectionReady = testTrinoJdbcConnection();
if (isTrinoJdbcConnectionReady) {
break;
} else {
try {
Thread.sleep(sleepTime);
LOG.warn("Waiting for trino server to be ready... ({}ms)", nRetry * sleepTime);
} catch (InterruptedException e) {
// ignore
}
}
}
return isTrinoJdbcConnectionReady;
return checkContainerStatusWithRetry(retryLimit, this::testTrinoJdbcConnection);
}

@Override
Expand Down Expand Up @@ -150,6 +134,7 @@ private boolean testTrinoJdbcConnection() {
return false;
}

LOG.info("Trino JDBC connection test success!");
return true;
}

Expand Down

0 comments on commit 14ec352

Please sign in to comment.