diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java index 14462042e2..4f31e0e232 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheImpl.java @@ -37,6 +37,7 @@ import java.util.Collections; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Phaser; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.Stream; @@ -58,7 +59,14 @@ class CuratorCacheImpl implements CuratorCache, CuratorCacheBridge private final boolean clearOnClose; private final StandardListenerManager listenerManager = StandardListenerManager.standard(); private final Consumer exceptionHandler; - private final OutstandingOps outstandingOps = new OutstandingOps(() -> callListeners(CuratorCacheListener::initialized)); + + private final Phaser outstandingOps = new Phaser() { + @Override + protected boolean onAdvance(int phase, int registeredParties) { + callListeners(CuratorCacheListener::initialized); + return true; + } + }; private enum State { @@ -210,10 +218,10 @@ else if ( event.getResultCode() == NONODE.intValue() ) { handleException(event); } - outstandingOps.decrement(); + outstandingOps.arriveAndDeregister(); }; - outstandingOps.increment(); + outstandingOps.register(); client.getChildren().inBackground(callback).forPath(fromPath); } catch ( Exception e ) @@ -245,10 +253,10 @@ else if ( event.getResultCode() == NONODE.intValue() ) { handleException(event); } - outstandingOps.decrement(); + outstandingOps.arriveAndDeregister(); }; - outstandingOps.increment(); + outstandingOps.register(); if ( compressedData ) { client.getData().decompressed().inBackground(callback).forPath(fromPath); diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/OutstandingOps.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/OutstandingOps.java deleted file mode 100644 index 4e7b540dba..0000000000 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/OutstandingOps.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.curator.framework.recipes.cache; - -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - -class OutstandingOps -{ - private final AtomicReference completionProc; - private final AtomicLong count = new AtomicLong(0); - private volatile boolean active = true; - - OutstandingOps(Runnable completionProc) - { - this.completionProc = new AtomicReference<>(completionProc); - } - - void increment() - { - if ( active ) - { - count.incrementAndGet(); - } - } - - void decrement() - { - if ( active && (count.decrementAndGet() == 0) ) - { - Runnable proc = completionProc.getAndSet(null); - if ( proc != null ) - { - active = false; - proc.run(); - } - } - } -}