-
Notifications
You must be signed in to change notification settings - Fork 25k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Tidy up PlainListenableActionFuture (#68735)
As of #53261 today `PlainListenableActionFuture` is the sole implementation of `ListenableActionFuture`, the reason for the `Plain...` prefix is now lost. It also heavily uses variable shadowing in its implementation which makes it quite hard to read, uses a mix of `volatile` fields and mutexes, and keeps hold of the deferred listeners even after completion. Finally it does not have many tests. This commit drops the unnecessary interface and renames the class to simply `ListenableActionFuture`. It reworks the implementation to avoid shadowing variables, drops the deferred listeners on completion, and moves to an entirely mutex-based implementation. Finally, it adds another test that it works as advertised.
- Loading branch information
1 parent
71763ca
commit ccf283e
Showing
9 changed files
with
208 additions
and
142 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
22 changes: 0 additions & 22 deletions
22
server/src/main/java/org/elasticsearch/action/ListenableActionFuture.java
This file was deleted.
Oops, something went wrong.
95 changes: 95 additions & 0 deletions
95
server/src/main/java/org/elasticsearch/action/support/ListenableActionFuture.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0 and the Server Side Public License, v 1; you may not use this file except | ||
* in compliance with, at your election, the Elastic License 2.0 or the Server | ||
* Side Public License, v 1. | ||
*/ | ||
|
||
package org.elasticsearch.action.support; | ||
|
||
import org.elasticsearch.action.ActionListener; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
|
||
/** | ||
* A {@code Future} and {@link ActionListener} against which which other {@link ActionListener}s can be registered later, to support | ||
* fanning-out a result to a dynamic collection of listeners. | ||
*/ | ||
public class ListenableActionFuture<T> extends AdapterActionFuture<T, T> { | ||
|
||
private Object listeners; | ||
private boolean executedListeners = false; | ||
|
||
/** | ||
* Registers an {@link ActionListener<T>} to be notified when this future is completed. If the future is already completed then the | ||
* listener is notified immediately, on the calling thread. If not, the listener is notified on the thread that completes the listener. | ||
*/ | ||
@SuppressWarnings("unchecked,rawtypes") | ||
public void addListener(final ActionListener<T> listener) { | ||
final boolean executeImmediate; | ||
synchronized (this) { | ||
executeImmediate = executedListeners; | ||
if (executeImmediate == false) { | ||
final Object oldListeners = listeners; | ||
final Object newListeners; | ||
if (oldListeners == null) { | ||
// adding the first listener | ||
newListeners = listener; | ||
} else if (oldListeners instanceof List) { | ||
// adding a listener after the second | ||
((List) oldListeners).add(listener); | ||
newListeners = oldListeners; | ||
} else { | ||
// adding the second listener | ||
newListeners = new ArrayList<>(2); | ||
((List) newListeners).add(oldListeners); | ||
((List) newListeners).add(listener); | ||
} | ||
this.listeners = newListeners; | ||
} | ||
} | ||
if (executeImmediate) { | ||
executeListener(listener); | ||
} | ||
} | ||
|
||
@Override | ||
@SuppressWarnings("unchecked,rawtypes") | ||
protected void done(boolean success) { | ||
super.done(success); | ||
final Object listenersToExecute; | ||
synchronized (this) { | ||
executedListeners = true; | ||
listenersToExecute = listeners; | ||
listeners = null; | ||
} | ||
|
||
if (listenersToExecute != null) { | ||
if (listenersToExecute instanceof List) { | ||
for (final Object listener : (List) listenersToExecute) { | ||
executeListener((ActionListener<T>) listener); | ||
} | ||
} else { | ||
executeListener((ActionListener<T>) listenersToExecute); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
protected T convert(T listenerResponse) { | ||
return listenerResponse; | ||
} | ||
|
||
private void executeListener(final ActionListener<T> listener) { | ||
try { | ||
// we use a timeout of 0 to by pass assertion forbidding to call actionGet() (blocking) on a network thread. | ||
// here we know we will never block | ||
listener.onResponse(actionGet(0)); | ||
} catch (Exception e) { | ||
listener.onFailure(e); | ||
} | ||
} | ||
|
||
} |
99 changes: 0 additions & 99 deletions
99
server/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.