diff --git a/matlab/src/cpp/arrow/matlab/error/error.h b/matlab/src/cpp/arrow/matlab/error/error.h index e5a5df6f4bcb6..425e089d9f2f9 100644 --- a/matlab/src/cpp/arrow/matlab/error/error.h +++ b/matlab/src/cpp/arrow/matlab/error/error.h @@ -249,5 +249,7 @@ static const char* IPC_RECORD_BATCH_READER_OPEN_FAILED = "arrow:io:ipc:FailedToOpenRecordBatchReader"; static const char* IPC_RECORD_BATCH_READ_INVALID_INDEX = "arrow:io:ipc:InvalidIndex"; static const char* IPC_RECORD_BATCH_READ_FAILED = "arrow:io:ipc:ReadFailed"; +static const char* IPC_TABLE_READ_FAILED = "arrow:io:ipc:TableReadFailed"; +static const char* IPC_END_OF_STREAM = "arrow:io:ipc:EndOfStream"; } // namespace arrow::matlab::error diff --git a/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc new file mode 100644 index 0000000000000..f3c833484d38e --- /dev/null +++ b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc @@ -0,0 +1,154 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/matlab/io/ipc/proxy/record_batch_stream_reader.h" +#include "arrow/io/file.h" +#include "arrow/matlab/error/error.h" +#include "arrow/matlab/tabular/proxy/record_batch.h" +#include "arrow/matlab/tabular/proxy/schema.h" +#include "arrow/matlab/tabular/proxy/table.h" +#include "arrow/util/utf8.h" + +#include "libmexclass/proxy/ProxyManager.h" + +namespace arrow::matlab::io::ipc::proxy { + +RecordBatchStreamReader::RecordBatchStreamReader( + const std::shared_ptr reader) + : reader{std::move(reader)} { + REGISTER_METHOD(RecordBatchStreamReader, getSchema); + REGISTER_METHOD(RecordBatchStreamReader, readRecordBatch); + REGISTER_METHOD(RecordBatchStreamReader, hasNextRecordBatch); + REGISTER_METHOD(RecordBatchStreamReader, readTable); +} + +libmexclass::proxy::MakeResult RecordBatchStreamReader::make( + const libmexclass::proxy::FunctionArguments& constructor_arguments) { + namespace mda = ::matlab::data; + using RecordBatchStreamReaderProxy = + arrow::matlab::io::ipc::proxy::RecordBatchStreamReader; + + const mda::StructArray opts = constructor_arguments[0]; + + const mda::StringArray filename_mda = opts[0]["Filename"]; + const auto filename_utf16 = std::u16string(filename_mda[0]); + MATLAB_ASSIGN_OR_ERROR(const auto filename_utf8, + arrow::util::UTF16StringToUTF8(filename_utf16), + error::UNICODE_CONVERSION_ERROR_ID); + + MATLAB_ASSIGN_OR_ERROR(auto input_stream, arrow::io::ReadableFile::Open(filename_utf8), + error::FAILED_TO_OPEN_FILE_FOR_READ); + + MATLAB_ASSIGN_OR_ERROR(auto reader, + arrow::ipc::RecordBatchStreamReader::Open(input_stream), + error::IPC_RECORD_BATCH_READER_OPEN_FAILED); + + return std::make_shared(std::move(reader)); +} + +void RecordBatchStreamReader::getSchema(libmexclass::proxy::method::Context& context) { + namespace mda = ::matlab::data; + using SchemaProxy = arrow::matlab::tabular::proxy::Schema; + + auto schema = reader->schema(); + + auto schema_proxy = std::make_shared(std::move(schema)); + const auto schema_proxy_id = + libmexclass::proxy::ProxyManager::manageProxy(schema_proxy); + + mda::ArrayFactory factory; + const auto schema_proxy_id_mda = factory.createScalar(schema_proxy_id); + context.outputs[0] = schema_proxy_id_mda; +} + +void RecordBatchStreamReader::readTable(libmexclass::proxy::method::Context& context) { + namespace mda = ::matlab::data; + using TableProxy = arrow::matlab::tabular::proxy::Table; + + MATLAB_ASSIGN_OR_ERROR_WITH_CONTEXT(auto table, reader->ToTable(), context, + error::IPC_TABLE_READ_FAILED); + auto table_proxy = std::make_shared(table); + const auto table_proxy_id = libmexclass::proxy::ProxyManager::manageProxy(table_proxy); + + mda::ArrayFactory factory; + const auto table_proxy_id_mda = factory.createScalar(table_proxy_id); + context.outputs[0] = table_proxy_id_mda; +} + +void RecordBatchStreamReader::readRecordBatch( + libmexclass::proxy::method::Context& context) { + namespace mda = ::matlab::data; + using RecordBatchProxy = arrow::matlab::tabular::proxy::RecordBatch; + using namespace libmexclass::error; + // If we don't have a "pre-cached" record batch to return, then try reading another + // record batch from the IPC Stream. If there are no more record batches in the stream, + // then error. + if (!nextRecordBatch) { + MATLAB_ASSIGN_OR_ERROR_WITH_CONTEXT(nextRecordBatch, reader->Next(), context, + error::IPC_RECORD_BATCH_READ_FAILED); + } + // Even if the read was "successful", the resulting record batch may be empty, + // signaling the end of the stream. + if (!nextRecordBatch) { + context.error = + Error{error::IPC_END_OF_STREAM, + "Reached end of Arrow IPC Stream. No more record batches to read."}; + return; + } + auto record_batch_proxy = std::make_shared(nextRecordBatch); + const auto record_batch_proxy_id = + libmexclass::proxy::ProxyManager::manageProxy(record_batch_proxy); + // Once we have "consumed" the next RecordBatch, set nextRecordBatch to nullptr + // so that the next call to hasNextRecordBatch correctly checks whether there are more + // record batches remaining in the IPC Stream. + nextRecordBatch = nullptr; + mda::ArrayFactory factory; + const auto record_batch_proxy_id_mda = factory.createScalar(record_batch_proxy_id); + context.outputs[0] = record_batch_proxy_id_mda; +} + +void RecordBatchStreamReader::hasNextRecordBatch( + libmexclass::proxy::method::Context& context) { + namespace mda = ::matlab::data; + bool has_next_record_batch = true; + if (!nextRecordBatch) { + // Try to read another RecordBatch from the + // IPC Stream. + auto maybe_record_batch = reader->Next(); + if (!maybe_record_batch.ok()) { + has_next_record_batch = false; + } else { + // If we read a RecordBatch successfully, + // then "cache" the RecordBatch + // so that we can return it on the next + // call to readRecordBatch. + nextRecordBatch = *maybe_record_batch; + + // Even if the read was "successful", the resulting + // record batch may be empty, signaling that + // the end of the IPC stream has been reached. + if (!nextRecordBatch) { + has_next_record_batch = false; + } + } + } + + mda::ArrayFactory factory; + context.outputs[0] = factory.createScalar(has_next_record_batch); +} + +} // namespace arrow::matlab::io::ipc::proxy diff --git a/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.h b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.h new file mode 100644 index 0000000000000..56fb293987825 --- /dev/null +++ b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.h @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/ipc/reader.h" +#include "libmexclass/proxy/Proxy.h" + +namespace arrow::matlab::io::ipc::proxy { + +class RecordBatchStreamReader : public libmexclass::proxy::Proxy { + public: + RecordBatchStreamReader(std::shared_ptr reader); + + ~RecordBatchStreamReader() = default; + + static libmexclass::proxy::MakeResult make( + const libmexclass::proxy::FunctionArguments& constructor_arguments); + + protected: + std::shared_ptr reader; + std::shared_ptr nextRecordBatch; + + void getSchema(libmexclass::proxy::method::Context& context); + void readRecordBatch(libmexclass::proxy::method::Context& context); + void hasNextRecordBatch(libmexclass::proxy::method::Context& context); + void readTable(libmexclass::proxy::method::Context& context); +}; + +} // namespace arrow::matlab::io::ipc::proxy diff --git a/matlab/src/cpp/arrow/matlab/proxy/factory.cc b/matlab/src/cpp/arrow/matlab/proxy/factory.cc index a08a7495c00c9..902546fd052f8 100644 --- a/matlab/src/cpp/arrow/matlab/proxy/factory.cc +++ b/matlab/src/cpp/arrow/matlab/proxy/factory.cc @@ -36,6 +36,7 @@ #include "arrow/matlab/io/feather/proxy/writer.h" #include "arrow/matlab/io/ipc/proxy/record_batch_file_reader.h" #include "arrow/matlab/io/ipc/proxy/record_batch_file_writer.h" +#include "arrow/matlab/io/ipc/proxy/record_batch_stream_reader.h" #include "arrow/matlab/io/ipc/proxy/record_batch_stream_writer.h" #include "arrow/matlab/tabular/proxy/record_batch.h" #include "arrow/matlab/tabular/proxy/schema.h" @@ -113,6 +114,7 @@ libmexclass::proxy::MakeResult Factory::make_proxy( REGISTER_PROXY(arrow.io.ipc.proxy.RecordBatchFileReader , arrow::matlab::io::ipc::proxy::RecordBatchFileReader); REGISTER_PROXY(arrow.io.ipc.proxy.RecordBatchFileWriter , arrow::matlab::io::ipc::proxy::RecordBatchFileWriter); REGISTER_PROXY(arrow.io.ipc.proxy.RecordBatchStreamWriter , arrow::matlab::io::ipc::proxy::RecordBatchStreamWriter); + REGISTER_PROXY(arrow.io.ipc.proxy.RecordBatchStreamReader , arrow::matlab::io::ipc::proxy::RecordBatchStreamReader); // clang-format on diff --git a/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m b/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m new file mode 100644 index 0000000000000..60ca38eba9ad5 --- /dev/null +++ b/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m @@ -0,0 +1,83 @@ +%RECORDBATCHSTREAMREADER Class for reading Arrow record batches from the +% Arrow IPC Stream format. + +% Licensed to the Apache Software Foundation (ASF) under one or more +% contributor license agreements. See the NOTICE file distributed with +% this work for additional information regarding copyright ownership. +% The ASF licenses this file to you under the Apache License, Version +% 2.0 (the "License"); you may not use this file except in compliance +% with the License. You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +% implied. See the License for the specific language governing +% permissions and limitations under the License. + +classdef RecordBatchStreamReader < matlab.mixin.Scalar + + properties(SetAccess=private, GetAccess=public, Hidden) + Proxy + end + + properties (Dependent, SetAccess=private, GetAccess=public) + Schema + end + + methods + function obj = RecordBatchStreamReader(filename) + arguments + filename(1, 1) string {mustBeNonzeroLengthText} + end + args = struct(Filename=filename); + proxyName = "arrow.io.ipc.proxy.RecordBatchStreamReader"; + obj.Proxy = arrow.internal.proxy.create(proxyName, args); + end + + function schema = get.Schema(obj) + proxyID = obj.Proxy.getSchema(); + proxyName = "arrow.tabular.proxy.Schema"; + proxy = libmexclass.proxy.Proxy(ID=proxyID, Name=proxyName); + schema = arrow.tabular.Schema(proxy); + end + + function tf = hasnext(obj) + tf = obj.Proxy.hasNextRecordBatch(); + end + + function tf = done(obj) + tf = ~obj.Proxy.hasNextRecordBatch(); + end + + function arrowRecordBatch = read(obj) + % NOTE: This function is a "convenience alias" for the readRecordBatch + % method, which has a longer name. This is the exact same implementation + % as readRecordBatch. Since this method might be called in a tight loop, + % it should be slightly more efficient to call the C++ code directly, + % rather than invoking obj.readRecordBatch indirectly. We are intentionally + % trading off code duplication for performance here. + proxyID = obj.Proxy.readRecordBatch(); + proxyName = "arrow.tabular.proxy.RecordBatch"; + proxy = libmexclass.proxy.Proxy(ID=proxyID, Name=proxyName); + arrowRecordBatch = arrow.tabular.RecordBatch(proxy); + end + + function arrowRecordBatch = readRecordBatch(obj) + proxyID = obj.Proxy.readRecordBatch(); + proxyName = "arrow.tabular.proxy.RecordBatch"; + proxy = libmexclass.proxy.Proxy(ID=proxyID, Name=proxyName); + arrowRecordBatch = arrow.tabular.RecordBatch(proxy); + end + + function arrowTable = readTable(obj) + proxyID = obj.Proxy.readTable(); + proxyName = "arrow.tabular.proxy.Table"; + proxy = libmexclass.proxy.Proxy(ID=proxyID, Name=proxyName); + arrowTable = arrow.tabular.Table(proxy); + end + + end + +end diff --git a/matlab/test/arrow/io/ipc/tRecordBatchStreamReader.m b/matlab/test/arrow/io/ipc/tRecordBatchStreamReader.m new file mode 100644 index 0000000000000..6ca67197739ae --- /dev/null +++ b/matlab/test/arrow/io/ipc/tRecordBatchStreamReader.m @@ -0,0 +1,336 @@ +%TRECORDBATCHSTREAMREADER Unit tests for arrow.io.ipc.RecordBatchStreamReader. + +% Licensed to the Apache Software Foundation (ASF) under one or more +% contributor license agreements. See the NOTICE file distributed with +% this work for additional information regarding copyright ownership. +% The ASF licenses this file to you under the Apache License, Version +% 2.0 (the "License"); you may not use this file except in compliance +% with the License. You may obtain a copy of the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, +% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +% implied. See the License for the specific language governing +% permissions and limitations under the License. +classdef tRecordBatchStreamReader < matlab.unittest.TestCase + + properties + DataFolder + ZeroBatchStreamFile + OneBatchStreamFile + MultipleBatchStreamFile + RandomAccessFile + end + + properties (TestParameter) + RecordBatchReadFcn = {@read, @readRecordBatch} + end + + methods(TestClassSetup) + + function setupDataFolder(testCase) + import matlab.unittest.fixtures.TemporaryFolderFixture + fixture = testCase.applyFixture(TemporaryFolderFixture); + testCase.DataFolder = string(fixture.Folder); + end + + function setupRandomAccessFile(testCase) + fieldA = arrow.field("A", arrow.string()); + fieldB = arrow.field("B", arrow.float32()); + schema = arrow.schema([fieldA, fieldB]); + fname = fullfile(testCase.DataFolder, "RandomAccessFile.arrow"); + writer = arrow.io.ipc.RecordBatchFileWriter(fname, schema); + writer.close(); + testCase.RandomAccessFile = fname; + end + + function setupZeroBatchStreamFile(testCase) + fieldA = arrow.field("A", arrow.string()); + fieldB = arrow.field("B", arrow.float32()); + schema = arrow.schema([fieldA, fieldB]); + fname = fullfile(testCase.DataFolder, "ZeroBatchStreamFile.arrows"); + writer = arrow.io.ipc.RecordBatchStreamWriter(fname, schema); + writer.close(); + testCase.ZeroBatchStreamFile = fname; + end + + function setupOneBatchStreamFile(testCase) + t = table(["Row1"; "Row2"], single([1; 2]), VariableNames=["A", "B"]); + recordBatch = arrow.recordBatch(t); + fname = fullfile(testCase.DataFolder, "OneBatchFile.arrows"); + writer = arrow.io.ipc.RecordBatchStreamWriter(fname, recordBatch.Schema); + writer.writeRecordBatch(recordBatch); + writer.close(); + testCase.OneBatchStreamFile = fname; + end + + function setupMultipleBatchStreamFile(testCase) + t1 = table(["Row1"; "Row2"], single([1; 2]), VariableNames=["A", "B"]); + t2 = table(["Row3"; "Row4"], single([3; 4]), VariableNames=["A", "B"]); + recordBatch1 = arrow.recordBatch(t1); + recordBatch2 = arrow.recordBatch(t2); + fname = fullfile(testCase.DataFolder, "MultipleBatchStreamFile.arrows"); + writer = arrow.io.ipc.RecordBatchStreamWriter(fname, recordBatch1.Schema); + writer.writeRecordBatch(recordBatch1); + writer.writeRecordBatch(recordBatch2); + writer.close(); + testCase.MultipleBatchStreamFile = fname; + end + end + + methods (Test) + + function ZeroLengthFilenameError(testCase) + % Verify RecordBatchStreamReader throws an exception with the + % identifier MATLAB:validators:mustBeNonzeroLengthText if the + % filename input argument given is a zero length string. + fcn = @() arrow.io.ipc.RecordBatchStreamReader(""); + testCase.verifyError(fcn, "MATLAB:validators:mustBeNonzeroLengthText"); + end + + function MissingStringFilenameError(testCase) + % Verify RecordBatchStreamReader throws an exception with the + % identifier MATLAB:validators:mustBeNonzeroLengthText if the + % filename input argument given is a missing string. + fcn = @() arrow.io.ipc.RecordBatchStreamReader(string(missing)); + testCase.verifyError(fcn, "MATLAB:validators:mustBeNonzeroLengthText"); + end + + function FilenameInvalidTypeError(testCase) + % Verify RecordBatchStreamReader throws an exception with the + % identifier MATLAB:validators:UnableToConvert if the filename + % input argument is neither a scalar string nor a char vector. + fcn = @() arrow.io.ipc.RecordBatchStreamReader(table); + testCase.verifyError(fcn, "MATLAB:validation:UnableToConvert"); + end + + function Schema(testCase) + % Verify the getter method for Schema returns the + % expected value. + fieldA = arrow.field("A", arrow.string()); + fieldB = arrow.field("B", arrow.float32()); + expectedSchema = arrow.schema([fieldA fieldB]); + + reader = arrow.io.ipc.RecordBatchStreamReader(testCase.ZeroBatchStreamFile); + testCase.verifyEqual(reader.Schema, expectedSchema); + + reader = arrow.io.ipc.RecordBatchStreamReader(testCase.OneBatchStreamFile); + testCase.verifyEqual(reader.Schema, expectedSchema); + + reader = arrow.io.ipc.RecordBatchStreamReader(testCase.MultipleBatchStreamFile); + testCase.verifyEqual(reader.Schema, expectedSchema); + end + + function SchemaNoSetter(testCase) + % Verify the Schema property is not settable. + fieldC = arrow.field("C", arrow.date32()); + schema = arrow.schema(fieldC); + reader = arrow.io.ipc.RecordBatchStreamReader(testCase.ZeroBatchStreamFile); + testCase.verifyError(@() setfield(reader, "Schema", schema), "MATLAB:class:SetProhibited"); + end + + function ReadErrorIfEndOfStream(testCase, RecordBatchReadFcn) + % Verify read throws an execption with the identifier arrow:io:ipc:EndOfStream + % on an Arrow IPC Stream file containing zero batches. + reader = arrow.io.ipc.RecordBatchStreamReader(testCase.ZeroBatchStreamFile); + fcn = @() RecordBatchReadFcn(reader); + testCase.verifyError(fcn, "arrow:io:ipc:EndOfStream"); + end + + function ReadOneBatchStreamFile(testCase, RecordBatchReadFcn) + % Verify read can successfully read an Arrow IPC Stream file + % containing one batch. + reader = arrow.io.ipc.RecordBatchStreamReader(testCase.OneBatchStreamFile); + + expectedMatlabTable = table(["Row1"; "Row2"], single([1; 2]), VariableNames=["A", "B"]); + expected = arrow.recordBatch(expectedMatlabTable); + actual = RecordBatchReadFcn(reader); + testCase.verifyEqual(actual, expected); + + fcn = @() RecordBatchReadFcn(reader); + testCase.verifyError(fcn, "arrow:io:ipc:EndOfStream"); + end + + function ReadMultipleBatchStreamFile(testCase, RecordBatchReadFcn) + % Verify read can successfully read an Arrow IPC Stream file + % containing mulitple batches. + reader = arrow.io.ipc.RecordBatchStreamReader(testCase.MultipleBatchStreamFile); + + expectedMatlabTable1 = table(["Row1"; "Row2"], single([1; 2]), VariableNames=["A", "B"]); + expected1 = arrow.recordBatch(expectedMatlabTable1); + actual1 = RecordBatchReadFcn(reader); + testCase.verifyEqual(actual1, expected1); + + expectedMatlabTable2 = table(["Row3"; "Row4"], single([3; 4]), VariableNames=["A", "B"]); + expected2 = arrow.recordBatch(expectedMatlabTable2); + actual2 = RecordBatchReadFcn(reader); + testCase.verifyEqual(actual2, expected2); + + fcn = @() RecordBatchReadFcn(reader); + testCase.verifyError(fcn, "arrow:io:ipc:EndOfStream"); + end + + function HasNext(testCase, RecordBatchReadFcn) + % Verify that the hasnext method returns true the correct + % number of times depending on the number of record + % batches in an Arrow IPC Stream format. + + reader = arrow.io.ipc.RecordBatchStreamReader(testCase.ZeroBatchStreamFile); + % hasnext should return true 0 times for a 0 batch file. + iterations = 0; + while reader.hasnext() + RecordBatchReadFcn(reader); + iterations = iterations + 1; + end + testCase.verifyEqual(iterations, 0); + + reader = arrow.io.ipc.RecordBatchStreamReader(testCase.OneBatchStreamFile); + % hasnext should return true 1 time for a 1 batch file. + iterations = 0; + while reader.hasnext() + RecordBatchReadFcn(reader); + iterations = iterations + 1; + end + testCase.verifyEqual(iterations, 1); + + reader = arrow.io.ipc.RecordBatchStreamReader(testCase.MultipleBatchStreamFile); + % hasnext should return true 2 times for a 2 batch file. + iterations = 0; + while reader.hasnext() + RecordBatchReadFcn(reader); + iterations = iterations + 1; + end + testCase.verifyEqual(iterations, 2); + end + + function Done(testCase, RecordBatchReadFcn) + % Verify that the done method returns false the correct + % number of times depending on the number of record + % batches in an Arrow IPC Stream format. + + reader = arrow.io.ipc.RecordBatchStreamReader(testCase.ZeroBatchStreamFile); + % done should return false 0 times for a 0 batch file. + iterations = 0; + while ~reader.done() + RecordBatchReadFcn(reader); + iterations = iterations + 1; + end + testCase.verifyEqual(iterations, 0); + + reader = arrow.io.ipc.RecordBatchStreamReader(testCase.OneBatchStreamFile); + % done should return false 1 time for a 1 batch file. + iterations = 0; + while ~reader.done() + RecordBatchReadFcn(reader); + iterations = iterations + 1; + end + testCase.verifyEqual(iterations, 1); + + reader = arrow.io.ipc.RecordBatchStreamReader(testCase.MultipleBatchStreamFile); + % done should return false 2 times for a 2 batch file. + iterations = 0; + while ~reader.done() + RecordBatchReadFcn(reader); + iterations = iterations + 1; + end + testCase.verifyEqual(iterations, 2); + end + + function ReadTableZeroBatchStreamFile(testCase) + % Verify read can successfully read an Arrow IPC Stream file + % containing zero batches as an arrow.tabular.Table. + reader = arrow.io.ipc.RecordBatchStreamReader(testCase.ZeroBatchStreamFile); + matlabTable = table('Size', [0, 2], 'VariableTypes', ["string", "single"], 'VariableNames', ["A", "B"]); + expected = arrow.table(matlabTable); + actual = reader.readTable(); + testCase.verifyEqual(actual, expected); + end + + function ReadTableOneBatchStreamFile(testCase) + % Verify read can successfully read an Arrow IPC Stream file + % containing one batch as an arrow.tabular.Table. + reader = arrow.io.ipc.RecordBatchStreamReader(testCase.OneBatchStreamFile); + matlabTable = table(["Row1"; "Row2"], single([1; 2]), VariableNames=["A", "B"]); + expected = arrow.table(matlabTable); + actual = reader.readTable(); + testCase.verifyEqual(actual, expected); + end + + function ReadTableMultipleBatchStreamFile(testCase) + % Verify read can successfully read an Arrow IPC Stream file + % containing multiple batches as an arrow.tabular.Table. + reader = arrow.io.ipc.RecordBatchStreamReader(testCase.MultipleBatchStreamFile); + matlabTable = table(["Row1"; "Row2"; "Row3"; "Row4"], single([1; 2; 3; 4]), VariableNames=["A", "B"]); + expected = arrow.table(matlabTable); + actual = reader.readTable(); + testCase.verifyEqual(actual, expected); + end + + function ReadTableAfterReadRecordBatch(testCase, RecordBatchReadFcn) + % Verify readTable returns only the remaining record batches + % in an Arrow IPC Stream file after calling readRecordBatch first. + reader = arrow.io.ipc.RecordBatchStreamReader(testCase.MultipleBatchStreamFile); + + testCase.verifyTrue(reader.hasnext()); + testCase.verifyFalse(reader.done()); + + expectedRecordBatch = arrow.recordBatch(... + table(["Row1"; "Row2"], single([1; 2]), VariableNames=["A", "B"]) ... + ); + actualRecordBatch = RecordBatchReadFcn(reader); + testCase.verifyEqual(actualRecordBatch, expectedRecordBatch); + + expectedTable = arrow.table(... + table(["Row3"; "Row4"], single([3; 4]), VariableNames=["A", "B"]) ... + ); + actualTable = reader.readTable(); + testCase.verifyEqual(actualTable, expectedTable); + + testCase.verifyFalse(reader.hasnext()); + testCase.verifyTrue(reader.done()); + end + + function ReadTableMultipleCalls(testCase) + % Verify readTable returns an empty table if it is called + % multiple times in a row. + reader = arrow.io.ipc.RecordBatchStreamReader(testCase.MultipleBatchStreamFile); + + expected = arrow.table(... + table(["Row1"; "Row2"; "Row3"; "Row4"], single([1; 2; 3; 4]), VariableNames=["A", "B"]) ... + ); + actual = reader.readTable(); + testCase.verifyEqual(actual, expected); + + testCase.verifyFalse(reader.hasnext()); + testCase.verifyTrue(reader.done()); + + expectedEmpty = arrow.table(... + table('Size', [0, 2], 'VariableTypes', ["string", "single"], 'VariableNames', ["A", "B"]) ... + ); + + actualEmpty = reader.readTable(); + testCase.verifyEqual(actualEmpty, expectedEmpty); + + testCase.verifyFalse(reader.hasnext()); + testCase.verifyTrue(reader.done()); + + actualEmpty = reader.readTable(); + testCase.verifyEqual(actualEmpty, expectedEmpty); + + testCase.verifyFalse(reader.hasnext()); + testCase.verifyTrue(reader.done()); + end + + function ErrorIfNotIpcStreamFile(testCase) + % Verify RecordBatchStreamReader throws an exception with the + % identifier arrow:io:ipc:FailedToOpenRecordBatchReader if + % the provided file is not an Arrow IPC Stream file. + fcn = @() arrow.io.ipc.RecordBatchStreamReader(testCase.RandomAccessFile); + testCase.verifyError(fcn, "arrow:io:ipc:FailedToOpenRecordBatchReader"); + end + + end + +end diff --git a/matlab/tools/cmake/BuildMatlabArrowInterface.cmake b/matlab/tools/cmake/BuildMatlabArrowInterface.cmake index 29a737a6ecf25..27af19676b73b 100644 --- a/matlab/tools/cmake/BuildMatlabArrowInterface.cmake +++ b/matlab/tools/cmake/BuildMatlabArrowInterface.cmake @@ -83,6 +83,7 @@ set(MATLAB_ARROW_LIBMEXCLASS_CLIENT_PROXY_SOURCES "${CMAKE_SOURCE_DIR}/src/cpp/a "${CMAKE_SOURCE_DIR}/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_file_reader.cc" "${CMAKE_SOURCE_DIR}/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_file_writer.cc" "${CMAKE_SOURCE_DIR}/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_writer.cc" + "${CMAKE_SOURCE_DIR}/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc" "${CMAKE_SOURCE_DIR}/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_writer.cc")