Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement opensearch index partition creation supplier and PitWorker without processing indices #2821

Merged
merged 2 commits into from
Jun 6, 2023

Conversation

graytaylor0
Copy link
Member

@graytaylor0 graytaylor0 commented Jun 3, 2023

Description

Implements the OpenSearchIndexPartitionSupplier, which will list all indices of the source cluster and filter them by the include and exclude regex patterns.

Also starts implementation for PitWorker to use the scheduling configuration with source coordinator without processing the object. When calling getNextPartition returns empty, the PitWorker will back off and retry in a fixed 30 seconds. The code added for PitWorker here would be essentially the same for ScrollWorker (the interactions with source coordinator will be very similar.

OpenSearchService now uses a scheduled executor service instead of spawning new threads directly. This allows scheduling to start at the start_time. The stop method will let the SearchWorker future know to not grab another partition, and will get 30 seconds to complete processing of the index before the executor service shuts it down completely. This means that if 30 seconds is not enough time to finish processing the current partition, duplicate data processing will occur (unless there is a state that can track where to pick up from, at which point the SearchWorker could saveState and give up the partition gracefully)

Tested and expected partitions are created and "processed" successfully

Issues Resolved

Related to #1985

Check List

  • New functionality includes testing.
  • New functionality has been documented.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

…without processing indices

Signed-off-by: Taylor Gray <[email protected]>
cmanning09
cmanning09 previously approved these changes Jun 5, 2023
Copy link
Contributor

@cmanning09 cmanning09 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks fundamentally correct to me. I am offering a few suggestions to improve the code. Take them or leave them.

final long waitTimeBeforeStartMillis = startTime.toEpochMilli() - Instant.now().toEpochMilli() < 0 ? 0L :
startTime.toEpochMilli() - Instant.now().toEpochMilli();

LOG.info("The opensearch source will start processing data at {}. It is currently {}", startTime, Instant.now().toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the .toString() redundant?

return false;
}

final IndexParametersConfiguration indexParametersConfiguration = openSearchSourceConfiguration.getIndexParametersConfiguration();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take or leave it: This and other configurations values can be saved as a class variable instead of creating a new local variable for every index filter. These values are always the same.


final IndexParametersConfiguration indexParametersConfiguration = openSearchSourceConfiguration.getIndexParametersConfiguration();

if (Objects.isNull(openSearchSourceConfiguration.getIndexParametersConfiguration())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be simplified to: if (Objects.isNull(indexParametersConfiguration)) {

.collect(Collectors.toList());
}

private boolean isIndexIncludedInOneOfTheIncludePatternsAndNotExcludedInAnExcludePattern(final IndicesRecord indicesRecord) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take or leave it: A simpler name and one that would extend well if we added other filtering options could be:

  • shouldIndexBeProcessed
  • isIndexSelected

The current method name is very long and a little too specific. I am open to other ideas as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like shouldIndexBeProcessed

Comment on lines 93 to 104
for (final OpenSearchIndex index : includedIndices) {
final Matcher matcher = index.getIndexNamePattern().matcher(indicesRecord.index());

if (matcher.matches()) {
matchesIncludedPattern = true;
break;
}
}

boolean matchesExcludePattern = false;

for (final OpenSearchIndex index : excludedIndices) {
final Matcher matcher = index.getIndexNamePattern().matcher(indicesRecord.index());

if (matcher.matches()) {
matchesExcludePattern = true;
break;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can eliminate code duplication, variable re-assignment and breaks by pull this into a separate function:

    pubic boolean doesIndexMatchPattern(final List<OpenSearchIndex> indices, final IndicesRecord indicesRecord) {
        for (final OpenSearchIndex index : excludedIndices) {
            final Matcher matcher = index.getIndexNamePattern().matcher(indicesRecord.index());

            if (matcher.matches()) {
                return true;
            }
        }
        return false;
    }

Then you can do something simple like:

final boolean matchesIncludedPattern = includedIndices.isEmpty() ? true : doesIndexMatchPattern(includedIndices, indicesRecord);
final boolean matchesExcludePattern = doesIndexMatchPattern(excludedIndices, indicesRecord);
return matchesIncludedPattern && !matchesExcludePattern;

Signed-off-by: Taylor Gray <[email protected]>

final List<PartitionIdentifier> partitionIdentifierList = createObjectUnderTest().apply(Collections.emptyMap());

assertThat(partitionIdentifierList, notNullValue());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think we should verify number of partitions here.

@@ -56,4 +56,9 @@ public SearchScrollResponse searchWithScroll(SearchScrollRequest searchScrollReq
public void deleteScroll(DeleteScrollRequest deleteScrollRequest) {
//todo: implement
}

@Override
public Object getClient() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we implement ClusterClientFactory here if we are returning null?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ElasticSearch accessor is not implemented yet

@graytaylor0 graytaylor0 merged commit 10d3984 into opensearch-project:main Jun 6, 2023
MaGonzalMayedo pushed a commit to MaGonzalMayedo/data-prepper that referenced this pull request Jun 21, 2023
…without processing indices (opensearch-project#2821)

Implement opensearch index partition creation supplier and PitWorker without processing indices

Signed-off-by: Taylor Gray <[email protected]>
Signed-off-by: Marcos_Gonzalez_Mayedo <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants