Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add file_offset functionality #409

Merged
merged 9 commits into from
Jul 5, 2022
2 changes: 1 addition & 1 deletion blosc/blosc2.c
Original file line number Diff line number Diff line change
Expand Up @@ -1530,7 +1530,7 @@ static int blosc_d(
fp = io_cb->open(urlpath, "rb", context->schunk->storage->io->params);
BLOSC_ERROR_NULL(fp, BLOSC2_ERROR_FILE_OPEN);
// The offset of the block is src_offset
io_cb->seek(fp, chunk_offset + src_offset, SEEK_SET);
io_cb->seek(fp, frame->file_offset + chunk_offset + src_offset, SEEK_SET);
}
// We can make use of tmp3 because it will be used after src is not needed anymore
int64_t rbytes = io_cb->read(tmp3, 1, block_csize, fp);
Expand Down
137 changes: 114 additions & 23 deletions blosc/frame.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ blosc2_frame_s* frame_new(const char* urlpath) {
if (urlpath != NULL) {
char* new_urlpath = malloc(strlen(urlpath) + 1); // + 1 for the trailing NULL
new_frame->urlpath = strcpy(new_urlpath, urlpath);
new_frame->file_offset = 0;
}
return new_frame;
}
Expand Down Expand Up @@ -383,6 +384,7 @@ int get_header_info(blosc2_frame_s *frame, int32_t *header_len, int64_t *frame_l
}
else {
fp = io_cb->open(frame->urlpath, "rb", io->params);
io_cb->seek(fp, frame->file_offset, SEEK_SET);
}
if (fp != NULL) {
rbytes = io_cb->read(header, 1, FRAME_HEADER_MINLEN, fp);
Expand Down Expand Up @@ -518,7 +520,7 @@ int update_frame_len(blosc2_frame_s* frame, int64_t len) {
else {
fp = io_cb->open(frame->urlpath, "rb+", frame->schunk->storage->io->params);
}
io_cb->seek(fp, FRAME_LEN, SEEK_SET);
io_cb->seek(fp, frame->file_offset + FRAME_LEN, SEEK_SET);
int64_t swap_len;
to_big(&swap_len, &len, sizeof(int64_t));
int64_t wbytes = io_cb->write(&swap_len, 1, sizeof(int64_t), fp);
Expand Down Expand Up @@ -726,7 +728,7 @@ int frame_update_trailer(blosc2_frame_s* frame, blosc2_schunk* schunk) {
BLOSC_TRACE_ERROR("Cannot open the frame for reading and writing.");
return BLOSC2_ERROR_FILE_OPEN;
}
io_cb->seek(fp, trailer_offset, SEEK_SET);
io_cb->seek(fp, frame->file_offset + trailer_offset, SEEK_SET);
int64_t wbytes = io_cb->write(trailer, 1, trailer_len, fp);
if (wbytes != trailer_len) {
BLOSC_TRACE_ERROR("Cannot write the trailer length in trailer.");
Expand Down Expand Up @@ -822,6 +824,7 @@ blosc2_frame_s* frame_from_file(const char* urlpath, const blosc2_io *io) {
frame->urlpath = urlpath_cpy;
frame->len = frame_len;
frame->sframe = sframe;
frame->file_offset = 0;

// Now, the trailer length
io_cb->seek(fp, frame_len - FRAME_TRAILER_MINLEN, SEEK_SET);
Expand All @@ -847,6 +850,88 @@ blosc2_frame_s* frame_from_file(const char* urlpath, const blosc2_io *io) {
}


/* Initialize a frame out of a file */
blosc2_frame_s* frame_from_file_offset(const char* urlpath, const blosc2_io *io, int64_t offset) {
oscargm98 marked this conversation as resolved.
Show resolved Hide resolved
// Get the length of the frame
uint8_t header[FRAME_HEADER_MINLEN];
uint8_t trailer[FRAME_TRAILER_MINLEN];

void* fp = NULL;
bool sframe = false;
struct stat path_stat;

urlpath = normalize_urlpath(urlpath);

if(stat(urlpath, &path_stat) < 0) {
BLOSC_TRACE_ERROR("Cannot get information about the path %s.", urlpath);
return NULL;
}

blosc2_io_cb *io_cb = blosc2_get_io_cb(io->id);
if (io_cb == NULL) {
BLOSC_TRACE_ERROR("Error getting the input/output API");
return NULL;
}

char* urlpath_cpy;
if (path_stat.st_mode & S_IFDIR) {
urlpath_cpy = malloc(strlen(urlpath) + 1);
strcpy(urlpath_cpy, urlpath);
char last_char = urlpath[strlen(urlpath) - 1];
if (last_char == '\\' || last_char == '/') {
urlpath_cpy[strlen(urlpath) - 1] = '\0';
}
else {
}
fp = sframe_open_index(urlpath_cpy, "rb", io);
sframe = true;
}
else {
urlpath_cpy = malloc(strlen(urlpath) + 1);
strcpy(urlpath_cpy, urlpath);
fp = io_cb->open(urlpath, "rb", io->params);
}
io_cb->seek(fp, offset, SEEK_SET);
int64_t rbytes = io_cb->read(header, 1, FRAME_HEADER_MINLEN, fp);
if (rbytes != FRAME_HEADER_MINLEN) {
BLOSC_TRACE_ERROR("Cannot read from file '%s'.", urlpath);
io_cb->close(fp);
free(urlpath_cpy);
return NULL;
}
int64_t frame_len;
to_big(&frame_len, header + FRAME_LEN, sizeof(frame_len));

blosc2_frame_s* frame = calloc(1, sizeof(blosc2_frame_s));
frame->urlpath = urlpath_cpy;
frame->len = frame_len;
frame->sframe = sframe;
frame->file_offset = offset;

// Now, the trailer length
io_cb->seek(fp, offset + frame_len - FRAME_TRAILER_MINLEN, SEEK_SET);
rbytes = io_cb->read(trailer, 1, FRAME_TRAILER_MINLEN, fp);
io_cb->close(fp);
if (rbytes != FRAME_TRAILER_MINLEN) {
BLOSC_TRACE_ERROR("Cannot read from file '%s'.", urlpath);
free(urlpath_cpy);
free(frame);
return NULL;
}
int trailer_offset = FRAME_TRAILER_MINLEN - FRAME_TRAILER_LEN_OFFSET;
if (trailer[trailer_offset - 1] != 0xce) {
free(urlpath_cpy);
free(frame);
return NULL;
}
uint32_t trailer_len;
to_big(&trailer_len, trailer + trailer_offset, sizeof(trailer_len));
frame->trailer_len = trailer_len;

return frame;
}


/* Initialize a frame out of a contiguous frame buffer */
blosc2_frame_s* frame_from_cframe(uint8_t *cframe, int64_t len, bool copy) {
// Get the length of the frame
Expand All @@ -863,6 +948,7 @@ blosc2_frame_s* frame_from_cframe(uint8_t *cframe, int64_t len, bool copy) {

blosc2_frame_s* frame = calloc(1, sizeof(blosc2_frame_s));
frame->len = frame_len;
frame->file_offset = 0;

// Now, the trailer length
const uint8_t* trailer = cframe + frame_len - FRAME_TRAILER_MINLEN;
Expand Down Expand Up @@ -890,6 +976,7 @@ blosc2_frame_s* frame_from_cframe(uint8_t *cframe, int64_t len, bool copy) {

/* Create a frame out of a super-chunk. */
int64_t frame_from_schunk(blosc2_schunk *schunk, blosc2_frame_s *frame) {
frame->file_offset = 0;
int64_t nchunks = schunk->nchunks;
int64_t cbytes = schunk->cbytes;
int32_t chunk_cbytes;
Expand Down Expand Up @@ -1111,7 +1198,7 @@ uint8_t* get_coffsets(blosc2_frame_s *frame, int32_t header_len, int64_t cbytes,
}
else {
fp = io_cb->open(frame->urlpath, "rb", frame->schunk->storage->io->params);
io_cb->seek(fp, header_len + cbytes, SEEK_SET);
io_cb->seek(fp, frame->file_offset + header_len + cbytes, SEEK_SET);
}
int64_t rbytes = io_cb->read(coffsets, 1, coffsets_cbytes, fp);
io_cb->close(fp);
Expand Down Expand Up @@ -1199,6 +1286,7 @@ int frame_update_header(blosc2_frame_s* frame, blosc2_schunk* schunk, bool new)
}
else {
fp = io_cb->open(frame->urlpath, "rb", frame->schunk->storage->io->params);
io_cb->seek(fp, frame->file_offset, SEEK_SET);
}
if (fp != NULL) {
rbytes = io_cb->read(header, 1, FRAME_HEADER_MINLEN, fp);
Expand Down Expand Up @@ -1241,6 +1329,7 @@ int frame_update_header(blosc2_frame_s* frame, blosc2_schunk* schunk, bool new)
fp = io_cb->open(frame->urlpath, "rb+", frame->schunk->storage->io->params);
}
if (fp != NULL) {
io_cb->seek(fp, frame->file_offset, SEEK_SET);
io_cb->write(h2, h2len, 1, fp);
io_cb->close(fp);
}
Expand Down Expand Up @@ -1404,6 +1493,7 @@ int frame_get_metalayers(blosc2_frame_s* frame, blosc2_schunk* schunk) {
}
else {
fp = io_cb->open(frame->urlpath, "rb", frame->schunk->storage->io->params);
io_cb->seek(fp, frame->file_offset, SEEK_SET);
}
if (fp != NULL) {
rbytes = io_cb->read(header, 1, header_len, fp);
Expand Down Expand Up @@ -1582,12 +1672,13 @@ int frame_get_vlmetalayers(blosc2_frame_s* frame, blosc2_schunk* schunk) {
sprintf(eframe_name, "%s/chunks.b2frame", frame->urlpath);
fp = io_cb->open(eframe_name, "rb", frame->schunk->storage->io->params);
free(eframe_name);
io_cb->seek(fp, trailer_offset, SEEK_SET);
}
else {
fp = io_cb->open(frame->urlpath, "rb", frame->schunk->storage->io->params);
io_cb->seek(fp, frame->file_offset + trailer_offset, SEEK_SET);
}
if (fp != NULL) {
io_cb->seek(fp, trailer_offset, SEEK_SET);
rbytes = io_cb->read(trailer, 1, trailer_len, fp);
io_cb->close(fp);
}
Expand Down Expand Up @@ -1774,7 +1865,7 @@ blosc2_schunk* frame_to_schunk(blosc2_frame_s* frame, bool copy, const blosc2_io
rbytes = frame_get_lazychunk(frame, offsets[i], &data_chunk, &needs_free);
}
else {
io_cb->seek(fp, header_len + offsets[i], SEEK_SET);
io_cb->seek(fp, frame->file_offset + header_len + offsets[i], SEEK_SET);
rbytes = io_cb->read(data_chunk, 1, BLOSC_EXTENDED_HEADER_LENGTH, fp);
}
if (rbytes != BLOSC_EXTENDED_HEADER_LENGTH) {
Expand All @@ -1790,7 +1881,7 @@ blosc2_schunk* frame_to_schunk(blosc2_frame_s* frame, bool copy, const blosc2_io
prev_alloc = chunk_cbytes;
}
if (!frame->sframe) {
io_cb->seek(fp, header_len + offsets[i], SEEK_SET);
io_cb->seek(fp, frame->file_offset + header_len + offsets[i], SEEK_SET);
rbytes = io_cb->read(data_chunk, 1, chunk_cbytes, fp);
if (rbytes != chunk_cbytes) {
rc = BLOSC2_ERROR_READ_BUFFER;
Expand Down Expand Up @@ -2014,7 +2105,7 @@ int frame_get_chunk(blosc2_frame_s *frame, int64_t nchunk, uint8_t **chunk, bool
if (frame->cframe == NULL) {
uint8_t header[BLOSC_EXTENDED_HEADER_LENGTH];
void* fp = io_cb->open(frame->urlpath, "rb", frame->schunk->storage->io->params);
io_cb->seek(fp, header_len + offset, SEEK_SET);
io_cb->seek(fp, frame->file_offset + header_len + offset, SEEK_SET);
int64_t rbytes = io_cb->read(header, 1, sizeof(header), fp);
if (rbytes != sizeof(header)) {
BLOSC_TRACE_ERROR("Cannot read the cbytes for chunk in the frame.");
Expand All @@ -2028,7 +2119,7 @@ int frame_get_chunk(blosc2_frame_s *frame, int64_t nchunk, uint8_t **chunk, bool
return rc;
}
*chunk = malloc(chunk_cbytes);
io_cb->seek(fp, header_len + offset, SEEK_SET);
io_cb->seek(fp, frame->file_offset + header_len + offset, SEEK_SET);
rbytes = io_cb->read(*chunk, 1, chunk_cbytes, fp);
io_cb->close(fp);
if (rbytes != chunk_cbytes) {
Expand Down Expand Up @@ -2131,7 +2222,7 @@ int frame_get_lazychunk(blosc2_frame_s *frame, int64_t nchunk, uint8_t **chunk,
}
else {
fp = io_cb->open(frame->urlpath, "rb", frame->schunk->storage->io->params);
io_cb->seek(fp, header_len + offset, SEEK_SET);
io_cb->seek(fp, frame->file_offset + header_len + offset, SEEK_SET);
}
int64_t rbytes = io_cb->read(header, 1, BLOSC_EXTENDED_HEADER_LENGTH, fp);
if (rbytes != BLOSC_EXTENDED_HEADER_LENGTH) {
Expand Down Expand Up @@ -2182,7 +2273,7 @@ int frame_get_lazychunk(blosc2_frame_s *frame, int64_t nchunk, uint8_t **chunk,
io_cb->seek(fp, 0, SEEK_SET);
}
else {
io_cb->seek(fp, header_len + offset, SEEK_SET);
io_cb->seek(fp, frame->file_offset + header_len + offset, SEEK_SET);
}

rbytes = io_cb->read(*chunk, 1, (int64_t)streams_offset, fp);
Expand Down Expand Up @@ -2398,12 +2489,12 @@ int64_t frame_fill_special(blosc2_frame_s* frame, int64_t nitems, int special_va
if (frame->sframe) {
// Update the offsets chunk in the chunks frame
fp = sframe_open_index(frame->urlpath, "rb+", frame->schunk->storage->io);
io_cb->seek(fp, header_len, SEEK_SET);
io_cb->seek(fp, frame->file_offset + header_len, SEEK_SET);
}
else {
// Regular frame
fp = io_cb->open(frame->urlpath, "rb+", schunk->storage->io->params);
io_cb->seek(fp, header_len + cbytes, SEEK_SET);
io_cb->seek(fp, frame->file_offset + header_len + cbytes, SEEK_SET);
}
wbytes = io_cb->write(off_chunk, 1, new_off_cbytes, fp); // the new offsets
io_cb->close(fp);
Expand Down Expand Up @@ -2622,12 +2713,12 @@ void* frame_append_chunk(blosc2_frame_s* frame, void* chunk, blosc2_schunk* schu
}
fp = sframe_open_index(frame->urlpath, "rb+",
frame->schunk->storage->io);
io_cb->seek(fp, header_len, SEEK_SET);
io_cb->seek(fp, frame->file_offset + header_len, SEEK_SET);
}
else {
// Regular frame
fp = io_cb->open(frame->urlpath, "rb+", frame->schunk->storage->io->params);
io_cb->seek(fp, header_len + cbytes, SEEK_SET);
io_cb->seek(fp, frame->file_offset + header_len + cbytes, SEEK_SET);
wbytes = io_cb->write(chunk, 1, chunk_cbytes, fp); // the new chunk
if (wbytes != chunk_cbytes) {
BLOSC_TRACE_ERROR("Cannot write the full chunk to frame.");
Expand Down Expand Up @@ -2832,12 +2923,12 @@ void* frame_insert_chunk(blosc2_frame_s* frame, int64_t nchunk, void* chunk, blo
// Update the offsets chunk in the chunks frame
fp = sframe_open_index(frame->urlpath, "rb+",
frame->schunk->storage->io);
io_cb->seek(fp, header_len + 0, SEEK_SET);
io_cb->seek(fp, frame->file_offset + header_len + 0, SEEK_SET);
}
else {
// Regular frame
fp = io_cb->open(frame->urlpath, "rb+", frame->schunk->storage->io->params);
io_cb->seek(fp, header_len + cbytes, SEEK_SET);
io_cb->seek(fp, frame->file_offset + header_len + cbytes, SEEK_SET);
wbytes = io_cb->write(chunk, 1, chunk_cbytes, fp); // the new chunk
if (wbytes != chunk_cbytes) {
BLOSC_TRACE_ERROR("Cannot write the full chunk to frame.");
Expand Down Expand Up @@ -3053,19 +3144,19 @@ void* frame_update_chunk(blosc2_frame_s* frame, int64_t nchunk, void* chunk, blo
// Update the offsets chunk in the chunks frame
fp = sframe_open_index(frame->urlpath, "rb+",
frame->schunk->storage->io);
io_cb->seek(fp, header_len + 0, SEEK_SET);
io_cb->seek(fp, frame->file_offset + header_len + 0, SEEK_SET);
}
else {
// Regular frame
fp = io_cb->open(frame->urlpath, "rb+", frame->schunk->storage->io->params);
io_cb->seek(fp, header_len + cbytes, SEEK_SET);
io_cb->seek(fp, frame->file_offset + header_len + cbytes, SEEK_SET);
wbytes = io_cb->write(chunk, 1, chunk_cbytes, fp); // the new chunk
if (wbytes != chunk_cbytes) {
BLOSC_TRACE_ERROR("Cannot write the full chunk to frame.");
io_cb->close(fp);
return NULL;
}
io_cb->seek(fp, header_len + new_cbytes, SEEK_SET);
io_cb->seek(fp, frame->file_offset + header_len + new_cbytes, SEEK_SET);
}
wbytes = io_cb->write(off_chunk, 1, new_off_cbytes, fp); // the new offsets
io_cb->close(fp);
Expand Down Expand Up @@ -3211,12 +3302,12 @@ void* frame_delete_chunk(blosc2_frame_s* frame, int64_t nchunk, blosc2_schunk* s
}
// Update the offsets chunk in the chunks frame
fp = sframe_open_index(frame->urlpath, "rb+", frame->schunk->storage->io);
io_cb->seek(fp, header_len + 0, SEEK_SET);
io_cb->seek(fp, frame->file_offset + header_len + 0, SEEK_SET);
}
else {
// Regular frame
fp = io_cb->open(frame->urlpath, "rb+", frame->schunk->storage->io);
io_cb->seek(fp, header_len + cbytes, SEEK_SET);
io_cb->seek(fp, frame->file_offset + header_len + cbytes, SEEK_SET);
}
wbytes = io_cb->write(off_chunk, 1, new_off_cbytes, fp); // the new offsets
io_cb->close(fp);
Expand Down Expand Up @@ -3350,12 +3441,12 @@ int frame_reorder_offsets(blosc2_frame_s* frame, const int64_t* offsets_order, b
// Update the offsets chunk in the chunks frame
fp = sframe_open_index(frame->urlpath, "rb+",
frame->schunk->storage->io);
io_cb->seek(fp, header_len + 0, SEEK_SET);
io_cb->seek(fp, frame->file_offset + header_len + 0, SEEK_SET);
}
else {
// Regular frame
fp = io_cb->open(frame->urlpath, "rb+", frame->schunk->storage->io->params);
io_cb->seek(fp, header_len + cbytes, SEEK_SET);
io_cb->seek(fp, frame->file_offset + header_len + cbytes, SEEK_SET);
}
int64_t wbytes = io_cb->write(off_chunk, 1, new_off_cbytes, fp); // the new offsets
io_cb->close(fp);
Expand Down
10 changes: 10 additions & 0 deletions blosc/frame.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ typedef struct {
uint32_t trailer_len; //!< The current length of the trailer in (compressed) bytes
bool sframe; //!< Whether the frame is sparse (true) or not
blosc2_schunk *schunk; //!< The schunk associated
int64_t file_offset; //!< The offset where the frame starts inside the file
} blosc2_frame_s;


Expand Down Expand Up @@ -110,6 +111,15 @@ int frame_free(blosc2_frame_s *frame);
*/
blosc2_frame_s* frame_from_file(const char *urlpath, const blosc2_io *io_cb);

/**
* @brief Initialize a frame out of a file.
*
* @param urlpath The file name.
*
* @return The frame created from the file.
*/
blosc2_frame_s* frame_from_file_offset(const char *urlpath, const blosc2_io *io_cb, int64_t offset);

/**
* @brief Initialize a frame out of a frame buffer.
*
Expand Down
Loading