Skip to content

Commit

Permalink
Merge pull request googleapis#6 from garrettjonesgoogle/master
Browse files Browse the repository at this point in the history
Threshold bundling logic + new BundlingCallable
  • Loading branch information
pongad committed Feb 9, 2016
2 parents cfe6b29 + f8c3d5a commit 765879e
Show file tree
Hide file tree
Showing 21 changed files with 1,658 additions and 10 deletions.
5 changes: 4 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ apply plugin: 'java'
apply plugin: 'maven'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'jacoco'

group = "com.google.api"
version = "0.0.0-SNAPSHOT"
Expand All @@ -19,6 +20,7 @@ ext {
jsr305: 'com.google.code.findbugs:jsr305:3.0.0',
autovalue: 'com.google.auto.value:auto-value:1.1',
guice: 'com.google.inject:guice:4.0',
joda: 'joda-time:joda-time:2.8.2',

// Testing
junit: 'junit:junit:4.11',
Expand All @@ -37,7 +39,8 @@ dependencies {
libraries.guava,
libraries.guice,
libraries.jsr305,
libraries.autovalue
libraries.autovalue,
libraries.joda

testCompile libraries.junit,
libraries.mockito,
Expand Down
60 changes: 60 additions & 0 deletions src/main/java/io/gapi/gax/bundling/AccumulatingBundleReceiver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package io.gapi.gax.bundling;

import java.util.ArrayList;
import java.util.List;

/**
* A simple ThresholdBundleReceiver that just accumulates bundles.
* Not thread-safe.
*/
public class AccumulatingBundleReceiver<T> implements ThresholdBundleReceiver<T> {
private List<List<T>> bundles = new ArrayList<>();

@Override
public void validateItem(T message) {
// no-op
}

@Override
public void processBundle(List<T> bundle) {
bundles.add(bundle);
}

/**
* Returns the accumulated bundles.
*/
public List<List<T>> getBundles() {
return bundles;
}
}
55 changes: 55 additions & 0 deletions src/main/java/io/gapi/gax/bundling/BundlingThreshold.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package io.gapi.gax.bundling;

/**
* The interface representing a threshold to be used in ThresholdBundler.
* Thresholds do not need to be thread-safe if they are only used inside
* of ThresholdBundler.
*/
public interface BundlingThreshold<E> {

/**
* Presents the element to the threshold for the attribute of interest to be accumulated.
*/
void accumulate(E e);

/**
* @returns whether the threshold has been reached.
*/
boolean isThresholdReached();

/**
* Reset the accumulated value back to zero.
*/
void reset();
}
56 changes: 56 additions & 0 deletions src/main/java/io/gapi/gax/bundling/BundlingThresholds.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package io.gapi.gax.bundling;

import com.google.common.collect.ImmutableList;

/**
* Factory methods for general-purpose bundling thresholds.
*/
public class BundlingThresholds {

/**
* Creates an ImmutableList containing only a single threshold which counts
* the number of elements. This is helpful for when using ThresholdBundler for
* the simple case, when the element count is the only threshold.
*/
public static <E> ImmutableList<BundlingThreshold<E>> of(long elementThreshold) {
BundlingThreshold<E> bundlingThreshold =
new NumericThreshold<E>(elementThreshold, new ElementCounter<E>() {
@Override
public long count(E e) {
return 1;
}
});
return ImmutableList.<BundlingThreshold<E>>of(bundlingThreshold);
}
}
43 changes: 43 additions & 0 deletions src/main/java/io/gapi/gax/bundling/ElementCounter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package io.gapi.gax.bundling;

/**
* Interface representing an object that provides a numerical count given
* an object of the parameterized type.
*/
public interface ElementCounter<E> {
/**
* Provides the numerical count associated with the given object.
*/
public long count(E element);
}
68 changes: 68 additions & 0 deletions src/main/java/io/gapi/gax/bundling/NumericThreshold.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package io.gapi.gax.bundling;

import com.google.common.base.Preconditions;

/**
* A threshold which accumulates a count based on the provided
* ElementCounter.
*/
public class NumericThreshold<E> implements BundlingThreshold<E> {
private final long threshold;
private final ElementCounter<E> extractor;
private long sum;

/**
* Constructs a NumericThreshold.
*/
public NumericThreshold(long threshold, ElementCounter<E> extractor) {
this.threshold = threshold;
this.extractor = Preconditions.checkNotNull(extractor);
this.sum = 0;
}

@Override
public void accumulate(E e) {
sum += extractor.count(e);
}

@Override
public boolean isThresholdReached() {
return sum >= threshold;
}

@Override
public void reset() {
sum = 0;
}
}
53 changes: 53 additions & 0 deletions src/main/java/io/gapi/gax/bundling/ThresholdBundleReceiver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package io.gapi.gax.bundling;

import java.util.List;

/**
* Interface representing an object that receives bundles from a
* ThresholdBundler and takes action on them.
*/
public interface ThresholdBundleReceiver<T> {

/**
* Validate that the item can be received by this ThresholdBundleReceiver.
* This is called to validate an item before it is queued.
*/
void validateItem(T message);

/**
* Process the given bundle.
*/
void processBundle(List<T> bundle);

}
Loading

0 comments on commit 765879e

Please sign in to comment.