Skip to content

Commit

Permalink
Add a new QueryTask.ResultFinisher interface (#1367)
Browse files Browse the repository at this point in the history
This is to make the `Result.keys()` value available as well.
  • Loading branch information
injectives authored Jan 24, 2023
1 parent 361e779 commit 8115a01
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 30 deletions.
49 changes: 41 additions & 8 deletions driver/src/main/java/org/neo4j/driver/QueryTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
*/
package org.neo4j.driver;

import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import org.neo4j.driver.internal.EagerResultValue;
import org.neo4j.driver.summary.ResultSummary;
import org.neo4j.driver.util.Experimental;

Expand Down Expand Up @@ -79,12 +82,13 @@
* .execute(mapping(record -> record.get("N").asLong(), maxBy(Long::compare)));
* }
* </pre>
* If there is a need to access {@link ResultSummary} value, another method option is available:
* If there is a need to access {@link Result#keys()} and/or {@link ResultSummary} value, another method option is
* available:
* <pre>
* {@code
* import static java.util.stream.Collectors.*;
*
* private record ResultValue(Set<Long> values, ResultSummary summary) {}
* private record ResultValue(List<String> keys, Set<Long> values, ResultSummary summary) {}
*
* var result = driver.queryTask("UNWIND range(0, 5) as N RETURN N")
* .execute(Collectors.mapping(record -> record.get("N").asLong(), toSet()), ResultValue::new);
Expand Down Expand Up @@ -118,7 +122,9 @@ public interface QueryTask {
*
* @return an instance of result containing all records, keys and result summary
*/
EagerResult execute();
default EagerResult execute() {
return execute(Collectors.toList(), EagerResultValue::new);
}

/**
* Executes query, collects {@link Record} values using the provided {@link Collector} and produces a final result.
Expand All @@ -129,22 +135,49 @@ public interface QueryTask {
* @return the final result value
*/
default <T> T execute(Collector<Record, ?, T> recordCollector) {
return execute(recordCollector, (collectorResult, ignored) -> collectorResult);
return execute(recordCollector, (ignoredKeys, collectorResult, ignoredSummary) -> collectorResult);
}

/**
* Executes query, collects {@link Record} values using the provided {@link Collector} and produces a final result
* by invoking the provided {@link BiFunction} with the collected result and {@link ResultSummary} values.
* <p>
* If any of the arguments throws an exception implementing the
* {@link org.neo4j.driver.exceptions.RetryableException} marker interface, the query is retried automatically in
* the same way as in the transaction functions. Exceptions not implementing the interface trigger transaction
* rollback and are then propagated to the user.
*
* @param recordCollector collector instance responsible for processing {@link Record} values and producing a
* collected result, the collector may be used multiple times if query is retried
* @param finisherWithSummary function accepting both the collected result and {@link ResultSummary} values to
* output the final result, the function may be invoked multiple times if query is
* retried
* @param resultFinisher function accepting the {@link Result#keys()}, collected result and {@link ResultSummary}
* values to output the final result value, the function may be invoked multiple times if
* query is retried
* @param <A> the mutable accumulation type of the collector's reduction operation
* @param <R> the collector's result type
* @param <T> the final result type
* @return the final result value
*/
<A, R, T> T execute(Collector<Record, A, R> recordCollector, BiFunction<R, ResultSummary, T> finisherWithSummary);
<A, R, T> T execute(Collector<Record, A, R> recordCollector, ResultFinisher<R, T> resultFinisher);

/**
* A function accepting the {@link Result#keys()}, collected result and {@link ResultSummary} values to produce a
* final result value.
*
* @param <S> the collected value type
* @param <T> the final value type
* @since 5.5
*/
@Experimental
@FunctionalInterface
interface ResultFinisher<S, T> {
/**
* Accepts the {@link Result#keys()}, collected result and {@link ResultSummary} values to produce the final
* result value.
* @param value the collected value
* @param keys the {@link Result#keys()} value
* @param summary the {@link ResultSummary} value
* @return the final value
*/
T finish(List<String> keys, S value, ResultSummary summary);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,17 @@

import static java.util.Objects.requireNonNull;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import org.neo4j.driver.Driver;
import org.neo4j.driver.EagerResult;
import org.neo4j.driver.Query;
import org.neo4j.driver.QueryConfig;
import org.neo4j.driver.QueryTask;
import org.neo4j.driver.Record;
import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.TransactionCallback;
import org.neo4j.driver.summary.ResultSummary;

public class InternalQueryTask implements QueryTask {
private static final BiFunction<List<Record>, ResultSummary, EagerResult> EAGER_RESULT_FINISHER =
(records, summary) -> {
var keys = records.stream().findFirst().map(Record::keys).orElseGet(Collections::emptyList);
return new EagerResultValue(keys, records, summary);
};
private final Driver driver;
private final Query query;
private final QueryConfig config;
Expand All @@ -68,13 +57,7 @@ public QueryTask withConfig(QueryConfig config) {
}

@Override
public EagerResult execute() {
return execute(Collectors.toList(), EAGER_RESULT_FINISHER);
}

@Override
public <A, R, T> T execute(
Collector<Record, A, R> recordCollector, BiFunction<R, ResultSummary, T> finisherWithSummary) {
public <A, R, T> T execute(Collector<Record, A, R> recordCollector, ResultFinisher<R, T> resultFinisher) {
var sessionConfigBuilder = SessionConfig.builder();
config.database().ifPresent(sessionConfigBuilder::withDatabase);
config.impersonatedUser().ifPresent(sessionConfigBuilder::withImpersonatedUser);
Expand All @@ -91,7 +74,7 @@ public <A, R, T> T execute(
}
var finishedValue = finisher.apply(container);
var summary = result.consume();
return finisherWithSummary.apply(finishedValue, summary);
return resultFinisher.finish(result.keys(), finishedValue, summary);
};
return switch (config.routing()) {
case WRITERS -> session.executeWrite(txCallback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;

import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
Expand All @@ -39,6 +40,7 @@
import org.neo4j.driver.Driver;
import org.neo4j.driver.Query;
import org.neo4j.driver.QueryConfig;
import org.neo4j.driver.QueryTask;
import org.neo4j.driver.Record;
import org.neo4j.driver.Result;
import org.neo4j.driver.RoutingControl;
Expand Down Expand Up @@ -131,6 +133,8 @@ void shouldExecuteAndReturnResult(RoutingControl routingControl) {
});
var result = mock(Result.class);
given(txContext.run(any(Query.class))).willReturn(result);
var keys = List.of("key");
given(result.keys()).willReturn(keys);
given(result.hasNext()).willReturn(true, false);
var record = mock(Record.class);
given(result.next()).willReturn(record);
Expand All @@ -152,9 +156,9 @@ var record = mock(Record.class);
Function<Object, String> finisher = mock(Function.class);
given(finisher.apply(resultContainer)).willReturn(collectorResult);
given(recordCollector.finisher()).willReturn(finisher);
BiFunction<String, ResultSummary, String> finisherWithSummary = mock(BiFunction.class);
QueryTask.ResultFinisher<String, String> finisherWithSummary = mock(QueryTask.ResultFinisher.class);
var expectedExecuteResult = "1";
given(finisherWithSummary.apply(any(String.class), any(ResultSummary.class)))
given(finisherWithSummary.finish(any(List.class), any(String.class), any(ResultSummary.class)))
.willReturn(expectedExecuteResult);
var queryTask = new InternalQueryTask(driver, query, config).withParameters(params);

Expand All @@ -181,7 +185,7 @@ var record = mock(Record.class);
then(accumulator).should().accept(resultContainer, record);
then(recordCollector).should().finisher();
then(finisher).should().apply(resultContainer);
then(finisherWithSummary).should().apply(collectorResult, summary);
then(finisherWithSummary).should().finish(keys, collectorResult, summary);
assertEquals(expectedExecuteResult, executeResult);
}
}

0 comments on commit 8115a01

Please sign in to comment.