Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes to work with kafka 0.8 #7

Open
wants to merge 8 commits into
base: master
Choose a base branch
from

Conversation

whitedr
Copy link

@whitedr whitedr commented Feb 13, 2014

These changes are based on an initial set of changes from clippPR to get the river to work with Kafka 0.8. I made a few changes based on the changes to the consumer offset being logical vs. physical as well as some subtle changes to the differences in exception handling between 0.7 and 0.8. Also made a fix for the case where connection issues occur in the dumpStats utility method so that the river will reconnect. One final change was to add a new river configuration param named 'startFromNewestOffset' (defaults to false) which allows you to configure the river to start either from the oldest or newest offset.

Warner Onstine and others added 3 commits January 14, 2014 12:35
…etch to translate and throw exceptions on error in the FetchResponse. Add support for new river configuration named 'startFromNewestOffset'. This flag allow the river to be setup either to start from the newest or oldest partition offset if/when the river encounters an OffsetOutOfRangeException. Fixed a couple tests that were failing due to the new clientName param in the KafkaClient connect method.
@damienclaveau
Copy link

+1
This update is a very good news

Dave White and others added 3 commits March 4, 2014 12:04
@adewahyu123
Copy link

i got this error when i test.

Test set: org.elasticsearch.river.kafka.KafkaClientTest

Tests run: 9, Failures: 0, Errors: 3, Skipped: 0, Time elapsed: 0.456 sec <<< FAILURE!
testGetNewestOffset(org.elasticsearch.river.kafka.KafkaClientTest) Time elapsed: 0.425 sec <<< ERROR!
java.lang.AssertionError:
Unexpected method call getOffsetsBefore(Name: OffsetRequest; Version: 0; CorrelationId: 0; ClientId: null; RequestInfo: [my_replicated_topic,0] -> PartitionOffsetRequestInfo(-1,1); ReplicaId: -1):
at org.easymock.internal.MockInvocationHandler.invoke(MockInvocationHandler.java:45)
at org.easymock.internal.ObjectMethodsFilter.invoke(ObjectMethodsFilter.java:73)
at org.easymock.internal.ClassProxyFactory$MockMethodInterceptor.intercept(ClassProxyFactory.java:92)
at kafka.javaapi.consumer.SimpleConsumer$$EnhancerByCGLIB$$abaf5997.getOffsetsBefore()
at org.elasticsearch.river.kafka.KafkaClient.getNewestOffset(KafkaClient.java:107)
at org.elasticsearch.river.kafka.KafkaClientTest.testGetNewestOffset(KafkaClientTest.java:119)
testGetOldestOffset(org.elasticsearch.river.kafka.KafkaClientTest) Time elapsed: 0 sec <<< ERROR!
java.lang.AssertionError:
Unexpected method call getOffsetsBefore(Name: OffsetRequest; Version: 0; CorrelationId: 0; ClientId: null; RequestInfo: [my_replicated_topic,0] -> PartitionOffsetRequestInfo(-2,1); ReplicaId: -1):
at org.easymock.internal.MockInvocationHandler.invoke(MockInvocationHandler.java:45)
at org.easymock.internal.ObjectMethodsFilter.invoke(ObjectMethodsFilter.java:73)
at org.easymock.internal.ClassProxyFactory$MockMethodInterceptor.intercept(ClassProxyFactory.java:92)
at kafka.javaapi.consumer.SimpleConsumer$$EnhancerByCGLIB$$abaf5997.getOffsetsBefore()
at org.elasticsearch.river.kafka.KafkaClient.getOldestOffset(KafkaClient.java:117)
at org.elasticsearch.river.kafka.KafkaClientTest.testGetOldestOffset(KafkaClientTest.java:133)
testFetch(org.elasticsearch.river.kafka.KafkaClientTest) Time elapsed: 0.008 sec <<< ERROR!
java.lang.AssertionError:
Unexpected method call fetch(Name: FetchRequest; Version: 0; CorrelationId: 0; ClientId: null; ReplicaId: -1; MaxWait: 0 ms; MinBytes: 0 bytes; RequestInfo: [my_replicated_topic,0] -> PartitionFetchInfo(1717,1024)):
at org.easymock.internal.MockInvocationHandler.invoke(MockInvocationHandler.java:45)
at org.easymock.internal.ObjectMethodsFilter.invoke(ObjectMethodsFilter.java:73)
at org.easymock.internal.ClassProxyFactory$MockMethodInterceptor.intercept(ClassProxyFactory.java:92)
at kafka.javaapi.consumer.SimpleConsumer$$EnhancerByCGLIB$$abaf5997.fetch()
at org.elasticsearch.river.kafka.KafkaClient.fetch(KafkaClient.java:127)
at org.elasticsearch.river.kafka.KafkaClientTest.testFetch(KafkaClientTest.java:225)

Can you give me information to solved this error.
Many Thanks..

@damienclaveau
Copy link

Hi, I think the 0.8 version submitted by dtabwhite is OK, but there is still a problem whith the unit tests in maven.
By the way, I found that the implementation of the KafkaClient still doesn't support the dynamic rebalancing of the leaders of the partitions.
The guidelines to support it is here https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
I will try to work on it....

Dave White added 2 commits April 24, 2014 14:11
…ncorporating the configured river name in the location in zk where the offsets are stored.
  - Have the river discover the set of brokers that exist from zk
  - Discover the leader broker for a given topic/partition
  - Don't store the brokerUrl in the zk offset path
  - On startup keep trying to discover/connect to kafka until there is a registered broker
  - On startup when no offsets are stored in zk, set the initial offset based on the 'startFromNewestOffset' setting
@whitedr
Copy link
Author

whitedr commented May 3, 2014

FYI - My latest commit in my fork changes the river to be more resilient within a clustered kafka setup. Here are the set of changes it introduces:

  • Have the river discover the set of brokers that exist from zk
  • Discover the leader broker for a given topic/partition
  • Don't store the brokerUrl in the zk offset path
  • On startup keep trying to discover/connect to kafka until there is a registered broker
  • On startup when no offsets are stored in zk, set the initial offset based on the 'startFromNewestOffset'

So essentially, the broker_host and broker_port go away with this change and are dynamically (re)discovered for the specified topic/partition. This is essential when running kafka in a clustered setup as the leadership for a given topic/partition can change and the river should really be able to deal with these changes on the fly.

@jplock
Copy link

jplock commented Jul 9, 2014

+1

2 similar comments
@santthosh
Copy link

+1

@mfirry
Copy link

mfirry commented Nov 10, 2014

+1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants