-
Notifications
You must be signed in to change notification settings - Fork 14.5k
/
Copy path_parallel.sh
313 lines (283 loc) · 11.7 KB
/
_parallel.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
#!/usr/bin/env bash
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Require SEMAPHORE_NAME
function parallel::initialize_monitoring() {
PARALLEL_MONITORED_DIR="$(mktemp -d)"
export PARALLEL_MONITORED_DIR
PARALLEL_TAIL_LENGTH=${PARALLEL_TAIL_LENGTH:=2}
export PARALLEL_TAIL_LENGTH
}
function parallel::make_sure_gnu_parallel_is_installed() {
start_end::group_start "Making sure GNU Parallels is installed"
echo
echo "Making sure you have GNU parallel installed"
echo
echo "You might need to provide root password if you do not have it"
echo
(command -v parallel || apt install parallel || sudo apt install parallel || brew install parallel) >/dev/null
start_end::group_end "Making sure GNU Parallels is installed"
}
function parallel::kill_stale_semaphore_locks() {
local pid
echo
echo "${COLOR_BLUE}Killing stale semaphore locks${COLOR_RESET}"
echo
for s in "${HOME}/.parallel/semaphores/id-${SEMAPHORE_NAME}/"*@*
do
pid="${s%%@*}"
if [[ ${pid} != "-*" ]]; then
kill -15 -- -"$(basename "${s%%@*}")" 2>/dev/null || true
rm -f "${s}" 2>/dev/null
fi
done
rm -rf "${HOME}/.parallel"
}
# Periodical loop to print summary of all the processes run by parallel
function parallel::monitor_loop() {
trap 'exit 0' TERM
echo
echo "Start monitoring of parallel execution in ${PARALLEL_MONITORED_DIR} directory."
echo
local progress_report_number=1
local start_time
local end_time
# To continue supporting Bash v3 we can't use associative arrays - so use a
# normal array and just check if the value is in it -- it will only ever be
# a few items long so it won't be too expensive
declare -a finished_jobs=()
start_time=${SECONDS}
while true
do
echo
echo "${COLOR_YELLOW}########## Monitoring progress start: ${progress_report_number} ##########${COLOR_RESET}"
echo
if [[ ${PR_LABELS} == *debug-ci-resources* || ${GITHUB_EVENT_NAME} == "push" ]]; then
# Only print stats in `main` or when "debug-ci-resources" label is set on PR.
echo "${COLOR_BLUE}########### STATISTICS #################"
docker_engine_resources::print_overall_stats
echo "########### STATISTICS #################${COLOR_RESET}"
fi
for directory in "${PARALLEL_MONITORED_DIR}"/*/*
do
parallel_process=$(basename "${directory}")
if ( IFS=$'\x1F'; [[ "$IFS${finished_jobs[*]}$IFS" == *"$IFS${parallel_process}$IFS"* ]] ) ; then
# Already finished, so don't print anything
continue
fi
echo "${COLOR_BLUE}### The last ${PARALLEL_TAIL_LENGTH} lines for ${parallel_process} process: ${directory}/stdout ###${COLOR_RESET}"
tail "-${PARALLEL_TAIL_LENGTH}" "${directory}/stdout" || true
echo
if [[ -s "${directory}/status" ]]; then
finished_jobs+=("$parallel_process")
# The last line of output (which we've already shown) will be a line about the success/failure
# of this job
fi
echo
done
end_time=${SECONDS}
echo "${COLOR_YELLOW}########## $((end_time - start_time)) seconds passed since start ##########${COLOR_RESET}"
sleep 15
progress_report_number=$((progress_report_number + 1))
done
}
# Monitors progress of parallel execution and periodically summarizes stdout entries created by
# the parallel execution. Sets PAPARALLEL_MONITORED_DIR which should be be passed as --results
# parameter to GNU parallel execution.
function parallel::monitor_progress() {
echo "Parallel results are stored in: ${PARALLEL_MONITORED_DIR}"
parallel::monitor_loop 2>/dev/null &
# shellcheck disable=SC2034
PARALLEL_MONITORING_PID=$!
# shellcheck disable=SC2016
traps::add_trap 'parallel::kill_monitor' EXIT
}
function parallel::kill_monitor() {
kill ${PARALLEL_MONITORING_PID} >/dev/null 2>&1 || true
}
# Outputs logs for successful test type
# $1 test type
function parallel::output_log_for_successful_job(){
local job=$1
local log_dir="${PARALLEL_MONITORED_DIR}/${SEMAPHORE_NAME}/${job}"
start_end::group_start "${COLOR_GREEN}Output for successful ${job}${COLOR_RESET}"
echo "${COLOR_GREEN}##### The ${job} succeeded ##### ${COLOR_RESET}"
echo
cat "${log_dir}"/stdout
echo
echo "${COLOR_GREEN}##### The ${job} succeeded ##### ${COLOR_RESET}"
echo
start_end::group_end
}
# Outputs logs for failed test type
# $1 test type
function parallel::output_log_for_failed_job(){
local job=$1
local log_dir="${PARALLEL_MONITORED_DIR}/${SEMAPHORE_NAME}/${job}"
start_end::group_start "${COLOR_RED}Output: for failed ${job}${COLOR_RESET}"
echo "${COLOR_RED}##### The ${job} failed ##### ${COLOR_RESET}"
echo
cat "${log_dir}"/stdout
echo
echo
echo "${COLOR_RED}##### The ${job} failed ##### ${COLOR_RESET}"
echo
start_end::group_end
}
# Prints summary of jobs and returns status:
# 0 - all jobs succeeded (SKIPPED_FAILED_JOBS is not counted)
# >0 - number of failed jobs (except Quarantine)
function parallel::print_job_summary_and_return_status_code() {
local return_code="0"
local job
local status_file
for job_path in "${PARALLEL_MONITORED_DIR}/${SEMAPHORE_NAME}/"*
do
job="$(basename "${job_path}")"
status_file="${PARALLEL_MONITORED_DIR}/${SEMAPHORE_NAME}/${job}/status"
if [[ -s "${status_file}" ]]; then
status=$(cat "${status_file}")
else
echo "${COLOR_RED}Missing ${status_file} file"
status="1"
fi
if [[ ${status} == "0" ]]; then
parallel::output_log_for_successful_job "${job}"
else
parallel::output_log_for_failed_job "${job}"
# SKIPPED_FAILED_JOB failure does not trigger whole test failure
if [[ ${SKIPPED_FAILED_JOB=} != "${job}" ]]; then
return_code=$((return_code + 1))
fi
fi
done
return "${return_code}"
}
function parallel::kill_all_running_docker_containers() {
echo
echo "${COLOR_BLUE}Kill all running docker containers${COLOR_RESET}"
echo
# shellcheck disable=SC2046
docker kill $(docker ps -q) || true
}
function parallel::system_prune_docker() {
echo
echo "${COLOR_BLUE}System-prune docker${COLOR_RESET}"
echo
docker_v system prune --force --volumes
echo
}
# Cleans up runner before test execution.
# * Kills all running docker containers
# * System prune to clean all the temporary/unnamed images and left-over volumes
# * Print information about available space and memory
# * Kills stale semaphore locks
function parallel::cleanup_runner() {
start_end::group_start "Cleanup runner"
parallel::kill_all_running_docker_containers
parallel::system_prune_docker
docker_engine_resources::check_all_resources
docker_engine_resources::print_overall_stats
parallel::kill_stale_semaphore_locks
start_end::group_end
}
function parallel::make_sure_python_versions_are_specified() {
if [[ -z "${CURRENT_PYTHON_MAJOR_MINOR_VERSIONS_AS_STRING=}" ]]; then
echo
echo "${COLOR_RED}The CURRENT_PYTHON_MAJOR_MINOR_VERSIONS_AS_STRING variable must be set and list python versions to use!${COLOR_RESET}"
echo
exit 1
fi
echo
echo "${COLOR_BLUE}Running parallel builds for those Python versions: ${CURRENT_PYTHON_MAJOR_MINOR_VERSIONS_AS_STRING}${COLOR_RESET}"
echo
}
function parallel::make_sure_kubernetes_versions_are_specified() {
if [[ -z "${CURRENT_KUBERNETES_VERSIONS_AS_STRING=}" ]]; then
echo
echo "${COLOR_RED}The CURRENT_KUBERNETES_VERSIONS_AS_STRING variable must be set and list K8S versions to use!${COLOR_RESET}"
echo
exit 1
fi
echo
echo "${COLOR_BLUE}Running parallel builds for those Kubernetes versions: ${CURRENT_KUBERNETES_VERSIONS_AS_STRING}${COLOR_RESET}"
echo
}
function parallel::get_maximum_parallel_k8s_jobs() {
docker_engine_resources::get_available_cpus_in_docker
if [[ -n ${RUNS_ON=} && ${RUNS_ON} != *"self-hosted"* ]]; then
echo
echo "${COLOR_YELLOW}This is a GitHub Public runner - for now we are forcing max parallel K8S tests jobs to 1 for those${COLOR_RESET}"
echo
export MAX_PARALLEL_K8S_JOBS="1"
else
if [[ ${MAX_PARALLEL_K8S_JOBS=} != "" ]]; then
echo
echo "${COLOR_YELLOW}Maximum parallel k8s jobs forced vi MAX_PARALLEL_K8S_JOBS = ${MAX_PARALLEL_K8S_JOBS}${COLOR_RESET}"
echo
else
MAX_PARALLEL_K8S_JOBS=$((CPUS_AVAILABLE_FOR_DOCKER / 4))
echo
echo "${COLOR_YELLOW}Maximum parallel k8s jobs set to number of CPUs available for Docker = ${MAX_PARALLEL_K8S_JOBS}${COLOR_RESET}"
echo
fi
fi
export MAX_PARALLEL_K8S_JOBS
}
# Launches parallel building of images. Redirects output to log set the right directories
function parallel::run_single_helm_test() {
local kubernetes_version=$1
local python_version=$2
local executor=$3
local single_job_filename=$4
local job="Cluster-${kubernetes_version}-python-${python_version}"
mkdir -p "${PARALLEL_MONITORED_DIR}/${SEMAPHORE_NAME}/${job}"
export JOB_LOG="${PARALLEL_MONITORED_DIR}/${SEMAPHORE_NAME}/${job}/stdout"
export PARALLEL_JOB_STATUS="${PARALLEL_MONITORED_DIR}/${SEMAPHORE_NAME}/${job}/status"
echo "Starting helm tests for kubernetes version ${kubernetes_version}, python version: ${python_version}"
parallel --ungroup --bg --semaphore --semaphorename "${SEMAPHORE_NAME}" \
--jobs "${MAX_PARALLEL_K8S_JOBS}" "${single_job_filename}" \
"${kubernetes_version}" "${python_version}" "${executor}" >"${JOB_LOG}" 2>&1
}
function parallel::run_helm_tests_in_parallel() {
parallel::cleanup_runner
start_end::group_start "Monitoring helm tests"
parallel::initialize_monitoring
parallel::monitor_progress
local single_job_filename=$1
# In case there are more kubernetes versions than strings, we can reuse python versions so we add it twice here
local repeated_python_versions
# shellcheck disable=SC2206
repeated_python_versions=(${CURRENT_PYTHON_MAJOR_MINOR_VERSIONS_AS_STRING} ${CURRENT_PYTHON_MAJOR_MINOR_VERSIONS_AS_STRING})
local index=0
for kubernetes_version in ${CURRENT_KUBERNETES_VERSIONS_AS_STRING}
do
index=$((index + 1))
python_version=${repeated_python_versions[${index}]}
FORWARDED_PORT_NUMBER=$((38080 + index))
export FORWARDED_PORT_NUMBER
API_SERVER_PORT=$((19090 + index))
export API_SERVER_PORT
# shellcheck disable=SC2153
parallel::run_single_helm_test "${kubernetes_version}" "${python_version}" "${EXECUTOR}" "${single_job_filename}" "${@}"
done
set +e
parallel --semaphore --semaphorename "${SEMAPHORE_NAME}" --wait
parallel::kill_monitor
set -e
start_end::group_end
}