Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify BigArrays to take name of circuit breaker (#36461) #36501

Merged
merged 3 commits into from
Dec 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,11 @@ public Settings additionalSettings() {
}

@Override
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler,
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService) {
NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) {
return Collections.singletonMap(NETTY_TRANSPORT_NAME, () -> new Netty4Transport(settings, Version.CURRENT, threadPool,
networkService, bigArrays, namedWriteableRegistry, circuitBreakerService));
networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -104,9 +104,10 @@ public class Netty4Transport extends TcpTransport {
private volatile Bootstrap clientBootstrap;
private volatile NioEventLoopGroup eventLoopGroup;

public Netty4Transport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) {
super("netty", settings, version, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService);
public Netty4Transport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService,
PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
CircuitBreakerService circuitBreakerService) {
super("netty", settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
Netty4Utils.setAvailableProcessors(EsExecutors.PROCESSORS_SETTING.get(settings));
this.workerCount = WORKER_COUNT.get(settings);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.mocksocket.MockSocket;
import org.elasticsearch.test.ESTestCase;
Expand Down Expand Up @@ -65,8 +64,8 @@ public class Netty4SizeHeaderFrameDecoderTests extends ESTestCase {
public void startThreadPool() {
threadPool = new ThreadPool(settings);
NetworkService networkService = new NetworkService(Collections.emptyList());
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
nettyTransport = new Netty4Transport(settings, Version.CURRENT, threadPool, networkService, bigArrays,
PageCacheRecycler recycler = new MockPageCacheRecycler(Settings.EMPTY);
nettyTransport = new Netty4Transport(settings, Version.CURRENT, threadPool, networkService, recycler,
new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService());
nettyTransport.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.plugins.NetworkPlugin;
Expand Down Expand Up @@ -90,13 +89,13 @@ public static final class ExceptionThrowingNetty4Transport extends Netty4Transpo
public static class TestPlugin extends Plugin implements NetworkPlugin {

@Override
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool,
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService) {
return Collections.singletonMap("exception-throwing",
() -> new ExceptionThrowingNetty4Transport(settings, threadPool, networkService, bigArrays,
() -> new ExceptionThrowingNetty4Transport(settings, threadPool, networkService, pageCacheRecycler,
namedWriteableRegistry, circuitBreakerService));
}
}
Expand All @@ -105,10 +104,10 @@ public ExceptionThrowingNetty4Transport(
Settings settings,
ThreadPool threadPool,
NetworkService networkService,
BigArrays bigArrays,
PageCacheRecycler recycler,
NamedWriteableRegistry namedWriteableRegistry,
CircuitBreakerService circuitBreakerService) {
super(settings, Version.CURRENT, threadPool, networkService, bigArrays, namedWriteableRegistry, circuitBreakerService);
super(settings, Version.CURRENT, threadPool, networkService, recycler, namedWriteableRegistry, circuitBreakerService);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.AbstractBytesReferenceTestCase;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand All @@ -36,7 +37,7 @@
public class Netty4UtilsTests extends ESTestCase {

private static final int PAGE_SIZE = BigArrays.BYTE_PAGE_SIZE;
private final BigArrays bigarrays = new BigArrays(null, new NoneCircuitBreakerService(), false);
private final BigArrays bigarrays = new BigArrays(null, new NoneCircuitBreakerService(), CircuitBreaker.REQUEST);

public void testToChannelBufferWithEmptyRef() throws IOException {
ByteBuf buffer = Netty4Utils.toByteBuf(getRandomizedBytesReference(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.network.NetworkUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
Expand Down Expand Up @@ -118,9 +117,9 @@ public void testThatDefaultProfilePortOverridesGeneralConfiguration() throws Exc
}

private TcpTransport startTransport(Settings settings, ThreadPool threadPool) {
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
PageCacheRecycler recycler = new MockPageCacheRecycler(Settings.EMPTY);
TcpTransport transport = new Netty4Transport(settings, Version.CURRENT, threadPool, new NetworkService(Collections.emptyList()),
bigArrays, new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService());
recycler, new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService());
transport.start();

assertThat(transport.lifecycleState(), is(Lifecycle.State.STARTED));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.transport.MockTransportService;
Expand Down Expand Up @@ -55,7 +55,7 @@ public static MockTransportService nettyFromThreadPool(Settings settings, Thread
ClusterSettings clusterSettings, boolean doHandshake) {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
Transport transport = new Netty4Transport(settings, version, threadPool, new NetworkService(Collections.emptyList()),
BigArrays.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) {
PageCacheRecycler.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) {

@Override
public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.Module;
Expand Down Expand Up @@ -183,8 +184,8 @@ private static ClientTemplate buildTemplate(Settings providedSettings, Settings
settingsModule.getClusterSettings());
resourcesToClose.add(circuitBreakerService);
PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings);
BigArrays bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService);
resourcesToClose.add(bigArrays);
BigArrays bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.REQUEST);
resourcesToClose.add(pageCacheRecycler);
modules.add(settingsModule);
NetworkModule networkModule = new NetworkModule(settings, true, pluginsService.filterPlugins(NetworkPlugin.class), threadPool,
bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, null);
Expand All @@ -195,6 +196,7 @@ private static ClientTemplate buildTemplate(Settings providedSettings, Settings
UUIDs.randomBase64UUID()), null, Collections.emptySet());
modules.add((b -> {
b.bind(BigArrays.class).toInstance(bigArrays);
b.bind(PageCacheRecycler.class).toInstance(pageCacheRecycler);
b.bind(PluginsService.class).toInstance(pluginsService);
b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
Expand Down Expand Up @@ -375,7 +377,7 @@ public void close() {
closeables.add(plugin);
}
closeables.add(() -> ThreadPool.terminate(injector.getInstance(ThreadPool.class), 10, TimeUnit.SECONDS));
closeables.add(injector.getInstance(BigArrays.class));
closeables.add(injector.getInstance(PageCacheRecycler.class));
IOUtils.closeWhileHandlingException(closeables);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public NetworkModule(Settings settings, boolean transportClient, List<NetworkPlu
registerHttpTransport(entry.getKey(), entry.getValue());
}
}
Map<String, Supplier<Transport>> transportFactory = plugin.getTransports(settings, threadPool, bigArrays, pageCacheRecycler,
Map<String, Supplier<Transport>> transportFactory = plugin.getTransports(settings, threadPool, pageCacheRecycler,
circuitBreakerService, namedWriteableRegistry, networkService);
for (Map.Entry<String, Supplier<Transport>> entry : transportFactory.entrySet()) {
registerTransport(entry.getKey(), entry.getValue());
Expand Down
27 changes: 12 additions & 15 deletions server/src/main/java/org/elasticsearch/common/util/BigArrays.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
import java.util.Arrays;

/** Utility class to work with arrays. */
public class BigArrays implements Releasable {
public class BigArrays {

public static final BigArrays NON_RECYCLING_INSTANCE = new BigArrays(null, null, false);
public static final BigArrays NON_RECYCLING_INSTANCE = new BigArrays(null, null, CircuitBreaker.REQUEST);

/** Page size in bytes: 16KB */
public static final int PAGE_SIZE_IN_BYTES = 1 << 14;
Expand Down Expand Up @@ -83,11 +83,6 @@ static boolean indexIsInt(long index) {
return index == (int) index;
}

@Override
public void close() {
recycler.close();
}

private abstract static class AbstractArrayWrapper extends AbstractArray implements BigArray {

static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(ByteArrayWrapper.class);
Expand Down Expand Up @@ -369,24 +364,26 @@ public T set(long index, T value) {
}

final PageCacheRecycler recycler;
final CircuitBreakerService breakerService;
final boolean checkBreaker;
private final CircuitBreakerService breakerService;
private final boolean checkBreaker;
private final BigArrays circuitBreakingInstance;
private final String breakerName;

public BigArrays(PageCacheRecycler recycler, @Nullable final CircuitBreakerService breakerService) {
public BigArrays(PageCacheRecycler recycler, @Nullable final CircuitBreakerService breakerService, String breakerName) {
// Checking the breaker is disabled if not specified
this(recycler, breakerService, false);
this(recycler, breakerService, breakerName, false);
}

// public for tests
public BigArrays(PageCacheRecycler recycler, @Nullable final CircuitBreakerService breakerService, boolean checkBreaker) {
protected BigArrays(PageCacheRecycler recycler, @Nullable final CircuitBreakerService breakerService, String breakerName,
boolean checkBreaker) {
this.checkBreaker = checkBreaker;
this.recycler = recycler;
this.breakerService = breakerService;
this.breakerName = breakerName;
if (checkBreaker) {
this.circuitBreakingInstance = this;
} else {
this.circuitBreakingInstance = new BigArrays(recycler, breakerService, true);
this.circuitBreakingInstance = new BigArrays(recycler, breakerService, breakerName, true);
}
}

Expand All @@ -400,7 +397,7 @@ public BigArrays(PageCacheRecycler recycler, @Nullable final CircuitBreakerServi
*/
void adjustBreaker(final long delta, final boolean isDataAlreadyCreated) {
if (this.breakerService != null) {
CircuitBreaker breaker = this.breakerService.getBreaker(CircuitBreaker.REQUEST);
CircuitBreaker breaker = this.breakerService.getBreaker(breakerName);
if (this.checkBreaker) {
// checking breaker means potentially tripping, but it doesn't
// have to if the delta is negative
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ public class PageCacheRecycler implements Releasable {
private final Recycler<long[]> longPage;
private final Recycler<Object[]> objectPage;

public static final PageCacheRecycler NON_RECYCLING_INSTANCE;

static {
NON_RECYCLING_INSTANCE = new PageCacheRecycler(Settings.builder().put(LIMIT_HEAP_SETTING.getKey(), "0%").build());
}

@Override
public void close() {
Releasables.close(true, bytePage, intPage, longPage, objectPage);
Expand Down
8 changes: 5 additions & 3 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Binder;
Expand Down Expand Up @@ -410,7 +411,7 @@ protected Node(

PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings);
BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService);
resourcesToClose.add(bigArrays);
resourcesToClose.add(pageCacheRecycler);
modules.add(settingsModule);
List<NamedWriteableRegistry.Entry> namedWriteables = Stream.of(
NetworkModule.getNamedWriteables().stream(),
Expand Down Expand Up @@ -563,6 +564,7 @@ protected Node(
b.bind(ResourceWatcherService.class).toInstance(resourceWatcherService);
b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
b.bind(BigArrays.class).toInstance(bigArrays);
b.bind(PageCacheRecycler.class).toInstance(pageCacheRecycler);
b.bind(ScriptService.class).toInstance(scriptModule.getScriptService());
b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry());
b.bind(IngestService.class).toInstance(ingestService);
Expand Down Expand Up @@ -917,7 +919,7 @@ public synchronized void close() throws IOException {


toClose.add(injector.getInstance(NodeEnvironment.class));
toClose.add(injector.getInstance(BigArrays.class));
toClose.add(injector.getInstance(PageCacheRecycler.class));

if (logger.isTraceEnabled()) {
logger.trace("Close times for each service:\n{}", stopWatch.prettyPrint());
Expand Down Expand Up @@ -1000,7 +1002,7 @@ public static CircuitBreakerService createCircuitBreakerService(Settings setting
* This method can be overwritten by subclasses to change their {@link BigArrays} implementation for instance for testing
*/
BigArrays createBigArrays(PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService) {
return new BigArrays(pageCacheRecycler, circuitBreakerService);
return new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.REQUEST);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,9 @@ default List<TransportInterceptor> getTransportInterceptors(NamedWriteableRegist
* Returns a map of {@link Transport} suppliers.
* See {@link org.elasticsearch.common.network.NetworkModule#TRANSPORT_TYPE_KEY} to configure a specific implementation.
*/
default Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler,
default Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService) {
NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) {
return Collections.emptyMap();
}

Expand Down
Loading