Skip to content

Commit

Permalink
add REDSET_ENCODE to select encoding method at runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
adammoody committed Dec 16, 2023
1 parent 1d054d7 commit fa621e0
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 32 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ LIST(APPEND REDSET_SERIAL_STATIC_LIBS ZLIB::ZLIB)
IF(ENABLE_OPENMP)
FIND_PACKAGE(OpenMP)
IF(OpenMP_FOUND)
SET(HAVE_OMP TRUE)
SET(HAVE_OPENMP TRUE)
LIST(APPEND REDSET_EXTERNAL_LIBS OpenMP::OpenMP_C)
LIST(APPEND REDSET_EXTERNAL_STATIC_LIBS OpenMP::OpenMP_C)
ENDIF(OpenMP_FOUND)
Expand Down
2 changes: 1 addition & 1 deletion cmake/config.h.in
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// System Specific
#cmakedefine HAVE_BYTESWAP_H
#cmakedefine HAVE_OMP
#cmakedefine HAVE_OPENMP
#cmakedefine HAVE_PTHREADS
#cmakedefine HAVE_CUDA
15 changes: 15 additions & 0 deletions src/redset.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
/** default set size for redset to use */
int redset_set_size = 8;

int redset_encode_method = REDSET_ENCODE_CPU;

int redset_init(void)
{
/* read our hostname */
Expand All @@ -42,6 +44,19 @@ int redset_init(void)
/* set MPI buffer size */
redset_mpi_buf_size = 1024 * 1024;

const char* val = getenv("REDSET_ENCODE");
if (val != NULL) {
if (strcmp(val, "CPU") == 0) {
redset_encode_method = REDSET_ENCODE_CPU;
} else if (strcmp(val, "OPENMP") == 0) {
redset_encode_method = REDSET_ENCODE_OPENMP;
} else if (strcmp(val, "PTHREADS") == 0) {
redset_encode_method = REDSET_ENCODE_PTHREADS;
} else if (strcmp(val, "CUDA") == 0) {
redset_encode_method = REDSET_ENCODE_CUDA;
}
}

/* set our global rank */
MPI_Comm_rank(MPI_COMM_WORLD, &redset_rank);

Expand Down
8 changes: 8 additions & 0 deletions src/redset_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ extern "C" {
#define REDSET_KEY_CONFIG_WORLD_SIZE "WRANKS"
#define REDSET_KEY_CONFIG_WORLD_RANK "WRANK"

#define REDSET_ENCODE_CPU (1)
#define REDSET_ENCODE_OPENMP (2)
#define REDSET_ENCODE_PTHREADS (3)
#define REDSET_ENCODE_CUDA (4)

typedef struct {
int enabled; /* flag indicating whether this descriptor is active */
int type; /* redundancy scheme to apply */
Expand Down Expand Up @@ -91,6 +96,9 @@ typedef struct {
/** default set size for redset to use */
extern int redset_set_size;

/* indicates whether to encode via serial CPU, OpenMP, pthreads, or CUDA */
extern int redset_encode_method;

#ifdef REDSET_ENABLE_MPI
int redset_set_partners(
MPI_Comm parent_comm, MPI_Comm comm, int dist,
Expand Down
49 changes: 40 additions & 9 deletions src/redset_reedsolomon.c
Original file line number Diff line number Diff line change
Expand Up @@ -519,15 +519,30 @@ int redset_apply_rs(
kvtree_write_fd(chunk_file, fd_chunk, header);
kvtree_delete(&header);

switch (redset_encode_method) {
#ifdef HAVE_CUDA
rc = redset_reedsolomon_encode_gpu(d, rsf, chunk_file, fd_chunk, chunk_size);
#else
case REDSET_ENCODE_CUDA:
rc = redset_reedsolomon_encode_gpu(d, rsf, chunk_file, fd_chunk, chunk_size);
break;
#endif /* HAVE_CUDA */

#ifdef HAVE_PTHREADS
rc = redset_reedsolomon_encode_pthreads(d, rsf, chunk_file, fd_chunk, chunk_size);
#else
rc = redset_reedsolomon_encode(d, rsf, chunk_file, fd_chunk, chunk_size);
case REDSET_ENCODE_PTHREADS:
rc = redset_reedsolomon_encode_pthreads(d, rsf, chunk_file, fd_chunk, chunk_size);
break;
#endif /* HAVE_PTHREADS */
#endif /* HAVE_CUDA */

#ifdef HAVE_OPENMP
case REDSET_ENCODE_OPENMP: /* OpenMP pragmas are in CPU code */
#endif /* HAVE_OPENMP */
case REDSET_ENCODE_CPU:
rc = redset_reedsolomon_encode(d, rsf, chunk_file, fd_chunk, chunk_size);
break;
default:
redset_abort(-1, "Unsupported encode method specified for RS %d %s @ %s:%d",
redset_encode_method, chunk_file, __FILE__, __LINE__
);
}

/* close my chunkfile, with fsync */
if (redset_close(chunk_file, fd_chunk) != REDSET_SUCCESS) {
Expand Down Expand Up @@ -978,12 +993,28 @@ int redset_recover_rs_rebuild(
);
}

switch (redset_encode_method) {
#ifdef HAVE_CUDA
rc = redset_reedsolomon_decode_gpu(d, missing, rebuild_ranks, need_rebuild, rsf, chunk_file, fd_chunk, chunk_size);
#else
rc = redset_reedsolomon_decode(d, missing, rebuild_ranks, need_rebuild, rsf, chunk_file, fd_chunk, chunk_size);
case REDSET_ENCODE_CUDA:
rc = redset_reedsolomon_decode_gpu(d, missing, rebuild_ranks, need_rebuild, rsf, chunk_file, fd_chunk, chunk_size);
break;
#endif /* HAVE_CUDA */

#ifdef HAVE_PTHREADS
case REDSET_ENCODE_PTHREADS: /* missing pthread decode, fall back to CPU */
#endif /* HAVE_PTHREADS */
#ifdef HAVE_OPENMP
case REDSET_ENCODE_OPENMP: /* OpenMP pragmas are in CPU code */
#endif /* HAVE_OPENMP */
case REDSET_ENCODE_CPU:
rc = redset_reedsolomon_decode(d, missing, rebuild_ranks, need_rebuild, rsf, chunk_file, fd_chunk, chunk_size);
break;
default:
redset_abort(-1, "Unsupported encode method specified for RS %d %s @ %s:%d",
redset_encode_method, chunk_file, __FILE__, __LINE__
);
}

/* close my chunkfile */
if (redset_close(chunk_file, fd_chunk) != REDSET_SUCCESS) {
rc = REDSET_FAILURE;
Expand Down
4 changes: 2 additions & 2 deletions src/redset_reedsolomon_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@

#include "config.h"

#ifdef HAVE_OMP
#ifdef HAVE_OPENMP
#include <omp.h>
#endif /* HAVE_OMP */
#endif /* HAVE_OPENMP */

#ifdef REDSET_ENABLE_MPI
#include "mpi.h"
Expand Down
4 changes: 2 additions & 2 deletions src/redset_reedsolomon_pthreads.c
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ static int reduce_rs_pthread_setup(threadset_t* tset)
/* TODO: launch threads and attach to descriptor, activate via condition variable */

/* compute number of threads to start up */
int max_threads = 16;
int max_threads = 10;
int nthreads = redset_get_nprocs();
if (nthreads > max_threads) {
nthreads = max_threads;
Expand Down Expand Up @@ -392,7 +392,7 @@ static int reduce_rs_pthread_setup3(threadset_t* tset)
int ret = REDSET_SUCCESS;

/* compute number of threads to start up */
int max_threads = 32;
int max_threads = 10;
int nthreads = redset_get_nprocs();
if (nthreads > max_threads) {
nthreads = max_threads;
Expand Down
60 changes: 43 additions & 17 deletions src/redset_xor.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@

#include "config.h"

#ifdef HAVE_OMP
#ifdef HAVE_OPENMP
#include <omp.h>
#endif /* HAVE_OMP */
#endif /* HAVE_OPENMP */

#include "mpi.h"

Expand Down Expand Up @@ -396,15 +396,28 @@ int redset_apply_xor(
kvtree_write_fd(chunk_file, fd_chunk, header);
kvtree_delete(&header);

switch (redset_encode_method) {
#ifdef HAVE_CUDA
rc = redset_xor_encode_gpu(d, rsf, chunk_file, fd_chunk, chunk_size);
#else
case REDSET_ENCODE_CUDA:
rc = redset_xor_encode_gpu(d, rsf, chunk_file, fd_chunk, chunk_size);
break;
#endif /* HAVE_CUDA */
#ifdef HAVE_PTHREADS
rc = redset_xor_encode_pthreads(d, rsf, chunk_file, fd_chunk, chunk_size);
#else
rc = redset_xor_encode(d, rsf, chunk_file, fd_chunk, chunk_size);
case REDSET_ENCODE_PTHREADS:
rc = redset_xor_encode_pthreads(d, rsf, chunk_file, fd_chunk, chunk_size);
break;
#endif /* HAVE_PTHREADS */
#endif /* HAVE_CUDA */
#ifdef HAVE_OPENMP
case REDSET_ENCODE_OPENMP: /* OpenMP pragmas are in CPU code */
#endif /* HAVE_OPENMP */
case REDSET_ENCODE_CPU:
rc = redset_xor_encode(d, rsf, chunk_file, fd_chunk, chunk_size);
break;
default:
redset_abort(-1, "Unsupported encode method specified for XOR %d %s @ %s:%d",
redset_encode_method, chunk_file, __FILE__, __LINE__
);
}

/* close my chunkfile, with fsync */
if (redset_close(chunk_file, fd_chunk) != REDSET_SUCCESS) {
Expand All @@ -429,7 +442,7 @@ static int redset_xor_decode(
const redset_base* d,
int root,
redset_lofi rsf,
const char* xor_file,
const char* chunk_file,
int fd_chunk,
size_t chunk_size)
{
Expand Down Expand Up @@ -470,7 +483,7 @@ static int redset_xor_decode(
offset += count;
} else {
/* for this chunk, read data from the XOR file */
if (redset_read_attempt(xor_file, fd_chunk, send_buf, count) != count) {
if (redset_read_attempt(chunk_file, fd_chunk, send_buf, count) != count) {
/* read failed, make sure we fail this rebuild */
rc = REDSET_FAILURE;
}
Expand Down Expand Up @@ -499,7 +512,7 @@ static int redset_xor_decode(
offset += count;
} else {
/* for this chunk, write data from the XOR file */
if (redset_write_attempt(xor_file, fd_chunk, recv_buf, count) != count) {
if (redset_write_attempt(chunk_file, fd_chunk, recv_buf, count) != count) {
/* write failed, make sure we fail this rebuild */
rc = REDSET_FAILURE;
}
Expand Down Expand Up @@ -634,15 +647,28 @@ int redset_recover_xor_rebuild(
);
}

switch (redset_encode_method) {
#ifdef HAVE_CUDA
rc = redset_xor_decode_gpu(d, root, rsf, xor_file, fd_chunk, chunk_size);
#else
case REDSET_ENCODE_CUDA:
rc = redset_xor_decode_gpu(d, root, rsf, xor_file, fd_chunk, chunk_size);
break;
#endif /* HAVE_CUDA */
#ifdef HAVE_PTHREADS
rc = redset_xor_decode_pthreads(d, root, rsf, xor_file, fd_chunk, chunk_size);
#else
rc = redset_xor_decode(d, root, rsf, xor_file, fd_chunk, chunk_size);
case REDSET_ENCODE_PTHREADS:
rc = redset_xor_decode_pthreads(d, root, rsf, xor_file, fd_chunk, chunk_size);
break;
#endif /* HAVE_PTHREADS */
#endif /* HAVE_CUDA */
#ifdef HAVE_OPENMP
case REDSET_ENCODE_OPENMP: /* OpenMP pragmas are in CPU code */
#endif /* HAVE_OPENMP */
case REDSET_ENCODE_CPU:
rc = redset_xor_decode(d, root, rsf, xor_file, fd_chunk, chunk_size);
break;
default:
redset_abort(-1, "Unsupported encode method specified for XOR %d %s @ %s:%d",
redset_encode_method, xor_file, __FILE__, __LINE__
);
}

/* close my chunkfile */
if (redset_close(xor_file, fd_chunk) != REDSET_SUCCESS) {
Expand Down

0 comments on commit fa621e0

Please sign in to comment.