From 14ec35277715f183f724dc7c674cb66a56f57c98 Mon Sep 17 00:00:00 2001 From: mchades Date: Sat, 24 Feb 2024 23:35:18 +0800 Subject: [PATCH] fix retry --- .../test/container/BaseContainer.java | 29 +++- .../test/container/HiveContainer.java | 146 +++++++----------- .../test/container/TrinoContainer.java | 21 +-- 3 files changed, 90 insertions(+), 106 deletions(-) diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/container/BaseContainer.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/container/BaseContainer.java index 331efae498e..1d1fa717cf4 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/container/BaseContainer.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/container/BaseContainer.java @@ -18,6 +18,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.BooleanSupplier; import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -124,7 +125,33 @@ public void start() { container.start(); } - protected abstract boolean checkContainerStatus(int retryLimit); + protected abstract boolean checkContainerStatus(int timeoutMillis); + + protected boolean checkContainerStatusWithRetry(int timeoutMillis, BooleanSupplier... checker) { + int nRetry = 0; + int sleepTimeMillis = 5_000; + int totalSleepTimeMillis = 0; + + // retry until timeout + for (BooleanSupplier check : checker) { + while (totalSleepTimeMillis < timeoutMillis) { + if (check.getAsBoolean()) { + break; + } + + sleepTimeMillis = sleepTimeMillis * (int) Math.pow(2, nRetry++); + sleepTimeMillis = Math.min(sleepTimeMillis, timeoutMillis - totalSleepTimeMillis); + LOG.info("retrying after {}ms (retry count: {})", sleepTimeMillis, nRetry); + try { + Thread.sleep(sleepTimeMillis); + } catch (InterruptedException e) { + // ignore + } + totalSleepTimeMillis += sleepTimeMillis; + } + } + return totalSleepTimeMillis < timeoutMillis; + } // Execute the command in the container. public Container.ExecResult executeInContainer(String... commandAndArgs) { diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/container/HiveContainer.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/container/HiveContainer.java index 34d1d3276d7..63d0b29e4b8 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/container/HiveContainer.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/container/HiveContainer.java @@ -9,7 +9,6 @@ import com.datastrato.gravitino.catalog.hive.HiveClientPool; import com.google.common.collect.ImmutableSet; import java.io.IOException; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -18,7 +17,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.thrift.TException; import org.rnorth.ducttape.Preconditions; import org.slf4j.Logger; @@ -59,94 +57,68 @@ protected void setupContainer() { @Override public void start() { super.start(); - Preconditions.check("Hive container startup failed!", checkContainerStatus(5)); + Preconditions.check("Hive container startup failed!", checkContainerStatus(60_000)); } @Override - protected boolean checkContainerStatus(int retryLimit) { - int nRetry = 0; - boolean isHiveContainerReady = false; - int sleepTimeMillis = 10_000; - while (nRetry++ < retryLimit) { - try { - String[] commandAndArgs = new String[] {"bash", "/tmp/check-status.sh"}; - Container.ExecResult execResult = executeInContainer(commandAndArgs); - if (execResult.getExitCode() != 0) { - String message = - format( - "Command [%s] exited with %s", - String.join(" ", commandAndArgs), execResult.getExitCode()); - LOG.error("{}", message); - LOG.error("stderr: {}", execResult.getStderr()); - LOG.error("stdout: {}", execResult.getStdout()); - } else { - LOG.info("Hive container startup success!"); - isHiveContainerReady = true; - break; - } - LOG.info( - "Hive container is not ready, recheck({}/{}) after {}ms", - nRetry, - retryLimit, - sleepTimeMillis); - Thread.sleep(sleepTimeMillis); - } catch (RuntimeException e) { - LOG.error(e.getMessage(), e); - } catch (InterruptedException e) { - // ignore - } - } - - // Test hive client if it can connect to hive server - boolean isHiveConnectSuccess = false; - HiveConf hiveConf = new HiveConf(); - String hiveMetastoreUris = - String.format("thrift://%s:%d", getContainerIpAddress(), HiveContainer.HIVE_METASTORE_PORT); - hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, hiveMetastoreUris); - HiveClientPool hiveClientPool = new HiveClientPool(1, hiveConf); - - nRetry = 0; - while (nRetry++ < retryLimit) { - try { - List databases = hiveClientPool.run(IMetaStoreClient::getAllDatabases); - if (!databases.isEmpty()) { - isHiveConnectSuccess = true; - break; - } - Thread.sleep(3000); - } catch (TException | InterruptedException e) { - LOG.warn("Failed to connect to hive server, retrying...", e); - } - } - - // Test HDFS client if it can connect to HDFS server - boolean isHdfsConnectSuccess = false; - nRetry = 0; - Configuration conf = new Configuration(); - conf.set( - "fs.defaultFS", - String.format("hdfs://%s:%d", getContainerIpAddress(), HiveContainer.HDFS_DEFAULTFS_PORT)); - while (nRetry++ < retryLimit) { - try (FileSystem fs = FileSystem.get(conf)) { - Path directoryPath = new Path("/"); - FileStatus[] fileStatuses = fs.listStatus(directoryPath); - if (fileStatuses.length > 0) { - isHdfsConnectSuccess = true; - break; - } - Thread.sleep(3000); - } catch (IOException | InterruptedException e) { - LOG.warn("Failed to connect to HDFS server, retrying...", e); - } - } - - LOG.info( - "Hive container status: isHiveContainerReady={}, isHiveConnectSuccess={}, isHdfsConnectSuccess={}", - isHiveContainerReady, - isHiveConnectSuccess, - isHdfsConnectSuccess); - - return isHiveContainerReady && isHiveConnectSuccess && isHdfsConnectSuccess; + protected boolean checkContainerStatus(int timeoutMillis) { + + return checkContainerStatusWithRetry( + timeoutMillis, + () -> { + // Check if the HDFS datanode is ready + try { + String[] commandAndArgs = new String[] {"bash", "/tmp/check-status.sh"}; + Container.ExecResult execResult = executeInContainer(commandAndArgs); + if (execResult.getExitCode() == 0) { + LOG.info("Hive container startup success!"); + return true; + } + + String message = + format( + "Command [%s] exited with %s", + String.join(" ", commandAndArgs), execResult.getExitCode()); + LOG.error("{}", message); + LOG.error("stderr: {}", execResult.getStderr()); + LOG.error("stdout: {}", execResult.getStdout()); + } catch (RuntimeException e) { + LOG.error(e.getMessage(), e); + } + return false; + }, + () -> { + // Test hive client if it can connect to hive server + try { + HiveConf hiveConf = new HiveConf(); + String hiveMetastoreUris = + String.format( + "thrift://%s:%d", getContainerIpAddress(), HiveContainer.HIVE_METASTORE_PORT); + hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, hiveMetastoreUris); + HiveClientPool hiveClientPool = new HiveClientPool(1, hiveConf); + hiveClientPool.run(c -> c.getDatabases("default")); + return true; + } catch (TException | InterruptedException e) { + LOG.warn("Failed to connect to hive server", e); + } + return false; + }, + () -> { + // Test HDFS client if it can connect to HDFS server + Configuration conf = new Configuration(); + conf.set( + "fs.defaultFS", + String.format( + "hdfs://%s:%d", getContainerIpAddress(), HiveContainer.HDFS_DEFAULTFS_PORT)); + try (FileSystem fs = FileSystem.get(conf)) { + Path directoryPath = new Path("/"); + FileStatus[] fileStatuses = fs.listStatus(directoryPath); + return fileStatuses.length > 0; + } catch (IOException e) { + LOG.warn("Failed to connect to HDFS server", e); + } + return false; + }); } public static class Builder extends BaseContainer.Builder { diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/container/TrinoContainer.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/container/TrinoContainer.java index a3f31afe841..77854ae02c0 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/container/TrinoContainer.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/container/TrinoContainer.java @@ -61,28 +61,12 @@ public void start() { super.start(); Preconditions.check("Initialization Trino JDBC connect failed!", initTrinoJdbcConnection()); - Preconditions.check("Trino container startup failed!", checkContainerStatus(5)); + Preconditions.check("Trino container startup failed!", checkContainerStatus(60_000)); } @Override protected boolean checkContainerStatus(int retryLimit) { - int nRetry = 0; - boolean isTrinoJdbcConnectionReady = false; - int sleepTime = 5000; - while (nRetry++ < retryLimit && !isTrinoJdbcConnectionReady) { - isTrinoJdbcConnectionReady = testTrinoJdbcConnection(); - if (isTrinoJdbcConnectionReady) { - break; - } else { - try { - Thread.sleep(sleepTime); - LOG.warn("Waiting for trino server to be ready... ({}ms)", nRetry * sleepTime); - } catch (InterruptedException e) { - // ignore - } - } - } - return isTrinoJdbcConnectionReady; + return checkContainerStatusWithRetry(retryLimit, this::testTrinoJdbcConnection); } @Override @@ -150,6 +134,7 @@ private boolean testTrinoJdbcConnection() { return false; } + LOG.info("Trino JDBC connection test success!"); return true; }