Skip to content

Commit

Permalink
Asyncio Compression: CR fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
yoniko committed Jan 26, 2022
1 parent 494d81f commit bed2a44
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 62 deletions.
2 changes: 1 addition & 1 deletion programs/fileio.c
Original file line number Diff line number Diff line change
Expand Up @@ -1143,7 +1143,7 @@ FIO_compressLz4Frame(cRess_t* ress,
LZ4F_preferences_t prefs;
LZ4F_compressionContext_t ctx;

IOJob_t *writeJob =AIO_WritePool_acquireJob(ress->writeCtx);
IOJob_t* writeJob = AIO_WritePool_acquireJob(ress->writeCtx);

LZ4F_errorCode_t const errorCode = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
if (LZ4F_isError(errorCode))
Expand Down
103 changes: 51 additions & 52 deletions programs/fileio_asyncio.c
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,9 @@ AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedS
* General IoPool implementation
*************************************/

static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) {
void *buffer;
IOJob_t *job;
job = (IOJob_t*) malloc(sizeof(IOJob_t));
buffer = malloc(bufferSize);
static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t* const ctx, const size_t bufferSize) {
IOJob_t* const job = (IOJob_t*) malloc(sizeof(IOJob_t));
void* const buffer = malloc(bufferSize);
if(!job || !buffer)
EXM_THROW(101, "Allocation error : not enough memory");
job->buffer = buffer;
Expand All @@ -153,7 +151,7 @@ static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) {
/* AIO_IOPool_createThreadPool:
* Creates a thread pool and a mutex for threaded IO pool.
* Displays warning if asyncio is requested but MT isn't available. */
static void AIO_IOPool_createThreadPool(IOPoolCtx_t *ctx, const FIO_prefs_t *prefs) {
static void AIO_IOPool_createThreadPool(IOPoolCtx_t* const ctx, const FIO_prefs_t* const prefs) {
ctx->threadPool = NULL;
if(prefs->asyncIO) {
if (ZSTD_pthread_mutex_init(&ctx->ioJobsMutex, NULL))
Expand All @@ -169,7 +167,8 @@ static void AIO_IOPool_createThreadPool(IOPoolCtx_t *ctx, const FIO_prefs_t *pre

/* AIO_IOPool_init:
* Allocates and sets and a new write pool including its included availableJobs. */
static void AIO_IOPool_init(IOPoolCtx_t *ctx, FIO_prefs_t* const prefs, POOL_function poolFunction, size_t bufferSize) {
static void AIO_IOPool_init(IOPoolCtx_t* const ctx, const FIO_prefs_t* const prefs,
POOL_function poolFunction, const size_t bufferSize) {
int i;
AIO_IOPool_createThreadPool(ctx, prefs);
ctx->prefs = prefs;
Expand All @@ -186,8 +185,8 @@ static void AIO_IOPool_init(IOPoolCtx_t *ctx, FIO_prefs_t* const prefs, POOL_fun

/* AIO_IOPool_releaseIoJob:
* Releases an acquired job back to the pool. Doesn't execute the job. */
static void AIO_IOPool_releaseIoJob(IOJob_t *job) {
IOPoolCtx_t *ctx = (IOPoolCtx_t *) job->ctx;
static void AIO_IOPool_releaseIoJob(IOJob_t* const job) {
IOPoolCtx_t* const ctx = (IOPoolCtx_t *) job->ctx;
if(ctx->threadPool)
ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
assert(ctx->availableJobsCount < ctx->totalIoJobs);
Expand All @@ -198,14 +197,14 @@ static void AIO_IOPool_releaseIoJob(IOJob_t *job) {

/* AIO_IOPool_join:
* Waits for all tasks in the pool to finish executing. */
static void AIO_IOPool_join(IOPoolCtx_t* ctx) {
static void AIO_IOPool_join(IOPoolCtx_t* const ctx) {
if(ctx->threadPool)
POOL_joinJobs(ctx->threadPool);
}

/* AIO_IOPool_free:
* Release a previously allocated write thread pool. Makes sure all takss are done and released. */
static void AIO_IOPool_destroy(IOPoolCtx_t* ctx) {
static void AIO_IOPool_destroy(IOPoolCtx_t* const ctx) {
int i;
if(ctx->threadPool) {
/* Make sure we finish all tasks and then free the resources */
Expand All @@ -225,7 +224,7 @@ static void AIO_IOPool_destroy(IOPoolCtx_t* ctx) {

/* AIO_IOPool_acquireJob:
* Returns an available io job to be used for a future io. */
static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t *ctx) {
static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t* const ctx) {
IOJob_t *job;
assert(ctx->file != NULL || ctx->prefs->testMode);
if(ctx->threadPool)
Expand All @@ -245,22 +244,22 @@ static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t *ctx) {
* Sets the destination file for future files in the pool.
* Requires completion of all queues write jobs and release of all otherwise acquired jobs.
* Also requires ending of sparse write if a previous file was used in sparse mode. */
static void AIO_IOPool_setFile(IOPoolCtx_t *ctx, FILE* file) {
static void AIO_IOPool_setFile(IOPoolCtx_t* const ctx, FILE* const file) {
assert(ctx!=NULL);
AIO_IOPool_join(ctx);
assert(ctx->availableJobsCount == ctx->totalIoJobs);
ctx->file = file;
}

static FILE* AIO_IOPool_getFile(IOPoolCtx_t *ctx) {
static FILE* AIO_IOPool_getFile(const IOPoolCtx_t* const ctx) {
return ctx->file;
}

/* AIO_IOPool_enqueueJob:
* Enqueues an io job for execution.
* The queued job shouldn't be used directly after queueing it. */
static void AIO_IOPool_enqueueJob(IOJob_t *job) {
IOPoolCtx_t* ctx = (IOPoolCtx_t *)job->ctx;
static void AIO_IOPool_enqueueJob(IOJob_t* const job) {
IOPoolCtx_t* const ctx = (IOPoolCtx_t *)job->ctx;
if(ctx->threadPool)
POOL_add(ctx->threadPool, ctx->poolFunction, job);
else
Expand All @@ -273,7 +272,7 @@ static void AIO_IOPool_enqueueJob(IOJob_t *job) {

/* AIO_WritePool_acquireJob:
* Returns an available write job to be used for a future write. */
IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t *ctx) {
IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t* const ctx) {
return AIO_IOPool_acquireJob(&ctx->base);
}

Expand All @@ -290,7 +289,7 @@ void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job) {
/* AIO_WritePool_sparseWriteEnd:
* Ends sparse writes to the current file.
* Blocks on completion of all current write jobs before executing. */
void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t *ctx) {
void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t* const ctx) {
assert(ctx != NULL);
if(ctx->base.threadPool)
POOL_joinJobs(ctx->base.threadPool);
Expand All @@ -302,28 +301,28 @@ void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t *ctx) {
* Sets the destination file for future writes in the pool.
* Requires completion of all queues write jobs and release of all otherwise acquired jobs.
* Also requires ending of sparse write if a previous file was used in sparse mode. */
void AIO_WritePool_setFile(WritePoolCtx_t *ctx, FILE* file) {
void AIO_WritePool_setFile(WritePoolCtx_t* const ctx, FILE* const file) {
AIO_IOPool_setFile(&ctx->base, file);
assert(ctx->storedSkips == 0);
}

/* AIO_WritePool_getFile:
* Returns the file the writePool is currently set to write to. */
FILE* AIO_WritePool_getFile(WritePoolCtx_t *ctx) {
FILE* AIO_WritePool_getFile(const WritePoolCtx_t* const ctx) {
return AIO_IOPool_getFile(&ctx->base);
}

/* AIO_WritePool_releaseIoJob:
* Releases an acquired job back to the pool. Doesn't execute the job. */
void AIO_WritePool_releaseIoJob(IOJob_t *job) {
void AIO_WritePool_releaseIoJob(IOJob_t* const job) {
AIO_IOPool_releaseIoJob(job);
}

/* AIO_WritePool_closeFile:
* Ends sparse write and closes the writePool's current file and sets the file to NULL.
* Requires completion of all queues write jobs and release of all otherwise acquired jobs. */
int AIO_WritePool_closeFile(WritePoolCtx_t *ctx) {
FILE *dstFile = ctx->base.file;
int AIO_WritePool_closeFile(WritePoolCtx_t* const ctx) {
FILE* const dstFile = ctx->base.file;
assert(dstFile!=NULL || ctx->base.prefs->testMode!=0);
AIO_WritePool_sparseWriteEnd(ctx);
AIO_IOPool_setFile(&ctx->base, NULL);
Expand All @@ -332,17 +331,17 @@ int AIO_WritePool_closeFile(WritePoolCtx_t *ctx) {

/* AIO_WritePool_executeWriteJob:
* Executes a write job synchronously. Can be used as a function for a thread pool. */
static void AIO_WritePool_executeWriteJob(void* opaque){
IOJob_t* job = (IOJob_t*) opaque;
WritePoolCtx_t* ctx = (WritePoolCtx_t*) job->ctx;
static void AIO_WritePool_executeWriteJob(void* const opaque){
IOJob_t* const job = (IOJob_t*) opaque;
WritePoolCtx_t* const ctx = (WritePoolCtx_t*) job->ctx;
ctx->storedSkips = AIO_fwriteSparse(job->file, job->buffer, job->usedBufferSize, ctx->base.prefs, ctx->storedSkips);
AIO_IOPool_releaseIoJob(job);
}

/* AIO_WritePool_create:
* Allocates and sets and a new write pool including its included jobs. */
WritePoolCtx_t* AIO_WritePool_create(FIO_prefs_t* const prefs, size_t bufferSize) {
WritePoolCtx_t* ctx = (WritePoolCtx_t*) malloc(sizeof(WritePoolCtx_t));
WritePoolCtx_t* AIO_WritePool_create(FIO_prefs_t* const prefs, const size_t bufferSize) {
WritePoolCtx_t* const ctx = (WritePoolCtx_t*) malloc(sizeof(WritePoolCtx_t));
if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");
AIO_IOPool_init(&ctx->base, prefs, AIO_WritePool_executeWriteJob, bufferSize);
ctx->storedSkips = 0;
Expand All @@ -351,7 +350,7 @@ WritePoolCtx_t* AIO_WritePool_create(FIO_prefs_t* const prefs, size_t bufferSize

/* AIO_WritePool_free:
* Frees and releases a writePool and its resources. Closes destination file if needs to. */
void AIO_WritePool_free(WritePoolCtx_t* ctx) {
void AIO_WritePool_free(WritePoolCtx_t* const ctx) {
/* Make sure we finish all tasks and then free the resources */
if(AIO_WritePool_getFile(ctx))
AIO_WritePool_closeFile(ctx);
Expand All @@ -364,7 +363,7 @@ void AIO_WritePool_free(WritePoolCtx_t* ctx) {
/* ***********************************
* ReadPool implementation
*************************************/
static void AIO_ReadPool_releaseAllCompletedJobs(ReadPoolCtx_t* ctx) {
static void AIO_ReadPool_releaseAllCompletedJobs(ReadPoolCtx_t* const ctx) {
int i;
for(i=0; i<ctx->completedJobsCount; i++) {
IOJob_t* job = (IOJob_t*) ctx->completedJobs[i];
Expand All @@ -373,8 +372,8 @@ static void AIO_ReadPool_releaseAllCompletedJobs(ReadPoolCtx_t* ctx) {
ctx->completedJobsCount = 0;
}

static void AIO_ReadPool_addJobToCompleted(IOJob_t *job) {
ReadPoolCtx_t *ctx = (ReadPoolCtx_t *)job->ctx;
static void AIO_ReadPool_addJobToCompleted(IOJob_t* const job) {
ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
if(ctx->base.threadPool)
ZSTD_pthread_mutex_lock(&ctx->base.ioJobsMutex);
assert(ctx->completedJobsCount < MAX_IO_JOBS);
Expand All @@ -389,7 +388,7 @@ static void AIO_ReadPool_addJobToCompleted(IOJob_t *job) {
* Looks through the completed jobs for a job matching the waitingOnOffset and returns it,
* if job wasn't found returns NULL.
* IMPORTANT: assumes ioJobsMutex is locked. */
static IOJob_t* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ReadPoolCtx_t *ctx) {
static IOJob_t* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ReadPoolCtx_t* const ctx) {
IOJob_t *job = NULL;
int i;
/* This implementation goes through all completed jobs and looks for the one matching the next offset.
Expand All @@ -408,15 +407,15 @@ static IOJob_t* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ReadPoolCt

/* AIO_ReadPool_numReadsInFlight:
* Returns the number of IO read jobs currrently in flight. */
static size_t AIO_ReadPool_numReadsInFlight(ReadPoolCtx_t *ctx) {
static size_t AIO_ReadPool_numReadsInFlight(const ReadPoolCtx_t* const ctx) {
const size_t jobsHeld = (ctx->currentJobHeld==NULL ? 0 : 1);
return ctx->base.totalIoJobs - (ctx->base.availableJobsCount + ctx->completedJobsCount + jobsHeld);
}

/* AIO_ReadPool_getNextCompletedJob:
* Returns a completed IOJob_t for the next read in line based on waitingOnOffset and advances waitingOnOffset.
* Would block. */
static IOJob_t* AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t *ctx) {
static IOJob_t* AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t* const ctx) {
IOJob_t *job = NULL;
if (ctx->base.threadPool)
ZSTD_pthread_mutex_lock(&ctx->base.ioJobsMutex);
Expand All @@ -443,9 +442,9 @@ static IOJob_t* AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t *ctx) {

/* AIO_ReadPool_executeReadJob:
* Executes a read job synchronously. Can be used as a function for a thread pool. */
static void AIO_ReadPool_executeReadJob(void* opaque){
IOJob_t* job = (IOJob_t*) opaque;
ReadPoolCtx_t* ctx = (ReadPoolCtx_t *)job->ctx;
static void AIO_ReadPool_executeReadJob(void* const opaque){
IOJob_t* const job = (IOJob_t*) opaque;
ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
if(ctx->reachedEof) {
job->usedBufferSize = 0;
AIO_ReadPool_addJobToCompleted(job);
Expand All @@ -464,14 +463,14 @@ static void AIO_ReadPool_executeReadJob(void* opaque){
AIO_ReadPool_addJobToCompleted(job);
}

static void AIO_ReadPool_enqueueRead(ReadPoolCtx_t *ctx) {
IOJob_t *job = AIO_IOPool_acquireJob(&ctx->base);
static void AIO_ReadPool_enqueueRead(ReadPoolCtx_t* const ctx) {
IOJob_t* const job = AIO_IOPool_acquireJob(&ctx->base);
job->offset = ctx->nextReadOffset;
ctx->nextReadOffset += job->bufferSize;
AIO_IOPool_enqueueJob(job);
}

static void AIO_ReadPool_startReading(ReadPoolCtx_t *ctx) {
static void AIO_ReadPool_startReading(ReadPoolCtx_t* const ctx) {
int i;
for (i = 0; i < ctx->base.availableJobsCount; i++) {
AIO_ReadPool_enqueueRead(ctx);
Expand All @@ -481,7 +480,7 @@ static void AIO_ReadPool_startReading(ReadPoolCtx_t *ctx) {
/* AIO_ReadPool_setFile:
* Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL.
* Waits for all current enqueued tasks to complete if a previous file was set. */
void AIO_ReadPool_setFile(ReadPoolCtx_t *ctx, FILE* file) {
void AIO_ReadPool_setFile(ReadPoolCtx_t* const ctx, FILE* file) {
assert(ctx!=NULL);
AIO_IOPool_join(&ctx->base);
AIO_ReadPool_releaseAllCompletedJobs(ctx);
Expand All @@ -503,8 +502,8 @@ void AIO_ReadPool_setFile(ReadPoolCtx_t *ctx, FILE* file) {
* Allocates and sets and a new readPool including its included jobs.
* bufferSize should be set to the maximal buffer we want to read at a time, will also be used
* as our basic read size. */
ReadPoolCtx_t* AIO_ReadPool_create(FIO_prefs_t* const prefs, size_t bufferSize) {
ReadPoolCtx_t* ctx = (ReadPoolCtx_t*) malloc(sizeof(ReadPoolCtx_t));
ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* const prefs, const size_t bufferSize) {
ReadPoolCtx_t* const ctx = (ReadPoolCtx_t*) malloc(sizeof(ReadPoolCtx_t));
if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");
AIO_IOPool_init(&ctx->base, prefs, AIO_ReadPool_executeReadJob, bufferSize);

Expand All @@ -523,7 +522,7 @@ ReadPoolCtx_t* AIO_ReadPool_create(FIO_prefs_t* const prefs, size_t bufferSize)

/* AIO_ReadPool_free:
* Frees and releases a readPool and its resources. Closes source file. */
void AIO_ReadPool_free(ReadPoolCtx_t* ctx) {
void AIO_ReadPool_free(ReadPoolCtx_t* const ctx) {
if(AIO_ReadPool_getFile(ctx))
AIO_ReadPool_closeFile(ctx);
if(ctx->base.threadPool)
Expand All @@ -535,15 +534,15 @@ void AIO_ReadPool_free(ReadPoolCtx_t* ctx) {

/* AIO_ReadPool_consumeBytes:
* Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */
void AIO_ReadPool_consumeBytes(ReadPoolCtx_t *ctx, size_t n) {
void AIO_ReadPool_consumeBytes(ReadPoolCtx_t* const ctx, const size_t n) {
assert(n <= ctx->srcBufferLoaded);
ctx->srcBufferLoaded -= n;
ctx->srcBuffer += n;
}

/* AIO_ReadPool_releaseCurrentlyHeldAndGetNext:
* Release the current held job and get the next one, returns NULL if no next job available. */
static IOJob_t* AIO_ReadPool_releaseCurrentHeldAndGetNext(ReadPoolCtx_t *ctx) {
static IOJob_t* AIO_ReadPool_releaseCurrentHeldAndGetNext(ReadPoolCtx_t* const ctx) {
if (ctx->currentJobHeld) {
AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld);
ctx->currentJobHeld = NULL;
Expand All @@ -558,7 +557,7 @@ static IOJob_t* AIO_ReadPool_releaseCurrentHeldAndGetNext(ReadPoolCtx_t *ctx) {
* Returns if srcBuffer has at least n bytes loaded or if we've reached the end of the file.
* Return value is the number of bytes added to the buffer.
* Note that srcBuffer might have up to 2 times jobBufferSize bytes. */
size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t *ctx, size_t n) {
size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t* const ctx, const size_t n) {
IOJob_t *job;
int useCoalesce = 0;
assert(n <= ctx->base.jobBufferSize);
Expand Down Expand Up @@ -589,21 +588,21 @@ size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t *ctx, size_t n) {

/* AIO_ReadPool_consumeAndRefill:
* Consumes the current buffer and refills it with bufferSize bytes. */
size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t *ctx) {
size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t* const ctx) {
AIO_ReadPool_consumeBytes(ctx, ctx->srcBufferLoaded);
return AIO_ReadPool_fillBuffer(ctx, ctx->base.jobBufferSize);
}

/* AIO_ReadPool_getFile:
* Returns the current file set for the read pool. */
FILE* AIO_ReadPool_getFile(ReadPoolCtx_t *ctx) {
FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t* const ctx) {
return AIO_IOPool_getFile(&ctx->base);
}

/* AIO_ReadPool_closeFile:
* Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */
int AIO_ReadPool_closeFile(ReadPoolCtx_t *ctx) {
FILE* file = AIO_ReadPool_getFile(ctx);
int AIO_ReadPool_closeFile(ReadPoolCtx_t* const ctx) {
FILE* const file = AIO_ReadPool_getFile(ctx);
AIO_ReadPool_setFile(ctx, NULL);
return fclose(file);
}
8 changes: 4 additions & 4 deletions programs/fileio_asyncio.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ typedef struct {
/* These struct fields should be set only on creation and not changed afterwards */
POOL_ctx* threadPool;
int totalIoJobs;
FIO_prefs_t* prefs;
const FIO_prefs_t* prefs;
POOL_function poolFunction;

/* Controls the file we currently write to, make changes only by using provided utility functions */
Expand Down Expand Up @@ -116,7 +116,7 @@ void AIO_WritePool_setFile(WritePoolCtx_t *ctx, FILE* file);

/* AIO_WritePool_getFile:
* Returns the file the writePool is currently set to write to. */
FILE* AIO_WritePool_getFile(WritePoolCtx_t *ctx);
FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx);

/* AIO_WritePool_closeFile:
* Ends sparse write and closes the writePool's current file and sets the file to NULL.
Expand All @@ -136,7 +136,7 @@ void AIO_WritePool_free(WritePoolCtx_t* ctx);
* Allocates and sets and a new readPool including its included jobs.
* bufferSize should be set to the maximal buffer we want to read at a time, will also be used
* as our basic read size. */
ReadPoolCtx_t* AIO_ReadPool_create(FIO_prefs_t* const prefs, size_t bufferSize);
ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize);

/* AIO_ReadPool_free:
* Frees and releases a readPool and its resources. Closes source file. */
Expand Down Expand Up @@ -164,7 +164,7 @@ void AIO_ReadPool_setFile(ReadPoolCtx_t *ctx, FILE* file);

/* AIO_ReadPool_getFile:
* Returns the current file set for the read pool. */
FILE* AIO_ReadPool_getFile(ReadPoolCtx_t *ctx);
FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t *ctx);

/* AIO_ReadPool_closeFile:
* Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */
Expand Down
Loading

0 comments on commit bed2a44

Please sign in to comment.