diff --git a/application/src/main/java/com/speedment/jpastreamer/application/JPAStreamer.java b/application/src/main/java/com/speedment/jpastreamer/application/JPAStreamer.java index 78aa8fba7..d84e2190f 100644 --- a/application/src/main/java/com/speedment/jpastreamer/application/JPAStreamer.java +++ b/application/src/main/java/com/speedment/jpastreamer/application/JPAStreamer.java @@ -25,7 +25,10 @@ import java.util.stream.Stream; /** - * A JPAStreamer is responsible for creating Streams from data sources + * A JPAStreamer is responsible for creating Streams from data sources, + * alternatively for creating {@link StreamSupplier}s that can be reused to create Streams of the same Entity source, + * see {@link JPAStreamer#createStreamSupplier(StreamConfiguration)} + *

* Entity sources can be RDBMSes, files or other data sources. * * A JPAStreamer must be thread safe and be able to handle several reading and @@ -138,7 +141,9 @@ public interface JPAStreamer { * * @param The element type (type of a class token) * @param streamConfiguration a configuration including an entity class (annotated with {@code @Entity}) - * @return a new stream over all entities in this table in unspecified order + * @return a new {@link Stream} over all entities in the + underlying data source (e.g database) described by the provided + {@code streamConfiguration} * * @throws RuntimeException if an error occurs during a Terminal Operation * (e.g. an SqlException is thrown by the underlying database) @@ -162,7 +167,7 @@ public interface JPAStreamer { * underlying data source (e.g database) of the provided type * {@code entityClass} * - * @see JPAStreamer#stream(StreamConfiguration) for furhter details + * @see JPAStreamer#stream(StreamConfiguration) for further details */ default Stream stream(final Class entityClass) { requireNonNull(entityClass); @@ -174,6 +179,7 @@ default Stream stream(final Class entityClass) { * underlying data source (e.g database) of the {@code entity} specified * by the provided {@code projection}. *

+ * * This method is a convenience method equivalent to: *

{@code stream(StreamConfiguration.of(projection.entityClass()).select(projection))}
* @@ -190,6 +196,91 @@ default Stream stream(final Projection projection) { return stream(StreamConfiguration.of(projection.entityClass()).selecting(projection)); } + /** + * Creates and returns a new {@link StreamSupplier} that can create + * {@link Stream}s over all entities in the underlying data source (e.g database) + * of the provided type {@code entityClass}. + *

+ * The provided {@link StreamSupplier} will not be closed whenever + * the generated {@link Stream} instance is closed. + *

+ * If you are using the same Stream source frequently e.g. Film.class, + * consider configuring a {@link StreamSupplier} that can supply + * {@link Stream}s from the same source over and over again. + * This save resources and avoids instantiating a new {@link EntityManager} for each new {@link Stream}. + *

+ * Here is an example of using a {@link StreamSupplier}: + *

{@code
+     *    final StreamSupplier streamSupplier = jpaStreamer.createStreamSupplier(Film.class);
+     *    List longFilms = streamSupplier.stream()
+     *       .filter(Film$.name.equal("Casablanca"))
+     *       .collect(toList()); // the terminal operation does not close the Stream Supplier and its Entity Manager
+     *     
+     *    // ... repeated uses of the supplier 
+     *      
+     *    streamSupplier.close(); // closes the Entity Manager
+     * }
+ * The above is equal to: + *
{@code
+     *    List films = jpaStreamer.stream(Film.class) 
+     *       .filter(Film$.name.equal("Casablanca"))
+     *       .collect(toList()); // the terminal operation closes the underlying StreamSupplier and its Entity Manager
+     * }
+ *

+ * + * @param The element type (type of a class token) + * @param streamConfiguration a configuration including an entity class (annotated with {@code @Entity}) + * @return a new {@link StreamSupplier} that can create + * {@link Stream}s over all entities in the + * underlying data source (e.g database) described by the provided + * {@code streamConfiguration} + */ + StreamSupplier createStreamSupplier(StreamConfiguration streamConfiguration); + + /** + * Creates and returns a new {@link StreamSupplier} that can create + * {@link Stream}s over all entities in the underlying data source (e.g database) + * of the provided type {@code entityClass}. + *

+ * This method is a convenience method equivalent to: + *

{@code createStreamer(StreamConfiguration.of(entityClass))}
+ * + * @param The element type (type of a class token) + * @param entityClass to use in generated {@link Stream}s + * @return a new {@link StreamSupplier} that can create + * {@link Stream}s over all entities in the + * underlying data source (e.g database) of the provided + * type {@code entityClass} + * + * @see JPAStreamer#createStreamSupplier(StreamConfiguration) (StreamConfiguration) for further details + */ + default StreamSupplier createStreamSupplier(final Class entityClass) { + requireNonNull(entityClass); + return createStreamSupplier(StreamConfiguration.of(entityClass)); + } + + /** + * Creates and returns a new {@link StreamSupplier} that can create + * {@link Stream}s over all entities in the underlying data source (e.g database) + * of the {@code entity} specified by the provided {@code projection}. + *

+ * This method is a convenience method equivalent to: + *

{@code createStreamer(StreamConfiguration.of(entityClass))}
+ * + * @param The element type (type of a class token) + * @param projection to use + * @return a new {@link StreamSupplier} that can create + * {@link Stream}s over all entities in the + * underlying data source (e.g database) of the {@code entity} + * specified by the provided {@code projection}. + * + * @see JPAStreamer#createStreamSupplier(StreamConfiguration) (StreamConfiguration) for further details + */ + default StreamSupplier createStreamSupplier(final Projection projection) { + requireNonNull(projection); + return createStreamSupplier(StreamConfiguration.of(projection.entityClass()).selecting(projection)); + } + /** * Resets the Streamer associated with the provided Entity classes. *

@@ -199,10 +290,13 @@ default Stream stream(final Projection projection) { * In case JPAStreamer was configured with a {@code Supplier} the lifecycle of the Entity Managers is * not managed by JPAStreamer, thus use of the method is not permitted and will result in an {@code UnsupportedOperationException}. * + * @deprecated since 3.0.2, JPAStreamer no longer caches Streamers thus there is no need for a method that resets the cache. + * If you wish to manage the Streamer lifecycle manually, see {@link JPAStreamer#createStreamSupplier(StreamConfiguration)} * @param entityClasses of the streamer * @throws UnsupportedOperationException if JPAStreamer is configured with a Supplier, see {@code com.speedment.jpastreamer.application.JPAStreamer#of(java.util.function.Supplier)} */ - void resetStreamer(Class... entityClasses) throws UnsupportedOperationException; + @Deprecated(since = "3.0.2", forRemoval = true) + void resetStreamer(Class... entityClasses); /** * Closes this JPAStreamer and releases any resources potentially held. diff --git a/application/src/main/java/com/speedment/jpastreamer/application/StreamSupplier.java b/application/src/main/java/com/speedment/jpastreamer/application/StreamSupplier.java new file mode 100644 index 000000000..f6d63be0c --- /dev/null +++ b/application/src/main/java/com/speedment/jpastreamer/application/StreamSupplier.java @@ -0,0 +1,141 @@ +package com.speedment.jpastreamer.application; + +import com.speedment.jpastreamer.streamconfiguration.StreamConfiguration; + +import java.util.stream.Stream; + +/** + * A Stream Supplier is responsible for creating Streams from a data source + * An entity source can be RDBMSes, files or other data sources. + * + * A Stream Supplier must be thread safe and be able to handle several reading and + * writing threads at the same time. + * + * @author Per Minborg, Julia Gustafsson + * @since 3.0.1 + */ +public interface StreamSupplier extends AutoCloseable { + + /** + * Creates and returns a new {@link Stream} over all entities in the + * underlying data source (e.g database) according to the {@code streamConfiguration} associated with this Streamer. + *

+ * The order in which elements are returned when the stream is eventually + * consumed is unspecified. The order may even change from one + * invocation to another. Thus, it is an error to assume any particular + * element order even though is might appear, for some stream sources, that + * there is a de-facto order. + *

+ * If a deterministic order is required, then make sure to invoke the + * {@link Stream#sorted(java.util.Comparator)} method on the {@link Stream} + * returned. + *

+ * Mutable elements are not reused within the stream. More formally, there + * are no pair of mutable stream elements e1 and + * e2 such that e1 == e2. + *

+ * The Stream will never contain null elements. + *

+ * This is an inexpensive O(1) operation that will complete in + * constant time regardless of the number of entities in the underlying + * database. + *

+ * The returned stream is aware of its own pipeline and will optionally + * optimize its own pipeline whenever it encounters a Terminal + * Operation so that it will only iterate over a minimum set of + * matching entities. + *

+ * When a Terminal Operation is eventually called on the {@link Stream}, + * that execution time of the Terminal Operation will depend on the + * optimized pipeline and the entities in the underlying database. + *

+ * The Stream will be automatically + * {@link Stream#onClose(java.lang.Runnable) closed} after the Terminal + * Operation is completed or if an Exception is thrown during the Terminal + * Operation. + *

+ * + * Some of the Terminal Operations are: + *

    + *
  • {@link Stream#forEach(java.util.function.Consumer) forEach(Consumer)} + *
  • {@link Stream#forEachOrdered(java.util.function.Consumer) forEachOrdered(Consumer)} + *
  • {@link Stream#toArray() toArray()} + *
  • {@link Stream#toArray(java.util.function.IntFunction) toArray(IntFunction)} + *
  • {@link Stream#reduce(java.util.function.BinaryOperator) reduce(BinaryOperation} + *
  • {@link Stream#reduce(java.lang.Object, java.util.function.BinaryOperator) reduce(Object, BinaryOperator)} + *
  • {@link Stream#reduce(java.lang.Object, java.util.function.BiFunction, java.util.function.BinaryOperator) reduce(Object, BiFunction, BinaryOperator)} + *
  • {@link Stream#collect(java.util.stream.Collector) collect(Collector)} + *
  • {@link Stream#collect(java.util.function.Supplier, java.util.function.BiConsumer, java.util.function.BiConsumer) collect(Supplier, BiConsumer, BiConsumer)} + *
  • {@link Stream#min(java.util.Comparator) min(Comparator)} + *
  • {@link Stream#max(java.util.Comparator) min(Comparator)} + *
  • {@link Stream#count() count()} + *
  • {@link Stream#anyMatch(java.util.function.Predicate) anyMatch(Predicate)} + *
  • {@link Stream#noneMatch(java.util.function.Predicate) noneMatch(Predicate)} + *
  • {@link Stream#findFirst() findFirst()} + *
  • {@link Stream#findAny() findAny()} + *
  • {@link Stream#iterator() iterator()} + *
+ *

+ * Any Terminating Operation may throw an Exception if the + * underlying database throws an Exception (e.g. an SqlException) + *

+ * Because the Stream may short-circuit operations in the Stream pipeline, + * methods having side-effects (like + * {@link Stream#peek(java.util.function.Consumer) peek(Consumer)} will + * potentially be affected by the optimization. + *

+ * Here are some examples of how the stream optimization might work: + *

    + *
  • + *
    {@code stream(Film.class)
    +     *   .filter(Film$.name.equal("Casablanca"))
    +     *   .collect(toList());}
    + *
    {@code -> select * from film where name='Casablanca'}
    + *
  • + *
  • + *
    {@code stream.count();}
    + *
    {@code -> select count(*) from film}
    + *
  • + *
  • + *
    {@code stream(Film.class)
    +     *   .filter(Film$.name.startsWith("A"))
    +     *   .count();}
    + *
    {@code -> select count(*) from hares where
    +     *   name LIKE 'A%'}
    + *

    + *

  • + *
  • + *
    {@code stream.stream(Film.class)
    +     *   .filter(Film$.rating.equal("G")
    +     *   .filter(Film$.length.greaterThan(100)
    +     *   .count();}
    + *
    {@code -> select count(*) from hares where
    +     *          rating ='G'
    +     *        and
    +     *          length > 100}
    + *
  • + *
+ * + * @return a new stream over all entities in this table in unspecified order + * + * @throws RuntimeException if an error occurs during a Terminal Operation + * (e.g. an SqlException is thrown by the underlying database) + * + * @see java.util.stream + * @see Stream + */ + Stream stream(); + + /** + * Closes this Stream Supplier and releases any resources potentially held, such as the underlying Entity Manager. + */ + void close(); + + /** + * Returns the {@link StreamConfiguration} that describes the stream source of the Streams generated by this Supplier. + * + * @return the configuration of the Streams generated by this Supplier + */ + StreamConfiguration configuration(); + +} diff --git a/docs/modules/fetching-data/pages/fetching-data.adoc b/docs/modules/fetching-data/pages/fetching-data.adoc index adc494fe8..d8c843b7e 100644 --- a/docs/modules/fetching-data/pages/fetching-data.adoc +++ b/docs/modules/fetching-data/pages/fetching-data.adoc @@ -5,7 +5,7 @@ author: Julia Gustafsson :navtitle: Fetching Data :source-highlighter: highlight.js -To start fetching data with JPAsStreamer you need to initialize an instance of `JPAStreamer`. This section describes how that is accomplished. +This section details how to create a basic Stream from your datasource. To initiate the data fetching process, you will need to initialize an instance of `JPAStreamer`. Once you have obtained access to `JPAStreamer`, you can leverage two main methods to obtain a Stream from your datasource: `stream()` and `createStreamSupplier()`. These methods provide efficient and customizable approaches to interact with your data. In this chapter, we will discuss what differentiates the two approaches. == Obtaining a JPAStreamer instance === From persistence unit name @@ -102,60 +102,122 @@ public class FilmRepository implements PanacheRepository { } ---- -NOTE: When using a Supplier, JPAStreamer is not responsible for the lifecycle of the Entity Managers, thus `JPAStreamer::close` will not close any supplied Entity Managers. +NOTE: When using a Supplier, JPAStreamer is not responsible for the lifecycle of the Entity Managers, thus `JPAStreamer::close` will not close any supplied Entity Managers. -== Resetting the Streamer -Calling `jpaStreamer.stream(Class entityClass)` creates a dedicated Streamer for the provided Entity class. The Streamer instance is reused for subsequent calls on `jpaStreamer.stream()` on the same Entity, see example below: +== Creating a Stream +To create a Stream, you first need to describe the stream source as a `StreamConfiguration`. The `StreamConfiguration` declares what entity to use as the base for the query, and e.g. if any joins or projections should be performed. The `StreamConfiguration` is then passed to xref:_stream[`JPAStreamer.stream(StreamConfiguration streamConfiguration)`] or xref:_createstreamsupplier[`JPAStreamer.createStreamSupplier(StreamConfiguration streamConfiguration)`]. -[source, Java] ----- -JPAStreamer jpaStreamer = JPAStreamer.createJPAStreamerBuilder("sakila") - .build(); +When deciding which of these methods to use, consider whether you will reuse the same stream source frequently or not, and if you are expecting that the source will be updated by an external application in between streams. A more detailed explanation follows in the subsequent sections on each method below. -long count = jpaStreamer.stream(Film.class) <1> - .filter(Film$.title.startsWith("A")) - .count(); +JPAStreamer also offers convenience methods for simple `StreamConfigurations`: -long count2 = jpaStreamer.stream(Film.class) <2> - .filter(Film$.title.startsWith("A")) - .count(); +- `JPAStreamer.stream(Class entityClass)` +- `JPAStreamer.stream(Projection projection)` +- `JPAStreamer.createStreamSupplier(Class entityClass)` +- `JPAStreamer.createStreamSupplier(Projection projection)` + +The simplest way of creating a `Stream` is to provide a single entity class, creating a `Stream` of the single table associated with that JPA entity: +[source, java] ---- -<1> The first call to `jpaStreamer.stream(Film.class)` will create a `Streamer` of `Film` entities -<2> The second call will reuse the previously configured `Streamer` +Stream stream = jpaStreamer.stream(Film.class); <1> +---- +<1> Creates a Stream over the Film-table. Passing the entity class `Film.class` is equivalent of passing `StreamConfiguration.of(Film.class)`. + +All options available for `StreamConfiguration` is laid out in the table below: + +.`StreamConfiguration` +[cols="1,1,1"] +|=== +| Modifer and type | Method | Description + +| `StreamConfiguration` +| `of(Class entityClass)` +| Creates and returns a new StreamConfiguration that can be used to configure streams. + +| `Class` +| `entityClass()` +| Returns the entity class that is to appear in a future Stream. + +| `Set` +| `joins()` +| Returns the fields that shall be joined in a future stream. + +| `StreamConfiguration` +| `joining(Field field)` +| Creates and returns a new `StreamConfiguration` configured with the provided field so that it will be eagerly joined when producing elements in the future Stream using join type left. + +| `StreamConfiguration` +| `joining(Field field, JoinType joinType)` +| Creates and returns a new `StreamConfiguration` configured with the provided field so that it will be eagerly joined when producing elements in the future Stream using the provided join type. -A `Streamer` instance hold a `javax.persistence.EntityManager` which has its own first-layer cache. Thus by default, database changes performed by another application, or made directly on the database, will not be detected. In the example above, the addition of the film "Avatar" to the database between the first and second count query therefore goes unnoticed and `count` will equal `count2`. +| `Optional>` +| `selections()` +| Returns the projected columns to use when creating entities or `Optional.empty()` if no projection should be used. + +| `StreamConfiguration` +| `selecting(Projection projection)` +| Selects the projected columns to initialize when creating initial entities in a future stream. -To ensure that database updates performed by another application are detected, you must reset the Streamer between queries. This will effectively remove the existing Streamer for the specified Entity and close its associated `EntityManager`. The next query will create a new Streamer with a new `EntityManager`, resetting the first-level cache associated with the Entity. +| `StreamConfiguration` +| `withHint(String hintName, Object value)` +| Adds a query hint. -You can reset the Streamer for one or more Entity classes with the following command: +| `Map` +| `hints()` +| Returns the map with the query hints that will be configured in a future Stream. -[source, Java] +|=== + +TIP: There are many examples of how to use a `StreamConfiguration` in xref:stream-examples.adoc[Stream Examples]. + +[#_stream] +=== `stream()` +Calls to `JPAStreamer.stream(StreamConfiguration streamConfiguration)` will lead to the creation of a `StreamSupplier`. The `StreamSupplier` obtains a JPA `EntityManager` that JPAStreamer uses internally to issue JPA Criteria Queries. + +Whenever the `Stream` is terminated with a terminal operation, e.g. `collect()`, the underlying `StreamSupplier` and the `EntityManager` is closed and can no longer be used. + +[source,java] ---- -jpaStreamer.resetStreamer(Class... entityClasses); +final JPAStreamer jpaStreamer = JPAStreamer.of("sakila"); + +final List films = jpaStreamer.stream(Film.class) <1> + .filter(Film$.name.equal("Casablanca")) + .collect(toList()); <2> ---- +<1> Creates a `StreamSupplier` that returns a `Stream` over the Film-table +<2> The terminal operation closes the underlying `StreamSupplier` and its Entity Manager -WARNING: JPAStreamer instances configured with a `Supplier - .filter(Film$.title.startsWith("A")) - .count(); +This means repeated calls to `StreamSupplier.stream()` will reuse the same `EntityManager`. We recommend using a try-with-resources block to automatically close the `StreamSupplier` when done with the operations: -jpaStreamer.resetStreamer(Film.class); <2> +[source,java] +---- +final JPAStreamer jpaStreamer = JPAStreamer.of("sakila"); -long count2 = jpaStreamer.stream(Film.class) <3> - .filter(Film$.title.startsWith("A")) - .count(); +try (final StreamSupplier streamSupplier = jpaStreamer.createStreamSupplier()) { + final List shortFilms = streamSupplier.stream(Film.class) + .filter(Film$.length.lessThan(60)) + .collect(toList()); <1> + + final List longFilms = streamSupplier.stream(Film.class) + .filter(Film$.length.greatherThanOrEqual(61)) + .collect(toList()); <1> +} <2> ---- -<1> Creates a Streamer of `Film` entities -<2> Resets (removes) the Streamer of `Film` entities. This resets the first-level cache. -<3> Creates a new Streamer of `Film` entities +<1> The `StreamSupplier` and the underlying `EntityManager` stays open when executing the terminal operation +<2> The `StreamSupplier` and the underlying `EntityManager` is closed + +NOTE: The `javax.persistence.EntityManager` associated with the `StreamSupplier` has a first-layer cache. Thus by default, database changes performed by another application, or made directly on the database, may not be detected between calls to `StreamSupplier.stream()`. To ensure that the cache is cleared between each fetch, use xref:_stream[JPAStreamer.stream()] instead. + +WARNING: If you instantiate `JPAStreamer` with a `Supplier` as described xref:_supplier[here], JPAStreamer will not close the underlying Entity Manager. In that case the lifecycle of the obtained Entity Managers is managed by the supplier. == Using Query Hints In complex scenarios or when dealing with specific database systems, it may be necessary to provide additional guidance to the underlying JPA provider for optimal query execution. This is where query hints come into play, allowing developers to control and influence various aspects of the query execution process. The query hints influence e.g. the execution plan chosen by the JPA provider, potentially leading to improved query performance or tailored behavior based on specific requirements. @@ -183,7 +245,7 @@ List films = jpaStreamer.stream(sc) .collect(Collectors::toList); ---- -WARNING: While query hints can be powerful tools for query optimization, it's important to use them carefully and with a clear understanding of their impact. Misusing or overusing query hints can lead to unintended consequences. +WARNING: While query hints can be powerful tools for query optimization, it's important to use them carefully and with a clear understanding of their impact. Misusing or overusing query hints can lead to unintended consequences. == What's Next The xref:sql-equivalents.adoc[next section] demonstrates how to use the available Stream operators and how they map to SQL constructs. diff --git a/docs/modules/fetching-data/pages/stream-examples.adoc b/docs/modules/fetching-data/pages/stream-examples.adoc index f8b4638ef..61cd5e0fd 100644 --- a/docs/modules/fetching-data/pages/stream-examples.adoc +++ b/docs/modules/fetching-data/pages/stream-examples.adoc @@ -57,17 +57,16 @@ long is true has 457 films There are several Stream operations that limit the number of rows obtained from the database (e.g. `filter()` and `limit()`). Although, to limit the number of columns we retrieve, JPAstreamer uses `Projections`. Projections can be used either initially, to limit the Stream source, or in combination with a `map()`-operation. The following sections describe both methods. === Initial Stream configuration -By making a `StreamConfiguration`, we can initially limit the Stream to include only selected columns. +Using a `Projection`, we can initially limit the Stream to include only selected columns. [source, java] ---- -StreamConfiguration sc = StreamConfiguration.of(Film.class).selecting(Projection.select(Film$.filmId, Film$.title)); - -jpaStreamer.stream(sc) +jpaStreamer.stream(Projection.select(Film$.filmId, Film$.title)) <1> .sorted(Film$.length.reversed()) .limit(3) .forEach(System.out::println); ---- +<1> Short for `StreamConfiguration.of(Film.class).selecting(Projection.select(Film$.filmId, Film$.title))` The `Projection` is translated by JpaStreamer to a `SELECT` statement that is applied to the resulting query. This way we limit the given result set to only include values from column `film_id` and `title`. diff --git a/integration/spring/spring-boot-jpastreamer-autoconfigure/src/main/java/module-info.java b/integration/spring/spring-boot-jpastreamer-autoconfigure/src/main/java/module-info.java index 5ae53f068..1bcc9a23d 100644 --- a/integration/spring/spring-boot-jpastreamer-autoconfigure/src/main/java/module-info.java +++ b/integration/spring/spring-boot-jpastreamer-autoconfigure/src/main/java/module-info.java @@ -11,7 +11,7 @@ * See: https://github.com/speedment/jpa-streamer/blob/master/LICENSE */ module jpastreamer.integration.spring.autoconfigure { - requires transitive jpastreamer.application; + requires jpastreamer.application; requires transitive spring.boot.autoconfigure; requires transitive spring.context; diff --git a/provider/application-standard/src/main/java/com/speedment/jpastreamer/application/standard/internal/StandardJPAStreamer.java b/provider/application-standard/src/main/java/com/speedment/jpastreamer/application/standard/internal/StandardJPAStreamer.java index d9cc6b230..b4ed30b6e 100644 --- a/provider/application-standard/src/main/java/com/speedment/jpastreamer/application/standard/internal/StandardJPAStreamer.java +++ b/provider/application-standard/src/main/java/com/speedment/jpastreamer/application/standard/internal/StandardJPAStreamer.java @@ -19,13 +19,13 @@ import com.speedment.jpastreamer.announcer.Announcer; import com.speedment.jpastreamer.appinfo.ApplicationInformation; import com.speedment.jpastreamer.application.JPAStreamer; +import com.speedment.jpastreamer.application.StreamSupplier; import com.speedment.jpastreamer.rootfactory.RootFactory; import com.speedment.jpastreamer.streamconfiguration.StreamConfiguration; import jakarta.persistence.EntityManager; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; import java.util.stream.Stream; @@ -33,7 +33,6 @@ final class StandardJPAStreamer implements JPAStreamer { private final Supplier entityManagerSupplier; private final Runnable closeHandler; - private final Map, Streamer> streamerCache; private final AnalyticsReporter analyticsReporter; private final boolean closeEntityManagers; @@ -42,7 +41,6 @@ final class StandardJPAStreamer implements JPAStreamer { this.closeHandler = requireNonNull(closeHandler); this.entityManagerSupplier = requireNonNull(entityManagerSupplier); this.closeEntityManagers = closeEntityManagers; - streamerCache = new ConcurrentHashMap<>(); final ApplicationInformation applicationInformation = RootFactory.getOrThrow(ApplicationInformation.class, ServiceLoader::load); final AnalyticsReporterFactory analyticsReporterFactory = RootFactory.getOrThrow(AnalyticsReporterFactory.class, ServiceLoader::load); analyticsReporter = analyticsReporterFactory.createAnalyticsReporter(applicationInformation.implementationVersion(), demoMode); @@ -51,48 +49,31 @@ final class StandardJPAStreamer implements JPAStreamer { } @Override - @SuppressWarnings("unchecked") public Stream stream(final StreamConfiguration streamConfiguration) { requireNonNull(streamConfiguration); - if (cached(streamConfiguration)) { - return (Stream) streamerCache - .computeIfAbsent(streamConfiguration, ec -> new StandardStreamer<>(streamConfiguration, entityManagerSupplier)) - .stream(); - } else { - final Streamer streamer = new StandardStreamer<>(streamConfiguration, entityManagerSupplier); - return closeEntityManagers ? + final StreamSupplier streamer = new StandardStreamSupplier<>(streamConfiguration, entityManagerSupplier, this.closeEntityManagers); + return closeEntityManagers ? streamer.stream().onClose(streamer::close) : - streamer.stream(); - } + streamer.stream(); } @Override - public void resetStreamer(Class... entityClasses) throws UnsupportedOperationException{ - if (!closeEntityManagers) { - throw new UnsupportedOperationException("An instance of JPAStreamer.of(Supplier) is not responsible for the lifecycle of the supplied Entity Managers, and thus cannot reset the Entity Managers."); - } - Arrays.stream(entityClasses) - .map(StreamConfiguration::of) - .filter(streamerCache::containsKey) - .forEach(sc -> { - streamerCache.get(sc).close(); // Close Entity Manager - streamerCache.remove(sc); - }); + public StreamSupplier createStreamSupplier(StreamConfiguration streamConfiguration) { + requireNonNull(streamConfiguration); + return new StandardStreamSupplier<>(streamConfiguration, entityManagerSupplier, this.closeEntityManagers); + } + + @Override + public void resetStreamer(Class... entityClasses) { + // As there no longer exists a Streamer cache, this method has no effect } @Override public void close() { - streamerCache.values().forEach(Streamer::close); analyticsReporter.stop(); closeHandler.run(); } - // Only cache simple configurations to limit the number of objects held - // See https://github.com/speedment/jpa-streamer/issues/56 - private boolean cached(final StreamConfiguration streamConfiguration) { - return streamConfiguration.joins().isEmpty() && !streamConfiguration.selections().isPresent(); - } - private void printGreeting(final ApplicationInformation info) { final String greeting = String.format("%s%n" + ":: %s %s :: %s%n" + diff --git a/provider/application-standard/src/main/java/com/speedment/jpastreamer/application/standard/internal/StandardStreamer.java b/provider/application-standard/src/main/java/com/speedment/jpastreamer/application/standard/internal/StandardStreamSupplier.java similarity index 67% rename from provider/application-standard/src/main/java/com/speedment/jpastreamer/application/standard/internal/StandardStreamer.java rename to provider/application-standard/src/main/java/com/speedment/jpastreamer/application/standard/internal/StandardStreamSupplier.java index 512b70b57..cb8d40e39 100644 --- a/provider/application-standard/src/main/java/com/speedment/jpastreamer/application/standard/internal/StandardStreamer.java +++ b/provider/application-standard/src/main/java/com/speedment/jpastreamer/application/standard/internal/StandardStreamSupplier.java @@ -14,6 +14,7 @@ import static java.util.Objects.requireNonNull; +import com.speedment.jpastreamer.application.StreamSupplier; import com.speedment.jpastreamer.autoclose.AutoCloseFactory; import com.speedment.jpastreamer.builder.BuilderFactory; import com.speedment.jpastreamer.renderer.Renderer; @@ -25,36 +26,41 @@ import jakarta.persistence.EntityManagerFactory; import java.util.ServiceLoader; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import java.util.stream.Stream; -final class StandardStreamer implements Streamer { +final class StandardStreamSupplier implements StreamSupplier { private final Renderer renderer; private final BuilderFactory builderFactory; private final AutoCloseFactory autoCloseFactory; private final StreamConfiguration streamConfiguration; + private final boolean closeEntityManager; + private static final AtomicBoolean closed = new AtomicBoolean(false); - StandardStreamer(final StreamConfiguration streamConfiguration, final EntityManagerFactory entityManagerFactory) { - this(streamConfiguration, entityManagerFactory::createEntityManager); + StandardStreamSupplier(final StreamConfiguration streamConfiguration, final EntityManagerFactory entityManagerFactory, boolean closeEntityManager) { + this(streamConfiguration, entityManagerFactory::createEntityManager, closeEntityManager); } - StandardStreamer(final StreamConfiguration streamConfiguration, final Supplier entityManagerSupplier) { + StandardStreamSupplier(final StreamConfiguration streamConfiguration, final Supplier entityManagerSupplier, boolean closeEntityManager) { this.streamConfiguration = requireNonNull(streamConfiguration); requireNonNull(entityManagerSupplier); this.builderFactory = RootFactory.getOrThrow(BuilderFactory.class, ServiceLoader::load); this.autoCloseFactory = RootFactory.getOrThrow(AutoCloseFactory.class, ServiceLoader::load); this.renderer = RootFactory.getOrThrow(RendererFactory.class, ServiceLoader::load) .createRenderer(entityManagerSupplier); + this.closeEntityManager = closeEntityManager; } - StandardStreamer(final StreamConfiguration streamConfiguration, final EntityManager entityManager) { + StandardStreamSupplier(final StreamConfiguration streamConfiguration, final EntityManager entityManager, boolean closeEntityManager) { this.streamConfiguration = requireNonNull(streamConfiguration); requireNonNull(entityManager); this.builderFactory = RootFactory.getOrThrow(BuilderFactory.class, ServiceLoader::load); this.autoCloseFactory = RootFactory.getOrThrow(AutoCloseFactory.class, ServiceLoader::load); this.renderer = RootFactory.getOrThrow(RendererFactory.class, ServiceLoader::load) .createRenderer(entityManager); + this.closeEntityManager = closeEntityManager; } @Override @@ -64,7 +70,16 @@ public Stream stream() { @Override public void close() { - //System.out.println("Closing Streamer<" + entityClass.getSimpleName() + ">"); - renderer.close(); + if (this.closeEntityManager) { + renderer.close(); + } else if (closed.compareAndSet(false, true)) { + System.out.println("JPAStreamer does not close Entity Managers obtained by a given Supplier."); + } } + + @Override + public StreamConfiguration configuration() { + return this.streamConfiguration; + } + } diff --git a/provider/application-standard/src/main/java/com/speedment/jpastreamer/application/standard/internal/Streamer.java b/provider/application-standard/src/main/java/com/speedment/jpastreamer/application/standard/internal/Streamer.java deleted file mode 100644 index d10ba424a..000000000 --- a/provider/application-standard/src/main/java/com/speedment/jpastreamer/application/standard/internal/Streamer.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * JPAstreamer - Express JPA queries with Java Streams - * Copyright (c) 2020-2022, Speedment, Inc. All Rights Reserved. - * - * License: GNU Lesser General Public License (LGPL), version 2.1 or later. - * - * This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; - * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. - * See the GNU Lesser General Public License for more details. - * - * See: https://github.com/speedment/jpa-streamer/blob/master/LICENSE - */ -package com.speedment.jpastreamer.application.standard.internal; - -import java.util.stream.Stream; - -interface Streamer { - - Stream stream(); - - void close(); -}