-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Data loader (sampler component) - Kafka/Kinesis samplers #7566
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall lgtm 👍
{ | ||
insertData(generateRecords(TOPIC)); | ||
|
||
KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpec(DATA_SCHEMA, null, new KafkaSupervisorIOConfig( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: formatting looks off
|
||
replayAll(); | ||
|
||
KinesisSupervisorSpec supervisorSpec = new KinesisSupervisorSpec(DATA_SCHEMA, null, new KinesisSupervisorIOConfig( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: formatting
private void assignAndSeek() throws InterruptedException | ||
{ | ||
final Set<StreamPartition<PartitionIdType>> partitions = recordSupplier | ||
.getPartitionIds(ioConfig.getStream()).stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: .stream() should probably be on newline
private final RecordSupplier<PartitionIdType, SequenceOffsetType> recordSupplier; | ||
|
||
private Iterator<OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType>> recordIterator; | ||
private Iterator<byte[]> interRecordIterator; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the starting bit of nextRowWithRaw
might be a bit clearer if this variable was named something like recordBytesIterator
or recordDataIterator
?
@Override | ||
public SamplerResponse sample() | ||
{ | ||
return firehoseSampler.sample(new FirehoseFactory() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: formatting
Been testing and using this feature as a "user", seems to work really well. Check it out here: https://youtu.be/tAEp5BXVHYE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm 👍
We really need builders for those indexing config classes, but not this PR
* implement Kafka/Kinesis sampler * add KafkaSamplerSpecTest and KinesisSamplerSpecTest * code review changes
* implement Kafka/Kinesis sampler * add KafkaSamplerSpecTest and KinesisSamplerSpecTest * code review changes
* implement Kafka/Kinesis sampler * add KafkaSamplerSpecTest and KinesisSamplerSpecTest * code review changes
* implement Kafka/Kinesis sampler * add KafkaSamplerSpecTest and KinesisSamplerSpecTest * code review changes
Implementation of the sampler component of #7502.
Depends on #7531.
Adds additional implementations to support sampling from Kafka and Kinesis.