Skip to content

Commit

Permalink
Modify BigArrays to take name of circuit breaker (#36461)
Browse files Browse the repository at this point in the history
This commit modifies BigArrays to take a circuit breaker name and
the circuit breaking service. The default instance of BigArrays that
is passed around everywhere always uses the request breaker. At the
network level, we want to be using the inflight request breaker. So this
change will allow that.

Additionally, as this change moves away from a single instance of
BigArrays, the class is modified to not be a Releasable anymore.
Releasing big arrays was always dispatching to the PageCacheRecycler,
so this change makes the PageCacheRecycler the class that needs to be
managed and torn-down.

Finally, this commit closes #31435 be making the serialization of
transport messages use the inflight request breaker. With this change,
we no longer push the global BigArrays instnace to the network level.
  • Loading branch information
Tim-Brooks authored Dec 11, 2018
1 parent a07fb31 commit 790f810
Show file tree
Hide file tree
Showing 38 changed files with 121 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,11 @@ public Settings additionalSettings() {
}

@Override
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler,
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService) {
NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) {
return Collections.singletonMap(NETTY_TRANSPORT_NAME, () -> new Netty4Transport(settings, Version.CURRENT, threadPool,
networkService, bigArrays, namedWriteableRegistry, circuitBreakerService));
networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -104,9 +104,10 @@ public class Netty4Transport extends TcpTransport {
private volatile Bootstrap clientBootstrap;
private volatile NioEventLoopGroup eventLoopGroup;

public Netty4Transport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) {
super("netty", settings, version, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService);
public Netty4Transport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService,
PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
CircuitBreakerService circuitBreakerService) {
super("netty", settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
Netty4Utils.setAvailableProcessors(EsExecutors.PROCESSORS_SETTING.get(settings));
this.workerCount = WORKER_COUNT.get(settings);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.mocksocket.MockSocket;
import org.elasticsearch.test.ESTestCase;
Expand Down Expand Up @@ -65,8 +64,8 @@ public class Netty4SizeHeaderFrameDecoderTests extends ESTestCase {
public void startThreadPool() {
threadPool = new ThreadPool(settings);
NetworkService networkService = new NetworkService(Collections.emptyList());
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
nettyTransport = new Netty4Transport(settings, Version.CURRENT, threadPool, networkService, bigArrays,
PageCacheRecycler recycler = new MockPageCacheRecycler(Settings.EMPTY);
nettyTransport = new Netty4Transport(settings, Version.CURRENT, threadPool, networkService, recycler,
new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService());
nettyTransport.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.plugins.NetworkPlugin;
Expand Down Expand Up @@ -90,13 +89,13 @@ public static final class ExceptionThrowingNetty4Transport extends Netty4Transpo
public static class TestPlugin extends Plugin implements NetworkPlugin {

@Override
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool,
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService) {
return Collections.singletonMap("exception-throwing",
() -> new ExceptionThrowingNetty4Transport(settings, threadPool, networkService, bigArrays,
() -> new ExceptionThrowingNetty4Transport(settings, threadPool, networkService, pageCacheRecycler,
namedWriteableRegistry, circuitBreakerService));
}
}
Expand All @@ -105,10 +104,10 @@ public ExceptionThrowingNetty4Transport(
Settings settings,
ThreadPool threadPool,
NetworkService networkService,
BigArrays bigArrays,
PageCacheRecycler recycler,
NamedWriteableRegistry namedWriteableRegistry,
CircuitBreakerService circuitBreakerService) {
super(settings, Version.CURRENT, threadPool, networkService, bigArrays, namedWriteableRegistry, circuitBreakerService);
super(settings, Version.CURRENT, threadPool, networkService, recycler, namedWriteableRegistry, circuitBreakerService);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.AbstractBytesReferenceTestCase;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand All @@ -36,7 +37,7 @@
public class Netty4UtilsTests extends ESTestCase {

private static final int PAGE_SIZE = BigArrays.BYTE_PAGE_SIZE;
private final BigArrays bigarrays = new BigArrays(null, new NoneCircuitBreakerService(), false);
private final BigArrays bigarrays = new BigArrays(null, new NoneCircuitBreakerService(), CircuitBreaker.REQUEST);

public void testToChannelBufferWithEmptyRef() throws IOException {
ByteBuf buffer = Netty4Utils.toByteBuf(getRandomizedBytesReference(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.network.NetworkUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
Expand Down Expand Up @@ -118,9 +117,9 @@ public void testThatDefaultProfilePortOverridesGeneralConfiguration() throws Exc
}

private TcpTransport startTransport(Settings settings, ThreadPool threadPool) {
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
PageCacheRecycler recycler = new MockPageCacheRecycler(Settings.EMPTY);
TcpTransport transport = new Netty4Transport(settings, Version.CURRENT, threadPool, new NetworkService(Collections.emptyList()),
bigArrays, new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService());
recycler, new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService());
transport.start();

assertThat(transport.lifecycleState(), is(Lifecycle.State.STARTED));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.transport.MockTransportService;
Expand Down Expand Up @@ -55,7 +55,7 @@ public static MockTransportService nettyFromThreadPool(Settings settings, Thread
ClusterSettings clusterSettings, boolean doHandshake) {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
Transport transport = new Netty4Transport(settings, version, threadPool, new NetworkService(Collections.emptyList()),
BigArrays.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) {
PageCacheRecycler.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) {

@Override
public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
Expand Down Expand Up @@ -66,16 +65,14 @@ public class NioTransport extends TcpTransport {
(s) -> Integer.toString(EsExecutors.numberOfProcessors(s) * 2),
(s) -> Setting.parseInt(s, 1, "transport.nio.worker_count"), Setting.Property.NodeScope);

protected final PageCacheRecycler pageCacheRecycler;
private final ConcurrentMap<String, TcpChannelFactory> profileToChannelFactory = newConcurrentMap();
private volatile NioGroup nioGroup;
private volatile Function<DiscoveryNode, TcpChannelFactory> clientChannelFactory;

protected NioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
protected NioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService,
PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
CircuitBreakerService circuitBreakerService) {
super("nio", settings, version, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService);
this.pageCacheRecycler = pageCacheRecycler;
super("nio", settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,12 @@ public List<Setting<?>> getSettings() {
}

@Override
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler,
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService) {
NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) {
return Collections.singletonMap(NIO_TRANSPORT_NAME,
() -> new NioTransport(settings, Version.CURRENT, threadPool, networkService, bigArrays, pageCacheRecycler,
namedWriteableRegistry, circuitBreakerService));
() -> new NioTransport(settings, Version.CURRENT, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry,
circuitBreakerService));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.plugins.NetworkPlugin;
Expand Down Expand Up @@ -90,22 +89,21 @@ public static final class ExceptionThrowingNioTransport extends NioTransport {
public static class TestPlugin extends Plugin implements NetworkPlugin {

@Override
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool,
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService) {
return Collections.singletonMap("exception-throwing",
() -> new ExceptionThrowingNioTransport(settings, threadPool, networkService, bigArrays, pageCacheRecycler,
() -> new ExceptionThrowingNioTransport(settings, threadPool, networkService, pageCacheRecycler,
namedWriteableRegistry, circuitBreakerService));
}
}

ExceptionThrowingNioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
ExceptionThrowingNioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService,
PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
CircuitBreakerService circuitBreakerService) {
super(settings, Version.CURRENT, threadPool, networkService, bigArrays, pageCacheRecycler, namedWriteableRegistry,
circuitBreakerService);
super(settings, Version.CURRENT, threadPool, networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.node.Node;
Expand Down Expand Up @@ -58,8 +57,8 @@ public static MockTransportService nioFromThreadPool(Settings settings, ThreadPo
ClusterSettings clusterSettings, boolean doHandshake) {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
NetworkService networkService = new NetworkService(Collections.emptyList());
Transport transport = new NioTransport(settings, version, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE,
new MockPageCacheRecycler(settings), namedWriteableRegistry, new NoneCircuitBreakerService()) {
Transport transport = new NioTransport(settings, version, threadPool, networkService, new MockPageCacheRecycler(settings),
namedWriteableRegistry, new NoneCircuitBreakerService()) {

@Override
public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.Module;
Expand Down Expand Up @@ -182,8 +183,8 @@ private static ClientTemplate buildTemplate(Settings providedSettings, Settings
settingsModule.getClusterSettings());
resourcesToClose.add(circuitBreakerService);
PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings);
BigArrays bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService);
resourcesToClose.add(bigArrays);
BigArrays bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.REQUEST);
resourcesToClose.add(pageCacheRecycler);
modules.add(settingsModule);
NetworkModule networkModule = new NetworkModule(settings, true, pluginsService.filterPlugins(NetworkPlugin.class), threadPool,
bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, null);
Expand All @@ -194,6 +195,7 @@ private static ClientTemplate buildTemplate(Settings providedSettings, Settings
UUIDs.randomBase64UUID()), null, Collections.emptySet());
modules.add((b -> {
b.bind(BigArrays.class).toInstance(bigArrays);
b.bind(PageCacheRecycler.class).toInstance(pageCacheRecycler);
b.bind(PluginsService.class).toInstance(pluginsService);
b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
Expand Down Expand Up @@ -374,7 +376,7 @@ public void close() {
closeables.add(plugin);
}
closeables.add(() -> ThreadPool.terminate(injector.getInstance(ThreadPool.class), 10, TimeUnit.SECONDS));
closeables.add(injector.getInstance(BigArrays.class));
closeables.add(injector.getInstance(PageCacheRecycler.class));
IOUtils.closeWhileHandlingException(closeables);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public NetworkModule(Settings settings, boolean transportClient, List<NetworkPlu
registerHttpTransport(entry.getKey(), entry.getValue());
}
}
Map<String, Supplier<Transport>> transportFactory = plugin.getTransports(settings, threadPool, bigArrays, pageCacheRecycler,
Map<String, Supplier<Transport>> transportFactory = plugin.getTransports(settings, threadPool, pageCacheRecycler,
circuitBreakerService, namedWriteableRegistry, networkService);
for (Map.Entry<String, Supplier<Transport>> entry : transportFactory.entrySet()) {
registerTransport(entry.getKey(), entry.getValue());
Expand Down
Loading

0 comments on commit 790f810

Please sign in to comment.