From d3e2aa3f38189f6de04bbd4e17e29ea0eb11c183 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 4 Jun 2024 16:51:32 +0900 Subject: [PATCH] out_s3: windows: Handle nested directories for processing parquet objects Signed-off-by: Hiroshi Hatake --- plugins/out_s3/s3.c | 76 ++++++++++++++++++++++++++++++++++++++++----- plugins/out_s3/s3.h | 5 +++ 2 files changed, 74 insertions(+), 7 deletions(-) diff --git a/plugins/out_s3/s3.c b/plugins/out_s3/s3.c index 87e93431a08..bfabc64a6cd 100644 --- a/plugins/out_s3/s3.c +++ b/plugins/out_s3/s3.c @@ -1395,6 +1395,64 @@ static int s3_mkdir(struct flb_s3 *ctx, const char *dir, mode_t mode) #if defined(FLB_SYSTEM_WINDOWS) +static flb_sds_t create_parquest_processing_dir(struct flb_s3 *ctx) +{ + int ret = 0; + DWORD bytes; + BOOL result = FALSE; + flb_sds_t path_buf = NULL; + TCHAR work_dir[MAX_PATH]; + TCHAR tmp_path[MAX_PATH]; + + path_buf = flb_sds_create_size(PATH_MAX); + if (path_buf == NULL) { + goto error; + } + + bytes = GetTempPathA(MAX_PATH, + tmp_path); + if (bytes > MAX_PATH || bytes == 0) { + flb_plg_error(ctx->ins, "GetTempPath failed"); + ret = GetLastError(); + goto error; + } + + result = flb_sds_cat_safe(&path_buf, tmp_path, strlen(tmp_path)); + if (result < 0) { + ret = -1; + goto error; + } + + result = flb_sds_cat_safe(&path_buf, ctx->parquet_process_dir, + flb_sds_len(ctx->parquet_process_dir)); + if (result < 0) { + ret = -1; + goto error; + } + + ret = s3_is_dir(path_buf); + if (ret == -1) { + if (_fullpath(work_dir, path_buf, MAX_PATH) == NULL) { + ret = -1; + goto error; + } + + if (SHCreateDirectoryExA(NULL, work_dir, NULL) != ERROR_SUCCESS) { + ret = -1; + goto error; + } + } + + return path_buf; + +error: + if (path_buf != NULL) { + flb_sds_destroy(path_buf); + } + + return NULL; +} + static int s3_compress_parquet(struct flb_s3 *ctx, char *body, size_t body_size, void **payload_buf, size_t *payload_size) @@ -1413,6 +1471,7 @@ static int s3_compress_parquet(struct flb_s3 *ctx, int fdout = -1; flb_sds_t parquet_buf; TCHAR tmp_path[MAX_PATH]; + flb_sds_t path_buf = NULL; TCHAR in_temp_file[MAX_PATH]; TCHAR out_temp_file[MAX_PATH]; @@ -1421,25 +1480,24 @@ static int s3_compress_parquet(struct flb_s3 *ctx, goto error; } - bytes = GetTempPathA(MAX_PATH, - tmp_path); - if (bytes > MAX_PATH || bytes == 0) { - flb_plg_error(ctx->ins, "GetTempPath failed"); + path_buf = create_parquest_processing_dir(ctx); + if (path_buf == NULL) { + flb_plg_error(ctx->ins, "create processing parquet directory failed"); ret = GetLastError(); goto error; } - bytes = GetTempFileNameA(tmp_path, + bytes = GetTempFileNameA(path_buf, TEXT(template_in_prefix), 0, /* create unique name only */ in_temp_file); - if (bytes > MAX_PATH || bytes == 0) { + if (bytes == 0) { flb_plg_error(ctx->ins, "GetFileName failed"); ret = GetLastError(); goto error; } - bytes = GetTempFileNameA(tmp_path, + bytes = GetTempFileNameA(path_buf, TEXT(template_out_prefix), 0, /* create unique name only */ out_temp_file); @@ -1527,6 +1585,7 @@ static int s3_compress_parquet(struct flb_s3 *ctx, *payload_size = parquet_size; flb_sds_destroy(parquet_cmd); + flb_sds_destroy(path_buf); return 0; @@ -1540,6 +1599,9 @@ static int s3_compress_parquet(struct flb_s3 *ctx, if (parquet_cmd != NULL) { flb_sds_destroy(parquet_cmd); } + if (path_buf != NULL) { + flb_sds_destroy(path_buf); + } return ret; } diff --git a/plugins/out_s3/s3.h b/plugins/out_s3/s3.h index 8c4e3536cf2..465426b0c81 100644 --- a/plugins/out_s3/s3.h +++ b/plugins/out_s3/s3.h @@ -61,7 +61,12 @@ #ifdef __ANDROID__ #define DEFAULT_PARQUET_PROCESS_DIR "/data/local/tmp/parquet/s3" #else +#if defined(FLB_SYSTEM_WINDOWS) +/* The prefix of process dir will be obtained by GetTempPathA */ +#define DEFAULT_PARQUET_PROCESS_DIR "parquet\\s3" +#else #define DEFAULT_PARQUET_PROCESS_DIR "/tmp/parquet/s3" +#endif /* FLB_SYSTEM_WINDOWS */ #endif /*