Skip to content

Commit

Permalink
Harden discard logic in ExchangeBuffer (#100636) (#100683)
Browse files Browse the repository at this point in the history
We can leave pages in the ExchangeBuffer if the noMoreInputs flag is set 
to true after we've checked it but before we add pages to the queue. I
can reliably reproduce the testFromLimit by inserting a delay in
between. This change hardens the discard logic by moving the check after
we've added a Page to the queue. If the noMoreInputs flag is set to
true, we will drain the pages from the queue.
  • Loading branch information
dnhatn authored Oct 11, 2023
1 parent 40f0d83 commit 9cf4fc7
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,12 @@ final class ExchangeBuffer {
}

void addPage(Page page) {
queue.add(page);
if (queueSize.incrementAndGet() == 1) {
notifyNotEmpty();
}
if (noMoreInputs) {
page.releaseBlocks();
} else {
queue.add(page);
if (queueSize.incrementAndGet() == 1) {
notifyNotEmpty();
}
discardPages();
}
}

Expand Down Expand Up @@ -115,13 +114,17 @@ SubscribableListener<Void> waitForReading() {
}
}

private void discardPages() {
Page p;
while ((p = pollPage()) != null) {
p.releaseBlocks();
}
}

void finish(boolean drainingPages) {
noMoreInputs = true;
if (drainingPages) {
Page p;
while ((p = pollPage()) != null) {
p.releaseBlocks();
}
discardPages();
}
notifyNotEmpty();
if (drainingPages || queueSize.get() == 0) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator.exchange;

import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.compute.data.BasicBlockTests;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.MockBlockFactory;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.test.ESTestCase;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.hamcrest.Matchers.equalTo;

public class ExchangeBufferTests extends ESTestCase {

public void testDrainPages() throws Exception {
ExchangeBuffer buffer = new ExchangeBuffer(randomIntBetween(10, 1000));
var blockFactory = blockFactory();
CountDownLatch latch = new CountDownLatch(1);
Thread[] producers = new Thread[between(1, 4)];
AtomicBoolean stopped = new AtomicBoolean();
AtomicInteger addedPages = new AtomicInteger();
for (int t = 0; t < producers.length; t++) {
producers[t] = new Thread(() -> {
try {
latch.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new AssertionError(e);
}
while (stopped.get() == false && addedPages.incrementAndGet() < 10_000) {
buffer.addPage(randomPage(blockFactory));
}
});
producers[t].start();
}
latch.countDown();
try {
int minPage = between(10, 100);
int receivedPage = 0;
while (receivedPage < minPage) {
Page p = buffer.pollPage();
if (p != null) {
p.releaseBlocks();
++receivedPage;
}
}
} finally {
buffer.finish(true);
stopped.set(true);
}
for (Thread t : producers) {
t.join();
}
assertThat(buffer.size(), equalTo(0));
blockFactory.ensureAllBlocksAreReleased();
}

private static MockBlockFactory blockFactory() {
BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, ByteSizeValue.ofGb(1)).withCircuitBreaking();
CircuitBreaker breaker = bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST);
return new MockBlockFactory(breaker, bigArrays);
}

private static Page randomPage(BlockFactory blockFactory) {
Block block = BasicBlockTests.randomBlock(
blockFactory,
randomFrom(ElementType.LONG, ElementType.BYTES_REF, ElementType.BOOLEAN),
randomIntBetween(1, 100),
randomBoolean(),
0,
between(1, 2),
0,
between(1, 2)
).block();
return new Page(block);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,6 @@ public void testFromStatsLimit() {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/99826")
public void testFromLimit() {
try (EsqlQueryResponse results = run("from test | keep data | limit 2")) {
logger.info(results);
Expand Down

0 comments on commit 9cf4fc7

Please sign in to comment.