From f8c3d5abc3617f8b683ffc35d9c82b940566de0b Mon Sep 17 00:00:00 2001 From: Garrett Jones Date: Tue, 9 Feb 2016 11:46:12 -0800 Subject: [PATCH] Threshold bundling logic + new BundlingCallable --- build.gradle | 5 +- .../bundling/AccumulatingBundleReceiver.java | 60 +++++ .../gapi/gax/bundling/BundlingThreshold.java | 55 +++++ .../gapi/gax/bundling/BundlingThresholds.java | 56 +++++ .../io/gapi/gax/bundling/ElementCounter.java | 43 ++++ .../gapi/gax/bundling/NumericThreshold.java | 68 ++++++ .../gax/bundling/ThresholdBundleReceiver.java | 53 +++++ .../gapi/gax/bundling/ThresholdBundler.java | 224 ++++++++++++++++++ .../bundling/ThresholdBundlingForwarder.java | 111 +++++++++ .../java/io/gapi/gax/grpc/ApiCallable.java | 23 +- .../java/io/gapi/gax/grpc/BundleExecutor.java | 94 ++++++++ .../java/io/gapi/gax/grpc/BundlerFactory.java | 102 ++++++++ .../io/gapi/gax/grpc/BundlingCallable.java | 73 ++++++ .../io/gapi/gax/grpc/BundlingContext.java | 102 ++++++++ .../io/gapi/gax/grpc/BundlingDescriptor.java | 66 ++++++ .../io/gapi/gax/grpc/BundlingSettings.java | 57 +++++ .../gapi/gax/grpc/ChannelBindingCallable.java | 2 +- .../java/io/gapi/gax/grpc/RequestIssuer.java | 55 +++++ .../gax/bundling/ThresholdBundlerTest.java | 175 ++++++++++++++ .../io/gapi/gax/grpc/ApiCallableTest.java | 161 ++++++++++++- .../io/gapi/gax/grpc/BundleExecutorTest.java | 83 +++++++ 21 files changed, 1658 insertions(+), 10 deletions(-) create mode 100644 src/main/java/io/gapi/gax/bundling/AccumulatingBundleReceiver.java create mode 100644 src/main/java/io/gapi/gax/bundling/BundlingThreshold.java create mode 100644 src/main/java/io/gapi/gax/bundling/BundlingThresholds.java create mode 100644 src/main/java/io/gapi/gax/bundling/ElementCounter.java create mode 100644 src/main/java/io/gapi/gax/bundling/NumericThreshold.java create mode 100644 src/main/java/io/gapi/gax/bundling/ThresholdBundleReceiver.java create mode 100644 src/main/java/io/gapi/gax/bundling/ThresholdBundler.java create mode 100644 src/main/java/io/gapi/gax/bundling/ThresholdBundlingForwarder.java create mode 100644 src/main/java/io/gapi/gax/grpc/BundleExecutor.java create mode 100644 src/main/java/io/gapi/gax/grpc/BundlerFactory.java create mode 100644 src/main/java/io/gapi/gax/grpc/BundlingCallable.java create mode 100644 src/main/java/io/gapi/gax/grpc/BundlingContext.java create mode 100644 src/main/java/io/gapi/gax/grpc/BundlingDescriptor.java create mode 100644 src/main/java/io/gapi/gax/grpc/BundlingSettings.java create mode 100644 src/main/java/io/gapi/gax/grpc/RequestIssuer.java create mode 100644 src/test/java/io/gapi/gax/bundling/ThresholdBundlerTest.java create mode 100644 src/test/java/io/gapi/gax/grpc/BundleExecutorTest.java diff --git a/build.gradle b/build.gradle index 93780a97f626..38c27b2f5b72 100644 --- a/build.gradle +++ b/build.gradle @@ -2,6 +2,7 @@ apply plugin: 'java' apply plugin: 'maven' apply plugin: 'eclipse' apply plugin: 'idea' +apply plugin: 'jacoco' group = "com.google.api" version = "0.0.0-SNAPSHOT" @@ -19,6 +20,7 @@ ext { jsr305: 'com.google.code.findbugs:jsr305:3.0.0', autovalue: 'com.google.auto.value:auto-value:1.1', guice: 'com.google.inject:guice:4.0', + joda: 'joda-time:joda-time:2.8.2', // Testing junit: 'junit:junit:4.11', @@ -37,7 +39,8 @@ dependencies { libraries.guava, libraries.guice, libraries.jsr305, - libraries.autovalue + libraries.autovalue, + libraries.joda testCompile libraries.junit, libraries.mockito, diff --git a/src/main/java/io/gapi/gax/bundling/AccumulatingBundleReceiver.java b/src/main/java/io/gapi/gax/bundling/AccumulatingBundleReceiver.java new file mode 100644 index 000000000000..0500f6701094 --- /dev/null +++ b/src/main/java/io/gapi/gax/bundling/AccumulatingBundleReceiver.java @@ -0,0 +1,60 @@ +/* + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.gapi.gax.bundling; + +import java.util.ArrayList; +import java.util.List; + +/** + * A simple ThresholdBundleReceiver that just accumulates bundles. + * Not thread-safe. + */ +public class AccumulatingBundleReceiver implements ThresholdBundleReceiver { + private List> bundles = new ArrayList<>(); + + @Override + public void validateItem(T message) { + // no-op + } + + @Override + public void processBundle(List bundle) { + bundles.add(bundle); + } + + /** + * Returns the accumulated bundles. + */ + public List> getBundles() { + return bundles; + } +} diff --git a/src/main/java/io/gapi/gax/bundling/BundlingThreshold.java b/src/main/java/io/gapi/gax/bundling/BundlingThreshold.java new file mode 100644 index 000000000000..5fc1a8e78974 --- /dev/null +++ b/src/main/java/io/gapi/gax/bundling/BundlingThreshold.java @@ -0,0 +1,55 @@ +/* + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.gapi.gax.bundling; + +/** + * The interface representing a threshold to be used in ThresholdBundler. + * Thresholds do not need to be thread-safe if they are only used inside + * of ThresholdBundler. + */ +public interface BundlingThreshold { + + /** + * Presents the element to the threshold for the attribute of interest to be accumulated. + */ + void accumulate(E e); + + /** + * @returns whether the threshold has been reached. + */ + boolean isThresholdReached(); + + /** + * Reset the accumulated value back to zero. + */ + void reset(); +} diff --git a/src/main/java/io/gapi/gax/bundling/BundlingThresholds.java b/src/main/java/io/gapi/gax/bundling/BundlingThresholds.java new file mode 100644 index 000000000000..831b8bbe54ff --- /dev/null +++ b/src/main/java/io/gapi/gax/bundling/BundlingThresholds.java @@ -0,0 +1,56 @@ +/* + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.gapi.gax.bundling; + +import com.google.common.collect.ImmutableList; + +/** + * Factory methods for general-purpose bundling thresholds. + */ +public class BundlingThresholds { + + /** + * Creates an ImmutableList containing only a single threshold which counts + * the number of elements. This is helpful for when using ThresholdBundler for + * the simple case, when the element count is the only threshold. + */ + public static ImmutableList> of(long elementThreshold) { + BundlingThreshold bundlingThreshold = + new NumericThreshold(elementThreshold, new ElementCounter() { + @Override + public long count(E e) { + return 1; + } + }); + return ImmutableList.>of(bundlingThreshold); + } +} diff --git a/src/main/java/io/gapi/gax/bundling/ElementCounter.java b/src/main/java/io/gapi/gax/bundling/ElementCounter.java new file mode 100644 index 000000000000..d49c4a2dd2c5 --- /dev/null +++ b/src/main/java/io/gapi/gax/bundling/ElementCounter.java @@ -0,0 +1,43 @@ +/* + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.gapi.gax.bundling; + +/** + * Interface representing an object that provides a numerical count given + * an object of the parameterized type. + */ +public interface ElementCounter { + /** + * Provides the numerical count associated with the given object. + */ + public long count(E element); +} diff --git a/src/main/java/io/gapi/gax/bundling/NumericThreshold.java b/src/main/java/io/gapi/gax/bundling/NumericThreshold.java new file mode 100644 index 000000000000..25a37f8f8f12 --- /dev/null +++ b/src/main/java/io/gapi/gax/bundling/NumericThreshold.java @@ -0,0 +1,68 @@ +/* + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.gapi.gax.bundling; + +import com.google.common.base.Preconditions; + +/** + * A threshold which accumulates a count based on the provided + * ElementCounter. + */ +public class NumericThreshold implements BundlingThreshold { + private final long threshold; + private final ElementCounter extractor; + private long sum; + + /** + * Constructs a NumericThreshold. + */ + public NumericThreshold(long threshold, ElementCounter extractor) { + this.threshold = threshold; + this.extractor = Preconditions.checkNotNull(extractor); + this.sum = 0; + } + + @Override + public void accumulate(E e) { + sum += extractor.count(e); + } + + @Override + public boolean isThresholdReached() { + return sum >= threshold; + } + + @Override + public void reset() { + sum = 0; + } +} diff --git a/src/main/java/io/gapi/gax/bundling/ThresholdBundleReceiver.java b/src/main/java/io/gapi/gax/bundling/ThresholdBundleReceiver.java new file mode 100644 index 000000000000..78adc9230b72 --- /dev/null +++ b/src/main/java/io/gapi/gax/bundling/ThresholdBundleReceiver.java @@ -0,0 +1,53 @@ +/* + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.gapi.gax.bundling; + +import java.util.List; + +/** + * Interface representing an object that receives bundles from a + * ThresholdBundler and takes action on them. + */ +public interface ThresholdBundleReceiver { + + /** + * Validate that the item can be received by this ThresholdBundleReceiver. + * This is called to validate an item before it is queued. + */ + void validateItem(T message); + + /** + * Process the given bundle. + */ + void processBundle(List bundle); + +} diff --git a/src/main/java/io/gapi/gax/bundling/ThresholdBundler.java b/src/main/java/io/gapi/gax/bundling/ThresholdBundler.java new file mode 100644 index 000000000000..8a957de746b4 --- /dev/null +++ b/src/main/java/io/gapi/gax/bundling/ThresholdBundler.java @@ -0,0 +1,224 @@ +/* + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.gapi.gax.bundling; + +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableList; + +import org.joda.time.Duration; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Queues up elements until either a duration of time has passed or any threshold in a given set of + * thresholds is breached, and then delivers the elements in a bundle to the consumer. + */ +public class ThresholdBundler { + + private final ImmutableList> thresholds; + private final Duration maxDelay; + + private final Lock lock = new ReentrantLock(); + private final Condition bundleCondition = lock.newCondition(); + private boolean bundleReady = false; + + private Stopwatch bundleStopwatch; + private final List data = new ArrayList<>(); + + public ThresholdBundler(ImmutableList> thresholds) { + this(null, thresholds); + } + + public ThresholdBundler(Duration maxDelay) { + this(maxDelay, ImmutableList.>of()); + } + + public ThresholdBundler(Duration maxDelay, ImmutableList> thresholds) { + this.thresholds = Preconditions.checkNotNull(thresholds); + this.maxDelay = maxDelay; + } + + /** + * Adds an element to the bundler. If the element causes the collection to go past any of the + * thresholds, the bundle will be made available to consumers. + */ + public void add(E e) { + final Lock lock = this.lock; + lock.lock(); + try { + boolean signal = false; + // TODO verify invariant: bundleStopwatch == null iff size() == 0 + if (data.size() == 0) { + bundleStopwatch = Stopwatch.createStarted(); + // we want to trigger the signal so that we switch the await from an unbounded + // await to a time-bounded await. + signal = true; + } + data.add(e); + if (!bundleReady) { + for (BundlingThreshold threshold : thresholds) { + threshold.accumulate(e); + if (threshold.isThresholdReached()) { + bundleReady = true; + signal = true; + break; + } + } + } + if (signal) { + bundleCondition.signalAll(); + } + } finally { + lock.unlock(); + } + } + + /** + * Makes the currently contained elements available for consumption, even if no thresholds + * were triggered. + */ + public void flush() { + final Lock lock = this.lock; + lock.lock(); + try { + bundleReady = true; + bundleCondition.signalAll(); + } finally { + lock.unlock(); + } + } + + /** + * Remove all currently contained elements, regardless of whether they have triggered any + * thresholds. All elements are placed into 'bundle'. + * + * @returns the number of items added to 'bundle'. + */ + public int drainTo(Collection bundle) { + final Lock lock = this.lock; + lock.lock(); + try { + int dataSize = data.size(); + + bundle.addAll(data); + data.clear(); + + for (BundlingThreshold threshold : thresholds) { + threshold.reset(); + } + + bundleStopwatch = null; + bundleReady = false; + + return dataSize; + } finally { + lock.unlock(); + } + } + + /** + * Waits until a bundle is available, and returns it once it is. + */ + public List takeBundle() throws InterruptedException { + final Lock lock = this.lock; + lock.lockInterruptibly(); + try { + while (shouldWait()) { + if (data.size() == 0 || maxDelay == null) { + // if an element gets added, this will be signaled, then we will re-check the while-loop + // condition to see if the delay or other thresholds have been exceeded, + // and if none of these are true, then we will arrive at the time-bounded + // await in the else clause. + bundleCondition.await(); + } else { + bundleCondition.await(getDelayLeft().getMillis(), TimeUnit.MILLISECONDS); + } + } + List bundle = new ArrayList<>(); + drainTo(bundle); + return bundle; + } finally { + lock.unlock(); + } + } + + /** + * Returns the number of elements queued up in the bundler. + */ + public int size() { + final Lock lock = this.lock; + lock.lock(); + try { + return data.size(); + } finally { + lock.unlock(); + } + } + + /** + * Returns the elements queued up in the bundler. + */ + public Object[] toArray() { + final Lock lock = this.lock; + lock.lock(); + try { + return data.toArray(); + } finally { + lock.unlock(); + } + } + + private boolean shouldWait() { + if (data.size() == 0) { + return true; + } + if (bundleReady) { + return false; + } + if (maxDelay == null) { + return true; + } + return getDelayLeft().getMillis() > 0; + } + + // pre-condition: data.size() > 0 ( === bundleStopwatch != null) + private Duration getDelayLeft() { + return Duration.millis(maxDelay.getMillis() - bundleStopwatch.elapsed(TimeUnit.MILLISECONDS)); + } +} diff --git a/src/main/java/io/gapi/gax/bundling/ThresholdBundlingForwarder.java b/src/main/java/io/gapi/gax/bundling/ThresholdBundlingForwarder.java new file mode 100644 index 000000000000..8811b636387c --- /dev/null +++ b/src/main/java/io/gapi/gax/bundling/ThresholdBundlingForwarder.java @@ -0,0 +1,111 @@ +/* + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.gapi.gax.bundling; + +import java.util.ArrayList; +import java.util.List; + +/** + * Accepts individual items and then forwards them in bundles to the given + * ThresholdBundleReceiver for processing. This class essentially converts + * the pull interface of ThresholdBundler into the push interface of + * ThresholdBundleReceiver. + */ +public class ThresholdBundlingForwarder implements AutoCloseable { + private final ThresholdBundler bundler; + private final ThresholdBundleReceiver bundleReceiver; + private final BundleForwardingRunnable forwardingRunnable; + private final Thread forwarderThread; + + /** + * Constructs a ThresholdBundlingForwarder. The start() method must + * be called for the forwarder to start forwarding. + */ + public ThresholdBundlingForwarder(ThresholdBundler bundler, + ThresholdBundleReceiver bundleReceiver) { + this.bundleReceiver = bundleReceiver; + this.bundler = bundler; + forwardingRunnable = new BundleForwardingRunnable(); + forwarderThread = new Thread(forwardingRunnable); + } + + /** + * Start the forwarder thread. + */ + public void start() { + forwarderThread.start(); + } + + /** + * First validates that the receiver can receive the given item (based + * on the inherent characteristics of the item), and then hands it off to + * the bundler. + */ + public void addToNextBundle(T item) { + bundleReceiver.validateItem(item); + bundler.add(item); + } + + @Override + public void close() { + forwarderThread.interrupt(); + try { + forwarderThread.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + private class BundleForwardingRunnable implements Runnable { + @Override + public void run() { + do { + try { + processBundle(bundler.takeBundle()); + } catch (InterruptedException e) { + break; + } + } while (!Thread.currentThread().isInterrupted()); + + List lastBundle = new ArrayList<>(); + bundler.drainTo(lastBundle); + processBundle(lastBundle); + } + + private void processBundle(List bundle) { + if (bundle.size() == 0) { + return; + } + bundleReceiver.processBundle(bundle); + } + } +} diff --git a/src/main/java/io/gapi/gax/grpc/ApiCallable.java b/src/main/java/io/gapi/gax/grpc/ApiCallable.java index 366118d4e2cf..f9a180d195fc 100644 --- a/src/main/java/io/gapi/gax/grpc/ApiCallable.java +++ b/src/main/java/io/gapi/gax/grpc/ApiCallable.java @@ -115,13 +115,17 @@ public void asyncCall(CallContext context, StreamObserver o new FutureCallback() { @Override public void onFailure(Throwable t) { - observer.onError(t); + if (observer != null) { + observer.onError(t); + } } @Override public void onSuccess(ResponseT result) { - observer.onNext(result); - observer.onCompleted(); + if (observer != null) { + observer.onNext(result); + observer.onCompleted(); + } } }); } @@ -142,7 +146,7 @@ public void asyncCall(RequestT request, StreamObserver observer) { */ public static ApiCallable create( MethodDescriptor descriptor) { - return create(new DescriptorClientCallFactory(descriptor)); + return create(new DescriptorClientCallFactory<>(descriptor)); } /** @@ -188,4 +192,15 @@ public ApiCallable> pageStreaming( return new ApiCallable>( new PageStreamingCallable(callable, pageDescriptor)); } + + /** + * Returns a callable which bundles the call, meaning that multiple requests are bundled + * together and sent at the same time. + */ + public ApiCallable bundling( + BundlingDescriptor bundlingDescriptor, + BundlerFactory bundlerFactory) { + return new ApiCallable( + new BundlingCallable(callable, bundlingDescriptor, bundlerFactory)); + } } diff --git a/src/main/java/io/gapi/gax/grpc/BundleExecutor.java b/src/main/java/io/gapi/gax/grpc/BundleExecutor.java new file mode 100644 index 000000000000..85c350f1d758 --- /dev/null +++ b/src/main/java/io/gapi/gax/grpc/BundleExecutor.java @@ -0,0 +1,94 @@ +/* + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.gapi.gax.grpc; + +import com.google.common.base.Preconditions; + +import io.gapi.gax.bundling.ThresholdBundleReceiver; + +import java.util.ArrayList; +import java.util.List; + +/** + * A bundle receiver which uses a provided bundling descriptor to merge + * the items from the bundle into a single request, invoke the callable from + * the bundling context to issue the request, split the bundle response + * into the components matching each incoming request, and finally send + * the result back to the listener for each request. + */ +public class BundleExecutor + implements ThresholdBundleReceiver> { + + private final BundlingDescriptor bundlingDescriptor; + private final String partitionKey; + + public BundleExecutor(BundlingDescriptor bundlingDescriptor, + String partitionKey) { + this.bundlingDescriptor = Preconditions.checkNotNull(bundlingDescriptor); + this.partitionKey = Preconditions.checkNotNull(partitionKey); + } + + @Override + public void validateItem(BundlingContext item) { + String itemPartitionKey = + bundlingDescriptor.getBundlePartitionKey(item.getCallContext().getRequest()); + if (!itemPartitionKey.equals(partitionKey)) { + String requestClassName = + item.getCallContext().getRequest().getClass().getSimpleName(); + throw new IllegalArgumentException( + String.format("For type %s, invalid partition key: %s, should be: %s", + requestClassName, itemPartitionKey, partitionKey)); + } + } + + @Override + public void processBundle(List> bundle) { + List requests = new ArrayList<>(bundle.size()); + for (BundlingContext message : bundle) { + requests.add(message.getCallContext().getRequest()); + } + RequestT bundleRequest = bundlingDescriptor.mergeRequests(requests); + ApiCallable callable = bundle.get(0).getCallable(); + + try { + ResponseT bundleResponse = callable.call(bundleRequest); + bundlingDescriptor.splitResponse(bundleResponse, bundle); + } catch (Throwable exception) { + bundlingDescriptor.splitException(exception, bundle); + } + + for (BundlingContext message : bundle) { + message.sendResult(); + } + } + +} diff --git a/src/main/java/io/gapi/gax/grpc/BundlerFactory.java b/src/main/java/io/gapi/gax/grpc/BundlerFactory.java new file mode 100644 index 000000000000..21d240340a22 --- /dev/null +++ b/src/main/java/io/gapi/gax/grpc/BundlerFactory.java @@ -0,0 +1,102 @@ +/* + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.gapi.gax.grpc; + +import io.gapi.gax.bundling.ThresholdBundler; +import io.gapi.gax.bundling.ThresholdBundlingForwarder; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * A Factory class which, for each unique partitionKey, creates a trio including + * a ThresholdBundler, BundleExecutor, and ThresholdBundlingForwarder. The + * ThresholdBundlingForwarder pulls items from the ThresholdBundler and forwards + * them to the BundleExecutor for processing. + */ +public class BundlerFactory implements AutoCloseable { + private final Map>> forwarders = + new ConcurrentHashMap<>(); + private final BundlingDescriptor bundlingDescriptor; + private final BundlingSettings bundlingSettings; + private final Object lock = new Object(); + + public BundlerFactory(BundlingDescriptor bundlingDescriptor, + BundlingSettings bundlingSettings) { + this.bundlingDescriptor = bundlingDescriptor; + this.bundlingSettings = bundlingSettings; + } + + /** + * Provides the ThresholdBundlingForwarder corresponding to the give + * partitionKey, or constructs one if it doesn't exist yet. The implementation + * is thread-safe. + */ + public ThresholdBundlingForwarder> + getForwarder(String partitionKey) { + ThresholdBundlingForwarder> forwarder = + forwarders.get(partitionKey); + if (forwarder == null) { + synchronized(lock) { + forwarder = forwarders.get(partitionKey); + if (forwarder == null) { + forwarder = createForwarder(partitionKey); + forwarders.put(partitionKey, forwarder); + forwarder.start(); + } + } + } + return forwarder; + } + + private ThresholdBundlingForwarder> + createForwarder(String partitionKey) { + ThresholdBundler> bundler = + new ThresholdBundler<>(bundlingSettings.getDelayThreshold(), + bundlingSettings.getThresholds()); + BundleExecutor processor = + new BundleExecutor<>(bundlingDescriptor, partitionKey); + return new ThresholdBundlingForwarder<>(bundler, processor); + } + + @Override + public void close() { + synchronized(lock) { + for (ThresholdBundlingForwarder> forwarder : + forwarders.values()) { + forwarder.close(); + } + forwarders.clear(); + } + } +} diff --git a/src/main/java/io/gapi/gax/grpc/BundlingCallable.java b/src/main/java/io/gapi/gax/grpc/BundlingCallable.java new file mode 100644 index 000000000000..cf8ffc6471b3 --- /dev/null +++ b/src/main/java/io/gapi/gax/grpc/BundlingCallable.java @@ -0,0 +1,73 @@ +/* + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.gapi.gax.grpc; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; + +import io.gapi.gax.bundling.ThresholdBundlingForwarder; + +/** + * FutureCallable which will bundle requests based on the given bundling + * descriptor and bundler factory. The bundler factory provides a + * distinct bundler for each partition as specified by the + * bundling descriptor. An example of a bundling partition would be a + * pubsub topic. + */ +class BundlingCallable implements FutureCallable { + private FutureCallable callable; + private BundlingDescriptor bundlingDescriptor; + private BundlerFactory bundlerFactory; + + public BundlingCallable(FutureCallable callable, + BundlingDescriptor bundlingDescriptor, + BundlerFactory bundlerFactory) { + this.callable = Preconditions.checkNotNull(callable); + this.bundlingDescriptor = Preconditions.checkNotNull(bundlingDescriptor); + this.bundlerFactory = Preconditions.checkNotNull(bundlerFactory); + } + + @Override + public ListenableFuture futureCall(CallContext context) { + SettableFuture result = SettableFuture.create(); + ApiCallable apiCallable = new ApiCallable<>(callable); + BundlingContext bundlableMessage = + new BundlingContext(context, apiCallable, result); + String partitionKey = bundlingDescriptor.getBundlePartitionKey(context.getRequest()); + ThresholdBundlingForwarder> forwarder = + bundlerFactory.getForwarder(partitionKey); + forwarder.addToNextBundle(bundlableMessage); + return result; + } + +} diff --git a/src/main/java/io/gapi/gax/grpc/BundlingContext.java b/src/main/java/io/gapi/gax/grpc/BundlingContext.java new file mode 100644 index 000000000000..3ae591118ddc --- /dev/null +++ b/src/main/java/io/gapi/gax/grpc/BundlingContext.java @@ -0,0 +1,102 @@ +/* + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.gapi.gax.grpc; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.SettableFuture; + +/** + * Holds the complete context to issue a call and notify the call's + * listener. This includes a CallContext object, which contains the call + * objects, the channel, and the request; a Callable object to issue + * the request; and a SettableFuture object, to notify the response + * listener. + */ +public class BundlingContext + implements RequestIssuer { + private final CallContext context; + private final ApiCallable callable; + private final SettableFuture settableFuture; + private ResponseT responseToSend; + private Throwable throwableToSend; + + public BundlingContext(CallContext context, + ApiCallable callable, + SettableFuture settableFuture) { + this.context = context; + this.callable = callable; + this.settableFuture = settableFuture; + this.responseToSend = null; + this.throwableToSend = null; + } + + public CallContext getCallContext() { + return context; + } + + public ApiCallable getCallable() { + return callable; + } + + @Override + public RequestT getRequest() { + return context.getRequest(); + } + + @Override + public void setResponse(ResponseT response) { + Preconditions.checkState(throwableToSend == null, + "Cannot set both exception and response"); + responseToSend = response; + } + + @Override + public void setException(Throwable throwable) { + Preconditions.checkState(throwableToSend == null, + "Cannot set both exception and response"); + throwableToSend = throwable; + } + + /** + * Sends back the result that was stored by either setResponse or setException + */ + public void sendResult() { + if (responseToSend != null) { + settableFuture.set(responseToSend); + } else if (throwableToSend != null) { + settableFuture.setException(throwableToSend); + } else { + throw new IllegalStateException( + "Neither response nor exception were set in BundlingContext"); + } + } +} diff --git a/src/main/java/io/gapi/gax/grpc/BundlingDescriptor.java b/src/main/java/io/gapi/gax/grpc/BundlingDescriptor.java new file mode 100644 index 000000000000..941399024f75 --- /dev/null +++ b/src/main/java/io/gapi/gax/grpc/BundlingDescriptor.java @@ -0,0 +1,66 @@ +/* + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.gapi.gax.grpc; + +import java.util.Collection; + +/** + * Interface which represents an object that transforms request/response + * data for the purposes of bundling. + */ +public interface BundlingDescriptor { + + /** + * Returns the value of the partition key for the given request. + */ + String getBundlePartitionKey(RequestT request); + + /** + * Merges the requests from the given collection into a single request + * (which serves as the bundle). + */ + RequestT mergeRequests(Collection requests); + + /** + * Splits the result from a bundled call into an individual setResponse + * call on each RequestIssuer. + */ + void splitResponse(ResponseT bundleResponse, + Collection> bundle); + + /** + * Splits the exception that resulted from a bundled call into + * an individual setException call on each RequestIssuer. + */ + void splitException(Throwable throwable, + Collection> bundle); +} diff --git a/src/main/java/io/gapi/gax/grpc/BundlingSettings.java b/src/main/java/io/gapi/gax/grpc/BundlingSettings.java new file mode 100644 index 000000000000..08484aa5df6c --- /dev/null +++ b/src/main/java/io/gapi/gax/grpc/BundlingSettings.java @@ -0,0 +1,57 @@ +/* + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.gapi.gax.grpc; + +import com.google.common.collect.ImmutableList; + +import io.gapi.gax.bundling.BundlingThreshold; + +import org.joda.time.Duration; + +/** + * Interface which represents the bundling settings to use for a + * ThresholdBundler. + */ +public interface BundlingSettings { + + /** + * Get the delay threshold to use for the ThresholdBundler. + */ + Duration getDelayThreshold(); + + /** + * Get the bundling thresholds to use for the ThresholdBundler. + * The thresholds returned should always be newly-instantiated + * thresholds, since thresholds are stateful. + */ + ImmutableList>> getThresholds(); +} diff --git a/src/main/java/io/gapi/gax/grpc/ChannelBindingCallable.java b/src/main/java/io/gapi/gax/grpc/ChannelBindingCallable.java index 98a44a0eecf5..cf792e1ea54e 100644 --- a/src/main/java/io/gapi/gax/grpc/ChannelBindingCallable.java +++ b/src/main/java/io/gapi/gax/grpc/ChannelBindingCallable.java @@ -64,6 +64,6 @@ public ListenableFuture futureCall(CallContext context) { @Override public String toString() { - return String.format("bind(%s)", callable); + return String.format("bind-channel(%s)", callable); } } diff --git a/src/main/java/io/gapi/gax/grpc/RequestIssuer.java b/src/main/java/io/gapi/gax/grpc/RequestIssuer.java new file mode 100644 index 000000000000..6d203f55d56b --- /dev/null +++ b/src/main/java/io/gapi/gax/grpc/RequestIssuer.java @@ -0,0 +1,55 @@ +/* + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.gapi.gax.grpc; + +/** + * Interface that encapsulates a request/response interaction. + */ +public interface RequestIssuer { + + /** + * Get the request from the issuer. + */ + RequestT getRequest(); + + /** + * Set the response that resulted from executing the request. + * Only one of response or exception should be set. + */ + void setResponse(ResponseT response); + + /** + * Set the exception that resulted from executing the request. + * Only one of response or exception should be set. + */ + void setException(Throwable throwable); +} diff --git a/src/test/java/io/gapi/gax/bundling/ThresholdBundlerTest.java b/src/test/java/io/gapi/gax/bundling/ThresholdBundlerTest.java new file mode 100644 index 000000000000..a7458fe05b29 --- /dev/null +++ b/src/test/java/io/gapi/gax/bundling/ThresholdBundlerTest.java @@ -0,0 +1,175 @@ +/* + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.gapi.gax.bundling; + +import com.google.common.truth.Truth; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.joda.time.Duration; +import org.junit.Test; + +public class ThresholdBundlerTest { + + @Test + public void testEmptyAddAndDrain() { + ThresholdBundler bundler = + new ThresholdBundler(BundlingThresholds.of(5)); + List resultBundle = new ArrayList<>(); + Truth.assertThat(bundler.size()).isEqualTo(0); + Truth.assertThat(bundler.toArray()).isEqualTo(new Integer[]{}); + + int drained = bundler.drainTo(resultBundle); + Truth.assertThat(drained).isEqualTo(0); + Truth.assertThat(resultBundle).isEqualTo(new ArrayList<>()); + } + + @Test + public void testAddAndDrain() { + ThresholdBundler bundler = + new ThresholdBundler(BundlingThresholds.of(5)); + bundler.add(14); + Truth.assertThat(bundler.size()).isEqualTo(1); + Truth.assertThat(bundler.toArray()).isEqualTo(new Integer[]{14}); + + List resultBundle = new ArrayList<>(); + int drained = bundler.drainTo(resultBundle); + Truth.assertThat(drained).isEqualTo(1); + Truth.assertThat(resultBundle).isEqualTo(Arrays.asList(14)); + Truth.assertThat(bundler.size()).isEqualTo(0); + Truth.assertThat(bundler.toArray()).isEqualTo(new Integer[]{}); + + List resultBundle2 = new ArrayList<>(); + int drained2 = bundler.drainTo(resultBundle2); + Truth.assertThat(drained2).isEqualTo(0); + Truth.assertThat(resultBundle2).isEqualTo(new ArrayList<>()); + } + + @Test + public void testBundling() throws Exception { + ThresholdBundler bundler = + new ThresholdBundler(BundlingThresholds.of(2)); + AccumulatingBundleReceiver receiver = + new AccumulatingBundleReceiver(); + ThresholdBundlingForwarder forwarder = + new ThresholdBundlingForwarder(bundler, receiver); + + try { + forwarder.start(); + bundler.add(3); + bundler.add(5); + // Give time for the forwarder thread to catch the bundle + Thread.sleep(100); + + bundler.add(7); + bundler.add(9); + // Give time for the forwarder thread to catch the bundle + Thread.sleep(100); + + bundler.add(11); + + } finally { + forwarder.close(); + } + + List> expected = + Arrays.asList( + Arrays.asList(3, 5), + Arrays.asList(7, 9), + Arrays.asList(11)); + Truth.assertThat(receiver.getBundles()).isEqualTo(expected); + } + + @Test + public void testBundlingWithDelay() throws Exception { + ThresholdBundler bundler = + new ThresholdBundler(Duration.millis(100)); + AccumulatingBundleReceiver receiver = + new AccumulatingBundleReceiver(); + ThresholdBundlingForwarder forwarder = + new ThresholdBundlingForwarder(bundler, receiver); + + try { + forwarder.start(); + bundler.add(3); + bundler.add(5); + // Give time for the forwarder thread to catch the bundle + Thread.sleep(500); + + bundler.add(11); + + } finally { + forwarder.close(); + } + + List> expected = + Arrays.asList( + Arrays.asList(3, 5), + Arrays.asList(11)); + Truth.assertThat(receiver.getBundles()).isEqualTo(expected); + } + + @Test + public void testFlush() throws Exception { + ThresholdBundler bundler = + new ThresholdBundler(BundlingThresholds.of(2)); + AccumulatingBundleReceiver receiver = + new AccumulatingBundleReceiver(); + ThresholdBundlingForwarder forwarder = + new ThresholdBundlingForwarder(bundler, receiver); + + try { + forwarder.start(); + bundler.add(3); + // flush before the threshold is met + bundler.flush(); + // Give time for the forwarder thread to catch the bundle + Thread.sleep(100); + + bundler.add(7); + bundler.add(9); + // Give time for the forwarder thread to catch the bundle + Thread.sleep(100); + + } finally { + forwarder.close(); + } + + List> expected = + Arrays.asList( + Arrays.asList(3), + Arrays.asList(7, 9)); + Truth.assertThat(receiver.getBundles()).isEqualTo(expected); + } +} diff --git a/src/test/java/io/gapi/gax/grpc/ApiCallableTest.java b/src/test/java/io/gapi/gax/grpc/ApiCallableTest.java index a9c31f2fb6c6..716fcedaed88 100644 --- a/src/test/java/io/gapi/gax/grpc/ApiCallableTest.java +++ b/src/test/java/io/gapi/gax/grpc/ApiCallableTest.java @@ -31,23 +31,32 @@ package io.gapi.gax.grpc; -import io.grpc.Channel; -import io.grpc.Status; - +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.truth.Truth; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.UncheckedExecutionException; +import io.gapi.gax.bundling.BundlingThreshold; +import io.gapi.gax.bundling.BundlingThresholds; +import io.grpc.Channel; +import io.grpc.Status; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.ExecutionException; +import org.joda.time.Duration; +import org.junit.Assert; import org.junit.Rule; +import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.junit.Test; import org.mockito.Mockito; /** * Tests for {@link ApiCallable}. @@ -174,4 +183,148 @@ public void pageStreaming() { .containsExactly(0, 1, 2, 3, 4) .inOrder(); } + + // Bundling + // ======== + private static class LabeledIntList { + public String label; + public List ints; + public LabeledIntList(String label, Integer... numbers) { + this(label, Arrays.asList(numbers)); + } + public LabeledIntList(String label, List ints) { + this.label = label; + this.ints = ints; + } + } + + private static FutureCallable> callLabeledIntSquarer = + new FutureCallable>() { + @Override + public ListenableFuture> futureCall(CallContext context) { + List result = new ArrayList<>(); + for (Integer i : context.getRequest().ints) { + result.add(i*i); + } + return Futures.immediateFuture(result); + } + }; + + private static BundlingDescriptor> SQUARER_BUNDLING_DESC = + new BundlingDescriptor>() { + + @Override + public String getBundlePartitionKey(LabeledIntList request) { + return request.label; + } + + @Override + public LabeledIntList mergeRequests(Collection requests) { + LabeledIntList firstRequest = requests.iterator().next(); + + List messages = new ArrayList<>(); + for (LabeledIntList request : requests) { + messages.addAll(request.ints); + } + + LabeledIntList bundleRequest = new LabeledIntList(firstRequest.label, messages); + return bundleRequest; + } + + @Override + public void splitResponse(List bundleResponse, + Collection>> bundle) { + int bundleMessageIndex = 0; + for (RequestIssuer> responder : bundle) { + List messageIds = new ArrayList<>(); + int messageCount = responder.getRequest().ints.size(); + for (int i = 0; i < messageCount; i++) { + messageIds.add(bundleResponse.get(bundleMessageIndex)); + bundleMessageIndex += 1; + } + responder.setResponse(messageIds); + } + } + + @Override + public void splitException(Throwable throwable, + Collection>> bundle) { + for (RequestIssuer> responder : bundle) { + responder.setException(throwable); + } + } + }; + + private BundlingSettings createBundlingSettings(int messageCountThreshold) { + return new BundlingSettings() { + @Override + public Duration getDelayThreshold() { + return Duration.standardSeconds(1); + } + @Override + public ImmutableList>> getThresholds() { + return BundlingThresholds.of(messageCountThreshold); + } + }; + } + + @Test + public void bundling() throws Exception { + BundlingSettings> bundlingSettings = + createBundlingSettings(2); + BundlerFactory> bundlerFactory = + new BundlerFactory<>(SQUARER_BUNDLING_DESC, bundlingSettings); + try { + ApiCallable> callable = + new ApiCallable<>(callLabeledIntSquarer) + .bundling(SQUARER_BUNDLING_DESC, bundlerFactory); + ListenableFuture> f1 = + callable.futureCall(new LabeledIntList("one", 1, 2)); + ListenableFuture> f2 = + callable.futureCall(new LabeledIntList("one", 3, 4)); + Truth.assertThat(f1.get()).isEqualTo(Arrays.asList(1, 4)); + Truth.assertThat(f2.get()).isEqualTo(Arrays.asList(9, 16)); + } finally { + bundlerFactory.close(); + } + } + + private static FutureCallable> callLabeledIntExceptionThrower = + new FutureCallable>() { + @Override + public ListenableFuture> futureCall(CallContext context) { + return Futures.immediateFailedFuture(new IllegalArgumentException("I FAIL!!")); + } + }; + + @Test + public void bundlingException() throws Exception { + BundlingSettings> bundlingSettings = + createBundlingSettings(2); + BundlerFactory> bundlerFactory = + new BundlerFactory<>(SQUARER_BUNDLING_DESC, bundlingSettings); + try { + ApiCallable> callable = + new ApiCallable<>(callLabeledIntExceptionThrower) + .bundling(SQUARER_BUNDLING_DESC, bundlerFactory); + ListenableFuture> f1 = + callable.futureCall(new LabeledIntList("one", 1, 2)); + ListenableFuture> f2 = + callable.futureCall(new LabeledIntList("one", 3, 4)); + try { + f1.get(); + Assert.fail("Expected exception from bundling call"); + } catch (ExecutionException e) { + // expected + } + try { + f2.get(); + Assert.fail("Expected exception from bundling call"); + } catch (ExecutionException e) { + // expected + } + } finally { + bundlerFactory.close(); + } + } } diff --git a/src/test/java/io/gapi/gax/grpc/BundleExecutorTest.java b/src/test/java/io/gapi/gax/grpc/BundleExecutorTest.java new file mode 100644 index 000000000000..4665eb6463d0 --- /dev/null +++ b/src/test/java/io/gapi/gax/grpc/BundleExecutorTest.java @@ -0,0 +1,83 @@ +/* + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.gapi.gax.grpc; + +import java.util.Collection; + +import org.junit.Test; + +public class BundleExecutorTest { + + BundlingDescriptor integerDescriptor = + new BundlingDescriptor() { + + @Override + public String getBundlePartitionKey(Integer request) { + return new Integer(request % 2).toString(); + } + + @Override + public Integer mergeRequests(Collection requests) { + return null; + } + + @Override + public void splitResponse(Integer bundleResponse, + Collection> bundle) { + } + + @Override + public void splitException(Throwable throwable, + Collection> bundle) { + } + }; + + @Test + public void testValidate() { + BundleExecutor executor = + new BundleExecutor(integerDescriptor, "0"); + CallContext callContextOk = CallContext.of(2); + BundlingContext bundlingContextOk = + new BundlingContext(callContextOk, null, null); + executor.validateItem(bundlingContextOk); + } + + @Test(expected=IllegalArgumentException.class) + public void testValidateFailure() { + BundleExecutor executor = + new BundleExecutor(integerDescriptor, "0"); + CallContext callContextOk = CallContext.of(3); + BundlingContext bundlingContextOk = + new BundlingContext(callContextOk, null, null); + executor.validateItem(bundlingContextOk); + } +}