Skip to content

Commit

Permalink
Fixes assertion check in RollupShardIndexer
Browse files Browse the repository at this point in the history
This commit removes the assertion in RollupShardIndexer that verifies that
temporary files are deleted. Since it is the responsibility of the indexer
to instruct the OS to delete files, it may not do so in a timely manner. This
results in a potentially flaky assertion. Instead, a new unit test is introduced
that will introspect the indexer and assert that it had successfully called
for the files to be deleted.

Closes elastic#68609.
  • Loading branch information
talevy committed Feb 11, 2021
1 parent 6cf33b1 commit 5eaa197
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ public void testRollupIndex() throws Exception {
assertBusy(() -> assertTrue(indexExists(index)));
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/68609")
public void testRollupIndexAndSetNewRollupPolicy() throws Exception {
createIndexWithSettings(client(), index, alias, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,15 @@ class RollupShardIndexer {
private final List<FieldValueFetcher> metricsFieldFetchers;

private final CompressingOfflineSorter sorter;
private final Set<String> tmpFiles = new HashSet<>();

private final BulkProcessor bulkProcessor;
private final AtomicLong numSent = new AtomicLong();
private final AtomicLong numIndexed = new AtomicLong();

// for testing
final Set<String> tmpFiles = new HashSet<>();
final Set<String> tmpFilesDeleted = new HashSet<>();

RollupShardIndexer(Client client,
IndexService indexService,
ShardId shardId,
Expand Down Expand Up @@ -131,6 +134,12 @@ public IndexOutput createTempOutput(String prefix, String suffix, IOContext cont
tmpFiles.add(output.getName());
return output;
}

@Override
public void deleteFile(String name) throws IOException {
tmpFilesDeleted.add(name);
super.deleteFile(name);
}
};
this.searchExecutionContext = indexService.newSearchExecutionContext(
indexShard.shardId().id(),
Expand Down Expand Up @@ -192,23 +201,12 @@ public long execute() throws IOException {
do {
bucket = computeBucket(bucket);
} while (bucket != null);
} finally {
assert(checkCleanDirectory(dir, tmpFiles));
}
// TODO: check that numIndexed == numSent, otherwise throw an exception
logger.info("Successfully sent [" + numIndexed.get() + "], indexed [" + numIndexed.get() + "]");
return numIndexed.get();
}

// check that all temporary files are deleted
private static boolean checkCleanDirectory(Directory dir, Set<String> tmpFiles) throws IOException {
Set<String> allFiles = Arrays.stream(dir.listAll()).collect(Collectors.toSet());
for (String tmpFile : tmpFiles) {
assert(allFiles.contains(tmpFile) == false);
}
return true;
}

private BulkProcessor createBulkProcessor() {
final BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder;
Expand Down Expand Up @@ -89,6 +93,31 @@ public void setup() {
"categorical_1", "type=keyword").get();
}

public void testRollupShardIndexerCleansTempFiles() throws IOException {
// create rollup config and index documents into source index
RollupActionDateHistogramGroupConfig dateHistogramGroupConfig = randomRollupActionDateHistogramGroupConfig("date_1");
SourceSupplier sourceSupplier = () -> XContentFactory.jsonBuilder().startObject()
.field("date_1", randomDateForInterval(dateHistogramGroupConfig.getInterval()))
.field("categorical_1", randomAlphaOfLength(1))
.field("numeric_1", randomDouble())
.endObject();
RollupActionConfig config = new RollupActionConfig(
new RollupActionGroupConfig(dateHistogramGroupConfig, null, new TermsGroupConfig("categorical_1")),
Collections.singletonList(new MetricConfig("numeric_1", Collections.singletonList("max"))));
bulkIndex(sourceSupplier);

IndicesService indexServices = getInstanceFromNode(IndicesService.class);
Index srcIndex = resolveIndex(index);
IndexService indexService = indexServices.indexServiceSafe(srcIndex);
IndexShard shard = indexService.getShard(0);

// re-use source index as temp index for test
RollupShardIndexer indexer = new RollupShardIndexer(client(), indexService, shard.shardId(), config, index, 2);
indexer.execute();
assertThat(indexer.tmpFilesDeleted, equalTo(indexer.tmpFiles));
// assert that files are deleted
}

public void testCannotRollupToExistingIndex() throws Exception {
RollupActionDateHistogramGroupConfig dateHistogramGroupConfig = randomRollupActionDateHistogramGroupConfig("date_1");
SourceSupplier sourceSupplier = () -> XContentFactory.jsonBuilder().startObject()
Expand Down

0 comments on commit 5eaa197

Please sign in to comment.