Skip to content

Commit

Permalink
Update several components to use ArchiveReaderAdaptor
Browse files Browse the repository at this point in the history
  • Loading branch information
gibber9809 committed Jan 8, 2025
1 parent b1af99c commit ae419b0
Show file tree
Hide file tree
Showing 10 changed files with 144 additions and 150 deletions.
51 changes: 25 additions & 26 deletions components/core/src/clp_s/ArchiveReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <string_view>

#include "archive_constants.hpp"
#include "ArchiveReaderAdaptor.hpp"
#include "InputConfig.hpp"
#include "ReaderUtils.hpp"

Expand All @@ -20,35 +21,28 @@ void ArchiveReader::open(Path const& archive_path, NetworkAuthOption const& netw
throw OperationFailed(ErrorCodeBadParam, __FILENAME__, __LINE__);
}

if (InputSource::Filesystem != archive_path.source) {
throw OperationFailed(ErrorCodeBadParam, __FILENAME__, __LINE__);
}
m_archive_reader_adaptor = std::make_shared<ArchiveReaderAdaptor>(archive_path, network_auth);

if (false == std::filesystem::is_directory(archive_path.path)) {
throw OperationFailed(ErrorCodeBadParam, __FILENAME__, __LINE__);
if (auto const rc = m_archive_reader_adaptor->load_archive_metadata(); ErrorCodeSuccess != rc) {
throw OperationFailed(rc, __FILENAME__, __LINE__);
}
auto const archive_path_str = archive_path.path;

m_var_dict = ReaderUtils::get_variable_dictionary_reader(archive_path_str);
m_log_dict = ReaderUtils::get_log_type_dictionary_reader(archive_path_str);
m_array_dict = ReaderUtils::get_array_dictionary_reader(archive_path_str);
m_timestamp_dict = ReaderUtils::get_timestamp_dictionary_reader(archive_path_str);

m_schema_tree = ReaderUtils::read_schema_tree(archive_path_str);
m_schema_map = ReaderUtils::read_schemas(archive_path_str);
m_schema_tree = ReaderUtils::read_schema_tree(*m_archive_reader_adaptor);
m_schema_map = ReaderUtils::read_schemas(*m_archive_reader_adaptor);

m_log_event_idx_column_id = m_schema_tree->get_metadata_field_id(constants::cLogEventIdxName);

m_table_metadata_file_reader.open(archive_path_str + constants::cArchiveTableMetadataFile);
m_stream_reader.open_packed_streams(archive_path_str + constants::cArchiveTablesFile);
m_var_dict = ReaderUtils::get_variable_dictionary_reader(*m_archive_reader_adaptor);
m_log_dict = ReaderUtils::get_log_type_dictionary_reader(*m_archive_reader_adaptor);
m_array_dict = ReaderUtils::get_array_dictionary_reader(*m_archive_reader_adaptor);
}

void ArchiveReader::read_metadata() {
constexpr size_t cDecompressorFileReadBufferCapacity = 64 * 1024; // 64 KB
m_table_metadata_decompressor.open(
m_table_metadata_file_reader,
cDecompressorFileReadBufferCapacity
auto table_metadata_reader = m_archive_reader_adaptor->checkout_reader_for_section(
constants::cArchiveTableMetadataFile
);
m_table_metadata_decompressor.open(*table_metadata_reader, cDecompressorFileReadBufferCapacity);

m_stream_reader.read_metadata(m_table_metadata_decompressor);

Expand Down Expand Up @@ -131,14 +125,19 @@ void ArchiveReader::read_metadata() {
- prev_metadata.stream_offset;
m_id_to_schema_metadata[prev_schema_id] = prev_metadata;
m_table_metadata_decompressor.close();

m_archive_reader_adaptor->checkin_reader_for_section(constants::cArchiveTableMetadataFile);
}

void ArchiveReader::read_dictionaries_and_metadata() {
m_var_dict->read_new_entries();
m_log_dict->read_new_entries();
m_array_dict->read_new_entries();
m_timestamp_dict->read_new_entries();
read_metadata();
m_var_dict->read_entries();
m_log_dict->read_entries();
m_array_dict->read_entries();
}

void ArchiveReader::open_packed_streams() {
m_stream_reader.open_packed_streams(m_archive_reader_adaptor);
}

SchemaReader& ArchiveReader::read_schema_table(
Expand Down Expand Up @@ -205,7 +204,7 @@ BaseColumnReader* ArchiveReader::append_reader_column(SchemaReader& reader, int3
column_reader = new ClpStringColumnReader(column_id, m_var_dict, m_array_dict, true);
break;
case NodeType::DateString:
column_reader = new DateStringColumnReader(column_id, m_timestamp_dict);
column_reader = new DateStringColumnReader(column_id, get_timestamp_dictionary());
break;
// No need to push columns without associated object readers into the SchemaReader.
case NodeType::Metadata:
Expand Down Expand Up @@ -288,7 +287,8 @@ void ArchiveReader::initialize_schema_reader(
m_id_to_schema_metadata[schema_id].num_messages,
should_marshal_records
);
auto timestamp_column_ids = m_timestamp_dict->get_authoritative_timestamp_column_ids();
auto timestamp_column_ids
= get_timestamp_dictionary()->get_authoritative_timestamp_column_ids();
for (size_t i = 0; i < schema.size(); ++i) {
int32_t column_id = schema[i];
if (Schema::schema_entry_is_unordered_object(column_id)) {
Expand Down Expand Up @@ -355,10 +355,9 @@ void ArchiveReader::close() {
m_var_dict->close();
m_log_dict->close();
m_array_dict->close();
m_timestamp_dict->close();

m_stream_reader.close();
m_table_metadata_file_reader.close();
m_archive_reader_adaptor.reset();

m_id_to_schema_metadata.clear();
m_schema_ids.clear();
Expand Down
26 changes: 11 additions & 15 deletions components/core/src/clp_s/ArchiveReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <string_view>
#include <utility>

#include "ArchiveReaderAdaptor.hpp"
#include "DictionaryReader.hpp"
#include "InputConfig.hpp"
#include "PackedStreamReader.hpp"
Expand Down Expand Up @@ -41,13 +42,18 @@ class ArchiveReader {
*/
void read_dictionaries_and_metadata();

/**
* Opens packed streams for reading.
*/
void open_packed_streams();

/**
* Reads the variable dictionary from the archive.
* @param lazy
* @return the variable dictionary reader
*/
std::shared_ptr<VariableDictionaryReader> read_variable_dictionary(bool lazy = false) {
m_var_dict->read_new_entries(lazy);
m_var_dict->read_entries(lazy);
return m_var_dict;
}

Expand All @@ -57,7 +63,7 @@ class ArchiveReader {
* @return the log type dictionary reader
*/
std::shared_ptr<LogTypeDictionaryReader> read_log_type_dictionary(bool lazy = false) {
m_log_dict->read_new_entries(lazy);
m_log_dict->read_entries(lazy);
return m_log_dict;
}

Expand All @@ -67,7 +73,7 @@ class ArchiveReader {
* @return the array dictionary reader
*/
std::shared_ptr<LogTypeDictionaryReader> read_array_dictionary(bool lazy = false) {
m_array_dict->read_new_entries(lazy);
m_array_dict->read_entries(lazy);
return m_array_dict;
}

Expand All @@ -76,15 +82,6 @@ class ArchiveReader {
*/
void read_metadata();

/**
* Reads the local timestamp dictionary from the archive.
* @return the timestamp dictionary reader
*/
std::shared_ptr<TimestampDictionaryReader> read_timestamp_dictionary() {
m_timestamp_dict->read_new_entries();
return m_timestamp_dict;
}

/**
* Reads a table from the archive.
* @param schema_id
Expand Down Expand Up @@ -113,7 +110,7 @@ class ArchiveReader {
std::shared_ptr<LogTypeDictionaryReader> get_array_dictionary() { return m_array_dict; }

std::shared_ptr<TimestampDictionaryReader> get_timestamp_dictionary() {
return m_timestamp_dict;
return m_archive_reader_adaptor->get_timestamp_dictionary();
}

std::shared_ptr<SchemaTree> get_schema_tree() { return m_schema_tree; }
Expand Down Expand Up @@ -201,7 +198,7 @@ class ArchiveReader {
std::shared_ptr<VariableDictionaryReader> m_var_dict;
std::shared_ptr<LogTypeDictionaryReader> m_log_dict;
std::shared_ptr<LogTypeDictionaryReader> m_array_dict;
std::shared_ptr<TimestampDictionaryReader> m_timestamp_dict;
std::shared_ptr<ArchiveReaderAdaptor> m_archive_reader_adaptor;

std::shared_ptr<SchemaTree> m_schema_tree;
std::shared_ptr<ReaderUtils::SchemaMap> m_schema_map;
Expand All @@ -212,7 +209,6 @@ class ArchiveReader {
};

PackedStreamReader m_stream_reader;
FileReader m_table_metadata_file_reader;
ZstdDecompressor m_table_metadata_decompressor;
SchemaReader m_schema_reader;
std::shared_ptr<char[]> m_stream_buffer{};
Expand Down
61 changes: 23 additions & 38 deletions components/core/src/clp_s/DictionaryReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include <boost/algorithm/string/case_conv.hpp>

#include "ArchiveReaderAdaptor.hpp"
#include "DictionaryEntry.hpp"
#include "Utils.hpp"

Expand All @@ -23,7 +24,7 @@ class DictionaryReader {
};

// Constructors
DictionaryReader() : m_is_open(false) {}
DictionaryReader(ArchiveReaderAdaptor& adaptor) : m_is_open(false), m_adaptor(adaptor) {}

// Methods
/**
Expand All @@ -38,9 +39,9 @@ class DictionaryReader {
void close();

/**
* Reads any new entries from disk
* Reads all entries from disk
*/
void read_new_entries(bool lazy = false);
void read_entries(bool lazy = false);

/**
* @return All dictionary entries
Expand Down Expand Up @@ -82,29 +83,22 @@ class DictionaryReader {

protected:
bool m_is_open;
FileReader m_dictionary_file_reader;
ArchiveReaderAdaptor& m_adaptor;
std::string m_dictionary_path;
ZstdDecompressor m_dictionary_decompressor;
std::vector<EntryType> m_entries;
};

class VariableDictionaryReader : public DictionaryReader<uint64_t, VariableDictionaryEntry> {};

class LogTypeDictionaryReader : public DictionaryReader<uint64_t, LogTypeDictionaryEntry> {};
using VariableDictionaryReader = DictionaryReader<uint64_t, VariableDictionaryEntry>;
using LogTypeDictionaryReader = DictionaryReader<uint64_t, LogTypeDictionaryEntry>;

template <typename DictionaryIdType, typename EntryType>
void DictionaryReader<DictionaryIdType, EntryType>::open(std::string const& dictionary_path) {
if (m_is_open) {
throw OperationFailed(ErrorCodeNotReady, __FILENAME__, __LINE__);
}

constexpr size_t cDecompressorFileReadBufferCapacity = 64 * 1024; // 64 KB

m_dictionary_file_reader.open(dictionary_path);
// Skip header
m_dictionary_file_reader.seek_from_begin(sizeof(uint64_t));
// Open decompressor
m_dictionary_decompressor.open(m_dictionary_file_reader, cDecompressorFileReadBufferCapacity);

m_dictionary_path = dictionary_path;
m_is_open = true;
}

Expand All @@ -113,40 +107,31 @@ void DictionaryReader<DictionaryIdType, EntryType>::close() {
if (false == m_is_open) {
throw OperationFailed(ErrorCodeNotReady, __FILENAME__, __LINE__);
}

m_dictionary_decompressor.close();
m_dictionary_file_reader.close();

m_is_open = false;
}

template <typename DictionaryIdType, typename EntryType>
void DictionaryReader<DictionaryIdType, EntryType>::read_new_entries(bool lazy) {
void DictionaryReader<DictionaryIdType, EntryType>::read_entries(bool lazy) {
if (false == m_is_open) {
throw OperationFailed(ErrorCodeNotInit, __FILENAME__, __LINE__);
}

auto dictionary_file_reader_pos = m_dictionary_file_reader.get_pos();
m_dictionary_file_reader.seek_from_begin(0);
uint64_t num_dictionary_entries;
m_dictionary_file_reader.read_numeric_value(num_dictionary_entries, false);
m_dictionary_file_reader.seek_from_begin(dictionary_file_reader_pos);
constexpr size_t cDecompressorFileReadBufferCapacity = 64 * 1024; // 64 KB
auto dictionary_reader = m_adaptor.checkout_reader_for_section(m_dictionary_path);

// Validate dictionary header
if (num_dictionary_entries < m_entries.size()) {
throw OperationFailed(ErrorCodeCorrupt, __FILENAME__, __LINE__);
uint64_t num_dictionary_entries;
dictionary_reader->read_numeric_value(num_dictionary_entries, false);
m_dictionary_decompressor.open(*dictionary_reader, cDecompressorFileReadBufferCapacity);

// Read dictionary entries
m_entries.resize(num_dictionary_entries);
for (size_t i = 0; i < num_dictionary_entries; ++i) {
auto& entry = m_entries[i];
entry.read_from_file(m_dictionary_decompressor, i, lazy);
}

// Read new dictionary entries
if (num_dictionary_entries > m_entries.size()) {
auto prev_num_dictionary_entries = m_entries.size();
m_entries.resize(num_dictionary_entries);

for (size_t i = prev_num_dictionary_entries; i < num_dictionary_entries; ++i) {
auto& entry = m_entries[i];
entry.read_from_file(m_dictionary_decompressor, i, lazy);
}
}
m_dictionary_decompressor.close();
m_adaptor.checkin_reader_for_section(m_dictionary_path);
}

template <typename DictionaryIdType, typename EntryType>
Expand Down
1 change: 1 addition & 0 deletions components/core/src/clp_s/JsonConstructor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ void JsonConstructor::store() {
"log order. Falling back to out of order decompression.");
}

m_archive_reader->open_packed_streams();
if (false == m_option.ordered || false == m_archive_reader->has_log_order()) {
FileWriter writer;
writer.open(
Expand Down
Loading

0 comments on commit ae419b0

Please sign in to comment.