Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 committed Jun 9, 2023
1 parent a858cee commit 603a1c6
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeResponse;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeletePointInTimeRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeResponse;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeResults;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -127,18 +127,18 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op
}

final SearchConfiguration searchConfiguration = openSearchSourceConfiguration.getSearchConfiguration();
SearchPointInTimeResponse searchPointInTimeResponse = null;
SearchPointInTimeResults searchPointInTimeResults = null;

// todo: Pass query and sort options from SearchConfiguration to the search request
do {
try {
searchPointInTimeResponse = searchAccessor.searchWithPit(SearchPointInTimeRequest.builder()
searchPointInTimeResults = searchAccessor.searchWithPit(SearchPointInTimeRequest.builder()
.withPitId(openSearchIndexProgressState.getPitId())
.withKeepAlive(EXTEND_KEEP_ALIVE_TIME)
.withPaginationSize(searchConfiguration.getBatchSize())
.withSearchAfter(Objects.nonNull(searchPointInTimeResponse) ? searchPointInTimeResponse.getNextSearchAfter() : null)
.withSearchAfter(Objects.nonNull(searchPointInTimeResults) ? searchPointInTimeResults.getNextSearchAfter() : null)
.build());
buffer.writeAll(searchPointInTimeResponse.getDocuments().stream().map(Record::new).collect(Collectors.toList()), BUFFER_TIMEOUT_MILLIS);
buffer.writeAll(searchPointInTimeResults.getDocuments().stream().map(Record::new).collect(Collectors.toList()), BUFFER_TIMEOUT_MILLIS);
} catch (final TimeoutException e) {
// todo: implement backoff and retry, can reuse buffer accumulator code from the s3 source
} catch (final Exception e) {
Expand All @@ -148,7 +148,7 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op

// todo: Don't save state on every iteration of paginating, save search_after to state to pick up where left off in case of crash
sourceCoordinator.saveProgressStateForPartition(indexName, openSearchIndexProgressState);
} while (searchPointInTimeResponse.getDocuments().size() == searchConfiguration.getBatchSize());
} while (searchPointInTimeResults.getDocuments().size() == searchConfiguration.getBatchSize());


// todo: This API call is failing with sigv4 enabled due to a mismatch in the signature. Tracking issue (https://github.com/opensearch-project/opensearch-java/issues/521)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeleteScrollRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchContextType;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeResponse;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeResults;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse;

Expand All @@ -30,7 +30,7 @@ public CreatePointInTimeResponse createPit(final CreatePointInTimeRequest create
}

@Override
public SearchPointInTimeResponse searchWithPit(SearchPointInTimeRequest searchPointInTimeRequest) {
public SearchPointInTimeResults searchWithPit(SearchPointInTimeRequest searchPointInTimeRequest) {
//todo: implement
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeleteScrollRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchContextType;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeResponse;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeResults;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse;
import org.slf4j.Logger;
Expand Down Expand Up @@ -93,7 +93,7 @@ public CreatePointInTimeResponse createPit(final CreatePointInTimeRequest create
}

@Override
public SearchPointInTimeResponse searchWithPit(final SearchPointInTimeRequest searchPointInTimeRequest) {
public SearchPointInTimeResults searchWithPit(final SearchPointInTimeRequest searchPointInTimeRequest) {
try {
final SearchResponse<ObjectNode> searchResponse = openSearchClient.search(
SearchRequest.of(builder -> {
Expand All @@ -117,9 +117,13 @@ public SearchPointInTimeResponse searchWithPit(final SearchPointInTimeRequest se
.withEventType(EventType.DOCUMENT.toString()).build())
.collect(Collectors.toList());

return SearchPointInTimeResponse.builder()
final List<String> nextSearchAfter = Objects.nonNull(searchResponse.hits().hits()) && !searchResponse.hits().hits().isEmpty() ?
searchResponse.hits().hits().get(searchResponse.hits().hits().size() - 1).sort() :
null;

return SearchPointInTimeResults.builder()
.withDocuments(documents)
.withNextSearchAfter(searchResponse.hits().hits().get(searchResponse.hits().hits().size() - 1).sort())
.withNextSearchAfter(nextSearchAfter)
.build();
} catch (final IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeleteScrollRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchContextType;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeResponse;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeResults;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse;

Expand Down Expand Up @@ -42,7 +42,7 @@ public interface SearchAccessor {
* @return
* @since 2.4
*/
SearchPointInTimeResponse searchWithPit(SearchPointInTimeRequest searchPointInTimeRequest);
SearchPointInTimeResults searchWithPit(SearchPointInTimeRequest searchPointInTimeRequest);

/**
* Deletes PITs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import java.util.List;

public class SearchPointInTimeResponse {
public class SearchPointInTimeResults {

private final List<Event> documents;
private final List<String> nextSearchAfter;
Expand All @@ -21,13 +21,13 @@ public List<String> getNextSearchAfter() {
return nextSearchAfter;
}

private SearchPointInTimeResponse(final SearchPointInTimeResponse.Builder builder) {
private SearchPointInTimeResults(final SearchPointInTimeResults.Builder builder) {
this.documents = builder.documents;
this.nextSearchAfter = builder.nextSearchAfter;
}

public static SearchPointInTimeResponse.Builder builder() {
return new SearchPointInTimeResponse.Builder();
public static SearchPointInTimeResults.Builder builder() {
return new SearchPointInTimeResults.Builder();
}

public static class Builder {
Expand All @@ -39,19 +39,19 @@ public Builder() {

}

public SearchPointInTimeResponse.Builder withDocuments(final List<Event> documents) {
public SearchPointInTimeResults.Builder withDocuments(final List<Event> documents) {
this.documents = documents;
return this;
}

public SearchPointInTimeResponse.Builder withNextSearchAfter(final List<String> nextSearchAfter) {
public SearchPointInTimeResults.Builder withNextSearchAfter(final List<String> nextSearchAfter) {
this.nextSearchAfter = nextSearchAfter;
return this;
}


public SearchPointInTimeResponse build() {
return new SearchPointInTimeResponse(this);
public SearchPointInTimeResults build() {
return new SearchPointInTimeResults(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeResponse;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeletePointInTimeRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeResponse;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeResults;

import java.time.Duration;
import java.util.Collections;
Expand Down Expand Up @@ -116,13 +116,13 @@ void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_pit_
when(searchConfiguration.getBatchSize()).thenReturn(2);
when(openSearchSourceConfiguration.getSearchConfiguration()).thenReturn(searchConfiguration);

final SearchPointInTimeResponse searchPointInTimeResponse = mock(SearchPointInTimeResponse.class);
when(searchPointInTimeResponse.getNextSearchAfter()).thenReturn(Collections.singletonList(UUID.randomUUID().toString()));
when(searchPointInTimeResponse.getDocuments()).thenReturn(List.of(mock(Event.class), mock(Event.class))).thenReturn(List.of(mock(Event.class), mock(Event.class)))
final SearchPointInTimeResults searchPointInTimeResults = mock(SearchPointInTimeResults.class);
when(searchPointInTimeResults.getNextSearchAfter()).thenReturn(Collections.singletonList(UUID.randomUUID().toString()));
when(searchPointInTimeResults.getDocuments()).thenReturn(List.of(mock(Event.class), mock(Event.class))).thenReturn(List.of(mock(Event.class), mock(Event.class)))
.thenReturn(List.of(mock(Event.class))).thenReturn(List.of(mock(Event.class)));

final ArgumentCaptor<SearchPointInTimeRequest> searchPointInTimeRequestArgumentCaptor = ArgumentCaptor.forClass(SearchPointInTimeRequest.class);
when(searchAccessor.searchWithPit(searchPointInTimeRequestArgumentCaptor.capture())).thenReturn(searchPointInTimeResponse);
when(searchAccessor.searchWithPit(searchPointInTimeRequestArgumentCaptor.capture())).thenReturn(searchPointInTimeResults);

doNothing().when(buffer).writeAll(anyCollection(), eq(BUFFER_TIMEOUT_MILLIS));

Expand Down Expand Up @@ -168,7 +168,7 @@ void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_pit_
assertThat(searchPointInTimeRequestList.get(1).getPitId(), equalTo(pitId));
assertThat(searchPointInTimeRequestList.get(1).getKeepAlive(), equalTo(EXTEND_KEEP_ALIVE_TIME));
assertThat(searchPointInTimeRequestList.get(1).getPaginationSize(), equalTo(2));
assertThat(searchPointInTimeRequestList.get(1).getSearchAfter(), equalTo(searchPointInTimeResponse.getNextSearchAfter()));
assertThat(searchPointInTimeRequestList.get(1).getSearchAfter(), equalTo(searchPointInTimeResults.getNextSearchAfter()));


final DeletePointInTimeRequest deletePointInTimeRequest = deleteRequestArgumentCaptor.getValue();
Expand All @@ -192,12 +192,12 @@ void run_with_getNextPartition_with_valid_existing_point_in_time_does_not_create
when(searchConfiguration.getBatchSize()).thenReturn(2);
when(openSearchSourceConfiguration.getSearchConfiguration()).thenReturn(searchConfiguration);

final SearchPointInTimeResponse searchPointInTimeResponse = mock(SearchPointInTimeResponse.class);
when(searchPointInTimeResponse.getNextSearchAfter()).thenReturn(Collections.singletonList(UUID.randomUUID().toString()));
when(searchPointInTimeResponse.getDocuments()).thenReturn(List.of(mock(Event.class), mock(Event.class))).thenReturn(List.of(mock(Event.class), mock(Event.class)))
final SearchPointInTimeResults searchPointInTimeResults = mock(SearchPointInTimeResults.class);
when(searchPointInTimeResults.getNextSearchAfter()).thenReturn(Collections.singletonList(UUID.randomUUID().toString()));
when(searchPointInTimeResults.getDocuments()).thenReturn(List.of(mock(Event.class), mock(Event.class))).thenReturn(List.of(mock(Event.class), mock(Event.class)))
.thenReturn(List.of(mock(Event.class))).thenReturn(List.of(mock(Event.class)));

when(searchAccessor.searchWithPit(any(SearchPointInTimeRequest.class))).thenReturn(searchPointInTimeResponse);
when(searchAccessor.searchWithPit(any(SearchPointInTimeRequest.class))).thenReturn(searchPointInTimeResults);

doNothing().when(buffer).writeAll(anyCollection(), eq(BUFFER_TIMEOUT_MILLIS));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeletePointInTimeRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchContextType;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeResponse;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeResults;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -221,12 +221,12 @@ void search_with_pit_returns_expected_SearchPointInTimeResponse(final boolean ha

when(openSearchClient.search(searchRequestArgumentCaptor.capture(), eq(ObjectNode.class))).thenReturn(searchResponse);

final SearchPointInTimeResponse searchPointInTimeResponse = createObjectUnderTest().searchWithPit(searchPointInTimeRequest);
final SearchPointInTimeResults searchPointInTimeResults = createObjectUnderTest().searchWithPit(searchPointInTimeRequest);

assertThat(searchPointInTimeResponse, notNullValue());
assertThat(searchPointInTimeResponse.getDocuments(), notNullValue());
assertThat(searchPointInTimeResponse.getDocuments().size(), equalTo(2));
assertThat(searchPointInTimeResults, notNullValue());
assertThat(searchPointInTimeResults.getDocuments(), notNullValue());
assertThat(searchPointInTimeResults.getDocuments().size(), equalTo(2));

assertThat(searchPointInTimeResponse.getNextSearchAfter(), equalTo(secondHit.sort()));
assertThat(searchPointInTimeResults.getNextSearchAfter(), equalTo(secondHit.sort()));
}
}

0 comments on commit 603a1c6

Please sign in to comment.