-
Notifications
You must be signed in to change notification settings - Fork 3.6k
/
Copy pathfile_parquet.cc
1167 lines (1006 loc) · 46.4 KB
/
file_parquet.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "arrow/dataset/file_parquet.h"
#include <memory>
#include <mutex>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include "arrow/compute/cast.h"
#include "arrow/compute/exec.h"
#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/parquet_encryption_config.h"
#include "arrow/dataset/scanner.h"
#include "arrow/filesystem/path_util.h"
#include "arrow/table.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/future.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
#include "arrow/util/range.h"
#include "arrow/util/tracing_internal.h"
#include "parquet/arrow/reader.h"
#include "parquet/arrow/schema.h"
#include "parquet/arrow/writer.h"
#include "parquet/encryption/crypto_factory.h"
#include "parquet/encryption/encryption.h"
#include "parquet/encryption/kms_client.h"
#include "parquet/file_reader.h"
#include "parquet/properties.h"
#include "parquet/statistics.h"
namespace arrow {
using internal::checked_cast;
using internal::checked_pointer_cast;
using internal::Iota;
namespace dataset {
using parquet::arrow::SchemaField;
using parquet::arrow::SchemaManifest;
using parquet::arrow::StatisticsAsScalars;
using compute::Cast;
namespace {
parquet::ReaderProperties MakeReaderProperties(
const ParquetFileFormat& format, ParquetFragmentScanOptions* parquet_scan_options,
const std::string& path = "", std::shared_ptr<fs::FileSystem> filesystem = nullptr,
MemoryPool* pool = default_memory_pool()) {
// Can't mutate pool after construction
parquet::ReaderProperties properties(pool);
if (parquet_scan_options->reader_properties->is_buffered_stream_enabled()) {
properties.enable_buffered_stream();
} else {
properties.disable_buffered_stream();
}
properties.set_buffer_size(parquet_scan_options->reader_properties->buffer_size());
#ifdef PARQUET_REQUIRE_ENCRYPTION
auto parquet_decrypt_config = parquet_scan_options->parquet_decryption_config;
if (parquet_decrypt_config != nullptr) {
auto file_decryption_prop =
parquet_decrypt_config->crypto_factory->GetFileDecryptionProperties(
*parquet_decrypt_config->kms_connection_config,
*parquet_decrypt_config->decryption_config, path, filesystem);
parquet_scan_options->reader_properties->file_decryption_properties(
std::move(file_decryption_prop));
}
#else
if (parquet_scan_options->parquet_decryption_config != nullptr) {
parquet::ParquetException::NYI("Encryption is not supported in this build.");
}
#endif
properties.file_decryption_properties(
parquet_scan_options->reader_properties->file_decryption_properties());
properties.set_thrift_string_size_limit(
parquet_scan_options->reader_properties->thrift_string_size_limit());
properties.set_thrift_container_size_limit(
parquet_scan_options->reader_properties->thrift_container_size_limit());
properties.set_page_checksum_verification(
parquet_scan_options->reader_properties->page_checksum_verification());
return properties;
}
parquet::ArrowReaderProperties MakeArrowReaderProperties(
const ParquetFileFormat& format, const parquet::FileMetaData& metadata) {
parquet::ArrowReaderProperties properties(/* use_threads = */ false);
for (const std::string& name : format.reader_options.dict_columns) {
auto column_index = metadata.schema()->ColumnIndex(name);
properties.set_read_dictionary(column_index, true);
}
properties.set_coerce_int96_timestamp_unit(
format.reader_options.coerce_int96_timestamp_unit);
return properties;
}
parquet::ArrowReaderProperties MakeArrowReaderProperties(
const ParquetFileFormat& format, const parquet::FileMetaData& metadata,
const ScanOptions& options, const ParquetFragmentScanOptions& parquet_scan_options) {
auto arrow_properties = MakeArrowReaderProperties(format, metadata);
arrow_properties.set_batch_size(options.batch_size);
// Must be set here since the sync ScanTask handles pre-buffering itself
arrow_properties.set_pre_buffer(
parquet_scan_options.arrow_reader_properties->pre_buffer());
arrow_properties.set_cache_options(
parquet_scan_options.arrow_reader_properties->cache_options());
arrow_properties.set_io_context(
parquet_scan_options.arrow_reader_properties->io_context());
arrow_properties.set_use_threads(options.use_threads);
return arrow_properties;
}
Result<std::shared_ptr<SchemaManifest>> GetSchemaManifest(
const parquet::FileMetaData& metadata,
const parquet::ArrowReaderProperties& properties) {
auto manifest = std::make_shared<SchemaManifest>();
const std::shared_ptr<const ::arrow::KeyValueMetadata>& key_value_metadata =
metadata.key_value_metadata();
RETURN_NOT_OK(SchemaManifest::Make(metadata.schema(), key_value_metadata, properties,
manifest.get()));
return manifest;
}
bool IsNan(const Scalar& value) {
if (value.is_valid) {
if (value.type->id() == Type::FLOAT) {
const FloatScalar& float_scalar = checked_cast<const FloatScalar&>(value);
return std::isnan(float_scalar.value);
} else if (value.type->id() == Type::DOUBLE) {
const DoubleScalar& double_scalar = checked_cast<const DoubleScalar&>(value);
return std::isnan(double_scalar.value);
}
}
return false;
}
std::optional<compute::Expression> ColumnChunkStatisticsAsExpression(
const FieldRef& field_ref, const SchemaField& schema_field,
const parquet::RowGroupMetaData& metadata) {
// For the remaining of this function, failure to extract/parse statistics
// are ignored by returning nullptr. The goal is two fold. First
// avoid an optimization which breaks the computation. Second, allow the
// following columns to maybe succeed in extracting column statistics.
// For now, only leaf (primitive) types are supported.
if (!schema_field.is_leaf()) {
return std::nullopt;
}
auto column_metadata = metadata.ColumnChunk(schema_field.column_index);
auto statistics = column_metadata->statistics();
const auto& field = schema_field.field;
if (statistics == nullptr) {
return std::nullopt;
}
return ParquetFileFragment::EvaluateStatisticsAsExpression(*field, field_ref,
*statistics);
}
void AddColumnIndices(const SchemaField& schema_field,
std::vector<int>* column_projection) {
if (schema_field.is_leaf()) {
column_projection->push_back(schema_field.column_index);
} else {
// The following ensure that complex types, e.g. struct, are materialized.
for (const auto& child : schema_field.children) {
AddColumnIndices(child, column_projection);
}
}
}
Status ResolveOneFieldRef(
const SchemaManifest& manifest, const FieldRef& field_ref,
const std::unordered_map<std::string, const SchemaField*>& field_lookup,
const std::unordered_set<std::string>& duplicate_fields,
std::vector<int>* columns_selection) {
if (const std::string* name = field_ref.name()) {
auto it = field_lookup.find(*name);
if (it != field_lookup.end()) {
AddColumnIndices(*it->second, columns_selection);
} else if (duplicate_fields.find(*name) != duplicate_fields.end()) {
// We shouldn't generally get here because SetProjection will reject such references
return Status::Invalid("Ambiguous reference to column '", *name,
"' which occurs more than once");
}
// "Virtual" column: field is not in file but is in the ScanOptions.
// Ignore it here, as projection will pad the batch with a null column.
return Status::OK();
}
const SchemaField* toplevel = nullptr;
const SchemaField* field = nullptr;
if (const std::vector<FieldRef>* refs = field_ref.nested_refs()) {
// Only supports a sequence of names
for (const auto& ref : *refs) {
if (const std::string* name = ref.name()) {
if (!field) {
// First lookup, top-level field
auto it = field_lookup.find(*name);
if (it != field_lookup.end()) {
field = it->second;
toplevel = field;
} else if (duplicate_fields.find(*name) != duplicate_fields.end()) {
return Status::Invalid("Ambiguous reference to column '", *name,
"' which occurs more than once");
} else {
// Virtual column
return Status::OK();
}
} else {
const SchemaField* result = nullptr;
for (const auto& child : field->children) {
if (child.field->name() == *name) {
if (!result) {
result = &child;
} else {
return Status::Invalid("Ambiguous nested reference to column '", *name,
"' which occurs more than once in field ",
field->field->ToString());
}
}
}
if (!result) {
// Virtual column
return Status::OK();
}
field = result;
}
continue;
}
return Status::NotImplemented("Inferring column projection from FieldRef ",
field_ref.ToString());
}
} else {
return Status::NotImplemented("Inferring column projection from FieldRef ",
field_ref.ToString());
}
if (field) {
// TODO(ARROW-1888): support fine-grained column projection. We should be
// able to materialize only the child fields requested, and not the entire
// top-level field.
// Right now, if enabled, projection/filtering will fail when they cast the
// physical schema to the dataset schema.
AddColumnIndices(*toplevel, columns_selection);
}
return Status::OK();
}
// Converts a field ref into a position-independent ref (containing only a sequence of
// names) based on the dataset schema. Returns `false` if no conversion was needed.
Result<FieldRef> MaybeConvertFieldRef(FieldRef ref, const Schema& dataset_schema) {
if (ARROW_PREDICT_TRUE(ref.IsNameSequence())) {
return std::move(ref);
}
ARROW_ASSIGN_OR_RAISE(auto path, ref.FindOne(dataset_schema));
std::vector<FieldRef> named_refs;
named_refs.reserve(path.indices().size());
const FieldVector* child_fields = &dataset_schema.fields();
for (auto index : path) {
const auto& child_field = *(*child_fields)[index];
named_refs.emplace_back(child_field.name());
child_fields = &child_field.type()->fields();
}
return named_refs.size() == 1 ? std::move(named_refs[0])
: FieldRef(std::move(named_refs));
}
// Compute the column projection based on the scan options
Result<std::vector<int>> InferColumnProjection(const parquet::arrow::FileReader& reader,
const ScanOptions& options) {
auto manifest = reader.manifest();
// Checks if the field is needed in either the projection or the filter.
auto field_refs = options.MaterializedFields();
// Build a lookup table from top level field name to field metadata.
// This is to avoid quadratic-time mapping of projected fields to
// column indices, in the common case of selecting top level
// columns. For nested fields, we will pay the cost of a linear scan
// assuming for now that this is relatively rare, but this can be
// optimized. (Also, we don't want to pay the cost of building all
// the lookup tables up front if they're rarely used.)
std::unordered_map<std::string, const SchemaField*> field_lookup;
std::unordered_set<std::string> duplicate_fields;
for (const auto& schema_field : manifest.schema_fields) {
const auto it = field_lookup.emplace(schema_field.field->name(), &schema_field);
if (!it.second) {
duplicate_fields.emplace(schema_field.field->name());
}
}
std::vector<int> columns_selection;
for (auto& ref : field_refs) {
// In the (unlikely) absence of a known dataset schema, we require that all
// materialized refs are named.
if (options.dataset_schema) {
ARROW_ASSIGN_OR_RAISE(
ref, MaybeConvertFieldRef(std::move(ref), *options.dataset_schema));
}
RETURN_NOT_OK(ResolveOneFieldRef(manifest, ref, field_lookup, duplicate_fields,
&columns_selection));
}
return columns_selection;
}
Status WrapSourceError(const Status& status, const std::string& path) {
return status.WithMessage("Could not open Parquet input source '", path,
"': ", status.message());
}
Result<bool> IsSupportedParquetFile(const ParquetFileFormat& format,
const FileSource& source) {
BEGIN_PARQUET_CATCH_EXCEPTIONS
try {
ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
ARROW_ASSIGN_OR_RAISE(
auto parquet_scan_options,
GetFragmentScanOptions<ParquetFragmentScanOptions>(
kParquetTypeName, nullptr, format.default_fragment_scan_options));
auto reader = parquet::ParquetFileReader::Open(
std::move(input), MakeReaderProperties(format, parquet_scan_options.get()));
std::shared_ptr<parquet::FileMetaData> metadata = reader->metadata();
return metadata != nullptr && metadata->can_decompress();
} catch (const ::parquet::ParquetInvalidOrCorruptedFileException& e) {
ARROW_UNUSED(e);
return false;
}
END_PARQUET_CATCH_EXCEPTIONS
}
} // namespace
std::optional<compute::Expression> ParquetFileFragment::EvaluateStatisticsAsExpression(
const Field& field, const FieldRef& field_ref,
const parquet::Statistics& statistics) {
auto field_expr = compute::field_ref(field_ref);
// Optimize for corner case where all values are nulls
if (statistics.num_values() == 0 && statistics.null_count() > 0) {
return is_null(std::move(field_expr));
}
std::shared_ptr<Scalar> min, max;
if (!StatisticsAsScalars(statistics, &min, &max).ok()) {
return std::nullopt;
}
auto maybe_min = Cast(min, field.type());
auto maybe_max = Cast(max, field.type());
if (maybe_min.ok() && maybe_max.ok()) {
min = maybe_min.MoveValueUnsafe().scalar();
max = maybe_max.MoveValueUnsafe().scalar();
if (min->Equals(*max)) {
auto single_value = compute::equal(field_expr, compute::literal(std::move(min)));
if (statistics.null_count() == 0) {
return single_value;
}
return compute::or_(std::move(single_value), is_null(std::move(field_expr)));
}
auto lower_bound = compute::greater_equal(field_expr, compute::literal(min));
auto upper_bound = compute::less_equal(field_expr, compute::literal(max));
compute::Expression in_range;
// Since the minimum & maximum values are NaN, useful statistics
// cannot be extracted for checking the presence of a value within
// range
if (IsNan(*min) && IsNan(*max)) {
return std::nullopt;
}
// If either minimum or maximum is NaN, it should be ignored for the
// range computation
if (IsNan(*min)) {
in_range = std::move(upper_bound);
} else if (IsNan(*max)) {
in_range = std::move(lower_bound);
} else {
in_range = compute::and_(std::move(lower_bound), std::move(upper_bound));
}
if (statistics.null_count() != 0) {
return compute::or_(std::move(in_range), compute::is_null(field_expr));
}
return in_range;
}
return std::nullopt;
}
std::optional<compute::Expression> ParquetFileFragment::EvaluateStatisticsAsExpression(
const Field& field, const parquet::Statistics& statistics) {
const auto field_name = field.name();
return EvaluateStatisticsAsExpression(field, FieldRef(std::move(field_name)),
statistics);
}
ParquetFileFormat::ParquetFileFormat()
: FileFormat(std::make_shared<ParquetFragmentScanOptions>()) {}
bool ParquetFileFormat::Equals(const FileFormat& other) const {
if (other.type_name() != type_name()) return false;
const auto& other_reader_options =
checked_cast<const ParquetFileFormat&>(other).reader_options;
// FIXME implement comparison for decryption options
return (reader_options.dict_columns == other_reader_options.dict_columns &&
reader_options.coerce_int96_timestamp_unit ==
other_reader_options.coerce_int96_timestamp_unit);
}
ParquetFileFormat::ParquetFileFormat(const parquet::ReaderProperties& reader_properties)
: FileFormat(std::make_shared<ParquetFragmentScanOptions>()) {
auto* default_scan_opts =
static_cast<ParquetFragmentScanOptions*>(default_fragment_scan_options.get());
*default_scan_opts->reader_properties = reader_properties;
}
Result<bool> ParquetFileFormat::IsSupported(const FileSource& source) const {
auto maybe_is_supported = IsSupportedParquetFile(*this, source);
if (!maybe_is_supported.ok()) {
return WrapSourceError(maybe_is_supported.status(), source.path());
}
return maybe_is_supported;
}
Result<std::shared_ptr<Schema>> ParquetFileFormat::Inspect(
const FileSource& source) const {
auto scan_options = std::make_shared<ScanOptions>();
ARROW_ASSIGN_OR_RAISE(auto reader, GetReader(source, scan_options));
std::shared_ptr<Schema> schema;
RETURN_NOT_OK(reader->GetSchema(&schema));
return schema;
}
Result<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader(
const FileSource& source, const std::shared_ptr<ScanOptions>& options) const {
return GetReader(source, options, /*metadata=*/nullptr);
}
Result<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader(
const FileSource& source, const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<parquet::FileMetaData>& metadata) const {
ARROW_ASSIGN_OR_RAISE(
auto parquet_scan_options,
GetFragmentScanOptions<ParquetFragmentScanOptions>(kParquetTypeName, options.get(),
default_fragment_scan_options));
auto properties =
MakeReaderProperties(*this, parquet_scan_options.get(), "", nullptr, options->pool);
ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
// `parquet::ParquetFileReader::Open` will not wrap the exception as status,
// so using `open_parquet_file` to wrap it.
auto open_parquet_file = [&]() -> Result<std::unique_ptr<parquet::ParquetFileReader>> {
BEGIN_PARQUET_CATCH_EXCEPTIONS
auto reader = parquet::ParquetFileReader::Open(std::move(input),
std::move(properties), metadata);
return reader;
END_PARQUET_CATCH_EXCEPTIONS
};
auto reader_opt = open_parquet_file();
if (!reader_opt.ok()) {
return WrapSourceError(reader_opt.status(), source.path());
}
auto reader = std::move(reader_opt).ValueOrDie();
std::shared_ptr<parquet::FileMetaData> reader_metadata = reader->metadata();
auto arrow_properties =
MakeArrowReaderProperties(*this, *reader_metadata, *options, *parquet_scan_options);
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
RETURN_NOT_OK(parquet::arrow::FileReader::Make(
options->pool, std::move(reader), std::move(arrow_properties), &arrow_reader));
return std::move(arrow_reader);
}
Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReaderAsync(
const FileSource& source, const std::shared_ptr<ScanOptions>& options) const {
return GetReaderAsync(source, options, nullptr);
}
Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReaderAsync(
const FileSource& source, const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<parquet::FileMetaData>& metadata) const {
ARROW_ASSIGN_OR_RAISE(
auto parquet_scan_options,
GetFragmentScanOptions<ParquetFragmentScanOptions>(kParquetTypeName, options.get(),
default_fragment_scan_options));
auto properties = MakeReaderProperties(*this, parquet_scan_options.get(), source.path(),
source.filesystem(), options->pool);
auto self = checked_pointer_cast<const ParquetFileFormat>(shared_from_this());
return source.OpenAsync().Then(
[=](const std::shared_ptr<io::RandomAccessFile>& input) mutable {
return parquet::ParquetFileReader::OpenAsync(input, std::move(properties),
metadata)
.Then(
[=](const std::unique_ptr<parquet::ParquetFileReader>& reader) mutable
-> Result<std::shared_ptr<parquet::arrow::FileReader>> {
auto arrow_properties = MakeArrowReaderProperties(
*self, *reader->metadata(), *options, *parquet_scan_options);
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
RETURN_NOT_OK(parquet::arrow::FileReader::Make(
options->pool,
// TODO(ARROW-12259): workaround since we have Future<(move-only
// type)> It *wouldn't* be safe to const_cast reader except that
// here we know there are no other waiters on the reader.
std::move(const_cast<std::unique_ptr<parquet::ParquetFileReader>&>(
reader)),
std::move(arrow_properties), &arrow_reader));
return std::move(arrow_reader);
},
[path = source.path()](const Status& status)
-> Result<std::shared_ptr<parquet::arrow::FileReader>> {
return WrapSourceError(status, path);
});
});
}
struct SlicingGenerator {
SlicingGenerator(RecordBatchGenerator source, int64_t batch_size)
: state(std::make_shared<State>(source, batch_size)) {}
Future<std::shared_ptr<RecordBatch>> operator()() {
if (state->current) {
return state->SliceOffABatch();
} else {
auto state_capture = state;
return state->source().Then(
[state_capture](const std::shared_ptr<RecordBatch>& next) {
if (IsIterationEnd(next)) {
return next;
}
state_capture->current = next;
return state_capture->SliceOffABatch();
});
}
}
struct State {
State(RecordBatchGenerator source, int64_t batch_size)
: source(std::move(source)), current(), batch_size(batch_size) {}
std::shared_ptr<RecordBatch> SliceOffABatch() {
if (current->num_rows() <= batch_size) {
auto sliced = current;
current = nullptr;
return sliced;
}
auto slice = current->Slice(0, batch_size);
current = current->Slice(batch_size);
return slice;
}
RecordBatchGenerator source;
std::shared_ptr<RecordBatch> current;
int64_t batch_size;
};
std::shared_ptr<State> state;
};
Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<FileFragment>& file) const {
auto parquet_fragment = checked_pointer_cast<ParquetFileFragment>(file);
std::vector<int> row_groups;
bool pre_filtered = false;
// If RowGroup metadata is cached completely we can pre-filter RowGroups before opening
// a FileReader, potentially avoiding IO altogether if all RowGroups are excluded due to
// prior statistics knowledge. In the case where a RowGroup doesn't have statistics
// metadata, it will not be excluded.
if (parquet_fragment->metadata() != nullptr) {
ARROW_ASSIGN_OR_RAISE(row_groups, parquet_fragment->FilterRowGroups(options->filter));
pre_filtered = true;
if (row_groups.empty()) return MakeEmptyGenerator<std::shared_ptr<RecordBatch>>();
}
// Open the reader and pay the real IO cost.
auto make_generator =
[this, options, parquet_fragment, pre_filtered,
row_groups](const std::shared_ptr<parquet::arrow::FileReader>& reader) mutable
-> Result<RecordBatchGenerator> {
// Ensure that parquet_fragment has FileMetaData
RETURN_NOT_OK(parquet_fragment->EnsureCompleteMetadata(reader.get()));
if (!pre_filtered) {
// row groups were not already filtered; do this now
ARROW_ASSIGN_OR_RAISE(row_groups,
parquet_fragment->FilterRowGroups(options->filter));
if (row_groups.empty()) return MakeEmptyGenerator<std::shared_ptr<RecordBatch>>();
}
ARROW_ASSIGN_OR_RAISE(auto column_projection,
InferColumnProjection(*reader, *options));
ARROW_ASSIGN_OR_RAISE(
auto parquet_scan_options,
GetFragmentScanOptions<ParquetFragmentScanOptions>(
kParquetTypeName, options.get(), default_fragment_scan_options));
int batch_readahead = options->batch_readahead;
int64_t rows_to_readahead = batch_readahead * options->batch_size;
ARROW_ASSIGN_OR_RAISE(auto generator,
reader->GetRecordBatchGenerator(
reader, row_groups, column_projection,
::arrow::internal::GetCpuThreadPool(), rows_to_readahead));
RecordBatchGenerator sliced =
SlicingGenerator(std::move(generator), options->batch_size);
if (batch_readahead == 0) {
return sliced;
}
RecordBatchGenerator sliced_readahead =
MakeSerialReadaheadGenerator(std::move(sliced), batch_readahead);
return sliced_readahead;
};
auto generator = MakeFromFuture(
GetReaderAsync(parquet_fragment->source(), options, parquet_fragment->metadata())
.Then(std::move(make_generator)));
WRAP_ASYNC_GENERATOR_WITH_CHILD_SPAN(
generator, "arrow::dataset::ParquetFileFormat::ScanBatchesAsync::Next");
return generator;
}
Future<std::optional<int64_t>> ParquetFileFormat::CountRows(
const std::shared_ptr<FileFragment>& file, compute::Expression predicate,
const std::shared_ptr<ScanOptions>& options) {
auto parquet_file = checked_pointer_cast<ParquetFileFragment>(file);
if (parquet_file->metadata()) {
ARROW_ASSIGN_OR_RAISE(auto maybe_count,
parquet_file->TryCountRows(std::move(predicate)));
return Future<std::optional<int64_t>>::MakeFinished(maybe_count);
} else {
return DeferNotOk(options->io_context.executor()->Submit(
[parquet_file, predicate]() -> Result<std::optional<int64_t>> {
RETURN_NOT_OK(parquet_file->EnsureCompleteMetadata());
return parquet_file->TryCountRows(predicate);
}));
}
}
Result<std::shared_ptr<ParquetFileFragment>> ParquetFileFormat::MakeFragment(
FileSource source, compute::Expression partition_expression,
std::shared_ptr<Schema> physical_schema, std::vector<int> row_groups) {
return std::shared_ptr<ParquetFileFragment>(new ParquetFileFragment(
std::move(source), shared_from_this(), std::move(partition_expression),
std::move(physical_schema), std::move(row_groups)));
}
Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
FileSource source, compute::Expression partition_expression,
std::shared_ptr<Schema> physical_schema) {
return std::shared_ptr<FileFragment>(new ParquetFileFragment(
std::move(source), shared_from_this(), std::move(partition_expression),
std::move(physical_schema), std::nullopt));
}
//
// ParquetFileWriter, ParquetFileWriteOptions
//
std::shared_ptr<FileWriteOptions> ParquetFileFormat::DefaultWriteOptions() {
std::shared_ptr<ParquetFileWriteOptions> options(
new ParquetFileWriteOptions(shared_from_this()));
options->writer_properties = parquet::default_writer_properties();
options->arrow_writer_properties = parquet::default_arrow_writer_properties();
return options;
}
Result<std::shared_ptr<FileWriter>> ParquetFileFormat::MakeWriter(
std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
std::shared_ptr<FileWriteOptions> options,
fs::FileLocator destination_locator) const {
if (!Equals(*options->format())) {
return Status::TypeError("Mismatching format/write options");
}
auto parquet_options = checked_pointer_cast<ParquetFileWriteOptions>(options);
std::unique_ptr<parquet::arrow::FileWriter> parquet_writer;
#ifdef PARQUET_REQUIRE_ENCRYPTION
auto parquet_encrypt_config = parquet_options->parquet_encryption_config;
if (parquet_encrypt_config != nullptr) {
auto file_encryption_prop =
parquet_encrypt_config->crypto_factory->GetFileEncryptionProperties(
*parquet_encrypt_config->kms_connection_config,
*parquet_encrypt_config->encryption_config, destination_locator.path,
destination_locator.filesystem);
auto writer_properties =
parquet::WriterProperties::Builder(*parquet_options->writer_properties)
.encryption(std::move(file_encryption_prop))
->build();
ARROW_ASSIGN_OR_RAISE(
parquet_writer, parquet::arrow::FileWriter::Open(
*schema, writer_properties->memory_pool(), destination,
writer_properties, parquet_options->arrow_writer_properties));
}
#else
if (parquet_options->parquet_encryption_config != nullptr) {
return Status::NotImplemented("Encryption is not supported in this build.");
}
#endif
if (parquet_writer == nullptr) {
ARROW_ASSIGN_OR_RAISE(parquet_writer,
parquet::arrow::FileWriter::Open(
*schema, parquet_options->writer_properties->memory_pool(),
destination, parquet_options->writer_properties,
parquet_options->arrow_writer_properties));
}
return std::shared_ptr<FileWriter>(
new ParquetFileWriter(std::move(destination), std::move(parquet_writer),
std::move(parquet_options), std::move(destination_locator)));
}
ParquetFileWriter::ParquetFileWriter(std::shared_ptr<io::OutputStream> destination,
std::shared_ptr<parquet::arrow::FileWriter> writer,
std::shared_ptr<ParquetFileWriteOptions> options,
fs::FileLocator destination_locator)
: FileWriter(writer->schema(), std::move(options), std::move(destination),
std::move(destination_locator)),
parquet_writer_(std::move(writer)) {}
Status ParquetFileWriter::Write(const std::shared_ptr<RecordBatch>& batch) {
ARROW_ASSIGN_OR_RAISE(auto table, Table::FromRecordBatches(batch->schema(), {batch}));
return parquet_writer_->WriteTable(*table, batch->num_rows());
}
Future<> ParquetFileWriter::FinishInternal() {
return DeferNotOk(destination_locator_.filesystem->io_context().executor()->Submit(
[this]() { return parquet_writer_->Close(); }));
}
//
// ParquetFileFragment
//
ParquetFileFragment::ParquetFileFragment(FileSource source,
std::shared_ptr<FileFormat> format,
compute::Expression partition_expression,
std::shared_ptr<Schema> physical_schema,
std::optional<std::vector<int>> row_groups)
: FileFragment(std::move(source), std::move(format), std::move(partition_expression),
std::move(physical_schema)),
parquet_format_(checked_cast<ParquetFileFormat&>(*format_)),
row_groups_(std::move(row_groups)) {}
Status ParquetFileFragment::EnsureCompleteMetadata(parquet::arrow::FileReader* reader) {
auto lock = physical_schema_mutex_.Lock();
if (metadata_ != nullptr) {
return Status::OK();
}
if (reader == nullptr) {
lock.Unlock();
auto scan_options = std::make_shared<ScanOptions>();
ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_, scan_options));
return EnsureCompleteMetadata(reader.get());
}
std::shared_ptr<Schema> schema;
RETURN_NOT_OK(reader->GetSchema(&schema));
if (physical_schema_ && !physical_schema_->Equals(*schema)) {
return Status::Invalid("Fragment initialized with physical schema ",
*physical_schema_, " but ", source_.path(), " has schema ",
*schema);
}
physical_schema_ = std::move(schema);
if (!row_groups_) {
row_groups_ = Iota(reader->num_row_groups());
}
ARROW_ASSIGN_OR_RAISE(
auto manifest,
GetSchemaManifest(*reader->parquet_reader()->metadata(), reader->properties()));
return SetMetadata(reader->parquet_reader()->metadata(), std::move(manifest));
}
Status ParquetFileFragment::SetMetadata(
std::shared_ptr<parquet::FileMetaData> metadata,
std::shared_ptr<parquet::arrow::SchemaManifest> manifest,
std::shared_ptr<parquet::FileMetaData> original_metadata) {
DCHECK(row_groups_.has_value());
metadata_ = std::move(metadata);
manifest_ = std::move(manifest);
original_metadata_ = original_metadata ? std::move(original_metadata) : metadata_;
// The SchemaDescriptor needs to be owned by a FileMetaData instance,
// because SchemaManifest only stores a raw pointer (GH-39562).
DCHECK_EQ(manifest_->descr, original_metadata_->schema())
<< "SchemaDescriptor should be owned by the original FileMetaData";
statistics_expressions_.resize(row_groups_->size(), compute::literal(true));
statistics_expressions_complete_.resize(manifest_->descr->num_columns(), false);
for (int row_group : *row_groups_) {
// Ensure RowGroups are indexing valid RowGroups before augmenting.
if (row_group < metadata_->num_row_groups()) continue;
return Status::IndexError("ParquetFileFragment references row group ", row_group,
" but ", source_.path(), " only has ",
metadata_->num_row_groups(), " row groups");
}
return Status::OK();
}
Result<FragmentVector> ParquetFileFragment::SplitByRowGroup(
compute::Expression predicate) {
RETURN_NOT_OK(EnsureCompleteMetadata());
ARROW_ASSIGN_OR_RAISE(auto row_groups, FilterRowGroups(predicate));
FragmentVector fragments(row_groups.size());
int i = 0;
for (int row_group : row_groups) {
ARROW_ASSIGN_OR_RAISE(auto fragment,
parquet_format_.MakeFragment(source_, partition_expression(),
physical_schema_, {row_group}));
RETURN_NOT_OK(fragment->SetMetadata(metadata_, manifest_,
/*original_metadata=*/original_metadata_));
fragments[i++] = std::move(fragment);
}
return fragments;
}
Result<std::shared_ptr<Fragment>> ParquetFileFragment::Subset(
compute::Expression predicate) {
RETURN_NOT_OK(EnsureCompleteMetadata());
ARROW_ASSIGN_OR_RAISE(auto row_groups, FilterRowGroups(predicate));
return Subset(std::move(row_groups));
}
Result<std::shared_ptr<Fragment>> ParquetFileFragment::Subset(
std::vector<int> row_groups) {
RETURN_NOT_OK(EnsureCompleteMetadata());
ARROW_ASSIGN_OR_RAISE(auto new_fragment, parquet_format_.MakeFragment(
source_, partition_expression(),
physical_schema_, std::move(row_groups)));
RETURN_NOT_OK(new_fragment->SetMetadata(metadata_, manifest_));
return new_fragment;
}
inline void FoldingAnd(compute::Expression* l, compute::Expression r) {
if (*l == compute::literal(true)) {
*l = std::move(r);
} else {
*l = and_(std::move(*l), std::move(r));
}
}
Result<std::vector<int>> ParquetFileFragment::FilterRowGroups(
compute::Expression predicate) {
std::vector<int> row_groups;
ARROW_ASSIGN_OR_RAISE(auto expressions, TestRowGroups(std::move(predicate)));
auto lock = physical_schema_mutex_.Lock();
DCHECK(expressions.empty() || (expressions.size() == row_groups_->size()));
for (size_t i = 0; i < expressions.size(); i++) {
if (expressions[i].IsSatisfiable()) {
row_groups.push_back(row_groups_->at(i));
}
}
return row_groups;
}
Result<std::vector<compute::Expression>> ParquetFileFragment::TestRowGroups(
compute::Expression predicate) {
auto lock = physical_schema_mutex_.Lock();
DCHECK_NE(metadata_, nullptr);
ARROW_ASSIGN_OR_RAISE(
predicate, SimplifyWithGuarantee(std::move(predicate), partition_expression_));
if (!predicate.IsSatisfiable()) {
return std::vector<compute::Expression>{};
}
for (const FieldRef& ref : FieldsInExpression(predicate)) {
ARROW_ASSIGN_OR_RAISE(auto match, ref.FindOneOrNone(*physical_schema_));
if (match.empty()) continue;
const SchemaField* schema_field = &manifest_->schema_fields[match[0]];
for (size_t i = 1; i < match.indices().size(); ++i) {
if (schema_field->field->type()->id() != Type::STRUCT) {
return Status::Invalid("nested paths only supported for structs");
}
schema_field = &schema_field->children[match[i]];
}
if (!schema_field->is_leaf()) continue;
if (statistics_expressions_complete_[schema_field->column_index]) continue;
statistics_expressions_complete_[schema_field->column_index] = true;
int i = 0;
for (int row_group : *row_groups_) {
auto row_group_metadata = metadata_->RowGroup(row_group);
if (auto minmax = ColumnChunkStatisticsAsExpression(ref, *schema_field,
*row_group_metadata)) {
FoldingAnd(&statistics_expressions_[i], std::move(*minmax));
ARROW_ASSIGN_OR_RAISE(statistics_expressions_[i],
statistics_expressions_[i].Bind(*physical_schema_));
}
++i;
}
}
std::vector<compute::Expression> row_groups(row_groups_->size());
for (size_t i = 0; i < row_groups_->size(); ++i) {
ARROW_ASSIGN_OR_RAISE(auto row_group_predicate,
SimplifyWithGuarantee(predicate, statistics_expressions_[i]));
row_groups[i] = std::move(row_group_predicate);
}
return row_groups;
}
Result<std::optional<int64_t>> ParquetFileFragment::TryCountRows(
compute::Expression predicate) {
DCHECK_NE(metadata_, nullptr);
if (ExpressionHasFieldRefs(predicate)) {
ARROW_ASSIGN_OR_RAISE(auto expressions, TestRowGroups(std::move(predicate)));
int64_t rows = 0;
for (size_t i = 0; i < row_groups_->size(); i++) {
// If the row group is entirely excluded, exclude it from the row count
if (!expressions[i].IsSatisfiable()) continue;
// Unless the row group is entirely included, bail out of fast path
if (expressions[i] != compute::literal(true)) return std::nullopt;
BEGIN_PARQUET_CATCH_EXCEPTIONS
rows += metadata()->RowGroup((*row_groups_)[i])->num_rows();
END_PARQUET_CATCH_EXCEPTIONS
}
return rows;
}
return metadata()->num_rows();
}
//
// ParquetFragmentScanOptions
//
ParquetFragmentScanOptions::ParquetFragmentScanOptions() {
reader_properties = std::make_shared<parquet::ReaderProperties>();
arrow_reader_properties =
std::make_shared<parquet::ArrowReaderProperties>(/*use_threads=*/false);
}
//
// ParquetDatasetFactory
//
static inline Result<std::string> FileFromRowGroup(
fs::FileSystem* filesystem, const std::string& base_path,
const parquet::RowGroupMetaData& row_group, bool validate_column_chunk_paths) {
constexpr auto prefix = "Extracting file path from RowGroup failed. ";
if (row_group.num_columns() == 0) {
return Status::Invalid(prefix,
"RowGroup must have a least one column to extract path.");
}