Skip to content

Commit

Permalink
Merge pull request #282 from Gottox/add/multithread_read
Browse files Browse the repository at this point in the history
Add/multithread read
  • Loading branch information
Gottox authored Aug 15, 2024
2 parents a17cd88 + 278d341 commit 5b33483
Show file tree
Hide file tree
Showing 18 changed files with 672 additions and 206 deletions.
11 changes: 11 additions & 0 deletions include/sqsh_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,17 @@ sqsh_file_iterator_new(const struct SqshFile *file, int *err);
SQSH_NO_UNUSED bool sqsh_file_iterator_next(
struct SqshFileIterator *iterator, size_t desired_size, int *err);

/**
* @brief Checks if the current block is a zero block.
* @memberof SqshFileIterator
*
* @param[in] iterator The file iterator to check.
*
* @return true if the current block is a zero block, false otherwise.
*/
SQSH_NO_UNUSED bool
sqsh_file_iterator_is_zero_block(const struct SqshFileIterator *iterator);

/**
* @deprecated Since 1.5.0. Use sqsh_file_iterator_skip2() instead.
* @memberof SqshFileIterator
Expand Down
46 changes: 46 additions & 0 deletions include/sqsh_posix.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,23 @@
#define SQSH_POSIX_H

#include "sqsh_common.h"
#include <stdint.h>
#include <stdio.h>

#ifdef __cplusplus
extern "C" {
#endif

struct SqshFile;
struct SqshFileIterator;

struct SqshThreadpool;

typedef void (*sqsh_file_iterator_mt_cb)(
const struct SqshFile *file, const struct SqshFileIterator *iterator,
uint64_t offset, void *data, int err);
typedef void (*sqsh_file_to_stream_mt_cb)(
const struct SqshFile *file, FILE *stream, void *data, int err);

/**
* @memberof SqshFile
Expand All @@ -54,6 +64,42 @@ struct SqshFile;
*/
int sqsh_file_to_stream(const struct SqshFile *file, FILE *stream);

/**
* @memberof SqshFile
* @brief writes data to a file descriptor.
*
* @param[in] file The file context.
* @param[in] threadpool The threadpool to use.
* @param[in] stream The descriptor to write the file contents to.
* @param[in] cb The callback to call when the operation is done.
* @param[in] data The data to pass to the callback.
*/
SQSH_NO_UNUSED int sqsh_file_to_stream_mt(
const struct SqshFile *file, struct SqshThreadpool *threadpool,
FILE *stream, sqsh_file_to_stream_mt_cb cb, void *data);

struct SqshThreadpool *sqsh_threadpool_new(size_t threads, int *err);

int sqsh_threadpool_wait(struct SqshThreadpool *pool);

/**
* @memberof SqshThreadpool
* @brief cleans up a threadpool.
*
* @param[in] pool The threadpool to uclean.
* @return The threadpool on success, NULL on error.
*/
int sqsh__threadpool_cleanup(struct SqshThreadpool *pool);

/**
* @memberof SqshThreadpool
* @brief creates a new threadpool.
*
* @param[in] pool The threadpool to free.
* @return 0 on success, less than 0 on error.
*/
int sqsh_threadpool_free(struct SqshThreadpool *pool);

#ifdef __cplusplus
}
#endif
Expand Down
10 changes: 5 additions & 5 deletions libsqsh/include/sqsh_extract_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ struct SqshExtractManager {
/**
* @privatesection
*/
struct CxRcHashMap hash_map;
struct CxRcRadixTree cache;
const struct SqshExtractorImpl *extractor_impl;
uint32_t block_size;
struct SqshMapManager *map_manager;
Expand All @@ -192,14 +192,13 @@ struct SqshExtractManager {
* @param[in] manager The manager to initialize.
* @param[in] archive The archive to use.
* @param[in] block_size The block size to use.
* @param[in] size The size of the manager.
* @param[in] lru_size The size of the lru cache.
*
* @return 0 on success, a negative value on error.
*/
SQSH_NO_EXPORT SQSH_NO_UNUSED int sqsh__extract_manager_init(
struct SqshExtractManager *manager, struct SqshArchive *archive,
uint32_t block_size, size_t size, size_t lru_size);
uint32_t block_size, size_t lru_size);

/**
* @internal
Expand All @@ -222,12 +221,12 @@ SQSH_NO_EXPORT int sqsh__extract_manager_uncompress(
* @brief releases a buffer retrieved by sqsh__extract_manager_uncompress.
*
* @param[in] manager The manager to use.
* @param[in] buffer The buffer to release.
* @param[in] address The address of the buffer to release.
*
* @return 0 on success, a negative value on error.
*/
SQSH_NO_EXPORT int sqsh__extract_manager_release(
struct SqshExtractManager *manager, const struct CxBuffer *buffer);
struct SqshExtractManager *manager, uint64_t address);

/**
* @internal
Expand All @@ -254,6 +253,7 @@ struct SqshExtractView {
*/
struct SqshExtractManager *manager;
const struct CxBuffer *buffer;
uint64_t address;
size_t size;
};

Expand Down
56 changes: 56 additions & 0 deletions libsqsh/include/sqsh_posix_private.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/******************************************************************************
* *
* Copyright (c) 2023-2024, Enno Boland <[email protected]> *
* *
* Redistribution and use in source and binary forms, with or without *
* modification, are permitted provided that the following conditions are *
* met: *
* *
* * Redistributions of source code must retain the above copyright notice, *
* this list of conditions and the following disclaimer. *
* * Redistributions in binary form must reproduce the above copyright *
* notice, this list of conditions and the following disclaimer in the *
* documentation and/or other materials provided with the distribution. *
* *
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS *
* IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, *
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR *
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR *
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, *
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, *
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR *
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF *
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING *
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS *
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *
* *
******************************************************************************/

/**
* @author Enno Boland ([email protected])
* @file sqsh_file_private.h
*/

#ifndef SQSH_POSIX_PRIVATE_H
#define SQSH_POSIX_PRIVATE_H

#include <sqsh_posix.h>

#include <cextras/concurrency.h>

#ifdef __cplusplus
extern "C" {
#endif

struct SqshThreadpool {
struct CxThreadpool pool;
};

int sqsh__threadpool_init(struct SqshThreadpool *pool, size_t threads);

int sqsh__threadpool_cleanup(struct SqshThreadpool *pool);

#ifdef __cplusplus
}
#endif
#endif /* SQSH_POSIX_PRIVATE_H */
43 changes: 2 additions & 41 deletions libsqsh/src/archive/archive.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,23 +60,6 @@ is_initialized(const struct SqshArchive *archive, enum InitializedBitmap mask) {
return archive->initialized & mask;
}

static uint64_t
get_data_segment_size(const struct SqshSuperblock *superblock) {
const uint64_t inode_table_start =
sqsh_superblock_inode_table_start(superblock);
uint64_t res;
/* BUG: This function does not return exact results. It may report values
* that are too large, as it does not take into account the size of the
* compression options. This is not a problem for the current implementation
* as this size is only used for finding upper limits for the extract
* manager. */
if (SQSH_SUB_OVERFLOW(
inode_table_start, sizeof(struct SqshDataSuperblock), &res)) {
return inode_table_start;
}
return res;
}

struct SqshArchive *
sqsh_archive_open(
const void *source, const struct SqshConfig *config, int *err) {
Expand Down Expand Up @@ -153,24 +136,9 @@ sqsh__archive_init(
goto out;
}

uint64_t range;
if (SQSH_SUB_OVERFLOW(
sqsh_superblock_bytes_used(&archive->superblock),
get_data_segment_size(&archive->superblock), &range)) {
rv = -SQSH_ERROR_INTEGER_OVERFLOW;
goto out;
}
const uint64_t metablock_capacity = SQSH_DIVIDE_CEIL(
range,
sizeof(struct SqshDataMetablock) + SQSH_METABLOCK_BLOCK_SIZE);
if (metablock_capacity > SIZE_MAX) {
rv = -SQSH_ERROR_INTEGER_OVERFLOW;
goto out;
}
rv = sqsh__extract_manager_init(
&archive->metablock_extract_manager, archive,
SQSH_METABLOCK_BLOCK_SIZE, (size_t)metablock_capacity,
metablock_lru_size);
SQSH_METABLOCK_BLOCK_SIZE, metablock_lru_size);
if (rv < 0) {
goto out;
}
Expand Down Expand Up @@ -214,19 +182,12 @@ sqsh__archive_data_extract_manager(
if (!is_initialized(archive, INITIALIZED_DATA_COMPRESSION_MANAGER)) {
const struct SqshSuperblock *superblock =
sqsh_archive_superblock(archive);
const uint64_t range = get_data_segment_size(superblock);
const uint64_t capacity =
SQSH_DIVIDE_CEIL(range, sqsh_superblock_block_size(superblock));
const uint32_t datablock_blocksize =
sqsh_superblock_block_size(superblock);
if (capacity > SIZE_MAX) {
rv = -SQSH_ERROR_INTEGER_OVERFLOW;
goto out;
}

rv = sqsh__extract_manager_init(
&archive->data_extract_manager, archive, datablock_blocksize,
(size_t)capacity, data_lru_size);
data_lru_size);
if (rv < 0) {
goto out;
}
Expand Down
10 changes: 5 additions & 5 deletions libsqsh/src/archive/inode_map.c
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ static uint64_t
dyn_map_get(const struct SqshInodeMap *map, uint32_t inode_number, int *err) {
int rv = 0;
uint64_t inode_ref = 0;
bool locked = false;

if (inode_number == 0 || inode_number - 1 >= map->inode_count) {
rv = -SQSH_ERROR_OUT_OF_BOUNDS;
Expand All @@ -131,6 +132,7 @@ dyn_map_get(const struct SqshInodeMap *map, uint32_t inode_number, int *err) {
if (rv < 0) {
goto out;
}
locked = true;

sqsh_index_t index = inode_number - 1;
sqsh_index_t inner_index = index & 0xff;
Expand All @@ -148,12 +150,10 @@ dyn_map_get(const struct SqshInodeMap *map, uint32_t inode_number, int *err) {
inode_ref = 0;
goto out;
}
rv = sqsh__mutex_unlock(map->mutex);
if (rv < 0) {
goto out;
}

out:
if (locked) {
sqsh__mutex_unlock(map->mutex);
}
if (err != NULL) {
*err = rv;
}
Expand Down
Loading

0 comments on commit 5b33483

Please sign in to comment.