Skip to content

Commit

Permalink
[FLINK-34324][test] Makes all s3 related operations being declared an…
Browse files Browse the repository at this point in the history
…d called in a single location
  • Loading branch information
XComp committed Apr 2, 2024
1 parent 1b92184 commit 874ef9a
Showing 1 changed file with 62 additions and 55 deletions.
117 changes: 62 additions & 55 deletions flink-end-to-end-tests/test-scripts/test_file_sink.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,89 +20,96 @@
OUT_TYPE="${1:-local}" # other type: s3
SINK_TO_TEST="${2:-"StreamingFileSink"}"

S3_PREFIX=temp/test_file_sink-$(uuidgen)
OUTPUT_PATH="$TEST_DATA_DIR/$S3_PREFIX"
S3_OUTPUT_PATH="s3://$IT_CASE_S3_BUCKET/$S3_PREFIX"
source "$(dirname "$0")"/common.sh

if [ "${OUT_TYPE}" == "s3" ]; then
source "$(dirname "$0")"/common_s3.sh
else
echo "S3 environment is not loaded for non-s3 test runs (test run type: $OUT_TYPE)."
fi

# randomly set up openSSL with dynamically/statically linked libraries
OPENSSL_LINKAGE=$(if (( RANDOM % 2 )) ; then echo "dynamic"; else echo "static"; fi)
echo "Executing test with ${OPENSSL_LINKAGE} openSSL linkage (random selection between 'dynamic' and 'static')"

s3_setup hadoop
set_conf_ssl "mutual" "OPENSSL" "${OPENSSL_LINKAGE}"
set_config_key "metrics.fetcher.update-interval" "2000"
# this test relies on global failovers
set_config_key "jobmanager.execution.failover-strategy" "full"

mkdir -p $OUTPUT_PATH

if [ "${OUT_TYPE}" == "local" ]; then
echo "Use local output"
JOB_OUTPUT_PATH=${OUTPUT_PATH}
elif [ "${OUT_TYPE}" == "s3" ]; then
echo "Use s3 output"
JOB_OUTPUT_PATH=${S3_OUTPUT_PATH}
set_config_key "state.checkpoints.dir" "s3://$IT_CASE_S3_BUCKET/$S3_PREFIX-chk"
mkdir -p "$OUTPUT_PATH-chk"
else
echo "Unknown output type: ${OUT_TYPE}"
exit 1
fi

# make sure we delete the file at the end
function out_cleanup {
s3_delete_by_full_path_prefix "$S3_PREFIX"
s3_delete_by_full_path_prefix "${S3_PREFIX}-chk"
rollback_openssl_lib
}
if [ "${OUT_TYPE}" == "s3" ]; then
on_exit out_cleanup
fi
# LOCAL_JOB_OUTPUT_PATH is a local folder that can be used as a download folder for remote data
# the helper functions will access this folder
RANDOM_PREFIX="temp/test_file_sink-$(uuidgen)"
LOCAL_JOB_OUTPUT_PATH="$TEST_DATA_DIR/${RANDOM_PREFIX}"
mkdir -p "${LOCAL_JOB_OUTPUT_PATH}"

TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram.jar"
# JOB_OUTPUT_PATH is the location where the job writes its data to
JOB_OUTPUT_PATH="${LOCAL_JOB_OUTPUT_PATH}"

###################################
# Get all lines in part files and sort them numerically.
#
# Globals:
# OUTPUT_PATH
# LOCAL_JOB_OUTPUT_PATH
# Arguments:
# None
# Returns:
# sorted content of part files
###################################
function get_complete_result {
if [ "${OUT_TYPE}" == "s3" ]; then
s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" "$S3_PREFIX" "part-" true
fi
find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g
find "${LOCAL_JOB_OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g
}

###################################
# Get total number of lines in part files.
#
# Globals:
# S3_PREFIX
# LOCAL_JOB_OUTPUT_PATH
# Arguments:
# None
# Returns:
# line number in part files
###################################
function get_total_number_of_valid_lines {
if [ "${OUT_TYPE}" == "local" ]; then
get_complete_result | wc -l | tr -d '[:space:]'
elif [ "${OUT_TYPE}" == "s3" ]; then
s3_get_number_of_lines_by_prefix "${S3_PREFIX}" "part-"
fi
get_complete_result | wc -l | tr -d '[:space:]'
}

if [ "${OUT_TYPE}" == "local" ]; then
echo "[INFO] Test run in local environment: No S3 environment is loaded."
elif [ "${OUT_TYPE}" == "s3" ]; then
# the s3 context requires additional
source "$(dirname "$0")"/common_s3.sh
s3_setup hadoop

# overwrites JOB_OUTPUT_PATH to point to S3
S3_DATA_PREFIX="${RANDOM_PREFIX}"
S3_CHECKPOINT_PREFIX="${RANDOM_PREFIX}-chk"
JOB_OUTPUT_PATH="s3://$IT_CASE_S3_BUCKET/${S3_DATA_PREFIX}"
set_config_key "state.checkpoints.dir" "s3://$IT_CASE_S3_BUCKET/${S3_CHECKPOINT_PREFIX}"

# overwrites implementation for local runs
function get_complete_result {
# copies the data from S3 to the local LOCAL_JOB_OUTPUT_PATH
s3_get_by_full_path_and_filename_prefix "$LOCAL_JOB_OUTPUT_PATH" "$S3_DATA_PREFIX" "part-" true

# and prints the sorted output
find "${LOCAL_JOB_OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g
}

# overwrites implementation for local runs
function get_total_number_of_valid_lines {
s3_get_number_of_lines_by_prefix "${S3_DATA_PREFIX}" "part-"
}

# make sure we delete the file at the end
function out_cleanup {
s3_delete_by_full_path_prefix "${S3_DATA_PREFIX}"
s3_delete_by_full_path_prefix "${S3_CHECKPOINT_PREFIX}"
rollback_openssl_lib
}

on_exit out_cleanup
else
echo "[ERROR] Unknown out type: ${OUT_TYPE}"
exit 1
fi

# randomly set up openSSL with dynamically/statically linked libraries
OPENSSL_LINKAGE=$(if (( RANDOM % 2 )) ; then echo "dynamic"; else echo "static"; fi)
echo "Executing test with ${OPENSSL_LINKAGE} openSSL linkage (random selection between 'dynamic' and 'static')"

set_conf_ssl "mutual" "OPENSSL" "${OPENSSL_LINKAGE}"
set_config_key "metrics.fetcher.update-interval" "2000"
# this test relies on global failovers
set_config_key "jobmanager.execution.failover-strategy" "full"

TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram.jar"

###################################
# Waits until a number of values have been written within a timeout.
# If the timeout expires, exit with return code 1.
Expand Down

0 comments on commit 874ef9a

Please sign in to comment.