Skip to content

Commit

Permalink
out_s3: Handle creating nested directories for processing parquet obj…
Browse files Browse the repository at this point in the history
…ects

Signed-off-by: Hiroshi Hatake <[email protected]>
  • Loading branch information
cosmo0920 committed Jun 4, 2024
1 parent cbac11c commit 339692e
Showing 1 changed file with 97 additions and 1 deletion.
98 changes: 97 additions & 1 deletion plugins/out_s3/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -1306,6 +1306,94 @@ static int build_columnify_command(struct flb_s3 *ctx,
return ret;
}

static int s3_is_dir(const char *dir)
{
int ret;
struct stat st;

if (!dir) {
errno = EINVAL;
return -1;
}

if (strlen(dir) == 0) {
errno = EINVAL;
return -1;
}

ret = stat(dir, &st);
if (ret == -1) {
return -1;
}

if (st.st_mode & S_IFDIR) {
return 0;
}

errno = EINVAL;

return -1;
}

static int s3_mkdir(struct flb_s3 *ctx, const char *dir, mode_t mode)
{
struct stat st;
char *dup_dir = NULL;
#ifdef FLB_SYSTEM_MACOS
char *parent_dir = NULL;
#endif

int ret;

if (!stat(dir, &st)) {
return 0;
}

#if FLB_SYSTEM_MACOS
dup_dir = strdup(dir);
if (!dup_dir) {
return -1;
}

/* macOS's dirname(3) should return current directory when slash
* charachter is not included in passed string.
* And note that macOS's dirname(3) does not modify passed string.
*/
parent_dir = dirname(dup_dir);
if (stat(parent_dir, &st) == 0 && strncmp(parent_dir, ".", 1)) {
if (S_ISDIR (st.st_mode)) {
flb_plg_debug(ctx->ins, "creating directory %s", dup_dir);
ret = mkdir(dup_dir, mode);
free(dup_dir);
return ret;
}
}

ret = s3_mkdir(ctx, dirname(dup_dir), mode);
if (ret != 0) {
free(dup_dir);
return ret;
}
flb_plg_debug(ctx->ins, "creating directory %s", dup_dir);
ret = mkdir(dup_dir, mode);
free(dup_dir);
return ret;
#else
dup_dir = strdup(dir);
if (!dup_dir) {
return -1;
}
ret = s3_mkdir(ctx, dirname(dup_dir), mode);
free(dup_dir);
if (ret != 0) {
return ret;
}
flb_plg_debug(ctx->ins, "creating directory %s", dir);
return mkdir(dir, mode);
#endif
}


#if defined(FLB_SYSTEM_WINDOWS)
static int s3_compress_parquet(struct flb_s3 *ctx,
char *body, size_t body_size,
Expand Down Expand Up @@ -1345,7 +1433,7 @@ static int s3_compress_parquet(struct flb_s3 *ctx,
TEXT(template_in_prefix),
0, /* create unique name only */
in_temp_file);
if (bytes == 0) {
if (bytes > MAX_PATH || bytes == 0) {
flb_plg_error(ctx->ins, "GetFileName failed");
ret = GetLastError();
goto error;
Expand Down Expand Up @@ -1470,6 +1558,14 @@ static int create_tmpfile(struct flb_s3 *ctx, char *file_path, char *template, s
goto error;
}

ret = s3_is_dir(ctx->parquet_process_dir);
if (ret == -1) {
flb_plg_debug(ctx->ins, "creating process dir %s.", ctx->parquet_process_dir);
if (s3_mkdir(ctx, ctx->parquet_process_dir, 0755) == -1) {
flb_plg_error(ctx->ins, "ensuring existence of process dir %s is failed.", ctx->parquet_process_dir);
}
}

process_dir = ctx->parquet_process_dir;
process_dir_len = flb_sds_len(ctx->parquet_process_dir);

Expand Down

0 comments on commit 339692e

Please sign in to comment.