diff --git a/src/main/asciidoc/streams.adoc b/src/main/asciidoc/streams.adoc
index 6df8fc8c379..9fa7aaeab82 100644
--- a/src/main/asciidoc/streams.adoc
+++ b/src/main/asciidoc/streams.adoc
@@ -170,3 +170,15 @@ returns `true` if the write queue is considered full.
Will be called if an exception occurs on the `WriteStream`.
- {@link io.vertx.core.streams.WriteStream#drainHandler}:
The handler will be called if the `WriteStream` is considered no longer full.
+
+=== Reducing streams
+
+Java collectors can reduce a `ReadStream` to a result in a similar fashion `java.util.Stream` does, yet in an asynchronous
+fashion.
+
+[source,$lang]
+----
+{@link examples.StreamsExamples#reduce1}
+----
+
+Note that `collect` overrides any previously handler set on the stream.
diff --git a/src/main/java/examples/StreamsExamples.java b/src/main/java/examples/StreamsExamples.java
index 4911e08f565..d094f0cb2c2 100644
--- a/src/main/java/examples/StreamsExamples.java
+++ b/src/main/java/examples/StreamsExamples.java
@@ -11,7 +11,7 @@
package examples;
-import io.vertx.core.Handler;
+import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.file.AsyncFile;
@@ -23,8 +23,10 @@
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.net.NetSocket;
import io.vertx.core.streams.Pipe;
-import io.vertx.core.streams.Pump;
import io.vertx.core.streams.ReadStream;
+import io.vertx.core.streams.Pump;
+
+import java.util.stream.Collectors;
/**
* @author Julien Viet
@@ -169,4 +171,11 @@ public void pipe9(AsyncFile src, AsyncFile dst) {
dst.end(Buffer.buffer("done"));
});
}
+
+ public void reduce1(ReadStream stream) {
+ // Count the number of elements
+ Future result = stream.collect(Collectors.counting());
+
+ result.onSuccess(count -> System.out.println("Stream emitted " + count + " elements"));
+ }
}
diff --git a/src/main/java/io/vertx/core/streams/ReadStream.java b/src/main/java/io/vertx/core/streams/ReadStream.java
index 6ea543d1329..616e57c7274 100644
--- a/src/main/java/io/vertx/core/streams/ReadStream.java
+++ b/src/main/java/io/vertx/core/streams/ReadStream.java
@@ -12,14 +12,18 @@
package io.vertx.core.streams;
import io.vertx.codegen.annotations.Fluent;
+import io.vertx.codegen.annotations.GenIgnore;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
+import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.streams.impl.PipeImpl;
+import java.util.function.BiConsumer;
+
/**
* Represents a stream of items that can be read from.
*