From 208e39f6c613b79fce52c1c87e30d5d1ea4dd273 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei Date: Thu, 19 Sep 2024 14:27:23 +0900 Subject: [PATCH] GH-40493: [GLib][Ruby] Add GArrowStreamDecoder This is the bindings of `arrow::ipc::StreamDecoder`. `arrow::ipc::StreamDecoder` is useful to integrate async input. --- c_glib/Gemfile | 2 +- c_glib/arrow-glib/arrow-glib.h | 1 + c_glib/arrow-glib/arrow-glib.hpp | 1 + c_glib/arrow-glib/decoder.cpp | 607 ++++++++++++++++++++ c_glib/arrow-glib/decoder.h | 96 ++++ c_glib/arrow-glib/decoder.hpp | 38 ++ c_glib/arrow-glib/internal-hash-table.hpp | 18 + c_glib/arrow-glib/meson.build | 3 + c_glib/arrow-glib/reader.cpp | 20 +- c_glib/test/test-stream-decoder.rb | 126 ++++ ruby/red-arrow/lib/arrow/loader.rb | 12 + ruby/red-arrow/lib/arrow/stream-decoder.rb | 29 + ruby/red-arrow/lib/arrow/stream-listener.rb | 47 ++ ruby/red-arrow/red-arrow.gemspec | 2 +- ruby/red-arrow/test/test-stream-listener.rb | 60 ++ 15 files changed, 1044 insertions(+), 18 deletions(-) create mode 100644 c_glib/arrow-glib/decoder.cpp create mode 100644 c_glib/arrow-glib/decoder.h create mode 100644 c_glib/arrow-glib/decoder.hpp create mode 100644 c_glib/test/test-stream-decoder.rb create mode 100644 ruby/red-arrow/lib/arrow/stream-decoder.rb create mode 100644 ruby/red-arrow/lib/arrow/stream-listener.rb create mode 100644 ruby/red-arrow/test/test-stream-listener.rb diff --git a/c_glib/Gemfile b/c_glib/Gemfile index d32bc87ba72c6..cc6adecabe230 100644 --- a/c_glib/Gemfile +++ b/c_glib/Gemfile @@ -20,4 +20,4 @@ source "https://rubygems.org/" gem "test-unit" -gem "gobject-introspection", ">= 4.1.1" +gem "gobject-introspection", ">= 4.2.3" diff --git a/c_glib/arrow-glib/arrow-glib.h b/c_glib/arrow-glib/arrow-glib.h index 7ba20882610e8..272b6ba1dae10 100644 --- a/c_glib/arrow-glib/arrow-glib.h +++ b/c_glib/arrow-glib/arrow-glib.h @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include diff --git a/c_glib/arrow-glib/arrow-glib.hpp b/c_glib/arrow-glib/arrow-glib.hpp index 79e8dcbcce61a..49571eeae4929 100644 --- a/c_glib/arrow-glib/arrow-glib.hpp +++ b/c_glib/arrow-glib/arrow-glib.hpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include diff --git a/c_glib/arrow-glib/decoder.cpp b/c_glib/arrow-glib/decoder.cpp new file mode 100644 index 0000000000000..83af6bc484394 --- /dev/null +++ b/c_glib/arrow-glib/decoder.cpp @@ -0,0 +1,607 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include +#include +#include +#include + +G_BEGIN_DECLS + +/** + * SECTION: decoder + * @section_id: decoder-classes + * @title: Decoder classes + * @include: arrow-glib/arrow-glib.h + * + * #GArrowStreamListener is a class for receiving decoded information + * from #GArrowStreamDecoder. + * + * #GArrowStreamDecoder is a class for decoding record batches in + * stream format from given data chunks. + */ + +struct GArrowStreamListenerPrivate +{ + std::shared_ptr listener; +}; + +G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE(GArrowStreamListener, + garrow_stream_listener, + G_TYPE_OBJECT); + +#define GARROW_STREAM_LISTENER_GET_PRIVATE(object) \ + static_cast( \ + garrow_stream_listener_get_instance_private(GARROW_STREAM_LISTENER(object))) + +G_END_DECLS + +namespace garrow { + class StreamListener : public arrow::ipc::Listener { + public: + StreamListener(GArrowStreamListener *listener) : listener_(listener) + { + g_object_ref(listener_); + } + ~StreamListener() { g_object_unref(listener_); } + + arrow::Status + OnEOS() override + { + if (!klass()->on_eos) { + return arrow::Status::OK(); + } + + GError *error = nullptr; + if (garrow_stream_listener_on_eos(listener_, &error)) { + return arrow::Status::OK(); + } else { + return garrow_error_to_status(error, + arrow::StatusCode::UnknownError, + "[stream-listener][on-eos]"); + } + } + + arrow::Status + OnRecordBatchWithMetadataDecoded( + arrow::RecordBatchWithMetadata arrow_record_batch_with_metadata) override + { + if (!klass()->on_record_batch_decoded) { + return arrow::Status::OK(); + } + + auto record_batch = + garrow_record_batch_new_raw(&(arrow_record_batch_with_metadata.batch)); + GHashTable *metadata = nullptr; + if (arrow_record_batch_with_metadata.custom_metadata) { + metadata = garrow_internal_hash_table_from_metadata( + arrow_record_batch_with_metadata.custom_metadata); + } + GError *error = nullptr; + auto success = garrow_stream_listener_on_record_batch_decoded(listener_, + record_batch, + metadata, + &error); + g_object_unref(record_batch); + if (metadata) { + g_hash_table_unref(metadata); + } + if (success) { + return arrow::Status::OK(); + } else { + return garrow_error_to_status(error, + arrow::StatusCode::UnknownError, + "[stream-listener][on-record-batch-decoded]"); + } + } + + arrow::Status + OnSchemaDecoded(std::shared_ptr arrow_schema, + std::shared_ptr arrow_filtered_schema) override + { + if (!klass()->on_schema_decoded) { + return arrow::Status::OK(); + } + + auto schema = garrow_schema_new_raw(&arrow_schema); + auto filtered_schema = garrow_schema_new_raw(&arrow_filtered_schema); + GError *error = nullptr; + auto success = garrow_stream_listener_on_schema_decoded(listener_, + schema, + filtered_schema, + &error); + g_object_unref(schema); + g_object_unref(filtered_schema); + if (success) { + return arrow::Status::OK(); + } else { + return garrow_error_to_status(error, + arrow::StatusCode::UnknownError, + "[stream-listener][on-schema-decoded]"); + } + } + + private: + GArrowStreamListener *listener_; + + GArrowStreamListenerClass * + klass() + { + return GARROW_STREAM_LISTENER_GET_CLASS(listener_); + } + }; +}; // namespace garrow + +G_BEGIN_DECLS + +static void +garrow_stream_listener_finalize(GObject *object) +{ + auto priv = GARROW_STREAM_LISTENER_GET_PRIVATE(object); + priv->listener.~shared_ptr(); + G_OBJECT_CLASS(garrow_stream_listener_parent_class)->finalize(object); +} + +static void +garrow_stream_listener_init(GArrowStreamListener *object) +{ + auto priv = GARROW_STREAM_LISTENER_GET_PRIVATE(object); + new (&priv->listener) + std::shared_ptr(new garrow::StreamListener(object)); +} + +static void +garrow_stream_listener_class_init(GArrowStreamListenerClass *klass) +{ + auto gobject_class = G_OBJECT_CLASS(klass); + gobject_class->finalize = garrow_stream_listener_finalize; + + klass->on_eos = nullptr; + klass->on_record_batch_decoded = nullptr; + klass->on_schema_decoded = nullptr; +} + +/** + * garrow_stream_listener_on_eos: + * @listener: A #GArrowStreamListener. + * @error: (nullable): Return location for a #GError or %NULL. + * + * Processes an EOS event. + * + * Returns: %TRUE on success, %FALSE on error. + * + * Since: 18.0.0 + */ +gboolean +garrow_stream_listener_on_eos(GArrowStreamListener *listener, GError **error) +{ + auto klass = GARROW_STREAM_LISTENER_GET_CLASS(listener); + if (!(klass && klass->on_eos)) { + g_set_error(error, + GARROW_ERROR, + GARROW_ERROR_NOT_IMPLEMENTED, + "[stream-listener][on-eos] not implemented"); + return false; + } + return klass->on_eos(listener, error); +} + +/** + * garrow_stream_listener_on_record_batch_decoded: + * @listener: A #GArrowStreamListener. + * @record_batch: A decoded #GArrowRecordBatch. + * @metadata: (element-type utf8 utf8) (nullable): A decoded metadata. + * @error: (nullable): Return location for a #GError or %NULL. + * + * Processes a decoded record batch. + * + * Returns: %TRUE on success, %FALSE on error. + * + * Since: 18.0.0 + */ +gboolean +garrow_stream_listener_on_record_batch_decoded(GArrowStreamListener *listener, + GArrowRecordBatch *record_batch, + GHashTable *metadata, + GError **error) +{ + auto klass = GARROW_STREAM_LISTENER_GET_CLASS(listener); + if (!(klass && klass->on_record_batch_decoded)) { + g_set_error(error, + GARROW_ERROR, + GARROW_ERROR_NOT_IMPLEMENTED, + "[stream-listener][on-record-batch-decoded] not implemented"); + return false; + } + return klass->on_record_batch_decoded(listener, record_batch, metadata, error); +} + +/** + * garrow_stream_listener_on_schema_decoded: + * @listener: A #GArrowStreamListener. + * @schema: A decoded #GArrowSchema. + * @filtered_schema: A decoded #GArrowSchema that only has read fields. + * @error: (nullable): Return location for a #GError or %NULL. + * + * Processes a decoded schema. + * + * Returns: %TRUE on success, %FALSE on error. + * + * Since: 18.0.0 + */ +gboolean +garrow_stream_listener_on_schema_decoded(GArrowStreamListener *listener, + GArrowSchema *schema, + GArrowSchema *filtered_schema, + GError **error) +{ + auto klass = GARROW_STREAM_LISTENER_GET_CLASS(listener); + if (!(klass && klass->on_schema_decoded)) { + g_set_error(error, + GARROW_ERROR, + GARROW_ERROR_NOT_IMPLEMENTED, + "[stream-listener][on-schema-decoded] not implemented"); + return false; + } + return klass->on_schema_decoded(listener, schema, filtered_schema, error); +} + +struct GArrowStreamDecoderPrivate +{ + std::shared_ptr decoder; + GArrowStreamListener *listener; +}; + +enum { + PROP_DECODER = 1, + PROP_LISTENER, +}; + +G_DEFINE_TYPE_WITH_PRIVATE(GArrowStreamDecoder, garrow_stream_decoder, G_TYPE_OBJECT); + +#define GARROW_STREAM_DECODER_GET_PRIVATE(object) \ + static_cast( \ + garrow_stream_decoder_get_instance_private(GARROW_STREAM_DECODER(object))) + +static void +garrow_stream_decoder_finalize(GObject *object) +{ + auto priv = GARROW_STREAM_DECODER_GET_PRIVATE(object); + priv->decoder.~shared_ptr(); + G_OBJECT_CLASS(garrow_stream_decoder_parent_class)->finalize(object); +} + +static void +garrow_stream_decoder_dispose(GObject *object) +{ + auto priv = GARROW_STREAM_DECODER_GET_PRIVATE(object); + + if (priv->listener) { + g_object_unref(priv->listener); + priv->listener = nullptr; + } + + G_OBJECT_CLASS(garrow_stream_decoder_parent_class)->dispose(object); +} + +static void +garrow_stream_decoder_set_property(GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec) +{ + auto priv = GARROW_STREAM_DECODER_GET_PRIVATE(object); + + switch (prop_id) { + case PROP_DECODER: + priv->decoder = *static_cast *>( + g_value_get_pointer(value)); + break; + case PROP_LISTENER: + priv->listener = GARROW_STREAM_LISTENER(g_value_dup_object(value)); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + break; + } +} + +static void +garrow_stream_decoder_get_property(GObject *object, + guint prop_id, + GValue *value, + GParamSpec *pspec) +{ + auto priv = GARROW_STREAM_DECODER_GET_PRIVATE(object); + + switch (prop_id) { + case PROP_LISTENER: + g_value_set_object(value, priv->listener); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + break; + } +} + +static void +garrow_stream_decoder_init(GArrowStreamDecoder *object) +{ + auto priv = GARROW_STREAM_DECODER_GET_PRIVATE(object); + new (&priv->decoder) std::shared_ptr; +} + +static void +garrow_stream_decoder_class_init(GArrowStreamDecoderClass *klass) +{ + auto gobject_class = G_OBJECT_CLASS(klass); + + gobject_class->finalize = garrow_stream_decoder_finalize; + gobject_class->dispose = garrow_stream_decoder_dispose; + gobject_class->set_property = garrow_stream_decoder_set_property; + gobject_class->get_property = garrow_stream_decoder_get_property; + + GParamSpec *spec; + spec = g_param_spec_pointer( + "decoder", + nullptr, + nullptr, + static_cast(G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY)); + g_object_class_install_property(gobject_class, PROP_DECODER, spec); + + /** + * GArrowStreamDecoder:listener: + * + * A listener that receives decoded events. + * + * Since: 18.0.0 + */ + spec = g_param_spec_object( + "listener", + nullptr, + nullptr, + GARROW_TYPE_STREAM_LISTENER, + static_cast(G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY)); + g_object_class_install_property(gobject_class, PROP_LISTENER, spec); +} + +/** + * garrow_stream_decoder_new: + * @listener: The #GArrowStreamListener that receives decoded events. + * @options: (nullable): The #GArrowReadOptions. + * + * Returns: A newly created #GArrowStreamDecoder. + * + * Since: 18.0.0 + */ +GArrowStreamDecoder * +garrow_stream_decoder_new(GArrowStreamListener *listener, GArrowReadOptions *options) +{ + auto arrow_listener = garrow_stream_listener_get_raw(listener); + arrow::ipc::IpcReadOptions arrow_options; + if (options) { + arrow_options = *garrow_read_options_get_raw(options); + } else { + arrow_options = arrow::ipc::IpcReadOptions::Defaults(); + } + auto arrow_decoder = + std::make_shared(arrow_listener, arrow_options); + return garrow_stream_decoder_new_raw(&arrow_decoder, listener); +} + +/** + * garrow_stream_decoder_consume_bytes: + * @decoder: A #GArrowStreamDecoder. + * @bytes: A #GBytes to be decoded. + * @error: (nullable): Return location for a #GError or %NULL. + * + * Feed data to the decoder as a raw data. + * + * If the decoder can read one or more record batches by the data, the + * decoder calls [vfunc@GArrowStreamListener.on_record_batch_decoded] + * with a decoded record batch multiple times. + * + * Returns: %TRUE on success, %FALSE if there was an error. + * + * Since: 18.0.0 + */ +gboolean +garrow_stream_decoder_consume_bytes(GArrowStreamDecoder *decoder, + GBytes *bytes, + GError **error) +{ + auto arrow_decoder = garrow_stream_decoder_get_raw(decoder); + gsize size; + gconstpointer data = g_bytes_get_data(bytes, &size); + return garrow::check(error, + arrow_decoder->Consume(static_cast(data), size), + "[stream-decoder][consume-bytes]"); +} + +/** + * garrow_stream_decoder_consume_buffer: + * @decoder: A #GArrowStreamDecoder. + * @buffer: A #GArrowBuffer to be decoded. + * @error: (nullable): Return location for a #GError or %NULL. + * + * Feed data to the decoder as a #GArrowBuffer. + * + * If the decoder can read one or more record batches by the data, the + * decoder calls [vfunc@GArrowStreamListener.on_record_batch_decoded] + * with a decoded record batch multiple times. + * + * Returns: %TRUE on success, %FALSE if there was an error. + * + * Since: 18.0.0 + */ +gboolean +garrow_stream_decoder_consume_buffer(GArrowStreamDecoder *decoder, + GArrowBuffer *buffer, + GError **error) +{ + auto arrow_decoder = garrow_stream_decoder_get_raw(decoder); + auto arrow_buffer = garrow_buffer_get_raw(buffer); + return garrow::check(error, + arrow_decoder->Consume(arrow_buffer), + "[stream-decoder][consume-buffer]"); +} + +/** + * garrow_stream_decoder_reset: + * @decoder: A #GArrowStreamDecoder. + * @error: (nullable): Return location for a #GError or %NULL. + * + * Reset the internal status. + * + * You can reuse this decoder for new stream after calling this. + * + * Returns: %TRUE on success, %FALSE if there was an error. + * + * Since: 18.0.0 + */ +gboolean +garrow_stream_decoder_reset(GArrowStreamDecoder *decoder, GError **error) +{ + auto arrow_decoder = garrow_stream_decoder_get_raw(decoder); + return garrow::check(error, arrow_decoder->Reset(), "[stream-decoder][reset]"); +} + +/** + * garrow_stream_decoder_get_schema: + * @decoder: A #GArrowStreamDecoder. + * + * Returns: (nullable) (transfer full): The shared #GArrowSchema of + * the record batches in the stream. + * + * Since: 18.0.0 + */ +GArrowSchema * +garrow_stream_decoder_get_schema(GArrowStreamDecoder *decoder) +{ + auto arrow_decoder = garrow_stream_decoder_get_raw(decoder); + auto arrow_schema = arrow_decoder->schema(); + if (arrow_schema) { + return garrow_schema_new_raw(&arrow_schema); + } else { + return nullptr; + } +} + +/** + * garrow_stream_decoder_get_next_required_size: + * @decoder: A #GArrowStreamDecoder. + * + * This method is provided for users who want to optimize performance. + * Normal users don't need to use this method. + * + * Here is an example usage for normal users: + * + * garrow_stream_decoder_consume_buffer(decoder, buffer1); + * garrow_stream_decoder_consume_buffer(decoder, buffer2); + * garrow_stream_decoder_consume_buffer(decoder, buffer3); + * + * Decoder has internal buffer. If consumed data isn't enough to + * advance the state of the decoder, consumed data is buffered to + * the internal buffer. It causes performance overhead. + * + * If you pass garrow_stream_decoer_get_next_required_size() size data + * to each + * garrow_stream_decoder_consume_bytes()/garrow_stream_decoder_consume_buffer() + * call, the decoder doesn't use its internal buffer. It improves + * performance. + * + * Here is an example usage to avoid using internal buffer: + * + * buffer1 = get_data(garrow_stream_decoder_get_next_required_size(decoder)); + * garrow_stream_decoder_consume_buffer(buffer1); + * buffer2 = get_data(garrow_stream_decoder_get_next_required_size(decoder)); + * garrow_stream_decoder_consume_buffer(buffer2); + * + * Users can use this method to avoid creating small chunks. Record + * batch data must be contiguous data. If users pass small chunks to + * the decoder, the decoder needs concatenate small chunks + * internally. It causes performance overhead. + * + * Here is an example usage to reduce small chunks: + * + * GArrowResizablBuffer *buffer = garrow_resizable_buffer_new(1024, NULL); + * while ((small_chunk = get_data(&small_chunk_size))) { + * size_t current_buffer_size = garrow_buffer_get_size(GARROW_BUFFER(buffer)); + * garrow_resizable_buffer_resize(buffer, current_buffer_size + small_chunk_size, +NULL); + * garrow_mutable_buffer_set_data(GARROW_MUTABLE_BUFFER(buffer), + * current_buffer_size, + * small_chunk, + * small_chunk_size, + * NULL); + * if (garrow_buffer_get_size(GARROW_BUFFER(buffer)) < + * garrow_stream_decoder_get_next_required_size(decoder)) { + * continue; + * } + * garrow_stream_decoder_consume_buffer(decoder, GARROW_BUFFER(buffer), NULL); + * g_object_unref(buffer); + * buffer = garrow_resizable_buffer_new(1024, NULL); + * } + * if (garrow_buffer_get_size(GARROW_BUFFER(buffer)) > 0) { + * garrow_stream_decoder_consume_buffer(decoder, GARROW_BUFFER(buffer), NULL); + * } + * g_object_unref(buffer); + * + * Returns: The number of bytes needed to advance the state of + * the decoder. + * + * Since: 18.0.0 + */ +gsize +garrow_stream_decoder_get_next_required_size(GArrowStreamDecoder *decoder) +{ + auto arrow_decoder = garrow_stream_decoder_get_raw(decoder); + return arrow_decoder->next_required_size(); +} + +G_END_DECLS + +std::shared_ptr +garrow_stream_listener_get_raw(GArrowStreamListener *listener) +{ + auto priv = GARROW_STREAM_LISTENER_GET_PRIVATE(listener); + return priv->listener; +} + +GArrowStreamDecoder * +garrow_stream_decoder_new_raw(std::shared_ptr *arrow_decoder, + GArrowStreamListener *listener) +{ + return GARROW_STREAM_DECODER(g_object_new(GARROW_TYPE_STREAM_DECODER, + "decoder", + arrow_decoder, + "listener", + listener, + nullptr)); +} + +std::shared_ptr +garrow_stream_decoder_get_raw(GArrowStreamDecoder *decoder) +{ + auto priv = GARROW_STREAM_DECODER_GET_PRIVATE(decoder); + return priv->decoder; +} diff --git a/c_glib/arrow-glib/decoder.h b/c_glib/arrow-glib/decoder.h new file mode 100644 index 0000000000000..2ac0efbabfc7b --- /dev/null +++ b/c_glib/arrow-glib/decoder.h @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +G_BEGIN_DECLS + +#define GARROW_TYPE_STREAM_LISTENER (garrow_stream_listener_get_type()) +GARROW_AVAILABLE_IN_18_0 +G_DECLARE_DERIVABLE_TYPE( + GArrowStreamListener, garrow_stream_listener, GARROW, STREAM_LISTENER, GObject) +struct _GArrowStreamListenerClass +{ + GObjectClass parent_class; + + gboolean (*on_eos)(GArrowStreamListener *listener, GError **error); + gboolean (*on_record_batch_decoded)(GArrowStreamListener *listener, + GArrowRecordBatch *record_batch, + GHashTable *metadata, + GError **error); + gboolean (*on_schema_decoded)(GArrowStreamListener *listener, + GArrowSchema *schema, + GArrowSchema *filtered_schema, + GError **error); +}; + +GARROW_AVAILABLE_IN_18_0 +gboolean +garrow_stream_listener_on_eos(GArrowStreamListener *listener, GError **error); + +GARROW_AVAILABLE_IN_18_0 +gboolean +garrow_stream_listener_on_record_batch_decoded(GArrowStreamListener *listener, + GArrowRecordBatch *record_batch, + GHashTable *metadata, + GError **error); + +GARROW_AVAILABLE_IN_18_0 +gboolean +garrow_stream_listener_on_schema_decoded(GArrowStreamListener *listener, + GArrowSchema *schema, + GArrowSchema *filtered_schema, + GError **error); + +#define GARROW_TYPE_STREAM_DECODER (garrow_stream_decoder_get_type()) +GARROW_AVAILABLE_IN_18_0 +G_DECLARE_DERIVABLE_TYPE( + GArrowStreamDecoder, garrow_stream_decoder, GARROW, STREAM_DECODER, GObject) +struct _GArrowStreamDecoderClass +{ + GObjectClass parent_class; +}; + +GARROW_AVAILABLE_IN_18_0 +GArrowStreamDecoder * +garrow_stream_decoder_new(GArrowStreamListener *listener, GArrowReadOptions *options); +GARROW_AVAILABLE_IN_18_0 +gboolean +garrow_stream_decoder_consume_bytes(GArrowStreamDecoder *decoder, + GBytes *bytes, + GError **error); +GARROW_AVAILABLE_IN_18_0 +gboolean +garrow_stream_decoder_consume_buffer(GArrowStreamDecoder *decoder, + GArrowBuffer *buffer, + GError **error); +GARROW_AVAILABLE_IN_18_0 +gboolean +garrow_stream_decoder_reset(GArrowStreamDecoder *decoder, GError **error); +GARROW_AVAILABLE_IN_18_0 +GArrowSchema * +garrow_stream_decoder_get_schema(GArrowStreamDecoder *decoder); +GARROW_AVAILABLE_IN_18_0 +gsize +garrow_stream_decoder_get_next_required_size(GArrowStreamDecoder *decoder); + +G_END_DECLS diff --git a/c_glib/arrow-glib/decoder.hpp b/c_glib/arrow-glib/decoder.hpp new file mode 100644 index 0000000000000..24b329867c685 --- /dev/null +++ b/c_glib/arrow-glib/decoder.hpp @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include + +GARROW_EXTERN +std::shared_ptr +garrow_stream_listener_get_raw(GArrowStreamListener *listener); + +GARROW_EXTERN +GArrowStreamDecoder * +garrow_stream_decoder_new_raw(std::shared_ptr *arrow_decoder, + GArrowStreamListener *listener); + +GARROW_EXTERN +std::shared_ptr +garrow_stream_decoder_get_raw(GArrowStreamDecoder *decoder); diff --git a/c_glib/arrow-glib/internal-hash-table.hpp b/c_glib/arrow-glib/internal-hash-table.hpp index 27ec060994c98..2e0a72561a7d8 100644 --- a/c_glib/arrow-glib/internal-hash-table.hpp +++ b/c_glib/arrow-glib/internal-hash-table.hpp @@ -37,3 +37,21 @@ garrow_internal_hash_table_to_metadata(GHashTable *metadata) &arrow_metadata); return arrow_metadata; } + +static inline GHashTable * +garrow_internal_hash_table_from_metadata( + const std::shared_ptr &arrow_metadata) +{ + auto metadata = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, g_free); + const auto &keys = arrow_metadata->keys(); + const auto &values = arrow_metadata->values(); + auto n = arrow_metadata->size(); + for (int64_t i = 0; i < n; ++i) { + const auto &key = keys[i]; + const auto &value = values[i]; + g_hash_table_insert(metadata, + g_strndup(key.data(), key.size()), + g_strndup(value.data(), value.size())); + } + return metadata; +} diff --git a/c_glib/arrow-glib/meson.build b/c_glib/arrow-glib/meson.build index 36a8274513ed2..854988e348986 100644 --- a/c_glib/arrow-glib/meson.build +++ b/c_glib/arrow-glib/meson.build @@ -28,6 +28,7 @@ sources = files( 'composite-data-type.cpp', 'datum.cpp', 'decimal.cpp', + 'decoder.cpp', 'error.cpp', 'expression.cpp', 'field.cpp', @@ -91,6 +92,7 @@ c_headers = files( 'data-type.h', 'datum.h', 'decimal.h', + 'decoder.h', 'error.h', 'expression.h', 'field.h', @@ -153,6 +155,7 @@ cpp_headers = files( 'data-type.hpp', 'datum.hpp', 'decimal.hpp', + 'decoder.hpp', 'error.hpp', 'expression.hpp', 'field.hpp', diff --git a/c_glib/arrow-glib/reader.cpp b/c_glib/arrow-glib/reader.cpp index 8a1c3722d4a0f..9fe9d9d1b3199 100644 --- a/c_glib/arrow-glib/reader.cpp +++ b/c_glib/arrow-glib/reader.cpp @@ -668,10 +668,10 @@ garrow_record_batch_file_reader_read_record_batch(GArrowRecordBatchFileReader *r } } -typedef struct GArrowFeatherFileReaderPrivate_ +struct GArrowFeatherFileReaderPrivate { std::shared_ptr feather_reader; -} GArrowFeatherFileReaderPrivate; +}; enum { PROP_FEATHER_READER = 1, @@ -714,22 +714,11 @@ garrow_feather_file_reader_set_property(GObject *object, } } -static void -garrow_feather_file_reader_get_property(GObject *object, - guint prop_id, - GValue *value, - GParamSpec *pspec) -{ - switch (prop_id) { - default: - G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); - break; - } -} - static void garrow_feather_file_reader_init(GArrowFeatherFileReader *object) { + auto priv = GARROW_FEATHER_FILE_READER_GET_PRIVATE(object); + new (&priv->feather_reader) std::shared_ptr; } static void @@ -739,7 +728,6 @@ garrow_feather_file_reader_class_init(GArrowFeatherFileReaderClass *klass) gobject_class->finalize = garrow_feather_file_reader_finalize; gobject_class->set_property = garrow_feather_file_reader_set_property; - gobject_class->get_property = garrow_feather_file_reader_get_property; GParamSpec *spec; spec = g_param_spec_pointer( diff --git a/c_glib/test/test-stream-decoder.rb b/c_glib/test/test-stream-decoder.rb new file mode 100644 index 0000000000000..108e687e3aa6b --- /dev/null +++ b/c_glib/test/test-stream-decoder.rb @@ -0,0 +1,126 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +class TestStreamDecoder < Test::Unit::TestCase + include Helper::Buildable + + class Listener < Arrow::StreamListener + type_register + + attr_reader :events + def initialize + super + @events = [] + end + + private + def virtual_do_on_eos + @events << [:eos] + true + end + + def virtual_do_on_record_batch_decoded(record_batch, metadata) + @events << [:record_batch_decoded, record_batch, metadata] + true + end + + def virtual_do_on_schema_decoded(schema, filtered_schema) + @events << [:schema_decoded, schema, filtered_schema] + true + end + end + + def setup + columns = { + "enabled": build_boolean_array([true, false, nil, true]), + } + @record_batch = build_record_batch(columns) + @schema = @record_batch.schema + + @buffer = Arrow::ResizableBuffer.new(0) + output = Arrow::BufferOutputStream.new(@buffer) + stream_writer = Arrow::RecordBatchStreamWriter.new(output, @schema) + stream_writer.write_record_batch(@record_batch) + stream_writer.close + output.close + + @listener = Listener.new + @decoder = Arrow::StreamDecoder.new(@listener) + end + + def test_listener + assert_equal(@listener, @decoder.listener) + end + + def test_consume_bytes + @buffer.data.to_s.each_byte do |byte| + @decoder.consume_bytes(GLib::Bytes.new(byte.chr)) + end + assert_equal([ + [:schema_decoded, @schema, @schema], + [:record_batch_decoded, @record_batch, nil], + [:eos], + ], + @listener.events) + end + + def test_consume_buffer + @buffer.data.to_s.each_byte do |byte| + @decoder.consume_buffer(Arrow::Buffer.new(byte.chr)) + end + assert_equal([ + [:schema_decoded, @schema, @schema], + [:record_batch_decoded, @record_batch, nil], + [:eos], + ], + @listener.events) + end + + def test_reset + @decoder.consume_bytes(@buffer.data.to_s[0, 10]) + @decoder.reset + @decoder.consume_bytes(@buffer.data) + assert_equal([ + [:schema_decoded, @schema, @schema], + [:record_batch_decoded, @record_batch, nil], + [:eos], + ], + @listener.events) + end + + def test_schema + assert_nil(@decoder.schema) + @decoder.consume_bytes(@buffer.data) + assert_equal(@schema, @decoder.schema) + end + + def test_next_required_size + data = @buffer.data.to_s + loop do + next_required_size = @decoder.next_required_size + break if next_required_size.zero? + @decoder.consume_bytes(data[0, next_required_size]) + data = data[next_required_size..-1] + end + assert_equal([ + [:schema_decoded, @schema, @schema], + [:record_batch_decoded, @record_batch, nil], + [:eos], + ], + @listener.events) + end +end diff --git a/ruby/red-arrow/lib/arrow/loader.rb b/ruby/red-arrow/lib/arrow/loader.rb index bd0d03930885c..5468b0c78cc99 100644 --- a/ruby/red-arrow/lib/arrow/loader.rb +++ b/ruby/red-arrow/lib/arrow/loader.rb @@ -116,6 +116,8 @@ def require_libraries require "arrow/sparse-union-data-type" require "arrow/string-dictionary-array-builder" require "arrow/string-array-builder" + require "arrow/stream-decoder" + require "arrow/stream-listener" require "arrow/struct-array" require "arrow/struct-array-builder" require "arrow/struct-data-type" @@ -168,6 +170,16 @@ def gc_guard end end + def rubyish_class_name(info) + name = info.name + case name + when "StreamListener" + "StreamListenerRaw" + else + super + end + end + def load_object_info(info) super diff --git a/ruby/red-arrow/lib/arrow/stream-decoder.rb b/ruby/red-arrow/lib/arrow/stream-decoder.rb new file mode 100644 index 0000000000000..a6945215bc646 --- /dev/null +++ b/ruby/red-arrow/lib/arrow/stream-decoder.rb @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +module Arrow + class StreamDecoder + def consume(data) + case data + when Buffer + consume_buffer(data) + else + consume_bytes(data) + end + end + end +end diff --git a/ruby/red-arrow/lib/arrow/stream-listener.rb b/ruby/red-arrow/lib/arrow/stream-listener.rb new file mode 100644 index 0000000000000..14a70385842e4 --- /dev/null +++ b/ruby/red-arrow/lib/arrow/stream-listener.rb @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +module Arrow + class StreamListener < StreamListenerRaw + type_register + + def on_eos + end + + def on_record_batch_decoded(record_batch, metadata) + end + + def on_schema(schema, filtered_schema) + end + + private + def virtual_do_on_eos + on_eos + true + end + + def virtual_do_on_record_batch_decoded(record_batch, metadata) + on_record_batch_decoded(record_batch, metadata) + true + end + + def virtual_do_on_schema_decoded(schema, filtered_schema) + on_schema_decoded(schema, filtered_schema) + true + end + end +end diff --git a/ruby/red-arrow/red-arrow.gemspec b/ruby/red-arrow/red-arrow.gemspec index 9e9c147f76507..67fec2e0907c1 100644 --- a/ruby/red-arrow/red-arrow.gemspec +++ b/ruby/red-arrow/red-arrow.gemspec @@ -49,7 +49,7 @@ Gem::Specification.new do |spec| spec.add_runtime_dependency("bigdecimal", ">= 3.1.0") spec.add_runtime_dependency("csv") spec.add_runtime_dependency("extpp", ">= 0.1.1") - spec.add_runtime_dependency("gio2", ">= 3.5.0") + spec.add_runtime_dependency("gio2", ">= 4.2.3") spec.add_runtime_dependency("native-package-installer") spec.add_runtime_dependency("pkg-config") diff --git a/ruby/red-arrow/test/test-stream-listener.rb b/ruby/red-arrow/test/test-stream-listener.rb new file mode 100644 index 0000000000000..0aed9cb1e2613 --- /dev/null +++ b/ruby/red-arrow/test/test-stream-listener.rb @@ -0,0 +1,60 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +class TestStreamListener < Test::Unit::TestCase + class Listener < Arrow::StreamListener + attr_reader :events + def initialize + super + @events = [] + end + + def on_eos + @events << [:eos] + end + + def on_record_batch_decoded(record_batch, metadata) + @events << [:record_batch_decoded, record_batch, metadata] + end + + def on_schema_decoded(schema, filtered_schema) + @events << [:schema_decoded, schema, filtered_schema] + end + end + + def setup + @record_batch = Arrow::RecordBatch.new(enabled: [true, false, nil, true]) + @schema = @record_batch.schema + + @buffer = Arrow::ResizableBuffer.new(0) + table = Arrow::Table.new(@schema, [@record_batch]) + table.save(@buffer, format: :stream) + + @listener = Listener.new + @decoder = Arrow::StreamDecoder.new(@listener) + end + + def test_consume + @decoder.consume(@buffer) + assert_equal([ + [:schema_decoded, @schema, @schema], + [:record_batch_decoded, @record_batch, nil], + [:eos], + ], + @listener.events) + end +end