Skip to content

Commit

Permalink
DynamicResourcePool: flat recursive calls in acquire (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
chaowangnk1 authored Oct 14, 2020
1 parent 3db7684 commit 4e5fb3a
Showing 1 changed file with 39 additions and 22 deletions.
61 changes: 39 additions & 22 deletions core/common/src/main/java/alluxio/resource/DynamicResourcePool.java
Original file line number Diff line number Diff line change
Expand Up @@ -277,13 +277,6 @@ public DynamicResourcePool(Options options) {
}, options.getInitialDelayMs(), options.getGcIntervalMs(), TimeUnit.MILLISECONDS);
}

/**
* @return available resources
*/
public int getAvailableResources() {
return mAvailableResources.size();
}

/**
* Acquires a resource of type {code T} from the pool.
*
Expand Down Expand Up @@ -317,27 +310,31 @@ public T acquire(long time, TimeUnit unit) throws TimeoutException, IOException

// Try to take a resource without blocking
ResourceInternal<T> resource = poll();
if (resource != null) {
return checkHealthyAndRetry(resource.mResource, endTimeMs);
}

if (!isFull()) {
// If the resource pool is empty but capacity is not yet full, create a new resource.
T newResource = createNewResource();
ResourceInternal<T> resourceInternal = new ResourceInternal<>(newResource);
if (add(resourceInternal)) {
return newResource;
} else {
closeResource(newResource);
}
if (resource != null && checkHealthy(resource.mResource)) {
return resource.mResource;
}

// Otherwise, try to take a resource from the pool, blocking if none are available.
try {
mLock.lock();
while (true) {
if (!isFull()) {
// If the resource pool is empty but capacity is not yet full, create a new resource.
T newResource = createNewResource();
ResourceInternal<T> resourceInternal = new ResourceInternal<>(newResource);
if (add(resourceInternal)) {
resource = resourceInternal;
break;
} else {
closeResource(newResource);
}
}
else {
LOG.warn("{} resources pool is full, waiting for resource released", this.getClass().getName());
}

resource = poll();
if (resource != null) {
if (resource != null && checkHealthy(resource.mResource)) {
break;
}
long currTimeMs = mClock.millis();
Expand All @@ -358,7 +355,7 @@ public T acquire(long time, TimeUnit unit) throws TimeoutException, IOException
mLock.unlock();
}

return checkHealthyAndRetry(resource.mResource, endTimeMs);
return resource.mResource;
}

/**
Expand Down Expand Up @@ -488,6 +485,26 @@ private T checkHealthyAndRetry(T resource, long endTimeMs) throws TimeoutExcepti
}
}

/**
* Checks whether the resource is healthy. If not remove it. When this called, the resource
* is not in mAvailableResources.
*
* @param resource the resource to check
* @param endTimeMs the end time to wait till
* @return the resource
* @throws TimeoutException if it times out to wait for a resource
*/
private boolean checkHealthy(T resource) throws TimeoutException, IOException {
if (isHealthy(resource)) {
return true;
} else {
remove(resource);
closeResource(resource);
return false;
}
}


// The following functions should be overridden by implementations.

/**
Expand Down

0 comments on commit 4e5fb3a

Please sign in to comment.