Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++] Make public version of StreamDecoderImpl-like class #37429

Closed
MrLolthe1st opened this issue Aug 28, 2023 · 5 comments · Fixed by #37970
Closed

[C++] Make public version of StreamDecoderImpl-like class #37429

MrLolthe1st opened this issue Aug 28, 2023 · 5 comments · Fixed by #37970

Comments

@MrLolthe1st
Copy link

Describe the enhancement requested

It is useful to use StreamDecoderImpl outside reader.cc.
For example, someone produces stream of data like that:

<SCHEMA>
<RECORD BATCH 0>
...
<RECORD BATCH n - 1>
<EOS> (*)
<SCHEMA>
<DICTIONARY 0>
...
<DICTIONARY k - 1>
<RECORD BATCH 0>
...
<RECORD BATCH n - 1>
<EOS>
<SCHEMA>
<RECORD BATCH 0>
...
<RECORD BATCH n - 1>
<EOS>

StreamDecoder in that way will ignore all payload after (*). But in way where StreamDecoderImpl-like class is open to use, I can use OnMessageDecoded(..) method and delegate message decoding in outer scope and when EOS coming, reseting StreamDecoderImpl to new.

Component(s)

C++

@kou kou changed the title Make public version of StreamDecoderImpl-like class [C++] Make public version of StreamDecoderImpl-like class Aug 28, 2023
@kou
Copy link
Member

kou commented Aug 28, 2023

How about adding arrow::ipc::StreamDecoder::Reset() instead of exporting StreamDecoderImpl?

@MrLolthe1st
Copy link
Author

How about adding arrow::ipc::StreamDecoder::Reset() instead of exporting StreamDecoderImpl?

It's just one approach. In addition to arrow::ipc::StreamDecoder::Reset() must be added arrow::ipc::StreamDecoder::OnMessageDecoded(...), because arrow::ipc::StreamDecoder::Consume(...) doesn't returns amount of readed bytes, so anyway hight-level control of messages must be provided to feed StreamDecoder messages one-by-one.
For example, I'm trying to feed StreamDecoder with payload looks like that:

<RECORD BATCH n - 1>
<EOS>
<SCHEMA>
<DICTIONARY 0>

In that way I'm must determine where message with schema is located (to feed resetted StreamDecoder with new messages, starting with schema message).

Another approach is to make Consume() return not just status, but amount of bytes readed before EOS.

@kou
Copy link
Member

kou commented Aug 30, 2023

How about calling arrow::ipc::StreamDecoder::Reset() in a listener?

class EndlessListener : public arrow::ipc::Listener {
 public:
  explicit EndlessListener() : arrow::ipc::Listener(), decoders_() {}

  arrow::Status OnEOS() override {
    for (auto decoder : decoders_) {
      ARROW_RETURN_NOT_OK(decoder->Reset());
    }
    return arrow::Status::OK();
  }

  void AddDecoder(arrow::ipc::StreamDecoder *decoder) {
    decoders_.push_back(decoder);
  }

 private:
  std::vector<arrow::ipc::StreamDecoder> decoders_;
};

auto listener = std::make_shared<EndlessListener>();
arrow::ipc::StreamDecoder decoder (listener);
listener->AddDecoder(*decoder);
decoder.Consume(...);

@MrLolthe1st
Copy link
Author

That's awesome!

kou added a commit to kou/arrow that referenced this issue Oct 1, 2023
We can reuse the same StreamDecoder to read multiple streams with
this.
kou added a commit to kou/arrow that referenced this issue Oct 3, 2023
We can reuse the same StreamDecoder to read multiple streams with
this.
@MrLolthe1st
Copy link
Author

MrLolthe1st commented Oct 6, 2023

(moved to #37970 (comment))

kou added a commit to kou/arrow that referenced this issue Oct 10, 2023
We can reuse the same StreamDecoder to read multiple streams with
this.
@kou kou closed this as completed in #37970 Nov 2, 2023
kou added a commit that referenced this issue Nov 2, 2023
### Rationale for this change

We can reuse the same StreamDecoder to read multiple streams with this.

### What changes are included in this PR?

Add StreamDecoder::Reset().

### Are these changes tested?

Yes.

### Are there any user-facing changes?

Yes.
* Closes: #37429

Authored-by: Sutou Kouhei <[email protected]>
Signed-off-by: Sutou Kouhei <[email protected]>
@kou kou added this to the 15.0.0 milestone Nov 2, 2023
loicalleyne pushed a commit to loicalleyne/arrow that referenced this issue Nov 13, 2023
…37970)

### Rationale for this change

We can reuse the same StreamDecoder to read multiple streams with this.

### What changes are included in this PR?

Add StreamDecoder::Reset().

### Are these changes tested?

Yes.

### Are there any user-facing changes?

Yes.
* Closes: apache#37429

Authored-by: Sutou Kouhei <[email protected]>
Signed-off-by: Sutou Kouhei <[email protected]>
dgreiss pushed a commit to dgreiss/arrow that referenced this issue Feb 19, 2024
…37970)

### Rationale for this change

We can reuse the same StreamDecoder to read multiple streams with this.

### What changes are included in this PR?

Add StreamDecoder::Reset().

### Are these changes tested?

Yes.

### Are there any user-facing changes?

Yes.
* Closes: apache#37429

Authored-by: Sutou Kouhei <[email protected]>
Signed-off-by: Sutou Kouhei <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants