From 4e5fb3a59fc91fde9994b21bcad3f6dcdbdafc12 Mon Sep 17 00:00:00 2001 From: chaowangnk1 <71095236+chaowangnk1@users.noreply.github.com> Date: Tue, 13 Oct 2020 17:05:29 -0700 Subject: [PATCH] DynamicResourcePool: flat recursive calls in acquire (#1) --- .../alluxio/resource/DynamicResourcePool.java | 61 ++++++++++++------- 1 file changed, 39 insertions(+), 22 deletions(-) diff --git a/core/common/src/main/java/alluxio/resource/DynamicResourcePool.java b/core/common/src/main/java/alluxio/resource/DynamicResourcePool.java index 9f33beb1e45f..8d051b263a0a 100644 --- a/core/common/src/main/java/alluxio/resource/DynamicResourcePool.java +++ b/core/common/src/main/java/alluxio/resource/DynamicResourcePool.java @@ -277,13 +277,6 @@ public DynamicResourcePool(Options options) { }, options.getInitialDelayMs(), options.getGcIntervalMs(), TimeUnit.MILLISECONDS); } - /** - * @return available resources - */ - public int getAvailableResources() { - return mAvailableResources.size(); - } - /** * Acquires a resource of type {code T} from the pool. * @@ -317,27 +310,31 @@ public T acquire(long time, TimeUnit unit) throws TimeoutException, IOException // Try to take a resource without blocking ResourceInternal resource = poll(); - if (resource != null) { - return checkHealthyAndRetry(resource.mResource, endTimeMs); - } - - if (!isFull()) { - // If the resource pool is empty but capacity is not yet full, create a new resource. - T newResource = createNewResource(); - ResourceInternal resourceInternal = new ResourceInternal<>(newResource); - if (add(resourceInternal)) { - return newResource; - } else { - closeResource(newResource); - } + if (resource != null && checkHealthy(resource.mResource)) { + return resource.mResource; } // Otherwise, try to take a resource from the pool, blocking if none are available. try { mLock.lock(); while (true) { + if (!isFull()) { + // If the resource pool is empty but capacity is not yet full, create a new resource. + T newResource = createNewResource(); + ResourceInternal resourceInternal = new ResourceInternal<>(newResource); + if (add(resourceInternal)) { + resource = resourceInternal; + break; + } else { + closeResource(newResource); + } + } + else { + LOG.warn("{} resources pool is full, waiting for resource released", this.getClass().getName()); + } + resource = poll(); - if (resource != null) { + if (resource != null && checkHealthy(resource.mResource)) { break; } long currTimeMs = mClock.millis(); @@ -358,7 +355,7 @@ public T acquire(long time, TimeUnit unit) throws TimeoutException, IOException mLock.unlock(); } - return checkHealthyAndRetry(resource.mResource, endTimeMs); + return resource.mResource; } /** @@ -488,6 +485,26 @@ private T checkHealthyAndRetry(T resource, long endTimeMs) throws TimeoutExcepti } } + /** + * Checks whether the resource is healthy. If not remove it. When this called, the resource + * is not in mAvailableResources. + * + * @param resource the resource to check + * @param endTimeMs the end time to wait till + * @return the resource + * @throws TimeoutException if it times out to wait for a resource + */ + private boolean checkHealthy(T resource) throws TimeoutException, IOException { + if (isHealthy(resource)) { + return true; + } else { + remove(resource); + closeResource(resource); + return false; + } + } + + // The following functions should be overridden by implementations. /**