Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Seekable format read optimization #3581

Merged
merged 2 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions contrib/seekable_format/tests/seekable_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,56 @@
#include <stdlib.h> // malloc
#include <stdio.h>
#include <assert.h>
#include <string.h>

#include "../zstd_seekable.h"


/* ZSTD_seekable_customFile implementation that reads/seeks a buffer while keeping track of total bytes read */
typedef struct {
const void *ptr;
size_t size;
size_t pos;
size_t totalRead;
} buffWrapperWithTotal_t;

static int readBuffWithTotal(void* opaque, void* buffer, size_t n)
{
buffWrapperWithTotal_t* const buff = (buffWrapperWithTotal_t*)opaque;
assert(buff != NULL);
if (buff->pos + n > buff->size) return -1;
memcpy(buffer, (const char*)buff->ptr + buff->pos, n);
buff->pos += n;
buff->totalRead += n;
return 0;
}

static int seekBuffWithTotal(void* opaque, long long offset, int origin)
{
buffWrapperWithTotal_t* const buff = (buffWrapperWithTotal_t*) opaque;
unsigned long long newOffset;
assert(buff != NULL);
switch (origin) {
case SEEK_SET:
assert(offset >= 0);
newOffset = (unsigned long long)offset;
break;
case SEEK_CUR:
newOffset = (unsigned long long)((long long)buff->pos + offset);
break;
case SEEK_END:
newOffset = (unsigned long long)((long long)buff->size + offset);
break;
default:
assert(0); /* not possible */
}
if (newOffset > buff->size) {
return -1;
}
buff->pos = newOffset;
return 0;
}

/* Basic unit tests for zstd seekable format */
int main(int argc, const char** argv)
{
Expand Down Expand Up @@ -220,6 +267,92 @@ int main(int argc, const char** argv)
}
printf("Success!\n");


printf("Test %u - multiple decompress calls: ", testNb++);
{ char const inBuffer[] = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt";
size_t const inSize = sizeof(inBuffer);

size_t const seekCapacity = 5000;
void* const seekBuffer = malloc(seekCapacity);
assert(seekBuffer != NULL);
size_t seekSize;

size_t const outCapacity = inSize;
char* const outBuffer = malloc(outCapacity);
assert(outBuffer != NULL);

ZSTD_seekable_CStream* const zscs = ZSTD_seekable_createCStream();
assert(zscs != NULL);

/* compress test data with a small frame size to ensure multiple frames in the output */
unsigned const maxFrameSize = 40;
{ size_t const initStatus = ZSTD_seekable_initCStream(zscs, 9, 0 /* checksumFlag */, maxFrameSize);
assert(!ZSTD_isError(initStatus));
}

{ ZSTD_outBuffer outb = { .dst=seekBuffer, .pos=0, .size=seekCapacity };
ZSTD_inBuffer inb = { .src=inBuffer, .pos=0, .size=inSize };

while (inb.pos < inb.size) {
size_t const cStatus = ZSTD_seekable_compressStream(zscs, &outb, &inb);
assert(!ZSTD_isError(cStatus));
}

size_t const endStatus = ZSTD_seekable_endStream(zscs, &outb);
assert(!ZSTD_isError(endStatus));
seekSize = outb.pos;
}

ZSTD_seekable* const stream = ZSTD_seekable_create();
assert(stream != NULL);
buffWrapperWithTotal_t buffWrapper = {seekBuffer, seekSize, 0, 0};
{ ZSTD_seekable_customFile srcFile = {&buffWrapper, &readBuffWithTotal, &seekBuffWithTotal};
size_t const initStatus = ZSTD_seekable_initAdvanced(stream, srcFile);
assert(!ZSTD_isError(initStatus)); }

/* Perform a series of small reads and seeks (repeatedly read 1 byte and skip 1 byte)
and check that we didn't reread input data unnecessarily */
size_t pos;
for (pos = 0; pos < inSize; pos += 2) {
size_t const decStatus = ZSTD_seekable_decompress(stream, outBuffer, 1, pos);
if (decStatus != 1 || outBuffer[0] != inBuffer[pos]) {
goto _test_error;
}
}
if (buffWrapper.totalRead > seekSize) {
/* We read more than the compressed size, meaning there were some rereads.
This is unneeded because we only seeked forward. */
printf("Too much data read: %zu read, with compressed size %zu\n", buffWrapper.totalRead, seekSize);
goto _test_error;
}

/* Perform some reads and seeks to ensure correctness */
struct {
size_t offset;
size_t size;
} const tests[] = { /* Assume the frame size is 40 */
{20, 40}, /* read partial data from two frames */
{60, 10}, /* continue reading from the same offset */
{50, 20}, /* seek backward within the same frame */
{10, 10}, /* seek backward to a different frame */
{25, 10}, /* seek forward within the same frame */
{60, 10}, /* seek forward to a different frame */
};
size_t idx;
for (idx = 0; idx < sizeof(tests) / sizeof(tests[0]); idx++) {
size_t const decStatus = ZSTD_seekable_decompress(stream, outBuffer, tests[idx].size, tests[idx].offset);
if (decStatus != tests[idx].size || memcmp(outBuffer, inBuffer + tests[idx].offset, tests[idx].size) != 0) {
goto _test_error;
}
}

free(seekBuffer);
free(outBuffer);
ZSTD_seekable_freeCStream(zscs);
ZSTD_seekable_free(stream);
}
printf("Success!\n");

/* TODO: Add more tests */
printf("Finished tests\n");
return 0;
Expand Down
2 changes: 1 addition & 1 deletion contrib/seekable_format/zstdseek_decompress.c
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ size_t ZSTD_seekable_decompress(ZSTD_seekable* zs, void* dst, size_t len, unsign
size_t srcBytesRead = 0;
do {
/* check if we can continue from a previous decompress job */
if (targetFrame != zs->curFrame || offset != zs->decompressedOffset) {
if (targetFrame != zs->curFrame || offset < zs->decompressedOffset) {
zs->decompressedOffset = zs->seekTable.entries[targetFrame].dOffset;
zs->curFrame = targetFrame;

Expand Down