Skip to content

Commit

Permalink
Harden discard logic in ExchangeBuffer (#100636)
Browse files Browse the repository at this point in the history
We can leave pages in the ExchangeBuffer if the noMoreInputs flag is set 
to true after we've checked it but before we add pages to the queue. I
can reliably reproduce the testFromLimit by inserting a delay in
between. This change hardens the discard logic by moving the check after
we've added a Page to the queue. If the noMoreInputs flag is set to
true, we will drain the pages from the queue.
  • Loading branch information
dnhatn authored Oct 11, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 6404723 commit e411b57
Showing 3 changed files with 106 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -41,13 +41,12 @@ final class ExchangeBuffer {
}

void addPage(Page page) {
queue.add(page);
if (queueSize.incrementAndGet() == 1) {
notifyNotEmpty();
}
if (noMoreInputs) {
page.releaseBlocks();
} else {
queue.add(page);
if (queueSize.incrementAndGet() == 1) {
notifyNotEmpty();
}
discardPages();
}
}

@@ -115,13 +114,17 @@ SubscribableListener<Void> waitForReading() {
}
}

private void discardPages() {
Page p;
while ((p = pollPage()) != null) {
p.releaseBlocks();
}
}

void finish(boolean drainingPages) {
noMoreInputs = true;
if (drainingPages) {
Page p;
while ((p = pollPage()) != null) {
p.releaseBlocks();
}
discardPages();
}
notifyNotEmpty();
if (drainingPages || queueSize.get() == 0) {
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator.exchange;

import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.compute.data.BasicBlockTests;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.MockBlockFactory;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.test.ESTestCase;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.hamcrest.Matchers.equalTo;

public class ExchangeBufferTests extends ESTestCase {

public void testDrainPages() throws Exception {
ExchangeBuffer buffer = new ExchangeBuffer(randomIntBetween(10, 1000));
var blockFactory = blockFactory();
CountDownLatch latch = new CountDownLatch(1);
Thread[] producers = new Thread[between(1, 4)];
AtomicBoolean stopped = new AtomicBoolean();
AtomicInteger addedPages = new AtomicInteger();
for (int t = 0; t < producers.length; t++) {
producers[t] = new Thread(() -> {
try {
latch.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new AssertionError(e);
}
while (stopped.get() == false && addedPages.incrementAndGet() < 10_000) {
buffer.addPage(randomPage(blockFactory));
}
});
producers[t].start();
}
latch.countDown();
try {
int minPage = between(10, 100);
int receivedPage = 0;
while (receivedPage < minPage) {
Page p = buffer.pollPage();
if (p != null) {
p.releaseBlocks();
++receivedPage;
}
}
} finally {
buffer.finish(true);
stopped.set(true);
}
for (Thread t : producers) {
t.join();
}
assertThat(buffer.size(), equalTo(0));
blockFactory.ensureAllBlocksAreReleased();
}

private static MockBlockFactory blockFactory() {
BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, ByteSizeValue.ofGb(1)).withCircuitBreaking();
CircuitBreaker breaker = bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST);
return new MockBlockFactory(breaker, bigArrays);
}

private static Page randomPage(BlockFactory blockFactory) {
Block block = BasicBlockTests.randomBlock(
blockFactory,
randomFrom(ElementType.LONG, ElementType.BYTES_REF, ElementType.BOOLEAN),
randomIntBetween(1, 100),
randomBoolean(),
0,
between(1, 2),
0,
between(1, 2)
).block();
return new Page(block);
}
}
Original file line number Diff line number Diff line change
@@ -862,7 +862,6 @@ public void testFromStatsLimit() {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/99826")
public void testFromLimit() {
try (EsqlQueryResponse results = run("from test | keep data | limit 2")) {
logger.info(results);

0 comments on commit e411b57

Please sign in to comment.