Skip to content

Promises.all

Richard Hightower edited this page Sep 9, 2016 · 13 revisions

We have all functionality with promises. You can create a promise that waits on all promises passed to it to be async returned.

All example

Promises.all(promise1, promise2, promise3).catchError(returnPromise::reject)
                .then(v -> returnPromise.resolve(true)).invoke()                

If every promise you pass to an all promise is invokeable then the all promise is invokeable as well, and calling invoke on it will invoke all of the children promises.

All example using Guava Reakt with for Cassandra

Promises.all(
 //Call to save Todo item in two table, don't respond until 
 // both calls come back from Cassandra.
 // First call to cassandra.
 futureToPromise(
         session.executeAsync(insertInto("Todo")
                 .value("id", todo.getId())
                 .value("updatedTime", todo.getUpdatedTime())
                 .value("createdTime", todo.getCreatedTime())
                 .value("name", todo.getName())
                 .value("description", todo.getDescription()))
                 ).catchError(error -> recordCassandraError("add.todo", error))
                 .thenSafe(resultSet -> handleResultFromAdd(resultSet, "add.todo")),
 // Second call to cassandra.
 futureToPromise(
         session.executeAsync(insertInto("TodoLookup")
                 .value("id", todo.getId())
                 .value("updatedTime", todo.getUpdatedTime()))
                 ).catchError(error -> recordCassandraError("add.lookup", error))
                 .thenSafe(resultSet -> handleResultFromAdd(resultSet, "add.lookup")
).catchError(returnPromise::reject)
                .then(v -> returnPromise.resolve(true)).invoke()                

Promises.all example

        /** Employee service. */
        EmployeeService employeeService = ...

        /* Promise that expects an employee. */
        Promise<Employee> promise1 = Promises.promise();
        Promise<Employee> promise2 = Promises.promise();


        /* Promise that returns when all employees are returned. */
        final Promise<Void> allPromise = Promises.all(promise1, promise2);


        allPromise.then(nil -> System.out.println("All DONE!"));

        assertFalse("Not done yet", allPromise.complete());

        /** Call service. */
        employeeService.loadEmployee("1", promise1);

        /** Still not done because only one service has been called. */
        assertFalse("Still not done yet", allPromise.complete());

        /** Ok now second service is called. */
        employeeService.loadEmployee("2", promise2);

        /** Wait some time. */
        //...

        assertTrue(allPromise.complete());
        assertTrue(allPromise.success());

We have three types of all promises: callback, blocking and replay callback. (A replay callback is a callback that gets replayed in the caller's thread).

API for creating all promise

...
public interface Promises ...{


    /**
     * All promises must complete.
     * @param promises promises
     * @return return containing promise
     */
    static Promise<Void> all(Promise<?>... promises) {
        return new AllPromise(promises);
    }

    /**
     * All promises must complete.
     * @param promises promises
     * @return return containing promise that is blocking.
     */
    static Promise<Void> allBlocking(Promise<?>... promises) {
        return new AllBlockingPromise(promises);
    }


    /**
     * All promises must complete.
     * @param timeout timeout
     * @param time time
     * @param promises promises
     * @return returns replay promise so promise can be replayed in caller's thread.
     */
    static ReplayPromise<Void> allReplay(final Duration timeout, long time, Promise<?>... promises) {
        return new AllReplayPromise(timeout, time, promises);
    }

    /**
     *
     * All promises must complete.
     * @param timeout timeout
     * @param promises promises
     * @return returns replay promise so promise can be replayed in caller's thread.
     */
    static ReplayPromise<Void> allReplay(final Duration timeout, Promise<?>... promises) {
        return Promises.allReplay(timeout, System.currentTimeMillis(), promises);
    }
...

Given this test service.

    public static class TestService {

        public void simple(Callback<Employee> callback) {
            callback.reply(new Employee("Rick"));
        }


        public void async(final Callback<Employee> callback) {

            new Thread(() -> {
                callback.reply(new Employee("Rick"));
            }).start();
        }


        public void asyncTimeout(final Callback<Employee> callback) {

            new Thread(() -> {
                try {
                    Thread.sleep(10_000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                callback.reply(new Employee("Rick"));
            }).start();
        }

        public void asyncError(final Callback<Employee> callback) {
            new Thread(() -> {
                callback.reject("Rick");
            }).start();
        }



        public void error(Callback<Employee> callback) {
            callback.reject("Error");
        }

        public void exception(Callback<Employee> callback) {
            callback.reject(new IllegalStateException("Error"));
        }
    }

We can have these three example tests.

Blocking example

import io.advantageous.reakt.Callback;
import io.advantageous.reakt.Expected;
import io.advantageous.reakt.Promises.promise;
    @Test
    public void testAllBlocking() throws Exception {

        TestService testService = new TestService();

        Promise<Employee> promise1 = Promises.promise();
        Promise<Employee> promise2 = Promises.promise();

        final Promise<Void> promise = Promises.allBlocking(promise1, promise2);

        assertFalse(promise.complete());

        testService.async(promise1);

        assertFalse(promise.complete());

        testService.async(promise2);

        assertTrue(promise.success());

    }

Callback example

    @Test
    public void testAll() throws Exception {

        /** Test service. */
        TestService testService = new TestService();

        /* Promise that expects an employee. */
        Promise<Employee> promise1 = Promises.promise();
        Promise<Employee> promise2 = Promises.promise();


        /* Promise that returns when all employees are returned. */
        final Promise<Void> promise = Promises.all(promise1, promise2);


        promise.then(nil -> System.out.println("DONE!"));

        assertFalse("Not done yet", promise.complete());

        /** Call service. */
        testService.simple(promise1);

        /** Still not done because only one service has been called. */
        assertFalse(promise.complete());

        /** Ok now second service is called. */
        testService.simple(promise2);

        /** Wait some time. */
        //...

        assertTrue(promise.complete());
        assertTrue(promise.success());

    }

Replay example

    @Test
    public void testAllReplay() throws Exception {

        TestService testService = new TestService();

        Promise<Employee> promise1 = Promises.promise();
        Promise<Employee> promise2 = Promises.promise();

        final ReplayPromise<Void> promise = Promises.allReplay(Duration.ofMillis(1000),
                promise1, promise2);

        assertFalse(promise.complete());

        testService.async(promise1);

        assertFalse(promise.complete());

        testService.async(promise2);



        for (int index=0; index < 10; index++) {
            promise.check(System.currentTimeMillis());
            if (promise.complete()) break;
            Thread.sleep(10);

        }


        assertTrue(promise.complete());
        assertTrue(promise.success());

    }