diff --git a/src/BUSData.cpp b/src/BUSData.cpp index 7b3ae85..c97f80f 100644 --- a/src/BUSData.cpp +++ b/src/BUSData.cpp @@ -71,6 +71,36 @@ uint64_t stringToBinary(const char* s, const size_t len, uint32_t &flag) { return r; } +int identifyParseHeader(std::istream &inf, BUSHeader &header, compressed_BUSHeader &comp_header) +{ + int ret = -1; + char magic[5]; + + magic[4] = '\0'; + + inf.read(magic, 4); + for (int i = 0; i < 4; ++i) + { + inf.putback(magic[3 - i]); + } + + if (std::strcmp(magic, "BUS\0") == 0) + { + return BUSFILE_TYPE::BUSFILE * parseHeader(inf, header); + } + else if (std::strcmp(magic, "BUS\1") == 0) + { + return BUSFILE_TYPE::BUSFILE_COMPRESED * parseCompressedHeader(inf, comp_header); + } + else if(std::strcmp(magic, "BZI\0") == 0){ + return BUSFILE_TYPE::BUSZ_INDEX; + } + else if (std::strcmp(magic, "BEC\0") == 0) + { + return BUSFILE_TYPE::EC_MATRIX_COMPRESSED; + } + return BUSFILE_TYPE::EC_MATRIX; +} bool parseHeader(std::istream &inf, BUSHeader &header) { char magic[4]; @@ -95,7 +125,80 @@ bool parseHeader(std::istream &inf, BUSHeader &header) { return true; } +bool parseCompressedHeader(std::istream &inf, compressed_BUSHeader &compheader) +{ + char magic[5]; + magic[4] = '\0'; + + BUSHeader &header = compheader.bus_header; + inf.read(magic, 4); + if (std::strcmp(magic, "BUS\1") != 0) + { + std::cerr << "Invalid header magic\n"; + return false; + } + inf.read((char *)(&header.version), sizeof(header.version)); + if (header.version != BUSFORMAT_VERSION) + { + return false; + } + inf.read((char *)(&header.bclen), sizeof(header.bclen)); + inf.read((char *)(&header.umilen), sizeof(header.umilen)); + uint32_t tlen = 0; + inf.read((char *)(&tlen), sizeof(tlen)); + char *t = new char[tlen + 1]; + inf.read(t, tlen); + t[tlen] = '\0'; + header.text.assign(t); + delete[] t; + + // We store the compressed_header-specific information after the regular header + inf.read((char *)&compheader.chunk_size, sizeof(compheader.chunk_size)); + inf.read((char *)&compheader.pfd_blocksize, sizeof(compheader.pfd_blocksize)); + inf.read((char *)&compheader.lossy_umi, sizeof(compheader.lossy_umi)); + return true; +} + +bool parseECs_stream(std::istream &in, BUSHeader &header) +{ + auto &ecs = header.ecs; + std::string line, t; + line.reserve(10000); + + std::vector c; + + int i = 0; + bool has_reached = false; + while (std::getline(in, line)) + { + c.clear(); + int ec = -1; + if (line.size() == 0) + { + continue; + } + std::stringstream ss(line); + ss >> ec; + assert(ec == i); + while (std::getline(ss, t, ',')) + { + c.push_back(std::stoi(t)); + } + if (!has_reached) + { + has_reached |= !(c.size() == 1 && c[0] == i); + if (has_reached) + { + std::cerr << "first line is " << i << '\n'; + } + } + + ecs.push_back(std::move(c)); + ++i; + } + return true; +} bool parseECs(const std::string &filename, BUSHeader &header) { auto &ecs = header.ecs; @@ -294,3 +397,24 @@ bool writeHeader(std::ostream &outf, const BUSHeader &header) { return true; } + +bool writeCompressedHeader(std::ostream &outf, const compressed_BUSHeader &compheader) +{ + outf.write("BUS\1", 4); + + // We start writing out the contents of the general header + const auto header = compheader.bus_header; + outf.write((char *)(&header.version), sizeof(header.version)); + outf.write((char *)(&header.bclen), sizeof(header.bclen)); + outf.write((char *)(&header.umilen), sizeof(header.umilen)); + uint32_t tlen = header.text.size(); + outf.write((char *)(&tlen), sizeof(tlen)); + outf.write((char *)header.text.c_str(), tlen); + + // We end by writing out the compressed-header-specific data + outf.write((char *)(&compheader.chunk_size), sizeof(compheader.chunk_size)); + outf.write((char *)&compheader.pfd_blocksize, sizeof(compheader.pfd_blocksize)); + outf.write((char *)(&compheader.lossy_umi), sizeof(compheader.lossy_umi)); + + return true; +} \ No newline at end of file diff --git a/src/BUSData.h b/src/BUSData.h index 97c5232..5c697f7 100644 --- a/src/BUSData.h +++ b/src/BUSData.h @@ -28,6 +28,15 @@ struct BUSHeader { BUSHeader() : version(0), bclen(0), umilen(0) {} }; +struct compressed_BUSHeader +{ + uint32_t chunk_size; + uint32_t lossy_umi; + uint32_t pfd_blocksize = 512; + BUSHeader bus_header; + compressed_BUSHeader() : chunk_size(0), lossy_umi(0) {} +}; + struct BUSData { uint64_t barcode; uint64_t UMI; @@ -38,11 +47,23 @@ struct BUSData { BUSData() : barcode(0), UMI(0), ec(-1), count(0), flags(0), pad(0) {} }; +enum BUSFILE_TYPE +{ + BUSFILE = 1, + BUSFILE_COMPRESED = 2, + BUSZ_INDEX = 3, + EC_MATRIX = 4, + EC_MATRIX_COMPRESSED = 5 +}; bool parseHeader(std::istream &inf, BUSHeader &header); bool writeHeader(std::ostream &outf, const BUSHeader &header); +bool parseCompressedHeader(std::istream &inf, compressed_BUSHeader &header); +bool writeCompressedHeader(std::ostream &inf, const compressed_BUSHeader &header); +int identifyParseHeader(std::istream &inf, BUSHeader &header, compressed_BUSHeader &comp_header); +bool parseECs_stream(std::istream &in, BUSHeader &header); bool parseECs(const std::string &filename, BUSHeader &header); bool writeECs(const std::string &filename, const BUSHeader &header); bool writeGenes(const std::string &filename, const std::unordered_map &genenames); diff --git a/src/Common.hpp b/src/Common.hpp index 7e85d75..f67ee5c 100644 --- a/src/Common.hpp +++ b/src/Common.hpp @@ -102,6 +102,12 @@ struct Bustools_opt /* linker */ int start, end; + /* Compression */ + std::string busz_index; + uint32_t chunk_size = 100000; + uint32_t lossy_umi = 0; + uint32_t pfd_blocksize = 512; + Bustools_opt() : threads(1), max_memory(1ULL << 32), type(0), threshold(0), start(-1), end(-1) {} }; diff --git a/src/bustools_compress.cpp b/src/bustools_compress.cpp new file mode 100644 index 0000000..a864063 --- /dev/null +++ b/src/bustools_compress.cpp @@ -0,0 +1,969 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include "Common.hpp" +#include "BUSData.h" +#include "bustools_compress.h" + +size_t pfd_blocksize = 512; + +/** + * @brief Encode `num` using fibonacci encoding into buf, starting at bitpos. + * @pre the next 128 bits in `buf` are 0 starting from `bitpos`, and wrapped around 192. + * 0 <= bit_pos < 3*64 == 192 + * + * @pre num > 0 + * @post buf now contains the fibo encoding of num from [pre(bitpos); post(bitpos)]. + * bit_pos has changed, 0 <= bitpos < 3*64 == 192. + * 0 or more elements in buf have been concentrated with fibo-encoded numbers and thus written to obuf. + * + * @note The largest fibonacci number that fits in a uint64_t is the 91st (0-indexed) fibo number, i.e. the first 92 fibonacci numbers fit in a 64-bit uint. + * Fibonacci encoding appends a single bit as a stop code. Hence, the longest fibonacci encoding uses 93 bits. + * + * @param num the number to encode, num > 0 + * @param bufsize the size of the output buffer. + * @param buf array of `bufsize` uint64_t. num is fibonacci-encoded in buf. + * @param bitpos the bit position in buf where the next fibo encoding of num starts. + * @param obuf the ostream buffer to write to. + * @return bool true iff encoding the current number does not write outside of buf + */ + +const auto fibo_begin = fibo64.begin(); +const auto fibo_end = fibo64.end(); + +template +bool fiboEncode(const uint64_t num, const size_t bufsize, BUF_t *buf, size_t &bitpos) +{ + constexpr uint32_t word_size = sizeof(BUF_t) * 8; + + const size_t max_bitpos = bufsize * word_size; + constexpr BUF_t ONE{1}; + + const uint32_t curr_byte_pos = bitpos / word_size; + + uint64_t remainder = num; + + // the ith fibonacci number is the largest fibo not greater than remainder + auto i = std::upper_bound(fibo_begin, fibo_end, remainder) - 1; + + const uint32_t n_bits = (i - fibo_begin) + 2; + + // Encoding the current number would write out of bounds of buf. + if (bitpos + n_bits > max_bitpos) + return false; + + uint32_t next_bit_pos = bitpos + n_bits - 1, + bit_offset = next_bit_pos % word_size, + buf_offset = (next_bit_pos / word_size) % bufsize; + + // Set the stop bit. + buf[buf_offset] |= ONE << (word_size - 1 - bit_offset); + + ++i; + while (remainder > 0) + { + i = std::upper_bound(fibo_begin, i, remainder) - 1; + next_bit_pos = bitpos + (i - fibo_begin); + buf_offset = (next_bit_pos / word_size) % bufsize; + bit_offset = next_bit_pos % word_size; + + buf[buf_offset] |= ONE << (word_size - 1 - bit_offset); + remainder -= *i; + } + bitpos += n_bits; + return true; +} + +/** + * @brief pack elem into buf starting at bitpos, using exactly b_bits bits. + * @pre num is representable using `b_bits` bits. + * @pre buf has at least one element + * @pre buf has at least two elements if bitpos + b_bits > 8*sizeof(buf). + * + * @param b_bits The number of bits to represent elem. + * @param elem The number to pack, must be representable with at most `b_bits` bits. + * @param buf The int64_t array where elem should be packed into. + * @param bitpos The starting point in bits of where to back elem. + * @return bool: true iff packing of elem saturates buf[0]. + */ +template +bool pack_int( + const uint32_t b_bits, + T elem, + T *buf, + uint32_t &bitpos) +{ + constexpr int32_t dest_wordsize = sizeof(T) * 8; + + // | | | + // ^ ^ + // bitpos dest_wordsize + int32_t shift = dest_wordsize - bitpos - b_bits; + T carryover = 0; + constexpr T ONE{1}; + + if (shift < 0) + { + // b_bits > (dest_wordsize - bitpos) + // -> number covers two elements + + carryover = elem & (ONE << -shift) - 1; + *(buf + 1) = carryover << dest_wordsize + shift; + elem >>= -shift; + } + + // shift by max(0, shift) + *buf |= elem << (shift > 0) * shift; + + bitpos = (dest_wordsize - shift) % dest_wordsize; + return (shift <= 0); +} + +/** + * @brief Encode a block of integers using NewPFD. + * @pre BUF has a size of `pfd_block`.size() / 64 * `b_bits` elements. + * + * @param pfd_block The numbers to encode. + * @param index_gaps Output: delta encoded indices of exceptions in `pfd_block`. + * @param exceptions Output: The most significant bits of each exception. + * @param b_bits The number of bits to use per int for the block. + * @param min_element The smallest element in `pfd_block`. + * @param BUF The buffer to pack the elements of pfd_block into. + */ +template +void encode_pfd_block( + std::vector &pfd_block, + std::vector &index_gaps, + std::vector &exceptions, + const uint32_t b_bits, + const SRC_T min_element, + DEST_T *BUF) +{ + index_gaps.clear(); + exceptions.clear(); + + SRC_T max_elem_bit_mask = (1ULL << b_bits) - 1; + uint32_t bitpos = 0, + idx = 0, + last_ex_idx = 0; + + bool do_increment_pointer = 0; + + // Store the elements in the primary block in pfd_buf using `b_bits` bits each. + for (SRC_T &elem : pfd_block) + { + elem -= min_element; + if (elem > max_elem_bit_mask) + { + // store the overflowing, most significant bits as exceptions. + exceptions.push_back(elem >> b_bits); + index_gaps.push_back(idx - last_ex_idx); + last_ex_idx = idx; + + // store the least b significant bits in the frame. + elem &= max_elem_bit_mask; + } + + do_increment_pointer = pack_int(b_bits, elem, BUF, bitpos); + BUF += do_increment_pointer; + ++idx; + } +} + +/** + * @brief Encode a block of `block_size` elements in `pfd_block` and write to of. + * + * @param block_size The number of elements in the block. + * @param pfd_block The elements to encode. + * @param index_gaps Vector to store delta-encoded indices of exceptions. + * @param exceptions Vector to store the most significant bits of exceptions. + * @param fibonacci_buf array used for storing fibonacci encoding. + * @param b_bits The number of bits each num in `pfd_block` is represented with. + * @param min_element The smallest element in `pfd_block`. + * @param of The ostream to write out the encoding. + */ + +template +size_t new_pfd( + const size_t block_size, + std::vector &pfd_block, + std::vector &index_gaps, + std::vector &exceptions, + DEST_T *fibonacci_buf, + const size_t b_bits, + const SRC_T min_element, + size_t fibonacci_bufsize, + DEST_T *PFD_buf) +{ + bool success = true; + constexpr size_t wordsize = sizeof(DEST_T) * 8; + + size_t buf_size = (block_size * b_bits) / wordsize; + std::fill(PFD_buf, PFD_buf + buf_size, 0ULL); + + size_t bitpos{0}; + + encode_pfd_block(pfd_block, index_gaps, exceptions, b_bits, min_element, PFD_buf); + + size_t n_exceptions = index_gaps.size(); + // For more compact fibonacci encoding, we pack the fibo encoded values together + success &= fiboEncode(b_bits + 1, fibonacci_bufsize, fibonacci_buf, bitpos); + success &= fiboEncode(min_element + 1, fibonacci_bufsize, fibonacci_buf, bitpos); + success &= fiboEncode(n_exceptions + 1, fibonacci_bufsize, fibonacci_buf, bitpos); + + for (const auto &i : index_gaps) + { + // we must increment by one, since the first index gap can be zero + success &= fiboEncode(i + 1, fibonacci_bufsize, fibonacci_buf, bitpos); + } + for (const auto &ex : exceptions) + { + // These are always > 0 since they contain the most significant bits of exception + success &= fiboEncode(ex, fibonacci_bufsize, fibonacci_buf, bitpos); + } + + size_t n_elems = bitpos / wordsize + (bitpos % wordsize > 0); + success &= (n_elems + buf_size <= fibonacci_bufsize); + + std::memcpy(fibonacci_buf + n_elems, PFD_buf, buf_size * sizeof(DEST_T) * success); + n_elems += buf_size; + + return n_elems * success; +} + +/** + * @brief Compute the smallest element, and the number of bits required to encode at least 90% of + * the numbers in pfd_scratch. + * @pre pfd_scratch contains the numbers to encode in a block + * @post pfd_scratch may have been reordered. + * @param block_size The number of elements in pfd_scratch, i.e. the block size. + * @param pfd_scratch The numbers to encode in a single block using NewPFD. + * @param min_element Output: The smallest element in `pfd_scratch`. + * @param b_bits Output: The minimum number of bits that are enough to encode at least ~90% of the elements in `pfd_scratch`. + */ + +template +void compute_pfd_params( + const size_t block_size, + std::vector &pfd_scratch, + const T &min_element, + uint32_t &b_bits) +{ + const size_t nth_elem_idx = (size_t)((double)block_size * 0.9); + + std::nth_element(pfd_scratch.begin(), pfd_scratch.begin() + nth_elem_idx, pfd_scratch.end()); + T nth_max_element = pfd_scratch[nth_elem_idx]; + + b_bits = sizeof(T) * 8 - 1 - __builtin_clrsb(nth_max_element - min_element); + b_bits = b_bits ?: 1; +} + +/** + * @brief Compress barcodes of rows using delta-runlen(0)-fibonacci encoding and write to `of`. + * + * @param rows BUSData array, contains at least `row_count` elements + * @param row_count The number of barcodes to compress. + * @param of The ostream for writing the encoding to. + * @return bool true iff encoding does not go out of bounds of obuf + */ +template +bool compress_barcodes(BUSData const *const rows, const int row_count, char *obuf, const size_t &obuf_size, size_t &global_bufpos) +{ + bool success = true; + uint64_t barcode = 0, + last_bc = 0, + runlen = 0; + size_t wordsize = sizeof(T) * 8; + + const size_t fibonacci_bufsize = (obuf_size - global_bufpos) / sizeof(T); + T *fibonacci_buf = (T *)(obuf + global_bufpos); + size_t bitpos{0}; + + for (int i = 0; i < row_count && success; ++i) + { + barcode = rows[i].barcode; + + // delta encoding + barcode -= last_bc; + + // Runlength encoding of zeros + if (barcode == 0) + { + ++runlen; + } + else + { + // Increment values as fibo cannot encode 0 + if (runlen) + { + success &= fiboEncode(1ULL, fibonacci_bufsize, fibonacci_buf, bitpos); + success &= fiboEncode(runlen, fibonacci_bufsize, fibonacci_buf, bitpos); + runlen = 0; + } + + if (barcode + 1 == 0) + { + std::cerr << "This input file needs sorting. Please sort this file and try again." << std::endl; + throw std::runtime_error("Input needs sorting prior to compression"); + } + + success &= fiboEncode(barcode + 1, fibonacci_bufsize, fibonacci_buf, bitpos); + } + last_bc = rows[i].barcode; + } + + // Take care of the last run of zeros in the delta-encoded barcodes + if (runlen) + { + success &= fiboEncode(1ULL, fibonacci_bufsize, fibonacci_buf, bitpos); + success &= fiboEncode(runlen, fibonacci_bufsize, fibonacci_buf, bitpos); + } + + global_bufpos += (bitpos / wordsize + (bitpos % wordsize > 0)) * sizeof(T); + return success; +} + +/** + * @brief Compress UMIs of rows using periodic_delta-runlen(0)-fibonacci encoding and write to `of`. + * + * @param rows BUSData array, contains at least `row_count` elements + * @param row_count The number of UMIs to compress. + * @param of The ostream for writing the encoding to. + * @return bool true iff encoding does not go out of bounds of obuf + */ +template +bool lossless_compress_umis(BUSData const *const rows, const int row_count, char *obuf, const size_t &obuf_size, size_t &global_bufpos) +{ + bool success = true; + uint64_t last_bc = rows[0].barcode + 1, + last_umi = 0, + bc, umi, diff; + + size_t wordsize = sizeof(T) * 8; + + const size_t fibonacci_bufsize = (obuf_size - global_bufpos) / sizeof(T); + T *fibonacci_buf = (T *)(obuf + global_bufpos); + size_t bitpos{0}; + + uint64_t runlen{0}; + const uint32_t RLE_val{0ULL}; + + for (int i = 0; i < row_count && success; ++i) + { + bc = rows[i].barcode; + + // We must increment umi, since a UMI==0 will confuse the runlength decoder. + umi = rows[i].UMI + 1; + + if (last_bc != bc) + { + last_umi = 0; + } + + diff = umi - last_umi; + + if (diff == RLE_val) + { + ++runlen; + } + else + { + // Increment values for fibonacci encoding. + if (runlen) + { + success &= fiboEncode(RLE_val + 1, fibonacci_bufsize, fibonacci_buf, bitpos); + success &= fiboEncode(runlen, fibonacci_bufsize, fibonacci_buf, bitpos); + runlen = 0; + } + + if(diff + 1 == 0){ + std::cerr << "This input file needs sorting. Please sort this file and try again." << std::endl; + throw std::runtime_error("Input needs sorting prior to compression"); + } + + success &= fiboEncode(diff + 1, fibonacci_bufsize, fibonacci_buf, bitpos); + } + + last_umi = umi; + last_bc = bc; + } + + // Take care of the last run of zeros. + if (runlen) + { + success &= fiboEncode(RLE_val + 1, fibonacci_bufsize, fibonacci_buf, bitpos); + success &= fiboEncode(runlen, fibonacci_bufsize, fibonacci_buf, bitpos); + } + + global_bufpos += (bitpos / wordsize + (bitpos % wordsize > 0)) * sizeof(T); + + return success; +} + +template +bool lossy_compress_umis(BUSData const *const rows, const int row_count, char *obuf, const size_t &obuf_size, size_t &global_bufpos) +{ + bool success = true; + std::cerr << "BEWARE: Lossy compression\n"; + return success; +} + +/** + * @brief Compress ECs of rows using NewPFD-fibonacci encoding and write to `of`. + * + * @param rows BUSData array, contains at least `row_count` elements + * @param row_count The number of ECs to compress. + * @param of The ostream for writing the encoding to. + * @return bool true iff encoding does not go out of bounds of obuf + */ +template +bool compress_ecs(BUSData const *const rows, const int row_count, char *obuf, const size_t &obuf_size, size_t &global_bufpos) +{ + bool success = true; + size_t BLOCK_SIZE{pfd_blocksize}; + size_t wordsize = sizeof(T) * 8; + size_t buf_offset = 0; + + std::vector index_gaps; + std::vector pfd_scratch, + pfd_block, + exceptions; + + // todo: We might be able to speed up by creating the primary array here. + size_t max_size_block = BLOCK_SIZE * sizeof(int32_t); + T *primary_block = new T[max_size_block]; + + exceptions.reserve(BLOCK_SIZE); + index_gaps.reserve(BLOCK_SIZE); + pfd_scratch.reserve(BLOCK_SIZE); + pfd_block.reserve(BLOCK_SIZE); + + const size_t fibonacci_bufsize = (obuf_size - global_bufpos) / sizeof(T); + T *fibonacci_buf = (T *)(obuf + global_bufpos); + + int row_index{0}; + int pfd_row_index{0}; + size_t elems_written = 0; + size_t byte_count = 0; + while (row_index < row_count && success) + { + pfd_row_index = 0; + pfd_scratch.clear(); + pfd_block.clear(); + int32_t min_element = rows[row_index].ec, + curr_el; + + while (pfd_row_index < BLOCK_SIZE && row_index < row_count) + { + curr_el = rows[row_index].ec; + pfd_scratch.push_back(curr_el); + pfd_block.push_back(curr_el); + + min_element = (min_element < curr_el) * min_element + (curr_el <= min_element) * curr_el; + ++pfd_row_index; + ++row_index; + } + + uint32_t b_bits = 0; + compute_pfd_params(pfd_row_index, pfd_scratch, min_element, b_bits); + + // we don't want to reset the fibonacci bytes, as this is our primary buffer. + elems_written = new_pfd( + BLOCK_SIZE, + pfd_block, + index_gaps, + exceptions, + fibonacci_buf + buf_offset, + b_bits, + min_element, + fibonacci_bufsize - buf_offset, + primary_block); + + success &= (elems_written > 0); + buf_offset += elems_written; + byte_count += elems_written * sizeof(T); + } + + global_bufpos += byte_count; + + delete[] primary_block; + return success; +} + +/** + * @brief Compress counts of rows using runlength(1)-fibonacci encoding and write to `of`. + * + * @param rows BUSData array, contains at least `row_count` elements + * @param row_count The number of counts to compress. + * @param of The ostream for writing the encoding to. + * @return bool true iff encoding does not go out of bounds of obuf + */ +template +bool compress_counts(BUSData const *const rows, const int row_count, char *obuf, const size_t &obuf_size, size_t &global_bufpos) +{ + bool success = true; + const uint32_t RLE_val{1UL}; + uint32_t count; + uint64_t runlen{0}; + + size_t wordsize = sizeof(T) * 8; + + const size_t fibonacci_bufsize = (obuf_size - global_bufpos) / sizeof(T); + T *fibonacci_buf = (T *)(obuf + global_bufpos); + size_t bitpos{0}; + + for (int i = 0; i < row_count && success; ++i) + { + count = rows[i].count; + if (count == RLE_val) + { + ++runlen; + } + else + { + if (runlen) + { + // Runlength-encode 1s. + success &= fiboEncode(RLE_val, fibonacci_bufsize, fibonacci_buf, bitpos); + success &= fiboEncode(runlen, fibonacci_bufsize, fibonacci_buf, bitpos); + runlen = 0; + } + success &= fiboEncode(count, fibonacci_bufsize, fibonacci_buf, bitpos); + } + } + if (runlen) + { + // Runlength-encode last run of 1s. + success &= fiboEncode(RLE_val, fibonacci_bufsize, fibonacci_buf, bitpos); + success &= fiboEncode(runlen, fibonacci_bufsize, fibonacci_buf, bitpos); + } + + global_bufpos += (bitpos / wordsize + (bitpos % wordsize > 0)) * sizeof(T); + return success; +} + +/** + * @brief Compress flags of rows using runlength(0)-fibonacci encoding and write to `of`. + * + * @param rows BUSData array, contains at least `row_count` elements + * @param row_count The number of counts to compress. + * @param of The ostream for writing the encoding to. + * @return bool true iff encoding does not go out of bounds of obuf + */ +template +bool compress_flags(BUSData const *const rows, const int row_count, char *obuf, const size_t &obuf_size, size_t &global_bufpos) +{ + bool success = true; + const uint32_t RLE_val{0UL}; + uint32_t flag, + runlen{0}; + + size_t wordsize = sizeof(T) * 8; + + const size_t fibonacci_bufsize = (obuf_size - global_bufpos) / sizeof(T); + T *fibonacci_buf = (T *)(obuf + global_bufpos); + size_t bitpos{0}; + // don't need to fill with zeros as that is done prior. + + for (int i = 0; i < row_count && success; ++i) + { + flag = rows[i].flags; + + if (flag == RLE_val) + { + ++runlen; + } + else + { + // Increment values as fibo cannot encode 0 + if (runlen) + { + // Runlength-encode 0s (incremented). + success &= fiboEncode(RLE_val + 1, fibonacci_bufsize, fibonacci_buf, bitpos); + success &= fiboEncode(runlen, fibonacci_bufsize, fibonacci_buf, bitpos); + runlen = 0; + } + success &= fiboEncode(flag + 1, fibonacci_bufsize, fibonacci_buf, bitpos); + } + } + + if (runlen) + { + // Runlength-encode last run of 0s (incremented). + success &= fiboEncode(RLE_val + 1, fibonacci_bufsize, fibonacci_buf, bitpos); + success &= fiboEncode(runlen, fibonacci_bufsize, fibonacci_buf, bitpos); + } + + global_bufpos += (bitpos / wordsize + (bitpos % wordsize > 0)) * sizeof(T); + return success; +} + +typedef bool (*compress_ptr)(BUSData const *, const int, char *, const size_t &, size_t &); + +void compress_busfile(const Bustools_opt &opt, std::ostream &outf, std::istream &in, BUSHeader &h) +{ + constexpr size_t ROW_SIZE = sizeof(BUSData); + + size_t N = opt.max_memory / ROW_SIZE; + const size_t chunk_size = (N < opt.chunk_size) ? N : opt.chunk_size; + + compress_ptr compressors[5]{ + &compress_barcodes, + (opt.lossy_umi ? &lossy_compress_umis : &lossless_compress_umis), + &compress_ecs, + &compress_counts, + &compress_flags, + }; + + compressed_BUSHeader comp_h; + comp_h.chunk_size = chunk_size; + comp_h.lossy_umi = opt.lossy_umi; + + comp_h.bus_header.text = h.text; + comp_h.bus_header.version = h.version; + comp_h.bus_header.bclen = h.bclen; + comp_h.bus_header.umilen = h.umilen; + + pfd_blocksize = opt.pfd_blocksize; + comp_h.pfd_blocksize = pfd_blocksize; + + writeCompressedHeader(outf, comp_h); + + std::vector block_sizes; + + // 6 * chunk_size is usually good enough, but we make it a multiple of 8; + size_t bufsize = (6 * chunk_size / 8) * 8; + size_t bufpos = 0; + size_t buf_checkpoint = 0; + + uint64_t block_header = 0; + uint64_t row_count = 0; + + BUSData *busdata; + char *buffer; + BUSZIndex busz_index(h, opt); + + try + { + busdata = new BUSData[chunk_size]; + buffer = new char[bufsize]; + + std::fill(buffer, buffer + bufsize, 0); + while (in.good()) + { + + in.read((char *)busdata, chunk_size * ROW_SIZE); + busz_index_add(busz_index, busdata[0].barcode, outf.tellp()); + row_count = in.gcount() / ROW_SIZE; + + for (int i_col = 0; i_col < 5; ++i_col) + { + bool success = compressors[i_col](busdata, row_count, buffer, bufsize, bufpos); + if (!success) + { + bufsize *= 2; + + char *newbuf = new char[bufsize]; + std::memcpy(newbuf, buffer, buf_checkpoint); + std::fill(newbuf + buf_checkpoint, newbuf + bufsize, 0); + + delete[] buffer; + buffer = newbuf; + + bufpos = buf_checkpoint; + --i_col; + } + buf_checkpoint = bufpos; + } + + block_header = bufpos << 30; + block_header |= row_count; + + outf.write((char *)&block_header, sizeof(block_header)); + outf.write(buffer, bufpos); + + std::fill(buffer, buffer + bufpos, 0); + + block_sizes.push_back(bufpos); + buf_checkpoint = 0; + bufpos = 0; + } + block_header = 0; + outf.write((char *)&block_header, sizeof(block_header)); + + delete[] busdata; + delete[] buffer; + } + catch (const std::bad_alloc &ex) + { + std::cerr << "Unable to allocate buffer\n" + << ex.what() << std::endl; + delete[] busdata; + delete[] buffer; + exit(1); + } + catch(const std::runtime_error &ex){ + delete[] busdata; + delete[] buffer; + exit(-1); + } + + if (!opt.busz_index.empty()) + { + std::ofstream ofindex; + ofindex.open(opt.busz_index); + busz_index.last_block = row_count ?: chunk_size; + write_BuszIndex(busz_index, ofindex); + } +} + +void busz_index_add(BUSZIndex &index, uint64_t bc, uint64_t pos) +{ + index.barcodes.push_back(bc); + index.positions.push_back(pos); + ++index.n_blocks; +} +void write_BuszIndex(BUSZIndex &index, std::ostream &of) +{ + + if(index.n_blocks > 1 && index.barcodes[index.n_blocks-2] == index.barcodes.back()){ + index.barcodes.pop_back(); + index.positions.pop_back(); + --index.n_blocks; + } + + of.write("BZI\0", 4); + of.write((char *)&index.n_blocks, sizeof(index.n_blocks)); + of.write((char *)&index.block_size, sizeof(index.block_size)); + of.write((char *)&index.last_block, sizeof(index.last_block)); + + for (int i = 0; i < index.n_blocks; ++i) + { + of.write((char *)&index.barcodes[i], sizeof(index.barcodes[i])); + of.write((char *)&index.positions[i], sizeof(index.positions[i])); + } +} + +template +bool pack_ec_row_to_file( + const std::vector &ecs, + const size_t bufsize, + T *buf, + size_t &bitpos, + std::ostream &of) +{ + bool success = true; + // Diffs must be incremented since ec == 0 is valid, although rare. + constexpr int32_t RL_VAL{2}; + + size_t n_elems{ecs.size()}; + + uint32_t diff, + last_ec{0}, + runlen{0}; + + success &= fiboEncode(n_elems, bufsize, buf, bitpos); + + const auto ecs_end = ecs.end(); + for (auto ec_it = ecs.begin(); success && ec_it != ecs_end; ++ec_it) + { + auto &ec = *ec_it; + diff = ec - last_ec + 1; + if (diff == RL_VAL) + { + ++runlen; + } + else + { + if (runlen) + { + success &= fiboEncode(RL_VAL, bufsize, buf, bitpos); + success &= fiboEncode(runlen, bufsize, buf, bitpos); + + runlen = 0; + } + success &= fiboEncode(diff, bufsize, buf, bitpos); + } + last_ec = ec; + } + if (runlen && success) + { + success &= fiboEncode(RL_VAL, bufsize, buf, bitpos); + success &= fiboEncode(runlen, bufsize, buf, bitpos); + } + + return success; +} + +template +void compress_ec_matrix(std::istream &in, BUSHeader &h, const Bustools_opt &opt, std::ostream &of) +{ + // first m rows map to themselves. In the file, we count how many such rows there are. + // For the rest, we don't need the ids, only the delta+runlen-1+int-coding of the ECs + parseECs_stream(in, h); + std::cerr << "Done parsing ecs" << std::endl; + auto &ecs = h.ecs; + + uint32_t lo = 0, hi = ecs.size() - 1, mid; + + // Assume all i->i rows are at the beginning of file. + while (lo < hi) + { + // | ecs[i] = [i] | ? | ecs[i] != [i] | + // ^ ^ ^ ^ + // 0 lo hi ecs.size() + mid = lo + (hi - lo + 1) / 2; + if (ecs.at(mid)[0] != mid || ecs.at(mid).size() != 1) + { + hi = mid - 1; + } + else + { + lo = mid; + } + } + + size_t bufsize{600000}; + T* fibonacci_buf = new T[bufsize]; + std::fill(fibonacci_buf, fibonacci_buf + bufsize, 0); + const size_t wordsize = sizeof(T) * 8; + + uint32_t num_identities = lo + 1; + uint32_t num_other = ecs.size() - num_identities; + + of.write("BEC\0", 4); + of.write((char *)&num_identities, sizeof(num_identities)); + of.write((char *)&num_other, sizeof(num_other)); + + size_t bitpos{0}; + uint32_t row_count = 0; + size_t checkpoint = 0; + uint64_t block_header = 0; + + const auto ecs_end = ecs.end(); + auto ecs_it = ecs.begin() + lo + 1; + int i_row = 0; + uint64_t total_rows = 0; + try + { + while (ecs_it < ecs_end) + { + while(ecs_it < ecs_end && row_count < opt.chunk_size){ + auto &ecs_list = *ecs_it; + bool success = pack_ec_row_to_file(ecs_list, bufsize, fibonacci_buf, bitpos, of); + if(!success) + { + size_t checkpoint_elem = checkpoint / wordsize + (checkpoint % wordsize > 0); + T *tmp_buf = new T[bufsize * 2]; + + std::fill(tmp_buf, tmp_buf + bufsize*2, 0); + std::memcpy(tmp_buf, fibonacci_buf, checkpoint_elem * sizeof(T)); + + bufsize *= 2; + + delete[] fibonacci_buf; + fibonacci_buf = tmp_buf; + bitpos = checkpoint; + continue; + } + + checkpoint = bitpos; + ++ecs_it; + ++row_count; + ++i_row; + + } + total_rows += row_count; + uint64_t n_bytes = (bitpos / wordsize + (bitpos % wordsize > 0)) * sizeof(T); + + + block_header = n_bytes << 30; + block_header |= row_count; + + of.write((char *)&block_header, sizeof(block_header)); + of.write((char *)fibonacci_buf, n_bytes); + + std::fill(fibonacci_buf, fibonacci_buf + bufsize, 0); + row_count = 0; + bitpos = 0; + } + } + catch (const std::bad_alloc &ex){ + std::cerr << "Unable to allocate " << bufsize << " sized buffer\n"; + std::cerr << ex.what() << std::endl; + } + block_header = 0; + of.write((char *)&block_header, sizeof(block_header)); + + delete[] fibonacci_buf; +} + +void bustools_compress(const Bustools_opt &opt) +{ + BUSHeader h; + compressed_BUSHeader comph; + + std::ofstream of; + std::streambuf *buf = nullptr; + + if (opt.stream_out) + { + buf = std::cout.rdbuf(); + } + else + { + of.open(opt.output); + buf = of.rdbuf(); + } + + std::ostream outf(buf); + + for (const auto &infn : opt.files) + { + std::streambuf *inbuf; + std::ifstream inf; + + if (opt.stream_in) + { + inbuf = std::cin.rdbuf(); + } + else + { + inf.open(infn.c_str(), std::ios::binary); + inbuf = inf.rdbuf(); + } + + std::istream in(inbuf); + int target_file_type = identifyParseHeader(in, h, comph); + + switch (target_file_type) + { + case BUSFILE_TYPE::BUSFILE: + // Compress a BUS file + compress_busfile(opt, outf, in, h); + break; + case BUSFILE_TYPE::BUSFILE_COMPRESED: + // decompress busz file + std::cerr << "Warning: The file " << infn << " is a compressed BUS file. Skipping.\n"; + break; + case BUSFILE_TYPE::EC_MATRIX_COMPRESSED: + // Decompress matrix.ecz + std::cerr << "Warning: The file " << infn << " is a compressed EC matrix file. Skipping.\n"; + break; + case BUSFILE_TYPE::EC_MATRIX: + // Compress matrix.ec + std::cerr << "Compressing matrix file " << infn << '\n'; + compress_ec_matrix(in, h, opt, outf); + break; + case 0: + std::cerr << "Error: Unable to parse file" << std::endl; + break; + } + } +} diff --git a/src/bustools_compress.h b/src/bustools_compress.h new file mode 100644 index 0000000..5cfed03 --- /dev/null +++ b/src/bustools_compress.h @@ -0,0 +1,36 @@ +#ifndef BUSTOOLS_COMPRESS_H +#define BUSTOOLS_COMPRESS_H +#include "Common.hpp" + +void bustools_compress(const Bustools_opt &opt); + +const std::vector fibo64{1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181, 6765, 10946, 17711, 28657, 46368, 75025, 121393, 196418, 317811, 514229, 832040, 1346269, 2178309, 3524578, 5702887, 9227465, 14930352, 24157817, 39088169, 63245986, 102334155, 165580141, 267914296, 433494437, 701408733, 1134903170, 1836311903, 2971215073, 4807526976, 7778742049, 12586269025, 20365011074, 32951280099, 53316291173, 86267571272, 139583862445, 225851433717, 365435296162, 591286729879, 956722026041, 1548008755920, 2504730781961, 4052739537881, 6557470319842, 10610209857723, 17167680177565, 27777890035288, 44945570212853, 72723460248141, 117669030460994, 190392490709135, 308061521170129, 498454011879264, 806515533049393, 1304969544928657, 2111485077978050, 3416454622906707, 5527939700884757, 8944394323791464, 14472334024676221, 23416728348467685, 37889062373143906, 61305790721611591, 99194853094755497, 160500643816367088, 259695496911122585, 420196140727489673, 679891637638612258, 1100087778366101931, 1779979416004714189, 2880067194370816120, 4660046610375530309ULL, 7540113804746346429ULL, 12200160415121876738ULL}; + +struct BUSZIndex { + public: + uint32_t bclen; + uint64_t n_blocks = 0; + uint64_t block_size; + uint64_t last_block; + std::vector positions; + std::vector barcodes; + + BUSZIndex(const BUSHeader &h, const Bustools_opt &opt) + { + bclen = h.bclen; + block_size = opt.chunk_size; + positions.reserve(100); + barcodes.reserve(100); + } + BUSZIndex(const uint32_t _bclen, const int _block_size){ + bclen = _bclen; + block_size = _block_size; + positions.reserve(100); + barcodes.reserve(100); + } +}; + +void write_BuszIndex(BUSZIndex &, std::ostream &); +void busz_index_add(BUSZIndex &, uint64_t , uint64_t); + +#endif \ No newline at end of file diff --git a/src/bustools_decompress.cpp b/src/bustools_decompress.cpp new file mode 100644 index 0000000..17e83da --- /dev/null +++ b/src/bustools_decompress.cpp @@ -0,0 +1,818 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include "Common.hpp" +#include "BUSData.h" +#include "bustools_compress.h" +#include "bustools_decompress.h" +size_t d_pfd_blocksize = 512; +/** + * @brief Decode a single fibonacci number from a buffer + * @pre buf contains at least n_buf elements. + * @pre 0 <= bitpos < 64 + * @pre 0 <= bufpos < n_buf + * + * @param buf The buf to fibo-decode from. + * @param n_buf maximum number of elements in the buffer. + * @param bitpos a bit offset from the left within buf[bufpos], where the next bit will be read. + * @param bufpos offset from the beginning of buf. + * + * @return uint64_t num; the fibonacci decoded number. If buf is exhausted before the termination bit of + * a fibonacci number, num represents a partially decoded number. + */ +template +DEST_T fiboDecodeSingle(T const *const buf, const size_t n_buf, size_t &bitpos, size_t &bufpos) +{ + uint32_t i_fibo = 0; + size_t SIZE = sizeof(T) * 8; + size_t buf_offset = bufpos; + size_t bitshift = SIZE - (bitpos % SIZE) - 1; + DEST_T num{0}; + + int bit = (buf[buf_offset] >> bitshift) & 1; + int last_bit = 0; + + ++bitpos; + + while (last_bit + bit < 2 && buf_offset < n_buf) + { + last_bit = bit; + num += bit * (fibo64[i_fibo]); + + buf_offset = bufpos + bitpos / SIZE; + bitshift = SIZE - (bitpos % SIZE) - 1; + + bit = (buf[buf_offset] >> bitshift) & 1; + + ++bitpos; + ++i_fibo; + } + + assert(last_bit + bit == 2); + assert(buf_offset <= n_buf); + + bufpos += (bitpos / SIZE); + bitpos %= SIZE; + return num; +} + + +template +inline T read_bit(const size_t bitpos, const T num) +{ + return (num >> (wordsize - 1 - bitpos)) & 1; +} + +/** + * @brief Decodes a single fibonacci-encoded number from buffered stream. + * @invariant: 0 <= bitpos < 8*sizeof(SRC_T) + * 0 <= bufpos < bufsize, if bufsize != 0, 0 otherwise + * bitpos is the bit-index (from left) of the bit starting the fibo-encoding of the next value. + * bufpos is the index in buf starting the fibo-encoding of the next value. + + * @tparam SRC_T the type of the read buffer. + * @tparam DEST_T the type of the decoded number. + * @param buf read buffer. Contains Fibonacci-encoded numbers. Has at least bufsize elements. + * @param bufsize The number of elements last read from `in`. + * @param bufpos The position in buf where we start reading from. 0 <= bufpos < bufsize. + * @param bitpos The bit position where the next fibonacci encoding starts within a single SRC_T word. + * @param in the stream to decode. + * @return DEST_T the next fibonacci decoded number if decoding terminates before eof, 0 otherwise. + */ +template +DEST_T decodeFibonacciStream( + SRC_T *buf, + size_t &bufsize, + size_t &bitpos, + size_t &bufpos, + std::istream &in) +{ + DEST_T num = 0; + uint32_t i_fibo{0}; + + constexpr size_t wordsize = sizeof(SRC_T) * 8; + size_t buf_offset = bufpos; + size_t bit_offset = wordsize - bitpos%wordsize -1; + + SRC_T bit = read_bit(bitpos, buf[bufpos]), + last_bit = 0; + + bitpos = (bitpos + 1) % wordsize; + bufpos += bitpos == 0; + if (bufpos == bufsize) + { + in.read((char *)buf, sizeof(SRC_T) * bufsize); + bufsize = in.gcount() / sizeof(SRC_T); + bufpos = 0; + } + + while (!(last_bit && bit) && bufpos <= bufsize) + { + // 0 <= bufpos < bufsize if EOF is not reached + // 0 <= bitpos < wordsize + + num += bit * fibo64[i_fibo++]; + last_bit = bit; + bit = read_bit(bitpos, buf[bufpos]); + bitpos = (bitpos + 1) % wordsize; + bufpos += bitpos == 0; + + if (bufpos >= bufsize) + { + in.read((char *)buf, sizeof(SRC_T) * bufsize); + bufsize = in.gcount() / sizeof(SRC_T); + bufpos = 0; + } + } + + return num * (last_bit && bit); +} + +template +uint64_t eliasDeltaDecode(T const *const buf, const size_t bufsize, uint32_t &bitpos, uint32_t &bufpos) +{ + size_t wordsize = sizeof(T) * 8; + size_t max_pos = wordsize - 1; + uint32_t L = 0, N = 0; + uint64_t bit{0}; + size_t bit_offset = max_pos - (bitpos % wordsize); + // TODO: make buffer cyclical and write to obuf, just like fibonacci. + + // EliasDelta(X): unary(L) + headless_binary(N+1) + headless_binary(num) + // N = len(headless_binary(num)) + // L = len(headless_binary(N+1)) + + // Decode unary encoding of L + while (bufpos < bufsize && !(buf[bufpos] >> (max_pos - bitpos % wordsize) & 1)) + { + ++L; + ++bitpos; + bufpos += (bitpos % wordsize == 0); + bit_offset = max_pos - (bitpos % wordsize); + } + + // Decode binary encoding of N+1 + for (int i = 0; i < L + 1 && bufpos < bufsize; ++i) + { + bit = buf[bufpos] >> (max_pos - bitpos % wordsize) & 1; + N += bit << (L - i); + ++bitpos; + bufpos += (bitpos % wordsize == 0); + } + --N; + + uint64_t num = 1ULL << N; + + // Decode headless binary encoding of num + // we can take this in chunks, as we are dealing with "normal" bits + for (int i = 0; i < N && bufpos < bufsize; ++i) + { + bit = buf[bufpos] >> (max_pos - bitpos % wordsize) & 1; + num += bit << (N - i - 1); + ++bitpos; + bufpos += (bitpos % wordsize == 0); + } + + if (bufpos >= bufsize) + { + throw std::out_of_range("Encoding buffer out of bounds."); + } + + bitpos %= wordsize; + return num; +} + +/** + * @brief Finish NewPFD encoding after parsing to account for exceptions. + * + * @param primary A pointer to a block of packed fixed-width integers each of size `b_bits`. + * @param index_gaps Delta encoded indices of exceptions in `primary`. + * @param exceptions The most significant bits of exceptions, each one is always > 0. + * @param b_bits The number of bits used for packing numbers into `primary`. + */ +template +void updatePFD(const size_t block_size, T *primary, std::vector &index_gaps, std::vector &exceptions, uint32_t b_bits, const T min_element) +{ + int i_exception = 0; + int exception_pos = 0; + assert(index_gaps.size() == exceptions.size()); + uint32_t index = 0; + int n = index_gaps.size(); + for (int i = 0; i < n; ++i) + { + auto index_diff = index_gaps[i]; + uint32_t ex = exceptions[i]; + index += index_diff; + primary[index] |= (exceptions[i] << b_bits); + } + for (int i = 0; i < block_size; ++i) + { + primary[i] += min_element; + } +} + +/** + * @brief + * + * @param buf + * @param max_elem + * @param n_ints + * @param b_bits + * @param primary + * @param bit_pos + * @param buf_offset + * @return size_t + */ + +template +size_t PfdParsePrimaryBlock( + SRC_T *buf, + size_t bufsize, + const int n_ints, + const size_t b_bits, + DEST_T *primary, + size_t &bit_pos, + size_t &bufpos) +{ + int i = 0; + constexpr size_t wordsize = sizeof(SRC_T) * 8; + constexpr SRC_T ONE{1}; + + while (i < n_ints && bufpos < bufsize) + { + // I: 0 <= bit_pos < wordsize + + // The max number of bit we can read from current element, no more than b_bits + uint32_t n_partial = std::min(b_bits, wordsize - bit_pos); + uint32_t bits_rem = b_bits - n_partial; + + uint32_t shift = (wordsize - n_partial - bit_pos); + SRC_T mask = (ONE << n_partial) - 1; + + // right shift extract from buf + // left shift num if the current number covers two elements in buf + + DEST_T num = ((buf[bufpos] >> shift) & mask) << bits_rem; + + bit_pos = (bit_pos + n_partial) % wordsize; + + // increment bufpos if bit_pos == 0, i.e. the next element in `buf` is reached. + bufpos += (!bit_pos); + + if (bits_rem) + { + shift = wordsize - bits_rem; + num |= buf[bufpos] >> shift; + bit_pos += bits_rem; + } + primary[i++] = num; + } + + return bufpos; +} + +/** + * @brief Decompress barcodes using fibonacci-runlength(0)-delta decoding. + * + * @param BUF The char array occupied by at least `row_count` encoded numbers, encoded using delta-runlenght(0)-fibonacci. + * @param rows The output BUSData array. + * @param row_count The number of BUSData elements in `rows`. + * @param buf_size The size of `BUF` in bytes. + */ +template +void decompress_barcode(char *BUF, BUSData *rows, const size_t &row_count, const size_t &buf_size, size_t &bufpos) +{ + T *fibonacci_buf = (T *)(BUF + bufpos); + + size_t fibonacci_bufsize = (buf_size - 1) / (sizeof(T)) + 1, + bitpos{0}, + buf_offset{0}, + row_index = 0; + uint64_t diff = 0, + barcode = 0, + runlen = 0; + + const uint64_t RLE_VAL{0ULL}; + + while (row_index < row_count) + { + diff = fiboDecodeSingle(fibonacci_buf, fibonacci_bufsize, bitpos, buf_offset) - 1; + + if (diff == RLE_VAL) + { + // Runlength decoding + runlen = fiboDecodeSingle(fibonacci_buf, fibonacci_bufsize, bitpos, buf_offset); + for (int i = 0; i < runlen; ++i) + { + rows[row_index].barcode = barcode; + ++row_index; + } + } + else + { + // Delta decoding + barcode += diff; + rows[row_index].barcode = barcode; + ++row_index; + } + } + + buf_offset += bitpos > 0; + bufpos += buf_offset * sizeof(T); +} + +/** + * @brief Decompress UMIS using fibonacci-runlength-periodic_delta decoding. + * @pre rows[0]..rows[row_count - 1] have its barcodes set from `decompress_barcodes`. + * @pre rows have at least `row_count` elements. + * + * @param BUF The encoded UMIs to decode. + * @param rows The output BUSData array whose UMI members are set in this function (up to `row_count`-1). + * @param row_count The number of elements in `rows` to decode. + * @param buf_size The size of the input array `BUF`. + */ +template +void decompress_lossless_umi(char *BUF, BUSData *rows, const size_t &row_count, const size_t &buf_size, size_t &bufpos) +{ + T *fibonacci_buf = (T *)(BUF + bufpos); + size_t fibonacci_bufsize = (buf_size - 1) / (sizeof(T)) + 1, + row_index = 0; + + size_t bitpos{0}, + buf_offset{0}; + + uint64_t diff = 0, + // guarantee that last_barcode != rows[0].barcode + last_barcode = rows[0].barcode + 1, + umi = 0, + barcode, + runlen = 0; + + const uint64_t RLE_VAL{0ULL}; + + while(row_index < row_count){ + diff = fiboDecodeSingle(fibonacci_buf, fibonacci_bufsize, bitpos, buf_offset) - 1; + barcode = rows[row_index].barcode; + if (barcode != last_barcode) + { + umi = 0; + } + + if (diff == RLE_VAL) + { + // diff is runlen encoded, next values are identical. + runlen = fiboDecodeSingle(fibonacci_buf, fibonacci_bufsize, bitpos, buf_offset); + for (int i = 0; i < runlen; ++i) + { + // we must decrement umi since we incremented it during compression + rows[row_index].UMI = umi - 1; + ++row_index; + } + } + else{ + // Current element is a delta + umi += diff; + // we must decrement umi since we incremented it during compression + rows[row_index].UMI = umi - 1; + ++row_index; + } + last_barcode = barcode; + } + + buf_offset += bitpos > 0; + bufpos += buf_offset * sizeof(T); +} + +/** + * @brief Decompress UMIs which have been compressed in a lossy manner. + * @note Not yet implemented. + * @pre rows[0]..rows[row_count - 1] have its barcodes set from `decompress_barcodes`. + * @pre rows have at least `row_count` elements. + * + * @param BUF The encoded UMIs to decode + * @param rows The output BUSData array whose UMI members are set in this function (up to `row_count`-1). + * @param row_count The number of elements in `rows` to decode. + * @param buf_size The size of the input array `BUF`. + */ +template +void decompress_lossy_umi(char *BUF, BUSData *rows, const size_t &row_count, const size_t &buf_size, size_t &bufpos) +{ +} + +/** + * @brief Decompress ECs which have been compressed using NewPFD and fibonacci encoding. + * @param BUF The encoded ECs to decode + * @param rows The output BUSData array whose EC members are set in this function (up to `row_count`-1). + * @param row_count The number of elements in `rows` to decode. + * @param buf_size The size of the input array `BUF`. + */ +template +void decompress_ec(char *BUF, BUSData *rows, const size_t &row_count, const size_t &buf_size, size_t &bufpos) +{ + T *pfd_buf = (T *)(BUF + bufpos); + size_t pfd_bufsize = (buf_size - bufpos) / sizeof(T); + size_t buf_offset{0}; + size_t bitpos{0}; + + const size_t block_size = d_pfd_blocksize; + + int32_t primary[block_size]; + uint32_t b_bits = 1; + int32_t min_element{0}; + uint64_t n_exceptions{0}; + + std::vector exceptions; + std::vector index_gaps; + + size_t row_index = 0; + while (row_index < row_count) + { + index_gaps.clear(); + exceptions.clear(); + + b_bits = fiboDecodeSingle(pfd_buf, pfd_bufsize, bitpos, buf_offset) - 1; + min_element = fiboDecodeSingle(pfd_buf, pfd_bufsize, bitpos, buf_offset) - 1; + n_exceptions = fiboDecodeSingle(pfd_buf, pfd_bufsize, bitpos, buf_offset) - 1; + + for (int i = 0; i < n_exceptions; ++i) + { + index_gaps.push_back(fiboDecodeSingle(pfd_buf, pfd_bufsize, bitpos, buf_offset) - 1); + } + for (int i = 0; i < n_exceptions; ++i) + { + exceptions.push_back(fiboDecodeSingle(pfd_buf, pfd_bufsize, bitpos, buf_offset)); + } + + buf_offset += (bitpos > 0); + bitpos = 0; + + buf_offset = PfdParsePrimaryBlock(pfd_buf, pfd_bufsize, block_size, b_bits, primary, bitpos, buf_offset); + + updatePFD(block_size, primary, index_gaps, exceptions, b_bits, min_element); + + for (int i = 0; i < block_size && row_index < row_count; ++i) + { + rows[row_index].ec = primary[i]; + ++row_index; + } + } + + buf_offset += bitpos > 0; + bufpos += buf_offset * sizeof(T); +} + +/** + * @brief Decompress counts which have been compressed using runlen(1) and fibonacci encoding. + * @param BUF The encoded counts to decode + * @param rows The output BUSData array whose count members are set in this function (up to `row_count`-1). + * @param row_count The number of elements in `rows` to decode. + * @param buf_size The size of the input array `BUF`. + */ +template +void decompress_counts(char *BUF, BUSData *rows, const size_t &row_count, const size_t &buf_size, size_t &bufpos) +{ + T *fibonacci_buf = (T *)(BUF + bufpos); + size_t fibonacci_bufsize{(buf_size - 1) / (sizeof(T)) + 1}, + bitpos{0}, + buf_offset{0}, + row_index{0}; + + uint32_t curr_el{0}, + runlen{0}; + + const uint32_t RLE_val{1U}; + + while(row_index < row_count) + { + curr_el = fiboDecodeSingle(fibonacci_buf, fibonacci_bufsize, bitpos, buf_offset); + runlen = curr_el == RLE_val ? fiboDecodeSingle(fibonacci_buf, fibonacci_bufsize, bitpos, buf_offset) : 1; + + for (int i = 0; i < runlen; ++i){ + rows[row_index].count = curr_el; + ++row_index; + } + } + + buf_offset += (bitpos > 0); + bufpos += buf_offset * sizeof(T); +} + +/** + * @brief Decompress counts which have been compressed using runlen(0) and fibonacci encoding. + * @param BUF The encoded counts to decode + * @param rows The output BUSData array whose count members are set in this function (up to `row_count`-1). + * @param row_count The number of elements in `rows` to decode. + * @param buf_size The size of the input array `BUF`. + */ +template +void decompress_flags(char *BUF, BUSData *rows, const size_t &row_count, const size_t &buf_size, size_t &bufpos) +{ + T *fibonacci_buf = (T *)(BUF + bufpos); + size_t fibonacci_bufsize{(buf_size - 1) / (sizeof(T)) + 1}, + bitpos{0}, + buf_offset{0}, + row_index{0}; + + uint32_t curr_el{0}, + runlen{0}; + + const uint32_t RLE_val{0U}; + + while(row_index < row_count) + { + curr_el = fiboDecodeSingle(fibonacci_buf, fibonacci_bufsize, bitpos, buf_offset) - 1; + runlen = curr_el == RLE_val ? fiboDecodeSingle(fibonacci_buf, fibonacci_bufsize, bitpos, buf_offset) : 1; + for (int i = 0; i < runlen; ++i) + { + rows[row_index].flags = curr_el; + ++row_index; + } + } + buf_offset += bitpos > 0; + bufpos += buf_offset * sizeof(T); +} + +typedef void (*decompress_ptr)(char *, BUSData *, const size_t &, const size_t &, size_t &); + +void decompress_block( + size_t row_count, + const size_t bufsize, + const decompress_ptr decompressors[5], + char *const BUF, + size_t &bufpos, + BUSData *busdata) +{ + for (int i = 0; i < 5; ++i) + { + size_t srclen = bufsize; + decompressors[i](BUF, busdata, row_count, srclen, bufpos); + } +} + +template +int32_t decompress_ec_row( + std::vector &vec, + size_t bufsize, + T *buf, + std::istream &in, + size_t &bitpos, + size_t &bufpos +) +{ + constexpr uint32_t RL_VAL{1}; + uint32_t n_elems = fiboDecodeSingle(buf, bufsize, bitpos, bufpos); + + int i_elem{0}; + uint32_t ec{0}; + uint32_t runlen{0}; + vec.clear(); + + vec.resize(n_elems); + while (i_elem < n_elems) + { + uint32_t diff = fiboDecodeSingle(buf, bufsize, bitpos, bufpos) - 1; + if (diff == RL_VAL) + { + runlen = fiboDecodeSingle(buf, bufsize, bitpos, bufpos); + + for (int j = 0; j < runlen; ++j) + { + vec[i_elem++] = ++ec; + } + } + else + { + ec += diff; + vec[i_elem++] = ec; + } + } + + return n_elems; +} + +/** + * @brief Decompress a compress matrix file + * pre: inf.tellg() == 4 + * the first 4 bytes of the stream were BEC\0 + * @tparam T + * @param inf + * @param header + * @param bufsize + * @return bool + */ +template +bool decompress_matrix(std::istream &inf, BUSHeader &header, size_t bufsize=100000) +{ + char magic[4]; + inf.read(magic, 4); + if(std::strcmp(magic, "BEC\0")) { + return false; + } + + // Number of rows mapping to themselves. + uint32_t num_identities{0}; + // Number of rows not mapping to themselves. + uint32_t num_rows{0}; + + inf.read((char *)&num_identities, sizeof(num_identities)); + inf.read((char *)&num_rows, sizeof(num_rows)); + + std::vector> &ecs = header.ecs; + ecs.resize(num_identities); + + int32_t ec_idx = 0; + for (; ec_idx < num_identities; ++ec_idx) + { + ecs[ec_idx].push_back(ec_idx); + } + + uint64_t block_header = 1; + bufsize = 600000; + uint64_t block_size_bytes = 0; + uint64_t block_count = 0; + uint64_t row_count_mask = (1 << 30) - 1; + try + { + T *buffer = new T[bufsize]; + std::vector vec; + vec.reserve(10000); + + size_t bitpos = 0; + size_t bufpos = 0; + int block_counter = 0; + + inf.read((char *)&block_header, sizeof(block_header)); + while (block_header) + { + block_size_bytes = block_header >> 30; + block_count = block_header & row_count_mask; + + if (block_size_bytes > bufsize * sizeof(T)){ + bufsize = block_size_bytes / sizeof(T); + delete[] buffer; + buffer = new T[bufsize]; + } + + inf.read((char *)buffer, block_size_bytes); + for (int i = 0; i < block_count; ++i) + { + vec.clear(); + int32_t n_elems = decompress_ec_row(vec, bufsize, buffer, inf, bitpos, bufpos); + ecs.push_back(std::move(vec)); + } + + inf.read((char *)&block_header, sizeof(block_header)); + bitpos = 0; + bufpos = 0; + block_counter++; + } + + delete[] buffer; + } + catch (const std::bad_alloc &ex) + { + std::cerr << "Error: Unable to allocate buffer.\n\t" + << ex.what() << std::endl; + return false; + } + catch (const std::exception &exception) + { + std::cerr << "Error: Unable to decode EC matrix:\n\t" + << exception.what() << std::endl; + return false; + } + + writeECs("output/matrix_test.ec", header); + + return true; +} + +void decompress_buszfile(std::istream &in, compressed_BUSHeader &comp_h, std::ostream &outf) +{ + writeHeader(outf, comp_h.bus_header); + BUSData *busdata = new BUSData[comp_h.chunk_size]; + + decompress_ptr decompressors[5]{ + &decompress_barcode, + comp_h.lossy_umi ? &decompress_lossy_umi : &decompress_lossless_umi, + &decompress_ec, + &decompress_counts, + &decompress_flags, + }; + d_pfd_blocksize = comp_h.pfd_blocksize; + + try + { + uint64_t max_block_size = 6 * comp_h.chunk_size; + uint32_t i_chunk = 0; + + char *BUF = new char[max_block_size]; + size_t bufpos = 0; + + uint64_t const row_count_mask = (1ULL << 30) - 1; + uint64_t block_header = 0, + row_count = 0, + block_size = 0; + + in.read((char *)&block_header, sizeof(uint64_t)); + while (block_header && in.good()) + { + block_size = block_header >> 30; + row_count = block_header & row_count_mask; + + if (block_size > max_block_size) + { + delete[] BUF; + max_block_size += block_size; + BUF = new char[max_block_size]; + } + + in.read((char *)BUF, block_size); + for (int i = 0; i < 5; ++i){ + size_t srclen = block_size; + decompressors[i](BUF, busdata, row_count, srclen, bufpos); + } + // decompress_block(row_count, block_size, decompressors, BUF, bufpos, busdata); + outf.write((char *)busdata, row_count * sizeof(BUSData)); + bufpos = 0; + + in.read((char *)&block_header, sizeof(uint64_t)); + ++i_chunk; + } + + delete[] BUF; + } + catch (const std::bad_alloc &ex) + { + std::cerr << "Unable to allocate buffer" << std::endl; + } + + delete[] busdata; +} + +void bustools_decompress(const Bustools_opt &opt) +{ + compressed_BUSHeader comp_header; + BUSHeader header; + std::ofstream of; + std::streambuf *buf = nullptr; + + if (opt.stream_out) + { + buf = std::cout.rdbuf(); + } + else + { + of.open(opt.output); + buf = of.rdbuf(); + } + + std::ostream outf(buf); + for (const auto &infn : opt.files) + { + std::streambuf *inbuf; + std::ifstream inf; + + if (opt.stream_in) + { + inbuf = std::cin.rdbuf(); + } + else + { + inf.open(infn.c_str(), std::ios::binary); + inbuf = inf.rdbuf(); + } + std::istream in(inbuf); + + int target_file_type = identifyParseHeader(in, header, comp_header); + switch (target_file_type) + { + case 1: + std::cerr << "Warning: The file " << infn << " is an uncompressed BUS file. Skipping\n" ; + continue; + case 2: + // compressed bus file + decompress_buszfile(in, comp_header, outf); + continue; + case 3: + std::cerr << "Warning: The file " << infn << " is an uncompressed EC matrix file. Skipping\n"; + continue; + case 4: + decompress_matrix(inf, comp_header.bus_header); + continue; + case 0: + std::cerr << "Error: Unable to parse or open file " << infn << '\n'; + continue; + default: + std::cerr << "Warning: Unknown file type. Skipping.\n"; + continue; + } + } +} \ No newline at end of file diff --git a/src/bustools_decompress.h b/src/bustools_decompress.h new file mode 100644 index 0000000..8dbc596 --- /dev/null +++ b/src/bustools_decompress.h @@ -0,0 +1,6 @@ +#ifndef BUSTOOLS_DECOMPRESS_H +#define BUSTOOLS_DECOMPRESS_H +#include "Common.hpp" + +void bustools_decompress(const Bustools_opt &opt); +#endif \ No newline at end of file diff --git a/src/bustools_main.cpp b/src/bustools_main.cpp index 1c702f4..ab954a8 100644 --- a/src/bustools_main.cpp +++ b/src/bustools_main.cpp @@ -37,6 +37,8 @@ #include "bustools_predict.h" #include "bustools_collapse.h" #include "bustools_clusterhist.h" +#include "bustools_compress.h" +#include "bustools_decompress.h" int my_mkdir(const char *path, mode_t mode) { @@ -960,6 +962,133 @@ void parse_ProgramOptions_extract(int argc, char **argv, Bustools_opt &opt) } } +/** + * @brief Parse command line arguments for bustools inflate. + * + * @param argc + * @param argv + * @param opt + * @return bool true iff requested from the command line. + */ +bool parse_ProgramOptions_inflate(int argc, char **argv, Bustools_opt &opt) +{ + const char *opt_string = "o:ph"; + bool print_usage = false; + static struct option long_options[] = { + {"output", required_argument, 0, 'o'}, + {"pipe", no_argument, 0, 'p'}, + {"help", no_argument, 0, 'h'}, + {0, 0, 0, 0}, + }; + int option_index = 0, c; + while ((c = getopt_long(argc, argv, opt_string, long_options, &option_index)) != -1) + { + switch (c) + { + case 'o': + opt.output = optarg; + break; + case 'p': + opt.stream_out = true; + break; + case 'h': + print_usage = true; + break; + default: + break; + } + } + + while (optind < argc) + opt.files.push_back(argv[optind++]); + + if (opt.files.size() == 1 && opt.files[0] == "-") + { + opt.stream_in = true; + } + return print_usage; +} + +/** + * @brief Parse command line arguments for bustools compress. + * + * @param argc + * @param argv + * @param opt + * @return bool true iff requested from the command line. + */ +bool parse_ProgramOptions_compress(int argc, char **argv, Bustools_opt &opt) +{ + const char *opt_string = "N:Lo:pP:h:i::"; + const bool lossy_umi_enabled = false; + + static struct option long_options[] = { + {"chunk-size", required_argument, 0, 'N'}, + {"lossy-umi", no_argument, 0, 'L'}, + {"output", required_argument, 0, 'o'}, + {"pipe", no_argument, 0, 'p'}, + {"pfd-size", required_argument, 0, 'P'}, + {"index", optional_argument, 0, 'i'}, + {"help", no_argument, 0, 'h'}, + {0, 0, 0, 0}}; + int option_index = 0, c; + bool print_usage = false; + + while ((c = getopt_long(argc, argv, opt_string, long_options, &option_index)) != -1) + { + switch (c) + { + case 'i': + { + if (optarg == NULL && optind < argc - 1 && argv[optind][0] != '-') + { + optarg = argv[optind++]; + } + if (optarg == NULL) + { + opt.busz_index.assign("output.busz.idx"); + } + else + { + opt.busz_index.assign(optarg); + } + break; + } + case 'N': + opt.chunk_size = atoi(optarg); + break; + case 'L': + if (!lossy_umi_enabled) + std::cerr << "Lossy UMI not yet implemented. Using lossless instead." << std::endl; + opt.lossy_umi = lossy_umi_enabled; + break; + case 'o': + opt.output = optarg; + break; + case 'p': + opt.stream_out = true; + break; + case 'P': + opt.pfd_blocksize = atoi(optarg); + break; + case 'h': + print_usage = true; + break; + default: + break; + } + } + // all other arguments are fast[a/q] files to be read + while (optind < argc) + opt.files.push_back(argv[optind++]); + + if (opt.files.size() == 1 && opt.files[0] == "-") + { + opt.stream_in = true; + } + return print_usage; +} + bool check_ProgramOptions_sort(Bustools_opt &opt) { @@ -2303,6 +2432,91 @@ bool check_ProgramOptions_extract(Bustools_opt &opt) return ret; } +bool check_ProgramOptions_inflate(Bustools_opt &opt) +{ + + bool ret = true; + + if (!opt.stream_out) + { + if (opt.output.empty()) + { + std::cerr << "Error: missing output file" << std::endl; + ret = false; + } + else if (!checkOutputFileValid(opt.output)) + { + std::cerr << "Error: unable to open output file" << std::endl; + ret = false; + } + } + + if (opt.files.size() != 1) + { + ret = false; + if (opt.files.size() == 0) + { + std::cerr << "Error: Missing BUSZ input file" << std::endl; + } + else + { + std::cerr << "Error: Multiple files not yet supported" << std::endl; + } + } + else if (!opt.stream_in) + { + if (!checkFileExists(opt.files[0])) + { + std::cerr << "Error: File not found, " << opt.files[0] << std::endl; + ret = false; + } + } + + return ret; +} + +bool check_ProgramOptions_compress(Bustools_opt &opt) +{ + bool ret = true; + + if (!opt.stream_out) + { + if (opt.output.empty()) + { + std::cerr << "Error: missing output file" << std::endl; + ret = false; + } + else if (!checkOutputFileValid(opt.output)) + { + std::cerr << "Error: unable to open output file" << std::endl; + ret = false; + } + } + + if (opt.files.size() != 1) + { + ret = false; + if (opt.files.size() == 0) + { + std::cerr << "Error: Missing BUS input file" << std::endl; + } + else + { + std::cerr << "Error: Multiple files not yet supported" << std::endl; + } + } + else if (!opt.stream_in) + { + if (!checkFileExists(opt.files[0])) + { + std::cerr << "Error: File not found, " << opt.files[0] << std::endl; + ret = false; + } + } + + return ret; +} + void Bustools_Usage() { std::cout << "bustools " << BUSTOOLS_VERSION << std::endl @@ -2326,6 +2540,8 @@ void Bustools_Usage() << "collapse Turn BUS files into a BUG file" << std::endl << "clusterhist Create UMI histograms per cluster" << std::endl << "linker Remove section of barcodes in BUS files" << std::endl + << "compress Compress a BUS file" << std::endl + << "inflate Decompress a BUSZ (compressed BUS) file" << std::endl << "version Prints version number" << std::endl << "cite Prints citation information" << std::endl << std::endl @@ -2557,6 +2773,31 @@ void Bustools_extract_Usage() << std::endl; } +void Bustools_compress_Usage() +{ + std::cout << "Usage: bustools compress [options] sorted-bus-file" << std::endl + << "Note: BUS file should be sorted by barcode-umi-ec" << std::endl + << std::endl + << "Options: " << std::endl + << "-N, --chunk-size CHUNK_SIZE Number of rows to compress as a single block." << std::endl + // << "-L, --lossy-umi Allow lossy compression over UMIs. Each UMI will be renamed for minimal compression." << std::endl + << "-o, --output OUTPUT Write compressed file to OUTPUT." << std::endl + << "-p, --pipe Write to standard output." << std::endl + << "-h, --help Print this message and exit." << std::endl + << std::endl; +} + +void Bustools_inflate_Usage() +{ + std::cout << "Usage: bustools {inflate | decompress} [options] compressed-bus-file" << std::endl + << std::endl + << "Options: " << std::endl + << "-p, --pipe Write to standard output." << std::endl + << "-o, --output OUTPUT File for inflated output." << std::endl + << "-h, --help Print this message and exit." << std::endl + << std::endl; +} + void print_citation() { std::cout << "When using this program in your research, please cite" << std::endl @@ -2915,6 +3156,52 @@ int main(int argc, char **argv) exit(1); } } + else if(cmd == "compress") + { + if (disp_help) + { + Bustools_compress_Usage(); + exit(0); + } + if (parse_ProgramOptions_compress(argc - 1, argv + 1, opt)) + { + Bustools_compress_Usage(); + exit(0); + } + else if (check_ProgramOptions_compress(opt)) + { + bustools_compress(opt); + exit(0); + } + else + { + Bustools_compress_Usage(); + exit(1); + } + } + else if(cmd == "inflate" || cmd == "decompress") + { + if (disp_help) + { + Bustools_inflate_Usage(); + exit(0); + } + if (parse_ProgramOptions_inflate(argc - 1, argv + 1, opt)) + { + Bustools_inflate_Usage(); + exit(0); + } + else if (check_ProgramOptions_inflate(opt)) + { + bustools_decompress(opt); + exit(0); + } + else + { + Bustools_inflate_Usage(); + exit(1); + } + } else { std::cerr << "Error: invalid command " << cmd << std::endl;