Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add write support for kafka connector #4230

Closed
wants to merge 1 commit into from

Conversation

charlesjmorgan
Copy link
Member

@charlesjmorgan charlesjmorgan commented Jun 25, 2020

Add write support for the kafka connector
Add encoder to serialize message into avro, csv, json, and raw formats (works for primitives and json date/time types)
Currently some changes proposed in #4183 are included in this pr, but once those get merged I'll rebase (done)
Closes #3980

@cla-bot cla-bot bot added the cla-signed label Jun 25, 2020
@charlesjmorgan charlesjmorgan changed the title Implement inserts for kafka connector Add write support for kafka connector Jun 25, 2020
@charlesjmorgan charlesjmorgan force-pushed the kafka-writes branch 2 times, most recently from a119b37 to 70648c3 Compare June 26, 2020 03:53
Copy link
Member

@aalbu aalbu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reviewed part of it and it looks good! I made a few comments, I will continue reviewing tomorrow.

@charlesjmorgan
Copy link
Member Author

charlesjmorgan commented Jun 26, 2020

Made revisions suggested by @aalbu

  • Changed RowEncoder#encodeRow to return a ProducerRecord
  • Added serializers for different data formats
  • Removed encoder tests that don't work anymore (will add more in the future)
  • Improved round trip test design in TestKafkaIntegrationSmokeTest
  • Rebased and brought in bug fix changes for the RawRowDecoder

@findepi
Copy link
Member

findepi commented Jun 26, 2020

cc @elonazoulay

@charlesjmorgan charlesjmorgan force-pushed the kafka-writes branch 3 times, most recently from d441a7b to b4bd592 Compare June 27, 2020 15:18
Copy link
Member

@kokosing kokosing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

few random comments, I need to get back it later

Copy link
Member

@findepi findepi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(skimming)

@findepi
Copy link
Member

findepi commented Jun 29, 2020

One non-obvious thing to me is about:
should we have set similar classes, interfaces, factories & providers for decoers & encoders
OR should we enhance the decoders' interface to handle encoding as well.

i assume @charlesjmorgan @aalbu you considered that, so perhaps there is an answer ready.

@charlesjmorgan
Copy link
Member Author

@findepi
The goal originally was to make an encoder that could be used by different connectors like the current record decoder. Any interfaces that seem unnecessary are there because of that.

Recently we changed the encoder to be Kafka specific so instead of returning a byte [] it returns a ProducerRecord and then the serializers take care of... well serialization. @aalbu brought up what I think is a good point about keeping encoding and decoding separate and the single-responsibility principle. That is why while the EncoderColumnHandle might seem functionally equivalent to the DecoderColumnHandle they are two separate classes. If we were to combine the encoder and decoder in the future I think that there is a case to be made for just using one column handle.

I'm not sure what the best option is, keeping encoding and decoding separate or combining them. I'm also not sure if it would be better to have a one-size-fits-all encoder/decoder or make specific ones for each connector that might be optimized for each specific use case. Any thoughts you (or anyone else) have on this would be helpful.

@aalbu
Copy link
Member

aalbu commented Jun 29, 2020

should we have set similar classes, interfaces, factories & providers for decoers & encoders

Perhaps. I advocated for an iterative approach, where we implement the functionality for the connector at hand and then we can consider some abstractions that are more generic. I feel that trying to generalize too early can lead to constraining decisions.

OR should we enhance the decoders' interface to handle encoding as well.

So basically RowDecoder -> RowCodec? That is a possibility, if we end up with symmetric abstractions. I think that the RowDecoder could be generalized more (why do we assume that the source is a byte[]?).

@findepi
Copy link
Member

findepi commented Jun 29, 2020

I'm also not sure if it would be better to have a one-size-fits-all encoder/decoder or make specific ones for each connector that might be optimized for each specific use case

that's a very valid question.
I am not sure whether decoder's reuse is the optimal choice, but we have it today anyway.

Since encoding seems pretty symmetric to decoding, it seems justified to bundle them in single class.
Doing so emphasizes this symmetry and may help understand the implementation.

I understand that, to realize this symmetry we would need kafka-independent types in the encoders interfaces.
byte[] or Slice seems like a safe choice. We would leave submitting this byte[] key/val to kafka topic in kafka-specific code.

I think that the RowDecoder could be generalized more (why do we assume that the source is a byte[]?).

Because that's just convenient. You need to plug flexible interface when cost of copying data is large (ie data is large).
Current code is probably not optimized to avoid that, and I am not sure it should be.

@losipiuk please chime in, since you were working with this code more than I did.

Copy link
Member

@losipiuk losipiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial review.

@aalbu
Copy link
Member

aalbu commented Jun 30, 2020

I understand that, to realize this symmetry we would need kafka-independent types in the encoders interfaces.
byte[] or Slice seems like a safe choice. We would leave submitting this byte[] key/val to kafka topic in kafka-specific code.

But what's the value of that? There isn't much code reuse. It's not an intuitive way of working with target systems. We might make Kafka work with it, but do we know that it's a good fit for other systems? Redis is using the RecordDecoder - can we make Redis writes work by providing it byte arrays? Would it work when you persist a list? I don't think we have given that much thought, so I feel there is no supporting evidence that extending the interface with encoding methods producing a 'byte[]` is appropriate.

@findepi
Copy link
Member

findepi commented Jun 30, 2020

But what's the value of that? There isn't much code reuse.

This isn't about code reuse, but more about physically bundling things that are coupled.
For example, i would want Integer#parseInt and Integer#toString conversions to be implemented
in one class, as one is reverse of the other (and they are not so complex to preclude being one class).

Redis is using the RecordDecoder - can we make Redis writes work by providing it byte arrays?

I don't know. IF the decoding was accepting byte[] THEN it would be reasonable for the encoding to return byte[].
However, I just realized this is worse than that because decoding takes byte[] data, Map<String, String> dataMap
(and the dataMap is specific to redis).
That suggests redis is not a good use-case for byte[]-based row decoding/encoding at all.

@losipiuk
Copy link
Member

This isn't about code reuse, but more about physically bundling things that are coupled

I agree that bundling encoders and decoders in single class would make reading and understanding implementation easier. Though it will push us more towards reusing same logic in very much different connectors. I think the fact that Kafka and Redis are sharing decoder implementation does more harm than good (see example @findepi gave above).

I think (if we find resources for that) we should untangle the situation and do either:

a) rework presto-decoder module to make it more connector agnostic and keep it reused where we currently use it. Then we can combine PrestoDecoder with PrestoEncoder in the shared module.

b) stop using presto-decoder in connectors it does not play well with (Redis?). This can potentially result in dropping presto-decoder at all if it looks like having separate implementation for each connector is better. Then we can combine decoder and encoder logic in Kafka connector module.

Short term probably it does not matter much. At least for this PR I would keep them separate so we can focus on merging it sooner. Then as next step (we should not postpone that) we can work on refactor and move it towards either a) or b). WDYT?

@findepi
Copy link
Member

findepi commented Jun 30, 2020

FWIW, the presto-record-decoder module is currently used in:

  • Kafka
  • Redis
  • Kinesis

At least for this PR I would keep them separate so we can focus on merging it sooner.

@losipiuk not sure what you mean here. How does separate classes vs extending existing classes matter?

FYI i do not have very strong opinion either way, but i expect it may be some work to change between approaches.

@losipiuk
Copy link
Member

@losipiuk not sure what you mean here. How does separate classes vs extending existing classes matter?

It matters in terms how many rounds of review we need. I would prefer to not bloat this PR with extra refactorings. Smaller PRs are much easier to review and work with. And sooner we merge this beast and continue with smaller gradual changes the better IMO.

@charlesjmorgan
Copy link
Member Author

charlesjmorgan commented Jun 30, 2020

Thank you @aalbu @findepi @losipiuk and @kokosing for all your feedback, it has been very helpful! I hope I got everything (I triple checked so should be good). I intentionally left out changes that I didn't think were in the scope of this PR, I will revisit those in the future. I am going to split this pr into 5 parts. The first will be basic functionality for inserts and the four after that will each be for a specific encoder format. lmk if you think I should split it up differently

Base functionality/CSV encoder PR - #4287

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

Add write support for Kafka connector
5 participants