Skip to content

Commit

Permalink
Merged in upstream trunk
Browse files Browse the repository at this point in the history
  • Loading branch information
Geoff Anderson committed Jun 16, 2015
2 parents 5397d3c + 20a31a2 commit 4034dd6
Show file tree
Hide file tree
Showing 230 changed files with 5,372 additions and 2,734 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ Please note for this to work you should create/update `~/.gradle/gradle.properti
signing.password=
signing.secretKeyRingFile=

### Publishing the jars without signing to a local repository ###
./gradlew -Dorg.gradle.project.skipSigning=true -Dorg.gradle.project.mavenUrl=file://path/to/repo uploadArchivesAll

### Building the test jar ###
./gradlew testJar

Expand Down
60 changes: 32 additions & 28 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,14 @@ allprojects {
}

ext {
gradleVersion = "2.0"
}
gradleVersion = "2.4"

skipSigning = project.hasProperty('skipSigning') && skipSigning.toBoolean()
shouldSign = !skipSigning && !version.endsWith("SNAPSHOT")

def isVerificationRequired(project) {
project.gradle.startParameter.taskNames.any { it.contains("upload") }
mavenUrl = project.hasProperty('mavenUrl') ? project.mavenUrl : ''
mavenUsername = project.hasProperty('mavenUsername') ? project.mavenUsername : ''
mavenPassword = project.hasProperty('mavenPassword') ? project.mavenPassword : ''
}

apply from: file('wrapper.gradle')
Expand All @@ -47,32 +50,33 @@ subprojects {
apply plugin: 'maven'
apply plugin: 'signing'

sourceCompatibility = 1.6

licenseTest.onlyIf { isVerificationRequired(project) }

uploadArchives {
repositories {
signing {
if (isVerificationRequired(project)) {
sign configurations.archives

// To test locally, replace mavenUrl in ~/.gradle/gradle.properties to file://localhost/tmp/myRepo/
mavenDeployer {
beforeDeployment { MavenDeployment deployment -> signing.signPom(deployment) }
repository(url: "${mavenUrl}") {
authentication(userName: "${mavenUsername}", password: "${mavenPassword}")
}
afterEvaluate {
pom.artifactId = "${archivesBaseName}"
pom.project {
name 'Apache Kafka'
packaging 'jar'
url 'http://kafka.apache.org'
licenses {
license {
name 'The Apache Software License, Version 2.0'
url 'http://www.apache.org/licenses/LICENSE-2.0.txt'
distribution 'repo'
}
required { shouldSign }
sign configurations.archives

// To test locally, replace mavenUrl in ~/.gradle/gradle.properties to file://localhost/tmp/myRepo/
mavenDeployer {
beforeDeployment { MavenDeployment deployment -> signing.signPom(deployment) }
repository(url: "${mavenUrl}") {
authentication(userName: "${mavenUsername}", password: "${mavenPassword}")
}
afterEvaluate {
pom.artifactId = "${archivesBaseName}"
pom.project {
name 'Apache Kafka'
packaging 'jar'
url 'http://kafka.apache.org'
licenses {
license {
name 'The Apache Software License, Version 2.0'
url 'http://www.apache.org/licenses/LICENSE-2.0.txt'
distribution 'repo'
}
}
}
Expand Down Expand Up @@ -205,7 +209,7 @@ project(':core') {
compile project(':clients')
compile "org.scala-lang:scala-library:$scalaVersion"
compile 'org.apache.zookeeper:zookeeper:3.4.6'
compile 'com.101tec:zkclient:0.3'
compile 'com.101tec:zkclient:0.5'
compile 'com.yammer.metrics:metrics-core:2.2.0'
compile 'net.sf.jopt-simple:jopt-simple:3.2'

Expand All @@ -224,7 +228,7 @@ project(':core') {
}
testRuntime "$slf4jlog4j"

zinc 'com.typesafe.zinc:zinc:0.3.1'
zinc 'com.typesafe.zinc:zinc:0.3.7'
}

configurations {
Expand Down Expand Up @@ -350,7 +354,7 @@ project(':clients') {

dependencies {
compile "org.slf4j:slf4j-api:1.7.6"
compile 'org.xerial.snappy:snappy-java:1.1.1.6'
compile 'org.xerial.snappy:snappy-java:1.1.1.7'
compile 'net.jpountz.lz4:lz4:1.2.0'

testCompile 'com.novocode:junit-interface:0.9'
Expand Down
16 changes: 16 additions & 0 deletions clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,21 @@
*/
package org.apache.kafka.clients;

import java.io.Closeable;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.kafka.common.config.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.kafka.common.utils.Utils.getHost;
import static org.apache.kafka.common.utils.Utils.getPort;

public class ClientUtils {
private static final Logger log = LoggerFactory.getLogger(ClientUtils.class);

public static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls) {
List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
Expand All @@ -45,4 +50,15 @@ public static List<InetSocketAddress> parseAndValidateAddresses(List<String> url
throw new ConfigException("No bootstrap urls given in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
return addresses;
}

public static void closeQuietly(Closeable c, String name, AtomicReference<Throwable> firstException) {
if (c != null) {
try {
c.close();
} catch (Throwable t) {
firstException.compareAndSet(null, t);
log.error("Failed to close " + name, t);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,22 @@
*/
final class ClusterConnectionStates {
private final long reconnectBackoffMs;
private final Map<Integer, NodeConnectionState> nodeState;
private final Map<String, NodeConnectionState> nodeState;

public ClusterConnectionStates(long reconnectBackoffMs) {
this.reconnectBackoffMs = reconnectBackoffMs;
this.nodeState = new HashMap<Integer, NodeConnectionState>();
this.nodeState = new HashMap<String, NodeConnectionState>();
}

/**
* Return true iff we can currently initiate a new connection to the given node. This will be the case if we are not
* Return true iff we can currently initiate a new connection. This will be the case if we are not
* connected and haven't been connected for at least the minimum reconnection backoff period.
* @param node The node id to check
* @param id The connection id to check
* @param now The current time in MS
* @return true if we can initiate a new connection
*/
public boolean canConnect(int node, long now) {
NodeConnectionState state = nodeState.get(node);
public boolean canConnect(String id, long now) {
NodeConnectionState state = nodeState.get(id);
if (state == null)
return true;
else
Expand All @@ -45,11 +45,11 @@ public boolean canConnect(int node, long now) {

/**
* Return true if we are disconnected from the given node and can't re-establish a connection yet
* @param node The node to check
* @param id The connection to check
* @param now The current time in ms
*/
public boolean isBlackedOut(int node, long now) {
NodeConnectionState state = nodeState.get(node);
public boolean isBlackedOut(String id, long now) {
NodeConnectionState state = nodeState.get(id);
if (state == null)
return false;
else
Expand All @@ -60,11 +60,11 @@ public boolean isBlackedOut(int node, long now) {
* Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When
* disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled
* connections.
* @param node The node to check
* @param id The connection to check
* @param now The current time in ms
*/
public long connectionDelay(int node, long now) {
NodeConnectionState state = nodeState.get(node);
public long connectionDelay(String id, long now) {
NodeConnectionState state = nodeState.get(id);
if (state == null) return 0;
long timeWaited = now - state.lastConnectAttemptMs;
if (state.state == ConnectionState.DISCONNECTED) {
Expand All @@ -77,67 +77,67 @@ public long connectionDelay(int node, long now) {
}

/**
* Enter the connecting state for the given node.
* @param node The id of the node we are connecting to
* Enter the connecting state for the given connection.
* @param id The id of the connection
* @param now The current time.
*/
public void connecting(int node, long now) {
nodeState.put(node, new NodeConnectionState(ConnectionState.CONNECTING, now));
public void connecting(String id, long now) {
nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now));
}

/**
* Return true iff we have a connection to the give node
* @param node The id of the node to check
* Return true iff a specific connection is connected
* @param id The id of the connection to check
*/
public boolean isConnected(int node) {
NodeConnectionState state = nodeState.get(node);
public boolean isConnected(String id) {
NodeConnectionState state = nodeState.get(id);
return state != null && state.state == ConnectionState.CONNECTED;
}

/**
* Return true iff we are in the process of connecting to the given node
* @param node The id of the node
* Return true iff we are in the process of connecting
* @param id The id of the connection
*/
public boolean isConnecting(int node) {
NodeConnectionState state = nodeState.get(node);
public boolean isConnecting(String id) {
NodeConnectionState state = nodeState.get(id);
return state != null && state.state == ConnectionState.CONNECTING;
}

/**
* Enter the connected state for the given node
* @param node The node we have connected to
* Enter the connected state for the given connection
* @param id The connection identifier
*/
public void connected(int node) {
NodeConnectionState nodeState = nodeState(node);
public void connected(String id) {
NodeConnectionState nodeState = nodeState(id);
nodeState.state = ConnectionState.CONNECTED;
}

/**
* Enter the disconnected state for the given node
* @param node The node we have disconnected from
* @param id The connection we have disconnected
*/
public void disconnected(int node) {
NodeConnectionState nodeState = nodeState(node);
public void disconnected(String id) {
NodeConnectionState nodeState = nodeState(id);
nodeState.state = ConnectionState.DISCONNECTED;
}

/**
* Get the state of our connection to the given node
* @param node The id of the node
* Get the state of a given connection
* @param id The id of the connection
* @return The state of our connection
*/
public ConnectionState connectionState(int node) {
return nodeState(node).state;
public ConnectionState connectionState(String id) {
return nodeState(id).state;
}

/**
* Get the state of a given node
* @param node The node to fetch the state for
* @param id The connection to fetch the state for
*/
private NodeConnectionState nodeState(int node) {
NodeConnectionState state = this.nodeState.get(node);
private NodeConnectionState nodeState(String id) {
NodeConnectionState state = this.nodeState.get(id);
if (state == null)
throw new IllegalStateException("No entry found for node " + node);
throw new IllegalStateException("No entry found for connection " + id);
return state;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,6 @@ public class CommonClientConfigs {
public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
public static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the <code>MetricReporter</code> interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.";

public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms";
public static final String CONNECTIONS_MAX_IDLE_MS_DOC = "Close idle connections after the number of milliseconds specified by this config.";
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
final class InFlightRequests {

private final int maxInFlightRequestsPerConnection;
private final Map<Integer, Deque<ClientRequest>> requests = new HashMap<Integer, Deque<ClientRequest>>();
private final Map<String, Deque<ClientRequest>> requests = new HashMap<String, Deque<ClientRequest>>();

public InFlightRequests(int maxInFlightRequestsPerConnection) {
this.maxInFlightRequestsPerConnection = maxInFlightRequestsPerConnection;
}

/**
* Add the given request to the queue for the node it was directed to
* Add the given request to the queue for the connection it was directed to
*/
public void add(ClientRequest request) {
Deque<ClientRequest> reqs = this.requests.get(request.request().destination());
Expand All @@ -45,7 +45,7 @@ public void add(ClientRequest request) {
/**
* Get the request queue for the given node
*/
private Deque<ClientRequest> requestQueue(int node) {
private Deque<ClientRequest> requestQueue(String node) {
Deque<ClientRequest> reqs = requests.get(node);
if (reqs == null || reqs.isEmpty())
throw new IllegalStateException("Response from server for which there are no in-flight requests.");
Expand All @@ -55,15 +55,15 @@ private Deque<ClientRequest> requestQueue(int node) {
/**
* Get the oldest request (the one that that will be completed next) for the given node
*/
public ClientRequest completeNext(int node) {
public ClientRequest completeNext(String node) {
return requestQueue(node).pollLast();
}

/**
* Get the last request we sent to the given node (but don't remove it from the queue)
* @param node The node id
*/
public ClientRequest lastSent(int node) {
public ClientRequest lastSent(String node) {
return requestQueue(node).peekFirst();
}

Expand All @@ -72,7 +72,7 @@ public ClientRequest lastSent(int node) {
* @param node The node the request was sent to
* @return The request
*/
public ClientRequest completeLastSent(int node) {
public ClientRequest completeLastSent(String node) {
return requestQueue(node).pollFirst();
}

Expand All @@ -82,7 +82,7 @@ public ClientRequest completeLastSent(int node) {
* @param node Node in question
* @return true iff we have no requests still being sent to the given node
*/
public boolean canSendMore(int node) {
public boolean canSendMore(String node) {
Deque<ClientRequest> queue = requests.get(node);
return queue == null || queue.isEmpty() ||
(queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection);
Expand All @@ -93,7 +93,7 @@ public boolean canSendMore(int node) {
* @param node The node
* @return The request count.
*/
public int inFlightRequestCount(int node) {
public int inFlightRequestCount(String node) {
Deque<ClientRequest> queue = requests.get(node);
return queue == null ? 0 : queue.size();
}
Expand All @@ -114,7 +114,7 @@ public int inFlightRequestCount() {
* @param node The node
* @return All the in-flight requests for that node that have been removed
*/
public Iterable<ClientRequest> clearAll(int node) {
public Iterable<ClientRequest> clearAll(String node) {
Deque<ClientRequest> reqs = requests.get(node);
if (reqs == null) {
return Collections.emptyList();
Expand Down
Loading

0 comments on commit 4034dd6

Please sign in to comment.