Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support for bulk create option #1561

Merged
merged 1 commit into from
Jul 30, 2022
Merged

Adds support for bulk create option #1561

merged 1 commit into from
Jul 30, 2022

Conversation

jzonthemtn
Copy link
Contributor

@jzonthemtn jzonthemtn commented Jul 2, 2022

Description

Adds support for bulk create option

Issues Resolved

#1457 Create-only actions in OpenSearch bulk requests

Check List

  • New functionality includes testing.
  • New functionality has been documented.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@jzonthemtn jzonthemtn requested a review from a team as a code owner July 2, 2022 14:13
@codecov-commenter
Copy link

codecov-commenter commented Jul 2, 2022

Codecov Report

Merging #1561 (6556c5b) into main (385cf6d) will increase coverage by 0.04%.
The diff coverage is n/a.

@@             Coverage Diff              @@
##               main    #1561      +/-   ##
============================================
+ Coverage     94.51%   94.55%   +0.04%     
- Complexity     1152     1236      +84     
============================================
  Files           158      162       +4     
  Lines          3297     3509     +212     
  Branches        268      290      +22     
============================================
+ Hits           3116     3318     +202     
- Misses          128      129       +1     
- Partials         53       62       +9     
Impacted Files Coverage Δ
...dataprepper/model/configuration/PipelineModel.java 100.00% <0.00%> (ø)
...h/dataprepper/logstash/mapping/LogstashMapper.java 100.00% <0.00%> (ø)
...pper/peerforwarder/PeerForwarderConfiguration.java 84.52% <0.00%> (ø)
...eerforwarder/PeerForwardingProcessorDecorator.java 100.00% <0.00%> (ø)
...prepper/peerforwarder/discovery/DiscoveryMode.java 100.00% <0.00%> (ø)
...aprepper/peerforwarder/PeerForwarderAppConfig.java 100.00% <0.00%> (ø)
.../com/amazon/dataprepper/parser/PipelineParser.java 93.46% <0.00%> (+3.77%) ⬆️
...prepper/parser/model/DataPrepperConfiguration.java 93.75% <0.00%> (+8.96%) ⬆️

Help us with your feedback. Take ten seconds to tell us how you rate us.

Copy link
Member

@graytaylor0 graytaylor0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great! Thanks for making this contribution. I had a couple comments for some general code improvements, but nothing major.

if (bulkRequest.getOperationsCount() > 0) {
flushBatch(bulkRequest);
} else {
throw new RuntimeException("Invalid action: " + action);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should never have to throw a RuntimeException for an invalid configuration. I think it would be best to create an enum for the possible BulkActions, and to have an IllegalArgumentException be thrown from the IndexConfiguration class when trying to convert the action String passed by the user to the enum type

@@ -260,6 +273,11 @@ public Builder withIsmPolicyFile(final String ismPolicyFile) {
return this;
}

public Builder withAction(final String action) {
this.action = action;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add validation on construction by adding the following line here (given that BulkActions is an enum)

checkArgument(EnumUtils.isValidEnum(BulkActions.class, action), "action must be one of the folllowing: BulkActions.values()")

flushBatch(bulkRequest);
}

} else if(IndexConfiguration.BULK_ACTION_INDEX.equals(action)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can limit the amount of duplicate code by refactoring the if create, else if index to look something like this. It would also be nice to move creation of the BulkOperation to a Factory class with functions of provideBulkOperationForCreate and provideBulkOperationForIndex, but that isn't necessary for this PR.

for (final Record<Object> record : records) {
        final SerializedJson document = getDocument(record.getData());

        final Optional<String> docId = getDocumentIdFromDocument(document);

        BulkOperation bulkOperation;
        if (action.equals("index")) {
          final IndexOperation.Builder<Object> indexOperationBuilder = new IndexOperation.Builder<>()
            .index(indexManager.getIndexAlias());
            .document(document);

          if (docId.isPresent()) {
            indexOperationBuilder.id(docId);
          }

          indexOperationBuilder.build();

          bulkOperation = new BulkOperation.Builder()
                    .index(indexOperationBuilder.build())
                    .build();

        } else if (action.equals("create")) {

          final CreateOperation.Builder<Object> createOperationBuilder = new CreateOperation.Builder<>()
            .index(indexManager.getIndexAlias());
            .document(document);

          if (docId.isPresent()) {
            createOperationBuilder.id(docId);
          }

          createOperationBuilder.build();

          bulkOperation = new BulkOperation.Builder()
                .create(createOperationsBuilder.build())
                .build();
        }
                

        final long estimatedBytesBeforeAdd = bulkRequest.estimateSizeInBytesWithDocument(bulkOperation);
        if (bulkSize >= 0 && estimatedBytesBeforeAdd >= bulkSize && bulkRequest.getOperationsCount() > 0) {
          flushBatch(bulkRequest);
          bulkRequest = bulkRequestSupplier.get();
        }
        bulkRequest.addOperation(bulkOperation);
      }

      // Flush the remaining requests
      if (bulkRequest.getOperationsCount() > 0) {
        flushBatch(bulkRequest);
      }

    }
}

private Optional<String> getDocumentIdFromDocument(final SerializedJson document) {
  final Map documentAsMap;
        try {
          documentAsMap = objectMapper.readValue(document.getSerializedJson(), Map.class);
        } catch (IOException e) {
          throw new RuntimeException(e);
        }
        if (documentAsMap != null) {
          final String docId = (String) documentAsMap.get(documentIdField);
          if (docId != null) {
            return Optional.of(docId);
          }
        }
  return Optional.empty();
}

@jzonthemtn
Copy link
Contributor Author

This looks great! Thanks for making this contribution. I had a couple comments for some general code improvements, but nothing major.

Awesome - thanks! I will get those addressed shortly.

@jzonthemtn
Copy link
Contributor Author

@graytaylor0 Thanks a lot for those comments. Everything made sense. I pushed changes up.

Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jzonthemtn for this contribution! I have one request regarding the naming in the pipeline configurations.

@jzonthemtn
Copy link
Contributor Author

Looks like I messed up in there by merging from main and not signing the merge commit. What's the best way to fix that?

@graytaylor0
Copy link
Member

Looks like I messed up in there by merging from main and not signing the merge commit. What's the best way to fix that?

Thanks for making those changes! The best way to fix the DCO will be to rebase your branch against main with

git rebase -i main, and then you can just squash all your commits (or at least the one without the DCO signing)

Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jzonthemtn for this contribution! And thank you for the quick responses to our feedback.

@jzonthemtn jzonthemtn closed this Jul 21, 2022
@jzonthemtn jzonthemtn deleted the 1457_create-only-actions branch July 21, 2022 14:47
@graytaylor0
Copy link
Member

@jzonthemtn Any reason this was closed? If it was an accident we can reopen and merge.

@jzonthemtn jzonthemtn restored the 1457_create-only-actions branch July 30, 2022 11:35
@jzonthemtn
Copy link
Contributor Author

@jzonthemtn Any reason this was closed? If it was an accident we can reopen and merge.

No, I don't know why. I can only guess I was in the wrong issue. Sorry for my clumsiness.

@jzonthemtn jzonthemtn reopened this Jul 30, 2022
@graytaylor0 graytaylor0 merged commit 0ea2657 into opensearch-project:main Jul 30, 2022
engechas pushed a commit to engechas/data-prepper that referenced this pull request Sep 12, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants