Skip to content

Commit

Permalink
Concatenate files in comma separated list
Browse files Browse the repository at this point in the history
* Accept comma separated lists of files anywhere a filename is accepted
* Treat the list as if the files were concatenated together
* Particularly useful for system like HDFS where a 100G file is
  typically stored as 100 separate 1G files.
* Added Uri struct to support the combination of file lists and
  finding related suffixed files. Only file paths that handle suffixes
  were explicitly converted, everything else uses the implicit
  constructor from const char* and std::string.
  • Loading branch information
ebernhardson committed Feb 20, 2018
1 parent 2a2ac0d commit a761ed0
Show file tree
Hide file tree
Showing 10 changed files with 257 additions and 91 deletions.
4 changes: 2 additions & 2 deletions include/LightGBM/dataset_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class DatasetLoader {

private:

Dataset* LoadFromBinFile(const char* data_filename, const char* bin_filename, int rank, int num_machines, int* num_global_data, std::vector<data_size_t>* used_data_indices);
Dataset* LoadFromBinFile(const Uri& uri, int rank, int num_machines, int* num_global_data, std::vector<data_size_t>* used_data_indices);

void SetHeader(const char* filename);

Expand All @@ -52,7 +52,7 @@ class DatasetLoader {
void ExtractFeaturesFromFile(const char* filename, const Parser* parser, const std::vector<data_size_t>& used_data_indices, Dataset* dataset);

/*! \brief Check can load from binary file */
std::string CheckCanLoadFromBin(const char* filename);
Uri CheckCanLoadFromBin(const char* filename);

const IOConfig& io_config_;
/*! \brief Random generator*/
Expand Down
26 changes: 21 additions & 5 deletions include/LightGBM/utils/file_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,22 @@

namespace LightGBM{

struct Uri {
Uri(const char* uri_) : name(uri_), uri(uri_), suffix("") {}
Uri(const std::string& uri_) : name(uri_), uri(uri_), suffix("") {}
Uri(const std::string& uri_, const std::string& suffix_) : name(uri_), uri(uri_), suffix(suffix_) {
name.append(suffix);
}

std::string name;
std::string uri;
std::string suffix;

inline bool empty() const {
return uri.empty();
}
};

/*!
* \brief An interface for writing files from buffers
*/
Expand All @@ -24,16 +40,16 @@ struct VirtualFileWriter {
virtual size_t Write(const void* data, size_t bytes) const = 0;
/*!
* \brief Create appropriate writer for filename
* \param filename Filename of the data
* \param uri Filename of the data
* \return File writer instance
*/
static std::unique_ptr<VirtualFileWriter> Make(const std::string& filename);
static std::unique_ptr<VirtualFileWriter> Make(const Uri& filename);
/*!
* \brief Check filename existence
* \param filename Filename of the data
* \param uri Filename of the data
* \return True when the file exists
*/
static bool Exists(const std::string& filename);
static bool Exists(const Uri& uri);
};

/**
Expand Down Expand Up @@ -62,7 +78,7 @@ struct VirtualFileReader {
* \param filename Filename of the data
* \return File reader instance
*/
static std::unique_ptr<VirtualFileReader> Make(const std::string& filename);
static std::unique_ptr<VirtualFileReader> Make(const Uri& filename);
};

} // namespace LightGBM
Expand Down
6 changes: 3 additions & 3 deletions include/LightGBM/utils/pipeline_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ class PipelineReader {
public:
/*!
* \brief Read data from a file, use pipeline methods
* \param filename Filename of data
* \param uri Filename of data
* \process_fun Process function
*/
static size_t Read(const char* filename, int skip_bytes, const std::function<size_t (const char*, size_t)>& process_fun) {
auto reader = VirtualFileReader::Make(filename);
static size_t Read(const Uri& uri, int skip_bytes, const std::function<size_t (const char*, size_t)>& process_fun) {
auto reader = VirtualFileReader::Make(uri);
if (!reader->Init()) {
return 0;
}
Expand Down
24 changes: 12 additions & 12 deletions include/LightGBM/utils/text_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ class TextReader {
public:
/*!
* \brief Constructor
* \param filename Filename of data
* \param uri Filename of data
* \param is_skip_first_line True if need to skip header
*/
TextReader(const char* filename, bool is_skip_first_line):
filename_(filename), is_skip_first_line_(is_skip_first_line){
TextReader(const Uri& uri, bool is_skip_first_line):
uri_(uri), is_skip_first_line_(is_skip_first_line){
if (is_skip_first_line_) {
auto reader = VirtualFileReader::Make(filename);
auto reader = VirtualFileReader::Make(uri);
if (!reader->Init()) {
Log::Fatal("Could not open %s", filename);
Log::Fatal("Could not open %s", uri.name.c_str());
}
std::stringstream str_buf;
char read_c;
Expand All @@ -52,7 +52,7 @@ class TextReader {
++skip_bytes_;
}
first_line_ = str_buf.str();
Log::Debug("Skipped header \"%s\" in file %s", first_line_.c_str(), filename_);
Log::Debug("Skipped header \"%s\" in file %s", first_line_.c_str(), uri_.name.c_str());
}
}
/*!
Expand Down Expand Up @@ -83,7 +83,7 @@ class TextReader {
INDEX_T ReadAllAndProcess(const std::function<void(INDEX_T, const char*, size_t)>& process_fun) {
last_line_ = "";
INDEX_T total_cnt = 0;
PipelineReader::Read(filename_, skip_bytes_,
PipelineReader::Read(uri_, skip_bytes_,
[this, &total_cnt, &process_fun]
(const char* buffer_process, size_t read_cnt) {
size_t cnt = 0;
Expand Down Expand Up @@ -122,7 +122,7 @@ class TextReader {
});
// if last line of file doesn't contain end of line
if (last_line_.size() > 0) {
Log::Info("Warning: last line of %s has no end of line, still using this line", filename_);
Log::Info("Warning: last line of %s has no end of line, still using this line", uri_.name.c_str());
process_fun(total_cnt, last_line_.c_str(), last_line_.size());
++total_cnt;
last_line_ = "";
Expand All @@ -144,7 +144,7 @@ class TextReader {
std::vector<char> ReadContent(size_t* out_len) {
std::vector<char> ret;
*out_len = 0;
auto reader = VirtualFileReader::Make(filename_);
auto reader = VirtualFileReader::Make(uri_);
if (!reader->Init()) {
return ret;
}
Expand Down Expand Up @@ -229,7 +229,7 @@ class TextReader {
last_line_ = "";
INDEX_T total_cnt = 0;
INDEX_T used_cnt = 0;
PipelineReader::Read(filename_, skip_bytes_,
PipelineReader::Read(uri_, skip_bytes_,
[this, &total_cnt, &process_fun,&used_cnt, &filter_fun]
(const char* buffer_process, size_t read_cnt) {
size_t cnt = 0;
Expand Down Expand Up @@ -277,7 +277,7 @@ class TextReader {
});
// if last line of file doesn't contain end of line
if (last_line_.size() > 0) {
Log::Info("Warning: last line of %s has no end of line, still using this line", filename_);
Log::Info("Warning: last line of %s has no end of line, still using this line", uri_.name.c_str());
if (filter_fun(used_cnt, total_cnt)) {
lines_.push_back(last_line_);
process_fun(used_cnt, lines_);
Expand Down Expand Up @@ -308,7 +308,7 @@ class TextReader {

private:
/*! \brief Filename of text data */
const char* filename_;
const Uri uri_;
/*! \brief Cache the read text data */
std::vector<std::string> lines_;
/*! \brief Buffer for last line */
Expand Down
17 changes: 8 additions & 9 deletions src/io/dataset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -516,24 +516,23 @@ void Dataset::SaveBinaryFile(const char* bin_filename) {
return;
}
// if not pass a filename, just append ".bin" of original file
std::string bin_filename_str(data_filename_);
if (bin_filename == nullptr || bin_filename[0] == '\0') {
bin_filename_str.append(".bin");
bin_filename = bin_filename_str.c_str();
Uri uri(bin_filename);
if (uri.empty()) {
uri = Uri(data_filename_, ".bin");
}
bool is_file_existed = false;

if (VirtualFileWriter::Exists(bin_filename)) {
if (VirtualFileWriter::Exists(uri)) {
is_file_existed = true;
Log::Warning("File %s existed, cannot save binary to it", bin_filename);
Log::Warning("File %s existed, cannot save binary to it", uri.name.c_str());
}

if (!is_file_existed) {
auto writer = VirtualFileWriter::Make(bin_filename);
auto writer = VirtualFileWriter::Make(uri);
if (!writer->Init()) {
Log::Fatal("Cannot write binary data to %s ", bin_filename);
Log::Fatal("Cannot write binary data to %s ", uri.name.c_str());
}
Log::Info("Saving data to binary file %s", bin_filename);
Log::Info("Saving data to binary file %s", uri.name.c_str());
size_t size_of_token = std::strlen(binary_file_token);
writer->Write(binary_file_token, size_of_token);
// get size of header
Expand Down
37 changes: 18 additions & 19 deletions src/io/dataset_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ Dataset* DatasetLoader::LoadFromFile(const char* filename, const char* initscore
auto dataset = std::unique_ptr<Dataset>(new Dataset());
data_size_t num_global_data = 0;
std::vector<data_size_t> used_data_indices;
auto bin_filename = CheckCanLoadFromBin(filename);
if (bin_filename.size() == 0) {
auto uri = CheckCanLoadFromBin(filename);
if (uri.empty()) {
auto parser = std::unique_ptr<Parser>(Parser::CreateParser(filename, io_config_.has_header, 0, label_idx_));
if (parser == nullptr) {
Log::Fatal("Could not recognize data format of %s", filename);
Expand Down Expand Up @@ -207,7 +207,7 @@ Dataset* DatasetLoader::LoadFromFile(const char* filename, const char* initscore
}
} else {
// load data from binary file
dataset.reset(LoadFromBinFile(filename, bin_filename.c_str(), rank, num_machines, &num_global_data, &used_data_indices));
dataset.reset(LoadFromBinFile(uri, rank, num_machines, &num_global_data, &used_data_indices));
}
// check meta data
dataset->metadata_.CheckOrPartition(num_global_data, used_data_indices);
Expand All @@ -222,8 +222,8 @@ Dataset* DatasetLoader::LoadFromFileAlignWithOtherDataset(const char* filename,
data_size_t num_global_data = 0;
std::vector<data_size_t> used_data_indices;
auto dataset = std::unique_ptr<Dataset>(new Dataset());
auto bin_filename = CheckCanLoadFromBin(filename);
if (bin_filename.size() == 0) {
auto uri = CheckCanLoadFromBin(filename);
if (uri.empty()) {
auto parser = std::unique_ptr<Parser>(Parser::CreateParser(filename, io_config_.has_header, 0, label_idx_));
if (parser == nullptr) {
Log::Fatal("Could not recognize data format of %s", filename);
Expand Down Expand Up @@ -254,20 +254,20 @@ Dataset* DatasetLoader::LoadFromFileAlignWithOtherDataset(const char* filename,
}
} else {
// load data from binary file
dataset.reset(LoadFromBinFile(filename, bin_filename.c_str(), 0, 1, &num_global_data, &used_data_indices));
dataset.reset(LoadFromBinFile(uri, 0, 1, &num_global_data, &used_data_indices));
}
// not need to check validation data
// check meta data
dataset->metadata_.CheckOrPartition(num_global_data, used_data_indices);
return dataset.release();
}

Dataset* DatasetLoader::LoadFromBinFile(const char* data_filename, const char* bin_filename, int rank, int num_machines, int* num_global_data, std::vector<data_size_t>* used_data_indices) {
Dataset* DatasetLoader::LoadFromBinFile(const Uri& uri, int rank, int num_machines, int* num_global_data, std::vector<data_size_t>* used_data_indices) {
auto dataset = std::unique_ptr<Dataset>(new Dataset());
auto reader = VirtualFileReader::Make(bin_filename);
dataset->data_filename_ = data_filename;
auto reader = VirtualFileReader::Make(uri);
dataset->data_filename_ = uri.uri;
if (!reader->Init()) {
Log::Fatal("Could not read binary data from %s", bin_filename);
Log::Fatal("Could not read binary data from %s", uri.name.c_str());
}

// buffer to read binary file
Expand Down Expand Up @@ -1058,17 +1058,16 @@ void DatasetLoader::ExtractFeaturesFromFile(const char* filename, const Parser*
}

/*! \brief Check can load from binary file */
std::string DatasetLoader::CheckCanLoadFromBin(const char* filename) {
std::string bin_filename(filename);
bin_filename.append(".bin");
Uri DatasetLoader::CheckCanLoadFromBin(const char* filename) {
Uri uri(filename, ".bin");

auto reader = VirtualFileReader::Make(bin_filename.c_str());
auto reader = VirtualFileReader::Make(uri);

if (!reader->Init()) {
bin_filename = std::string(filename);
reader = VirtualFileReader::Make(bin_filename.c_str());
uri = Uri(filename);
reader = VirtualFileReader::Make(uri);
if (!reader->Init()) {
Log::Fatal("cannot open data file %s", bin_filename.c_str());
Log::Fatal("cannot open data file %s", uri.name.c_str());
}
}

Expand All @@ -1079,9 +1078,9 @@ std::string DatasetLoader::CheckCanLoadFromBin(const char* filename) {
size_t read_cnt = reader->Read(buffer.data(), size_of_token);
if (read_cnt == size_of_token
&& std::string(buffer.data()) == std::string(Dataset::binary_file_token)) {
return bin_filename;
return uri;
} else {
return std::string();
return Uri("");
}

}
Expand Down
Loading

0 comments on commit a761ed0

Please sign in to comment.