-
Notifications
You must be signed in to change notification settings - Fork 35
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Discard Stream cache and expose StreamSupplier #342
Changes from 1 commit
3bd9115
cca9a31
824f9c7
2c3352e
a72c594
67007e4
ae168f2
156834a
603ba9a
75fd605
98f85f9
d433a19
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,7 +25,10 @@ | |
import java.util.stream.Stream; | ||
|
||
/** | ||
* A JPAStreamer is responsible for creating Streams from data sources | ||
* A JPAStreamer is responsible for creating Streams from data sources, | ||
* alternatively for creating Streamers that can be reused to create Streams of the same Entity source, | ||
* see {@link JPAStreamer#createStreamSupplier(StreamConfiguration)} | ||
* <p> | ||
* Entity sources can be RDBMSes, files or other data sources. | ||
* | ||
* A JPAStreamer must be thread safe and be able to handle several reading and | ||
|
@@ -138,7 +141,9 @@ public interface JPAStreamer { | |
* | ||
* @param <T> The element type (type of a class token) | ||
* @param streamConfiguration a configuration including an entity class (annotated with {@code @Entity}) | ||
* @return a new stream over all entities in this table in unspecified order | ||
* @return a new {@link Stream} over all entities in the | ||
underlying data source (e.g database) described by the provided | ||
{@code streamConfiguration} | ||
* | ||
* @throws RuntimeException if an error occurs during a Terminal Operation | ||
* (e.g. an SqlException is thrown by the underlying database) | ||
|
@@ -162,7 +167,7 @@ public interface JPAStreamer { | |
* underlying data source (e.g database) of the provided type | ||
* {@code entityClass} | ||
* | ||
* @see JPAStreamer#stream(StreamConfiguration) for furhter details | ||
* @see JPAStreamer#stream(StreamConfiguration) for further details | ||
*/ | ||
default <T> Stream<T> stream(final Class<T> entityClass) { | ||
requireNonNull(entityClass); | ||
|
@@ -174,6 +179,7 @@ default <T> Stream<T> stream(final Class<T> entityClass) { | |
* underlying data source (e.g database) of the {@code entity} specified | ||
* by the provided {@code projection}. | ||
* <p> | ||
* | ||
* This method is a convenience method equivalent to: | ||
* <pre>{@code stream(StreamConfiguration.of(projection.entityClass()).select(projection))}</pre> | ||
* | ||
|
@@ -190,6 +196,91 @@ default <T> Stream<T> stream(final Projection<T> projection) { | |
return stream(StreamConfiguration.of(projection.entityClass()).selecting(projection)); | ||
} | ||
|
||
/** | ||
* Creates and returns a new {@link StreamSupplier} that can create | ||
* {@link Stream}s over all entities in the underlying data source (e.g database) | ||
* of the provided type {@code entityClass}. | ||
* <p> | ||
* The provided {@link StreamSupplier} will <em>not</em> be closed whenever | ||
* the generated {@link Stream} instance is closed. | ||
* <p> | ||
* If you are using the same Stream source frequently e.g. Film.class, | ||
* consider configuring a {@link StreamSupplier} that can supply | ||
* {@link Stream}s from the same source over and over again. | ||
* This save resources and avoids instantiating a new {@link EntityManager} for each new {@link Stream}s. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for each new Stream There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed |
||
* <p> | ||
* Here is an example of using a {@link StreamSupplier}: | ||
* <pre>{@code | ||
* final StreamSupplier<Film> streamSupplier = jpaStreamer.createStreamSupplier(Film.class); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think using Try-With-Resources would be better here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could I provide both examples? I just think this way is clearer, but I totally get that TWR is a better practice. |
||
* List<Film> longFilms = streamSupplier.stream() | ||
* .filter(Film$.name.equal("Casablanca")) | ||
* .collect(toList()); // the terminal operation does not close the Stream Supplier and its Entity Manager | ||
* | ||
* // ... repeated uses of the supplier | ||
* | ||
* streamSupplier.close(); // closes the Entity Manager | ||
* }</pre> | ||
* The above is equal to: | ||
* <pre>{@code | ||
* List<Film> films = jpaStreamer.stream(Film.class) | ||
* .filter(Film$.name.equal("Casablanca")) | ||
* .collect(toList()); // the terminal operation closes the underlying StreamSupplier and its Entity Manager | ||
* }</pre> | ||
* <p> | ||
* | ||
* @param <T> The element type (type of a class token) | ||
* @param streamConfiguration a configuration including an entity class (annotated with {@code @Entity}) | ||
* @return a new {@link StreamSupplier} that can create | ||
* {@link Stream}s over all entities in the | ||
* underlying data source (e.g database) described by the provided | ||
* {@code streamConfiguration} | ||
*/ | ||
<T> StreamSupplier<T> createStreamSupplier(StreamConfiguration<T> streamConfiguration); | ||
|
||
/** | ||
* Creates and returns a new {@link StreamSupplier} that can create | ||
* {@link Stream}s over all entities in the underlying data source (e.g database) | ||
* of the provided type {@code entityClass}. | ||
* <p> | ||
* This method is a convenience method equivalent to: | ||
* <pre>{@code createStreamer(StreamConfiguration.of(entityClass))}</pre> | ||
* | ||
* @param <T> The element type (type of a class token) | ||
* @param entityClass to use in generated {@link Stream}s | ||
* @return a new {@link StreamSupplier} that can create | ||
* {@link Stream}s over all entities in the | ||
* underlying data source (e.g database) of the provided | ||
* type {@code entityClass} | ||
* | ||
* @see JPAStreamer#createStreamSupplier(StreamConfiguration) (StreamConfiguration) for further details | ||
*/ | ||
default <T> StreamSupplier<T> createStreamSupplier(final Class<T> entityClass) { | ||
requireNonNull(entityClass); | ||
return createStreamSupplier(StreamConfiguration.of(entityClass)); | ||
} | ||
|
||
/** | ||
* Creates and returns a new {@link StreamSupplier} that can create | ||
* {@link Stream}s over all entities in the underlying data source (e.g database) | ||
* of the {@code entity} specified by the provided {@code projection}. | ||
* <p> | ||
* This method is a convenience method equivalent to: | ||
* <pre>{@code createStreamer(StreamConfiguration.of(entityClass))}</pre> | ||
* | ||
* @param <T> The element type (type of a class token) | ||
* @param projection to use | ||
* @return a new {@link StreamSupplier} that can create | ||
* {@link Stream}s over all entities in the | ||
* underlying data source (e.g database) of the {@code entity} | ||
* specified by the provided {@code projection}. | ||
* | ||
* @see JPAStreamer#createStreamSupplier(StreamConfiguration) (StreamConfiguration) for further details | ||
*/ | ||
default <T> StreamSupplier<T> createStreamSupplier(final Projection<T> projection) { | ||
requireNonNull(projection); | ||
return createStreamSupplier(StreamConfiguration.of(projection.entityClass()).selecting(projection)); | ||
} | ||
|
||
/** | ||
* Resets the Streamer associated with the provided Entity classes. | ||
* <p> | ||
|
@@ -199,9 +290,12 @@ default <T> Stream<T> stream(final Projection<T> projection) { | |
* In case JPAStreamer was configured with a {@code Supplier<EntityManager>} the lifecycle of the Entity Managers is | ||
* not managed by JPAStreamer, thus use of the method is not permitted and will result in an {@code UnsupportedOperationException}. | ||
* | ||
* @deprecated since 3.0.2, JPAStreamer no longer caches Streamers thus there is no need for a method that resets the cache. | ||
* If you wish to manage the Streamer lifecycle manually, see {@link JPAStreamer#createStreamSupplier(StreamConfiguration)} | ||
* @param entityClasses of the streamer | ||
* @throws UnsupportedOperationException if JPAStreamer is configured with a Supplier, see {@code com.speedment.jpastreamer.application.JPAStreamer#of(java.util.function.Supplier)} | ||
*/ | ||
@Deprecated(since = "3.0.2", forRemoval = true) | ||
void resetStreamer(Class<?>... entityClasses) throws UnsupportedOperationException; | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
package com.speedment.jpastreamer.application; | ||
|
||
import com.speedment.jpastreamer.streamconfiguration.StreamConfiguration; | ||
|
||
import java.util.stream.Stream; | ||
|
||
/** | ||
* A Stream Supplier is responsible for creating Streams from a data source | ||
* An entity source can be RDBMSes, files or other data sources. | ||
* | ||
* A Stream Supplier must be thread safe and be able to handle several reading and | ||
* writing threads at the same time. | ||
* | ||
* @author Per Minborg, Julia Gustafsson | ||
* @since 3.0.1 | ||
*/ | ||
public interface StreamSupplier<T> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you mean extends |
||
|
||
/** | ||
* Creates and returns a new {@link Stream} over all entities in the | ||
* underlying data source (e.g database) according to the {@code streamConfiguration} associated with this Streamer. | ||
* <p> | ||
* The order in which elements are returned when the stream is eventually | ||
* consumed <em>is unspecified</em>. The order may even change from one | ||
* invocation to another. Thus, it is an error to assume any particular | ||
* element order even though is might appear, for some stream sources, that | ||
* there is a de-facto order. | ||
* <p> | ||
* If a deterministic order is required, then make sure to invoke the | ||
* {@link Stream#sorted(java.util.Comparator)} method on the {@link Stream} | ||
* returned. | ||
* <p> | ||
* Mutable elements are not reused within the stream. More formally, there | ||
* are no pair of mutable stream elements <code>e1</code> and | ||
* <code>e2</code> such that <code>e1 == e2</code>. | ||
* <p> | ||
* The Stream will never contain <code>null</code> elements. | ||
* <p> | ||
* This is <em>an inexpensive O(1) operation</em> that will complete in | ||
* constant time regardless of the number of entities in the underlying | ||
* database. | ||
* <p> | ||
* The returned stream is aware of its own pipeline and will optionally | ||
* <em>optimize its own pipeline</em> whenever it encounters a <em>Terminal | ||
* Operation</em> so that it will only iterate over a minimum set of | ||
* matching entities. | ||
* <p> | ||
* When a Terminal Operation is eventually called on the {@link Stream}, | ||
* that execution time of the Terminal Operation will depend on the | ||
* optimized pipeline and the entities in the underlying database. | ||
* <p> | ||
* The Stream will be automatically | ||
* {@link Stream#onClose(java.lang.Runnable) closed} after the Terminal | ||
* Operation is completed or if an Exception is thrown during the Terminal | ||
* Operation. | ||
* <p> | ||
* | ||
* Some of the <em>Terminal Operations</em> are: | ||
* <ul> | ||
* <li>{@link Stream#forEach(java.util.function.Consumer) forEach(Consumer)} | ||
* <li>{@link Stream#forEachOrdered(java.util.function.Consumer) forEachOrdered(Consumer)} | ||
* <li>{@link Stream#toArray() toArray()} | ||
* <li>{@link Stream#toArray(java.util.function.IntFunction) toArray(IntFunction)} | ||
* <li>{@link Stream#reduce(java.util.function.BinaryOperator) reduce(BinaryOperation} | ||
* <li>{@link Stream#reduce(java.lang.Object, java.util.function.BinaryOperator) reduce(Object, BinaryOperator)} | ||
* <li>{@link Stream#reduce(java.lang.Object, java.util.function.BiFunction, java.util.function.BinaryOperator) reduce(Object, BiFunction, BinaryOperator)} | ||
* <li>{@link Stream#collect(java.util.stream.Collector) collect(Collector)} | ||
* <li>{@link Stream#collect(java.util.function.Supplier, java.util.function.BiConsumer, java.util.function.BiConsumer) collect(Supplier, BiConsumer, BiConsumer)} | ||
* <li>{@link Stream#min(java.util.Comparator) min(Comparator)} | ||
* <li>{@link Stream#max(java.util.Comparator) min(Comparator)} | ||
* <li>{@link Stream#count() count()} | ||
* <li>{@link Stream#anyMatch(java.util.function.Predicate) anyMatch(Predicate)} | ||
* <li>{@link Stream#noneMatch(java.util.function.Predicate) noneMatch(Predicate)} | ||
* <li>{@link Stream#findFirst() findFirst()} | ||
* <li>{@link Stream#findAny() findAny()} | ||
* <li>{@link Stream#iterator() iterator()} | ||
* </ul> | ||
* <p> | ||
* Any Terminating Operation may throw an Exception if the | ||
* underlying database throws an Exception (e.g. an SqlException) | ||
* <p> | ||
* Because the Stream may short-circuit operations in the Stream pipeline, | ||
* methods having side-effects (like | ||
* {@link Stream#peek(java.util.function.Consumer) peek(Consumer)} will | ||
* potentially be affected by the optimization. | ||
* <p> | ||
* Here are some examples of how the stream optimization might work: | ||
* <ul> | ||
* <li> | ||
* <pre>{@code stream(Film.class) | ||
* .filter(Film$.name.equal("Casablanca")) | ||
* .collect(toList());}</pre> | ||
* <pre>{@code -> select * from film where name='Casablanca'}</pre> | ||
* </li> | ||
* <li> | ||
* <pre>{@code stream.count();}</pre> | ||
* <pre>{@code -> select count(*) from film}</pre> | ||
* </li> | ||
* <li> | ||
* <pre>{@code stream(Film.class) | ||
* .filter(Film$.name.startsWith("A")) | ||
* .count();}</pre> | ||
* <pre>{@code -> select count(*) from hares where | ||
* name LIKE 'A%'}</pre> | ||
* <p> | ||
* </li> | ||
* <li> | ||
* <pre>{@code stream.stream(Film.class) | ||
* .filter(Film$.rating.equal("G") | ||
* .filter(Film$.length.greaterThan(100) | ||
* .count();}</pre> | ||
* <pre>{@code -> select count(*) from hares where | ||
* rating ='G' | ||
* and | ||
* length > 100}</pre> | ||
* </li> | ||
* </ul> | ||
* | ||
* @return a new stream over all entities in this table in unspecified order | ||
* | ||
* @throws RuntimeException if an error occurs during a Terminal Operation | ||
* (e.g. an SqlException is thrown by the underlying database) | ||
* | ||
* @see java.util.stream | ||
* @see Stream | ||
*/ | ||
Stream<T> stream(); | ||
|
||
/** | ||
* Closes this Stream Supplier and releases any resources potentially held, such as the underlying Entity Manager. | ||
*/ | ||
void close(); | ||
|
||
/** | ||
* Returns the {@link StreamConfiguration} that describes the stream source of the Streams generated by this Supplier. | ||
* | ||
* @return the configuration of the Streams generated by this Supplier | ||
*/ | ||
StreamConfiguration<T> configuration(); | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ | |
import com.speedment.jpastreamer.announcer.Announcer; | ||
import com.speedment.jpastreamer.appinfo.ApplicationInformation; | ||
import com.speedment.jpastreamer.application.JPAStreamer; | ||
import com.speedment.jpastreamer.application.StreamSupplier; | ||
import com.speedment.jpastreamer.rootfactory.RootFactory; | ||
import com.speedment.jpastreamer.streamconfiguration.StreamConfiguration; | ||
|
||
|
@@ -33,7 +34,7 @@ final class StandardJPAStreamer implements JPAStreamer { | |
|
||
private final Supplier<EntityManager> entityManagerSupplier; | ||
private final Runnable closeHandler; | ||
private final Map<StreamConfiguration<?>, Streamer<?>> streamerCache; | ||
private final Map<StreamConfiguration<?>, StreamSupplier<?>> streamerCache; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think now we can delete this field. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are right! |
||
private final AnalyticsReporter analyticsReporter; | ||
|
||
private final boolean closeEntityManagers; | ||
|
@@ -51,38 +52,28 @@ final class StandardJPAStreamer implements JPAStreamer { | |
} | ||
|
||
@Override | ||
@SuppressWarnings("unchecked") | ||
public <T> Stream<T> stream(final StreamConfiguration<T> streamConfiguration) { | ||
requireNonNull(streamConfiguration); | ||
if (cached(streamConfiguration)) { | ||
return (Stream<T>) streamerCache | ||
.computeIfAbsent(streamConfiguration, ec -> new StandardStreamer<>(streamConfiguration, entityManagerSupplier)) | ||
.stream(); | ||
} else { | ||
final Streamer<T> streamer = new StandardStreamer<>(streamConfiguration, entityManagerSupplier); | ||
return closeEntityManagers ? | ||
final StreamSupplier<T> streamer = new StandardStreamSupplier<>(streamConfiguration, entityManagerSupplier, this.closeEntityManagers); | ||
return closeEntityManagers ? | ||
streamer.stream().onClose(streamer::close) : | ||
streamer.stream(); | ||
} | ||
streamer.stream(); | ||
} | ||
|
||
@Override | ||
public void resetStreamer(Class<?>... entityClasses) throws UnsupportedOperationException{ | ||
if (!closeEntityManagers) { | ||
throw new UnsupportedOperationException("An instance of JPAStreamer.of(Supplier<EntityManager>) is not responsible for the lifecycle of the supplied Entity Managers, and thus cannot reset the Entity Managers."); | ||
} | ||
Arrays.stream(entityClasses) | ||
.map(StreamConfiguration::of) | ||
.filter(streamerCache::containsKey) | ||
.forEach(sc -> { | ||
streamerCache.get(sc).close(); // Close Entity Manager | ||
streamerCache.remove(sc); | ||
}); | ||
public <T> StreamSupplier<T> createStreamSupplier(StreamConfiguration<T> streamConfiguration) { | ||
requireNonNull(streamConfiguration); | ||
return new StandardStreamSupplier<>(streamConfiguration, entityManagerSupplier, this.closeEntityManagers); | ||
} | ||
|
||
@Override | ||
public void resetStreamer(Class<?>... entityClasses) throws UnsupportedOperationException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should not declare There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed |
||
// As there no longer exists a Streamer cache, this method has no effect | ||
} | ||
|
||
@Override | ||
public void close() { | ||
streamerCache.values().forEach(Streamer::close); | ||
streamerCache.values().forEach(StreamSupplier::close); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This line can be removed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
analyticsReporter.stop(); | ||
closeHandler.run(); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
creating
StreamSuppliers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, a relic of my first attempt to introduce Streamers.