Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a new QueryTask.ResultFinisher interface #1367

Merged
merged 1 commit into from
Jan 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 41 additions & 8 deletions driver/src/main/java/org/neo4j/driver/QueryTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
*/
package org.neo4j.driver;

import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import org.neo4j.driver.internal.EagerResultValue;
import org.neo4j.driver.summary.ResultSummary;
import org.neo4j.driver.util.Experimental;

Expand Down Expand Up @@ -79,12 +82,13 @@
* .execute(mapping(record -> record.get("N").asLong(), maxBy(Long::compare)));
* }
* </pre>
* If there is a need to access {@link ResultSummary} value, another method option is available:
* If there is a need to access {@link Result#keys()} and/or {@link ResultSummary} value, another method option is
* available:
* <pre>
* {@code
* import static java.util.stream.Collectors.*;
*
* private record ResultValue(Set<Long> values, ResultSummary summary) {}
* private record ResultValue(List<String> keys, Set<Long> values, ResultSummary summary) {}
*
* var result = driver.queryTask("UNWIND range(0, 5) as N RETURN N")
* .execute(Collectors.mapping(record -> record.get("N").asLong(), toSet()), ResultValue::new);
Expand Down Expand Up @@ -118,7 +122,9 @@ public interface QueryTask {
*
* @return an instance of result containing all records, keys and result summary
*/
EagerResult execute();
default EagerResult execute() {
return execute(Collectors.toList(), EagerResultValue::new);
}

/**
* Executes query, collects {@link Record} values using the provided {@link Collector} and produces a final result.
Expand All @@ -129,22 +135,49 @@ public interface QueryTask {
* @return the final result value
*/
default <T> T execute(Collector<Record, ?, T> recordCollector) {
return execute(recordCollector, (collectorResult, ignored) -> collectorResult);
return execute(recordCollector, (ignoredKeys, collectorResult, ignoredSummary) -> collectorResult);
}

/**
* Executes query, collects {@link Record} values using the provided {@link Collector} and produces a final result
* by invoking the provided {@link BiFunction} with the collected result and {@link ResultSummary} values.
* <p>
* If any of the arguments throws an exception implementing the
* {@link org.neo4j.driver.exceptions.RetryableException} marker interface, the query is retried automatically in
* the same way as in the transaction functions. Exceptions not implementing the interface trigger transaction
* rollback and are then propagated to the user.
*
* @param recordCollector collector instance responsible for processing {@link Record} values and producing a
* collected result, the collector may be used multiple times if query is retried
* @param finisherWithSummary function accepting both the collected result and {@link ResultSummary} values to
* output the final result, the function may be invoked multiple times if query is
* retried
* @param resultFinisher function accepting the {@link Result#keys()}, collected result and {@link ResultSummary}
* values to output the final result value, the function may be invoked multiple times if
* query is retried
* @param <A> the mutable accumulation type of the collector's reduction operation
* @param <R> the collector's result type
* @param <T> the final result type
* @return the final result value
*/
<A, R, T> T execute(Collector<Record, A, R> recordCollector, BiFunction<R, ResultSummary, T> finisherWithSummary);
<A, R, T> T execute(Collector<Record, A, R> recordCollector, ResultFinisher<R, T> resultFinisher);

/**
* A function accepting the {@link Result#keys()}, collected result and {@link ResultSummary} values to produce a
* final result value.
*
* @param <S> the collected value type
* @param <T> the final value type
* @since 5.5
*/
@Experimental
@FunctionalInterface
interface ResultFinisher<S, T> {
/**
* Accepts the {@link Result#keys()}, collected result and {@link ResultSummary} values to produce the final
* result value.
* @param value the collected value
* @param keys the {@link Result#keys()} value
* @param summary the {@link ResultSummary} value
* @return the final value
*/
T finish(List<String> keys, S value, ResultSummary summary);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,17 @@

import static java.util.Objects.requireNonNull;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import org.neo4j.driver.Driver;
import org.neo4j.driver.EagerResult;
import org.neo4j.driver.Query;
import org.neo4j.driver.QueryConfig;
import org.neo4j.driver.QueryTask;
import org.neo4j.driver.Record;
import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.TransactionCallback;
import org.neo4j.driver.summary.ResultSummary;

public class InternalQueryTask implements QueryTask {
private static final BiFunction<List<Record>, ResultSummary, EagerResult> EAGER_RESULT_FINISHER =
(records, summary) -> {
var keys = records.stream().findFirst().map(Record::keys).orElseGet(Collections::emptyList);
return new EagerResultValue(keys, records, summary);
};
private final Driver driver;
private final Query query;
private final QueryConfig config;
Expand All @@ -68,13 +57,7 @@ public QueryTask withConfig(QueryConfig config) {
}

@Override
public EagerResult execute() {
return execute(Collectors.toList(), EAGER_RESULT_FINISHER);
}

@Override
public <A, R, T> T execute(
Collector<Record, A, R> recordCollector, BiFunction<R, ResultSummary, T> finisherWithSummary) {
public <A, R, T> T execute(Collector<Record, A, R> recordCollector, ResultFinisher<R, T> resultFinisher) {
var sessionConfigBuilder = SessionConfig.builder();
config.database().ifPresent(sessionConfigBuilder::withDatabase);
config.impersonatedUser().ifPresent(sessionConfigBuilder::withImpersonatedUser);
Expand All @@ -91,7 +74,7 @@ public <A, R, T> T execute(
}
var finishedValue = finisher.apply(container);
var summary = result.consume();
return finisherWithSummary.apply(finishedValue, summary);
return resultFinisher.finish(result.keys(), finishedValue, summary);
};
return switch (config.routing()) {
case WRITERS -> session.executeWrite(txCallback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;

import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
Expand All @@ -39,6 +40,7 @@
import org.neo4j.driver.Driver;
import org.neo4j.driver.Query;
import org.neo4j.driver.QueryConfig;
import org.neo4j.driver.QueryTask;
import org.neo4j.driver.Record;
import org.neo4j.driver.Result;
import org.neo4j.driver.RoutingControl;
Expand Down Expand Up @@ -131,6 +133,8 @@ void shouldExecuteAndReturnResult(RoutingControl routingControl) {
});
var result = mock(Result.class);
given(txContext.run(any(Query.class))).willReturn(result);
var keys = List.of("key");
given(result.keys()).willReturn(keys);
given(result.hasNext()).willReturn(true, false);
var record = mock(Record.class);
given(result.next()).willReturn(record);
Expand All @@ -152,9 +156,9 @@ var record = mock(Record.class);
Function<Object, String> finisher = mock(Function.class);
given(finisher.apply(resultContainer)).willReturn(collectorResult);
given(recordCollector.finisher()).willReturn(finisher);
BiFunction<String, ResultSummary, String> finisherWithSummary = mock(BiFunction.class);
QueryTask.ResultFinisher<String, String> finisherWithSummary = mock(QueryTask.ResultFinisher.class);
var expectedExecuteResult = "1";
given(finisherWithSummary.apply(any(String.class), any(ResultSummary.class)))
given(finisherWithSummary.finish(any(List.class), any(String.class), any(ResultSummary.class)))
.willReturn(expectedExecuteResult);
var queryTask = new InternalQueryTask(driver, query, config).withParameters(params);

Expand All @@ -181,7 +185,7 @@ var record = mock(Record.class);
then(accumulator).should().accept(resultContainer, record);
then(recordCollector).should().finisher();
then(finisher).should().apply(resultContainer);
then(finisherWithSummary).should().apply(collectorResult, summary);
then(finisherWithSummary).should().finish(keys, collectorResult, summary);
assertEquals(expectedExecuteResult, executeResult);
}
}