Skip to content

Commit

Permalink
Merged in upstream trunk.
Browse files Browse the repository at this point in the history
  • Loading branch information
Geoff Anderson committed Jul 22, 2015
2 parents 47b7b64 + 2040890 commit 8b62019
Show file tree
Hide file tree
Showing 54 changed files with 2,679 additions and 1,060 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ The release file can be found inside ./core/build/distributions/.
### Cleaning the build ###
./gradlew clean

### Running a task on a particular version of Scala (either 2.9.1, 2.9.2, 2.10.5 or 2.11.6) ###
### Running a task on a particular version of Scala (either 2.9.1, 2.9.2, 2.10.5 or 2.11.7) ###
#### (If building a jar with a version other than 2.10, need to set SCALA_BINARY_VERSION variable or change it in bin/kafka-run-class.sh to run quick start.) ####
./gradlew -PscalaVersion=2.9.1 jar
./gradlew -PscalaVersion=2.9.1 test
Expand Down
36 changes: 17 additions & 19 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ rat {
// And some of the files that we have checked in should also be excluded from this check
excludes.addAll([
'**/.git/**',
'build/rat/rat-report.xml',
'build/**',
'gradlew',
'gradlew.bat',
'**/README.md',
Expand Down Expand Up @@ -163,7 +163,7 @@ subprojects {
}
}

for ( sv in ['2_9_1', '2_9_2', '2_10_5', '2_11_6'] ) {
for ( sv in ['2_9_1', '2_9_2', '2_10_5', '2_11_7'] ) {
String svInDot = sv.replaceAll( "_", ".")

tasks.create(name: "jar_core_${sv}", type: GradleBuild) {
Expand Down Expand Up @@ -203,20 +203,20 @@ for ( sv in ['2_9_1', '2_9_2', '2_10_5', '2_11_6'] ) {
}
}

tasks.create(name: "jarAll", dependsOn: ['jar_core_2_9_1', 'jar_core_2_9_2', 'jar_core_2_10_5', 'jar_core_2_11_6', 'clients:jar', 'examples:jar', 'contrib:hadoop-consumer:jar', 'contrib:hadoop-producer:jar', 'log4j-appender:jar', 'tools:jar']) {
tasks.create(name: "jarAll", dependsOn: ['jar_core_2_9_1', 'jar_core_2_9_2', 'jar_core_2_10_5', 'jar_core_2_11_7', 'clients:jar', 'examples:jar', 'contrib:hadoop-consumer:jar', 'contrib:hadoop-producer:jar', 'log4j-appender:jar', 'tools:jar']) {
}

tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_9_1', 'srcJar_2_9_2', 'srcJar_2_10_5', 'srcJar_2_11_6', 'clients:srcJar', 'examples:srcJar', 'contrib:hadoop-consumer:srcJar', 'contrib:hadoop-producer:srcJar', 'log4j-appender:srcJar', 'tools:srcJar']) { }
tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_9_1', 'srcJar_2_9_2', 'srcJar_2_10_5', 'srcJar_2_11_7', 'clients:srcJar', 'examples:srcJar', 'contrib:hadoop-consumer:srcJar', 'contrib:hadoop-producer:srcJar', 'log4j-appender:srcJar', 'tools:srcJar']) { }

tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_9_1', 'docsJar_2_9_2', 'docsJar_2_10_5', 'docsJar_2_11_6', 'clients:docsJar', 'examples:docsJar', 'contrib:hadoop-consumer:docsJar', 'contrib:hadoop-producer:docsJar', 'log4j-appender:docsJar', 'tools:docsJar']) { }
tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_9_1', 'docsJar_2_9_2', 'docsJar_2_10_5', 'docsJar_2_11_7', 'clients:docsJar', 'examples:docsJar', 'contrib:hadoop-consumer:docsJar', 'contrib:hadoop-producer:docsJar', 'log4j-appender:docsJar', 'tools:docsJar']) { }

tasks.create(name: "testAll", dependsOn: ['test_core_2_9_1', 'test_core_2_9_2', 'test_core_2_10_5', 'test_core_2_11_6', 'clients:test', 'log4j-appender:test', 'tools:test']) {
tasks.create(name: "testAll", dependsOn: ['test_core_2_9_1', 'test_core_2_9_2', 'test_core_2_10_5', 'test_core_2_11_7', 'clients:test', 'log4j-appender:test', 'tools:test']) {
}

tasks.create(name: "releaseTarGzAll", dependsOn: ['releaseTarGz_2_9_1', 'releaseTarGz_2_9_2', 'releaseTarGz_2_10_5', 'releaseTarGz_2_11_6']) {
tasks.create(name: "releaseTarGzAll", dependsOn: ['releaseTarGz_2_9_1', 'releaseTarGz_2_9_2', 'releaseTarGz_2_10_5', 'releaseTarGz_2_11_7']) {
}

tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_9_1', 'uploadCoreArchives_2_9_2', 'uploadCoreArchives_2_10_5', 'uploadCoreArchives_2_11_6', 'clients:uploadArchives', 'examples:uploadArchives', 'contrib:hadoop-consumer:uploadArchives', 'contrib:hadoop-producer:uploadArchives', 'log4j-appender:uploadArchives', 'tools:uploadArchives']) {
tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_9_1', 'uploadCoreArchives_2_9_2', 'uploadCoreArchives_2_10_5', 'uploadCoreArchives_2_11_7', 'clients:uploadArchives', 'examples:uploadArchives', 'contrib:hadoop-consumer:uploadArchives', 'contrib:hadoop-producer:uploadArchives', 'log4j-appender:uploadArchives', 'tools:uploadArchives']) {
}

project(':core') {
Expand All @@ -225,8 +225,6 @@ project(':core') {
apply plugin: 'scala'
archivesBaseName = "kafka_${baseScalaVersion}"

def (major, minor, trivial) = scalaVersion.tokenize('.')

dependencies {
compile project(':clients')
compile project(':log4j-appender')
Expand All @@ -235,19 +233,19 @@ project(':core') {
compile 'com.101tec:zkclient:0.5'
compile 'com.yammer.metrics:metrics-core:2.2.0'
compile 'net.sf.jopt-simple:jopt-simple:3.2'
if (scalaVersion.startsWith('2.11')) {
compile 'org.scala-lang.modules:scala-xml_2.11:1.0.4'
compile 'org.scala-lang.modules:scala-parser-combinators_2.11:1.0.4'
}

testCompile 'junit:junit:4.6'
testCompile 'org.easymock:easymock:3.0'
testCompile 'org.objenesis:objenesis:1.2'
if (scalaVersion.startsWith('2.10')) {
testCompile 'org.scalatest:scalatest_2.10:1.9.1'
} else if (scalaVersion.startsWith('2.11')) {
compile 'org.scala-lang.modules:scala-xml_2.11:1.0.3'
compile 'org.scala-lang.modules:scala-parser-combinators_2.11:1.0.3'
testCompile "org.scalatest:scalatest_2.11:2.2.0"
} else {
testCompile "org.scalatest:scalatest_$scalaVersion:1.8"
}
if (scalaVersion.startsWith('2.9'))
testCompile "org.scalatest:scalatest_$scalaVersion:1.9.1"
else
testCompile "org.scalatest:scalatest_$baseScalaVersion:2.2.5"

testRuntime "$slf4jlog4j"

zinc 'com.typesafe.zinc:zinc:0.3.7'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,21 @@ public interface Consumer<K, V> extends Closeable {
*/
public void commit(CommitType commitType);

/**
* @see KafkaConsumer#commit(CommitType, ConsumerCommitCallback)
*/
public void commit(CommitType commitType, ConsumerCommitCallback callback);

/**
* @see KafkaConsumer#commit(Map, CommitType)
*/
public void commit(Map<TopicPartition, Long> offsets, CommitType commitType);

/**
* @see KafkaConsumer#commit(Map, CommitType, ConsumerCommitCallback)
*/
public void commit(Map<TopicPartition, Long> offsets, CommitType commitType, ConsumerCommitCallback callback);

/**
* @see KafkaConsumer#seek(TopicPartition, long)
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.clients.consumer;

import org.apache.kafka.common.TopicPartition;

import java.util.Map;

/**
* A callback interface that the user can implement to trigger custom actions when a commit request completes. The callback
* may be executed in any thread calling {@link Consumer#poll(long) poll()}.
*/
public interface ConsumerCommitCallback {

/**
* A callback method the user can implement to provide asynchronous handling of commit request completion.
* This method will be called when the commit request sent to the server has been acknowledged.
*
* @param offsets A map of the offsets that this callback applies to
* @param exception The exception thrown during processing of the request, or null if the commit completed successfully
*/
void onComplete(Map<TopicPartition, Long> offsets, Exception exception);
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ public class ConsumerConfig extends AbstractConfig {
SESSION_TIMEOUT_MS_DOC)
.define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
Type.STRING,
"blah",
"range",
in("range", "roundrobin"),
Importance.MEDIUM,
PARTITION_ASSIGNMENT_STRATEGY_DOC)
.define(METADATA_MAX_AGE_CONFIG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public interface ConsumerRebalanceCallback {
* It is guaranteed that all the processes in a consumer group will execute their
* {@link #onPartitionsRevoked(Consumer, Collection)} callback before any instance executes its
* {@link #onPartitionsAssigned(Consumer, Collection)} callback.
*
*
* @param consumer Reference to the consumer for convenience
* @param partitions The list of partitions that are now assigned to the consumer (may include partitions previously
* assigned to the consumer)
*/
Expand All @@ -86,7 +87,8 @@ public interface ConsumerRebalanceCallback {
* custom offset store to prevent duplicate data
* <p>
* For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer}
*
*
* @param consumer Reference to the consumer for convenience
* @param partitions The list of partitions that were assigned to the consumer on the last rebalance
*/
public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ public ConsumerRecord<K, V> makeNext() {
}
}

public boolean isEmpty() {
return records.isEmpty();
}

@SuppressWarnings("unchecked")
public static <K, V> ConsumerRecords<K, V> empty() {
return (ConsumerRecords<K, V>) EMPTY;
Expand Down
Loading

0 comments on commit 8b62019

Please sign in to comment.