Skip to content

Commit

Permalink
Upgrade guava 16.0.1 -> 20.0
Browse files Browse the repository at this point in the history
  • Loading branch information
mspangdal committed Nov 22, 2016
1 parent 32fd9d8 commit d5dbcf9
Show file tree
Hide file tree
Showing 20 changed files with 60 additions and 117 deletions.
15 changes: 6 additions & 9 deletions driver-core/src/main/java/com/datastax/driver/core/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@
import com.datastax.driver.core.policies.*;
import com.datastax.driver.core.utils.MoreFutures;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Functions;
import com.google.common.base.Predicates;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.base.*;
import com.google.common.collect.*;
import com.google.common.util.concurrent.*;
import org.slf4j.Logger;
Expand Down Expand Up @@ -338,15 +335,15 @@ public ListenableFuture<Session> connectAsync(final String keyspace) {
return sessionInitialized;
} else {
final String useQuery = "USE " + keyspace;
ListenableFuture<ResultSet> keyspaceSet = Futures.transform(sessionInitialized, new AsyncFunction<Session, ResultSet>() {
ListenableFuture<ResultSet> keyspaceSet = Futures.transformAsync(sessionInitialized, new AsyncFunction<Session, ResultSet>() {
@Override
public ListenableFuture<ResultSet> apply(Session session) throws Exception {
public ListenableFuture<ResultSet> apply(Session session) {
return session.executeAsync(useQuery);
}
});
ListenableFuture<ResultSet> withErrorHandling = Futures.withFallback(keyspaceSet, new FutureFallback<ResultSet>() {
ListenableFuture<ResultSet> withErrorHandling = Futures.catchingAsync(keyspaceSet, Throwable.class, new AsyncFunction<Throwable, ResultSet>() {
@Override
public ListenableFuture<ResultSet> create(Throwable t) throws Exception {
public ListenableFuture<ResultSet> apply(Throwable t) throws Exception {
session.closeAsync();
if (t instanceof SyntaxError) {
// Give a more explicit message, because it's probably caused by a bad keyspace name
Expand Down Expand Up @@ -2311,7 +2308,7 @@ public void run() {
rs.getExecutionInfo().setSchemaInAgreement(finalSchemaInAgreement);
future.setResult(rs);
}
}, MoreExecutors.sameThreadExecutor());
}, MoreExecutors.newDirectExecutorService());

} catch (Exception e) {
logger.warn("Error while waiting for schema agreement", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,13 @@ public void operationComplete(ChannelFuture future) throws Exception {

Executor initExecutor = factory.manager.configuration.getPoolingOptions().getInitializationExecutor();

ListenableFuture<Void> initializeTransportFuture = Futures.transform(channelReadyFuture,
ListenableFuture<Void> initializeTransportFuture = Futures.transformAsync(channelReadyFuture,
onChannelReady(protocolVersion, initExecutor), initExecutor);

// Fallback on initializeTransportFuture so we can properly propagate specific exceptions.
ListenableFuture<Void> initFuture = Futures.withFallback(initializeTransportFuture, new FutureFallback<Void>() {
ListenableFuture<Void> initFuture = Futures.catchingAsync(initializeTransportFuture, Throwable.class, new AsyncFunction<Throwable, Void>() {
@Override
public ListenableFuture<Void> create(Throwable t) throws Exception {
public ListenableFuture<Void> apply(Throwable t) throws Exception {
SettableFuture<Void> future = SettableFuture.create();
// Make sure the connection gets properly closed.
if (t instanceof ClusterNameMismatchException || t instanceof UnsupportedProtocolVersionException) {
Expand Down Expand Up @@ -225,7 +225,7 @@ private AsyncFunction<Void, Void> onChannelReady(final ProtocolVersion protocolV
public ListenableFuture<Void> apply(Void input) throws Exception {
ProtocolOptions.Compression compression = factory.configuration.getProtocolOptions().getCompression();
Future startupResponseFuture = write(new Requests.Startup(compression));
return Futures.transform(startupResponseFuture,
return Futures.transformAsync(startupResponseFuture,
onStartupResponse(protocolVersion, initExecutor), initExecutor);
}
};
Expand Down Expand Up @@ -285,7 +285,7 @@ private ListenableFuture<Void> checkClusterName(ProtocolVersion protocolVersion,
DefaultResultSetFuture clusterNameFuture = new DefaultResultSetFuture(null, protocolVersion, new Requests.Query("select cluster_name from system.local"));
try {
write(clusterNameFuture);
return Futures.transform(clusterNameFuture,
return Futures.transformAsync(clusterNameFuture,
new AsyncFunction<ResultSet, Void>() {
@Override
public ListenableFuture<Void> apply(ResultSet rs) throws Exception {
Expand All @@ -311,7 +311,7 @@ private ListenableFuture<Void> authenticateV1(Authenticator authenticator, final
Requests.Credentials creds = new Requests.Credentials(((ProtocolV1Authenticator) authenticator).getCredentials());
try {
Future authResponseFuture = write(creds);
return Futures.transform(authResponseFuture,
return Futures.transformAsync(authResponseFuture,
new AsyncFunction<Message.Response, Void>() {
@Override
public ListenableFuture<Void> apply(Message.Response authResponse) throws Exception {
Expand All @@ -337,7 +337,7 @@ private ListenableFuture<Void> authenticateV2(final Authenticator authenticator,

try {
Future authResponseFuture = write(new Requests.AuthResponse(initialResponse));
return Futures.transform(authResponseFuture, onV2AuthResponse(authenticator, protocolVersion, executor), executor);
return Futures.transformAsync(authResponseFuture, onV2AuthResponse(authenticator, protocolVersion, executor), executor);
} catch (Exception e) {
return Futures.immediateFailedFuture(e);
}
Expand All @@ -363,7 +363,7 @@ public ListenableFuture<Void> apply(Message.Response authResponse) throws Except
// Otherwise, send the challenge response back to the server
logger.trace("{} Sending Auth response to challenge", this);
Future nextResponseFuture = write(new Requests.AuthResponse(responseToServer));
return Futures.transform(nextResponseFuture, onV2AuthResponse(authenticator, protocolVersion, executor), executor);
return Futures.transformAsync(nextResponseFuture, onV2AuthResponse(authenticator, protocolVersion, executor), executor);
}
case ERROR:
// This is not very nice, but we're trying to identify if we
Expand Down Expand Up @@ -472,7 +472,7 @@ ListenableFuture<Void> setKeyspaceAsync(final String keyspace) throws Connection
logger.trace("{} Setting keyspace {}", this, keyspace);
// Note: we quote the keyspace below, because the name is the one coming from Cassandra, so it's in the right case already
Future future = write(new Requests.Query("USE \"" + keyspace + '"'));
return Futures.transform(future, new AsyncFunction<Message.Response, Void>() {
return Futures.transformAsync(future, new AsyncFunction<Message.Response, Void>() {
@Override
public ListenableFuture<Void> apply(Message.Response response) throws Exception {
if (response instanceof SetKeyspace) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,9 @@ public void onFailure(Throwable t) {
}

private ListenableFuture<Void> handleErrors(ListenableFuture<Void> connectionInitFuture, Executor executor) {
return Futures.withFallback(connectionInitFuture, new FutureFallback<Void>() {
return Futures.catchingAsync(connectionInitFuture, Throwable.class, new AsyncFunction<Throwable, Void>() {
@Override
public ListenableFuture<Void> create(Throwable t) throws Exception {
public ListenableFuture<Void> apply(Throwable t) throws Exception {
// Propagate these exceptions because they mean no connection will ever succeed. They will be handled
// accordingly in SessionManager#maybeAddPool.
Throwables.propagateIfInstanceOf(t, ClusterNameMismatchException.class);
Expand Down Expand Up @@ -644,7 +644,7 @@ public void run() {
if (connection.state.compareAndSet(OPEN, GONE))
open.decrementAndGet();
}
}, MoreExecutors.sameThreadExecutor());
}, MoreExecutors.newDirectExecutorService());
futures.add(future);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public class PoolingOptions {
*/
public static final int DEFAULT_HEARTBEAT_INTERVAL_SECONDS = 30;

private static final Executor DEFAULT_INITIALIZATION_EXECUTOR = MoreExecutors.sameThreadExecutor();
private static final Executor DEFAULT_INITIALIZATION_EXECUTOR = MoreExecutors.directExecutor();

private volatile Cluster.Manager manager;
private volatile ProtocolVersion protocolVersion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public ListenableFuture<Session> initAsync() {

Collection<Host> hosts = cluster.getMetadata().allHosts();
ListenableFuture<?> allPoolsCreatedFuture = createPools(hosts);
ListenableFuture<?> allPoolsUpdatedFuture = Futures.transform(allPoolsCreatedFuture,
ListenableFuture<?> allPoolsUpdatedFuture = Futures.transformAsync(allPoolsCreatedFuture,
new AsyncFunction<Object, Object>() {
@Override
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -197,7 +197,7 @@ public Session.State getState() {
}

private ListenableFuture<PreparedStatement> toPreparedStatement(final String query, final Connection.Future future) {
return Futures.transform(future, new AsyncFunction<Response, PreparedStatement>() {
return Futures.transformAsync(future, new AsyncFunction<Response, PreparedStatement>() {
@Override
public ListenableFuture<PreparedStatement> apply(Response response) {
switch (response.type) {
Expand Down Expand Up @@ -437,7 +437,7 @@ ListenableFuture<?> updateCreatedPools() {
// Wait pool creation before removing, so we don't lose connectivity
ListenableFuture<?> allPoolsCreatedFuture = Futures.successfulAsList(poolCreatedFutures);

return Futures.transform(allPoolsCreatedFuture, new AsyncFunction<Object, List<Void>>() {
return Futures.transformAsync(allPoolsCreatedFuture, new AsyncFunction<Object, List<Void>>() {
@Override
public ListenableFuture<List<Void>> apply(Object input) throws Exception {
List<ListenableFuture<Void>> poolRemovedFuture = Lists.newArrayListWithCapacity(toRemove.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.datastax.driver.core.utils.Bytes;
import com.google.common.reflect.TypeToken;

import java.lang.reflect.Type;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.InetAddress;
Expand Down Expand Up @@ -608,7 +609,7 @@ public boolean accepts(DataType cqlType) {
* Implementation notes:
* <ol>
* <li>The default implementation is <em>covariant</em> with respect to the passed
* argument (through the usage of {@link TypeToken#isAssignableFrom(TypeToken)}
* argument (through the usage of {@link TypeToken#isSupertypeOf(Type)}
* and <em>it's strongly recommended not to modify this behavior</em>.
* This means that, by default, a codec will accept
* <em>any subtype</em> of the Java type that it has been created for.</li>
Expand All @@ -628,7 +629,7 @@ public boolean accepts(DataType cqlType) {
*/
public boolean accepts(Object value) {
checkNotNull(value, "Parameter value cannot be null");
return this.javaType.isAssignableFrom(TypeToken.of(value.getClass()));
return this.javaType.isSupertypeOf(TypeToken.of(value.getClass()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.datastax.driver.core.AtomicMonotonicTimestampGenerator;
import com.datastax.driver.core.TimestampGenerator;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;

/**
Expand Down Expand Up @@ -281,11 +282,11 @@ public Builder withSpeculativeExecutionPolicy(SpeculativeExecutionPolicy specula
public Policies build() {
return new Policies(
loadBalancingPolicy == null ? Policies.defaultLoadBalancingPolicy() : loadBalancingPolicy,
Objects.firstNonNull(reconnectionPolicy, Policies.defaultReconnectionPolicy()),
Objects.firstNonNull(retryPolicy, Policies.defaultRetryPolicy()),
Objects.firstNonNull(addressTranslator, Policies.defaultAddressTranslator()),
Objects.firstNonNull(timestampGenerator, Policies.defaultTimestampGenerator()),
Objects.firstNonNull(speculativeExecutionPolicy, Policies.defaultSpeculativeExecutionPolicy()));
MoreObjects.firstNonNull(reconnectionPolicy, Policies.defaultReconnectionPolicy()),
MoreObjects.firstNonNull(retryPolicy, Policies.defaultRetryPolicy()),
MoreObjects.firstNonNull(addressTranslator, Policies.defaultAddressTranslator()),
MoreObjects.firstNonNull(timestampGenerator, Policies.defaultTimestampGenerator()),
MoreObjects.firstNonNull(speculativeExecutionPolicy, Policies.defaultSpeculativeExecutionPolicy()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void should_init_cluster_and_session_if_needed() throws Exception {
@Test(groups = "short", dataProvider = "keyspace", enabled = false,
description = "disabled because the blocking USE call in the current pool implementation makes it deadlock")
public void should_chain_query_on_async_session_init_with_same_executor(String keyspace) throws Exception {
ListenableFuture<Integer> resultFuture = connectAndQuery(keyspace, MoreExecutors.sameThreadExecutor());
ListenableFuture<Integer> resultFuture = connectAndQuery(keyspace, MoreExecutors.directExecutor());

Integer result = Uninterruptibles.getUninterruptibly(resultFuture);
assertThat(result).isEqualTo(1);
Expand All @@ -114,7 +114,7 @@ public void should_chain_query_on_async_session_init_with_different_executor(Str

@Test(groups = "short")
public void should_propagate_error_to_chained_query_if_session_init_fails() throws Exception {
ListenableFuture<Integer> resultFuture = connectAndQuery("wrong_keyspace", MoreExecutors.sameThreadExecutor());
ListenableFuture<Integer> resultFuture = connectAndQuery("wrong_keyspace", MoreExecutors.directExecutor());

try {
Uninterruptibles.getUninterruptibly(resultFuture);
Expand Down Expand Up @@ -159,7 +159,7 @@ public Thread apply(ResultSet input) {

private ListenableFuture<Integer> connectAndQuery(String keyspace, Executor executor) {
ListenableFuture<Session> sessionFuture = cluster().connectAsync(keyspace);
ListenableFuture<ResultSet> queryFuture = Futures.transform(sessionFuture, new AsyncFunction<Session, ResultSet>() {
ListenableFuture<ResultSet> queryFuture = Futures.transformAsync(sessionFuture, new AsyncFunction<Session, ResultSet>() {
@Override
public ListenableFuture<ResultSet> apply(Session session) throws Exception {
return session.executeAsync("select v from foo where k = 1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private void should_iterate_result_set_asynchronously(int totalCount, int fetchS
Statement statement = new SimpleStatement("select * from ints").setFetchSize(fetchSize);
ResultsAccumulator results = new ResultsAccumulator();

ListenableFuture<ResultSet> future = Futures.transform(
ListenableFuture<ResultSet> future = Futures.transformAsync(
session().executeAsync(statement),
results);

Expand All @@ -85,7 +85,7 @@ public ListenableFuture<ResultSet> apply(ResultSet rs) throws Exception {
if (wasLastPage)
return Futures.immediateFuture(rs);
else
return Futures.transform(rs.fetchMoreResults(), this);
return Futures.transformAsync(rs.fetchMoreResults(), this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void should_release_connection_before_completing_future() throws Exceptio
mockFutures.add(session.executeAsync("mock query"));


ListenableFuture<ResultSet> future = Futures.transform(session.executeAsync("select c from test1 where k=1"),
ListenableFuture<ResultSet> future = Futures.transformAsync(session.executeAsync("select c from test1 where k=1"),
new AsyncFunction<ResultSet, ResultSet>() {
@Override
public ListenableFuture<ResultSet> apply(ResultSet result) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.datastax.driver.core.policies.DelegatingLoadBalancingPolicy;
import com.datastax.driver.core.policies.LoadBalancingPolicy;
import com.datastax.driver.core.policies.RoundRobinPolicy;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
Expand Down Expand Up @@ -153,7 +154,7 @@ public int hashCode() {

@Override
public String toString() {
return Objects.toStringHelper(this).add("action", action).add("host", host).toString();
return MoreObjects.toStringHelper(this).add("action", action).add("host", host).toString();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
public class LimitingLoadBalancingPolicy extends DelegatingLoadBalancingPolicy {
private final int maxHosts;
private final int threshold;
private final Set<Host> liveHosts = Sets.newSetFromMap(new ConcurrentHashMap<Host, Boolean>());
private final Set<Host> chosenHosts = Sets.newSetFromMap(new ConcurrentHashMap<Host, Boolean>());
private final Set<Host> liveHosts = Collections.newSetFromMap(new ConcurrentHashMap<Host, Boolean>());
private final Set<Host> chosenHosts = Collections.newSetFromMap(new ConcurrentHashMap<Host, Boolean>());
private final Lock updateLock = new ReentrantLock();

private volatile Cluster cluster;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ public ListenableFuture<Void> saveAsync(T entity, Option... options) {
}

private ListenableFuture<Void> submitVoidQueryAsync(ListenableFuture<BoundStatement> bsFuture) {
ListenableFuture<ResultSet> rsFuture = Futures.transform(bsFuture, new AsyncFunction<BoundStatement, ResultSet>() {
ListenableFuture<ResultSet> rsFuture = Futures.transformAsync(bsFuture, new AsyncFunction<BoundStatement, ResultSet>() {
@Override
public ListenableFuture<ResultSet> apply(BoundStatement bs) throws Exception {
return session().executeAsync(bs);
Expand Down Expand Up @@ -446,7 +446,7 @@ public T get(Object... objects) {
*/
public ListenableFuture<T> getAsync(final Object... objects) {
ListenableFuture<BoundStatement> bsFuture = getQueryAsync(objects);
ListenableFuture<ResultSet> rsFuture = Futures.transform(bsFuture, new AsyncFunction<BoundStatement, ResultSet>() {
ListenableFuture<ResultSet> rsFuture = Futures.transformAsync(bsFuture, new AsyncFunction<BoundStatement, ResultSet>() {
@Override
public ListenableFuture<ResultSet> apply(BoundStatement bs) throws Exception {
return session().executeAsync(bs);
Expand Down
Loading

0 comments on commit d5dbcf9

Please sign in to comment.