From 6ec0029ac5f6ce4f7f9793e166df01e615c59973 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Tue, 17 Jan 2017 12:46:32 +0200 Subject: [PATCH] Update zsh-async to 1.5.0 (#271) --- async.zsh | 333 ++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 209 insertions(+), 124 deletions(-) diff --git a/async.zsh b/async.zsh index 345eb800a5ba0..c1a4f6834e2e6 100644 --- a/async.zsh +++ b/async.zsh @@ -3,76 +3,91 @@ # # zsh-async # -# version: 1.3.1 +# version: 1.5.0 # author: Mathias Fredriksson # url: https://github.com/mafredri/zsh-async # +# Produce debug output from zsh-async when set to 1. +ASYNC_DEBUG=${ASYNC_DEBUG:-0} + # Wrapper for jobs executed by the async worker, gives output in parseable format with execution time _async_job() { + # Disable xtrace as it would mangle the output. + setopt localoptions noxtrace + # Store start time as double precision (+E disables scientific notation) float -F duration=$EPOCHREALTIME - # Run the command - # - # What is happening here is that we are assigning stdout, stderr and ret to - # variables, and then we are printing out the variable assignment through - # typeset -p. This way when we run eval we get something along the lines of: - # eval " - # typeset stdout=' M async.test.sh\n M async.zsh' - # typeset ret=0 - # typeset stderr='' - # " - unset stdout stderr ret - eval "$( - { - stdout=$(eval '$@') - ret=$? - typeset -p stdout ret - } 2> >(stderr=$(cat); typeset -p stderr) - )" - - # Calculate duration - duration=$(( EPOCHREALTIME - duration )) - - # stip all null-characters from stdout and stderr - stdout=${stdout//$'\0'/} - stderr=${stderr//$'\0'/} - - # if ret is missing for some unknown reason, set it to -1 to indicate we - # have run into a bug - ret=${ret:--1} - - # Grab mutex lock, stalls until token is available - read -ep >/dev/null - - # return output ( ) - print -r -N -n -- "$1" "$ret" "$stdout" "$duration" "$stderr"$'\0' - - # Unlock mutex by inserting a token - print -p "t" + # Run the command and capture both stdout (`eval`) and stderr (`cat`) in + # separate subshells. When the command is complete, we grab write lock + # (mutex token) and output everything except stderr inside the command + # block, after the command block has completed, the stdin for `cat` is + # closed, causing stderr to be appended with a $'\0' at the end to mark the + # end of output from this job. + local stdout stderr ret tok + { + stdout=$(eval "$@") + ret=$? + duration=$(( EPOCHREALTIME - duration )) # Calculate duration. + + # Grab mutex lock, stalls until token is available. + read -r -k 1 -p tok || exit 1 + + # Return output ( ). + print -r -n - ${(q)1} $ret ${(q)stdout} $duration + } 2> >(stderr=$(cat) && print -r -n - " "${(q)stderr}$'\0') + + # Unlock mutex by inserting a token. + print -n -p $tok } # The background worker manages all tasks and runs them without interfering with other processes _async_worker() { + # Reset all options to defaults inside async worker. + emulate -R zsh + + # Make sure monitor is unset to avoid printing the + # pids of child processes. + unsetopt monitor + + # Redirect stderr to `/dev/null` in case unforseen errors produced by the + # worker. For example: `fork failed: resource temporarily unavailable`. + # Some older versions of zsh might also print malloc errors (know to happen + # on at least zsh 5.0.2 and 5.0.8) likely due to kill signals. + exec 2>/dev/null + + # When a zpty is deleted (using -d) all the zpty instances created before + # the one being deleted receive a SIGHUP, unless we catch it, the async + # worker would simply exit (stop working) even though visible in the list + # of zpty's (zpty -L). + TRAPHUP() { + return 0 # Return 0, indicating signal was handled. + } + local -A storage local unique=0 local notify_parent=0 local parent_pid=0 local coproc_pid=0 + local processing=0 - # Deactivate all zsh hooks inside the worker. + local -a zsh_hooks zsh_hook_functions zsh_hooks=(chpwd periodic precmd preexec zshexit zshaddhistory) - unfunction $zsh_hooks &>/dev/null - # And hooks with registered functions. - zsh_hook_functions=( ${^zsh_hooks}_functions ) - unset $zsh_hook_functions + zsh_hook_functions=(${^zsh_hooks}_functions) + unfunction $zsh_hooks &>/dev/null # Deactivate all zsh hooks inside the worker. + unset $zsh_hook_functions # And hooks with registered functions. + unset zsh_hooks zsh_hook_functions # Cleanup. child_exit() { + local -a pids + pids=(${${(v)jobstates##*:*:}%\=*}) + # If coproc (cat) is the only child running, we close it to avoid # leaving it running indefinitely and cluttering the process tree. - if [[ ${#jobstates} = 1 ]] && [[ $coproc_pid = ${${(v)jobstates##*:*:}%\=*} ]]; then + if (( ! processing )) && [[ $#pids = 1 ]] && [[ $coproc_pid = $pids[1] ]]; then coproc : + coproc_pid=0 fi # On older version of zsh (pre 5.2) we notify the parent through a @@ -98,38 +113,50 @@ _async_worker() { esac done - local -a buffer - # Command arguments are separated with a null character. - while read -r -d $'\0' line; do - if [[ $line != ___ZSH_ASNYC_EOC___ ]]; then - # Read command arguments until we receive magic end-of-command string. - buffer+=($line) - continue - fi + killjobs() { + local tok + local -a pids + pids=(${${(v)jobstates##*:*:}%\=*}) - # Copy command buffer - cmd=("${(@)=buffer}") + # No need to send SIGHUP if no jobs are running. + (( $#pids == 0 )) && continue + (( $#pids == 1 )) && [[ $coproc_pid = $pids[1] ]] && continue - # Reset command buffer - buffer=() + # Grab lock to prevent half-written output in case a child + # process is in the middle of writing to stdin during kill. + (( coproc_pid )) && read -r -k 1 -p tok - local job=$cmd[1] + kill -HUP -$$ # Send to entire process group. + coproc : # Quit coproc. + coproc_pid=0 # Reset pid. + } + + local request + local -a cmd + while :; do + # Wait for jobs sent by async_job. + read -r -d $'\0' request || { + # Since we handle SIGHUP above (and thus do not know when `zpty -d`) + # occurs, a failure to read probably indicates that stdin has + # closed. This is why we propagate the signal to all children and + # exit manually. + kill -HUP -$$ # Send SIGHUP to all jobs. + exit 0 + } # Check for non-job commands sent to worker - case $job in - _unset_trap) - notify_parent=0; continue;; - _killjobs) - # Do nothing in the worker when receiving the TERM signal - trap '' TERM - # Send TERM to the entire process group (PID and all children) - kill -TERM -$$ &>/dev/null - # Reset trap - trap - TERM - continue - ;; + case $request in + _unset_trap) notify_parent=0; continue;; + _killjobs) killjobs; continue;; esac + # Parse the request using shell parsing (z) to allow commands + # to be parsed from single strings and multi-args alike. + cmd=("${(z)request}") + + # Name of the job (first argument). + local job=$cmd[1] + # If worker should perform unique jobs if (( unique )); then # Check if a previous job is still running, if yes, let it finnish @@ -140,20 +167,25 @@ _async_worker() { done fi + # Guard against closing coproc from trap before command has started. + processing=1 + # Because we close the coproc after the last job has completed, we must # recreate it when there are no other jobs running. - if (( !${#jobstates} )); then + if (( ! coproc_pid )); then # Use coproc as a mutex for synchronized output between children. coproc cat - coproc_pid=$! + coproc_pid="$!" # Insert token into coproc - print -p "t" + print -n -p "t" fi - # Run task in background + # Run job in background, completed jobs are printed to stdout. _async_job $cmd & # Store pid because zsh job manager is extremely unflexible (show jobname as non-unique '$job')... - storage[$job]=$! + storage[$job]="$!" + + processing=0 # Disable guard. done } @@ -174,46 +206,54 @@ _async_worker() { async_process_results() { setopt localoptions noshwordsplit - integer count=0 local worker=$1 local callback=$2 local caller=$3 local -a items - local line + local null=$'\0' data + integer -l len pos num_processed typeset -gA ASYNC_PROCESS_BUFFER - # Read output from zpty and parse it if available - while zpty -rt $worker line 2>/dev/null; do - # Remove unwanted \r from output - ASYNC_PROCESS_BUFFER[$worker]+=${line//$'\r'$'\n'/$'\n'} - # Split buffer on null characters, preserve empty elements - # (an anonymous function is used to avoid leaking modified IFS into the callback) - () { - local IFS=$'\0' - items=("${(@)=ASYNC_PROCESS_BUFFER[$worker]}") - } - # Remove last element since it's an artifact - # of the return string separator structure - items=("${(@)items[1,${#items}-1]}") - - # Continue until we receive all information - (( ${#items} % 5 )) && continue - - # Work through all results - while (( ${#items} > 0 )); do - $callback "${(@)items[1,5]}" - shift 5 items - count+=1 - done - # Empty the buffer - unset "ASYNC_PROCESS_BUFFER[$worker]" + # Read output from zpty and parse it if available. + while zpty -r -t $worker data 2>/dev/null; do + ASYNC_PROCESS_BUFFER[$worker]+=$data + len=${#ASYNC_PROCESS_BUFFER[$worker]} + pos=${ASYNC_PROCESS_BUFFER[$worker][(i)$null]} # Get index of NULL-character (delimiter). + + # Keep going until we find a NULL-character. + if (( ! len )) || (( pos > len )); then + continue + fi + + while (( pos <= len )); do + # Take the content from the beginning, until the NULL-character and + # perform shell parsing (z) and unquoting (Q) as an array (@). + items=("${(@Q)${(z)ASYNC_PROCESS_BUFFER[$worker][1,$pos-1]}}") + + # Remove the extracted items from the buffer. + ASYNC_PROCESS_BUFFER[$worker]=${ASYNC_PROCESS_BUFFER[$worker][$pos+1,$len]} + + if (( $#items == 5 )); then + $callback "${(@)items}" # Send all parsed items to the callback. + else + # In case of corrupt data, invoke callback with *async* as job + # name, non-zero exit status and an error message on stderr. + $callback "async" 1 "" 0 "$0:$LINENO: error: bad format, got ${#items} items (${(@q)items})" + fi + + (( num_processed++ )) + + len=${#ASYNC_PROCESS_BUFFER[$worker]} + if (( len > 1 )); then + pos=${ASYNC_PROCESS_BUFFER[$worker][(i)$null]} # Get index of NULL-character (delimiter). + fi + done done - # If we processed any results, return success - (( count )) && return 0 + (( num_processed )) && return 0 - # Avoid printing exit value from setopt printexitvalue + # Avoid printing exit value when `setopt printexitvalue` is active.` [[ $caller = trap || $caller = watcher ]] && return 0 # No results were processed @@ -243,13 +283,13 @@ async_job() { local worker=$1; shift - local cmd p - for p in "$@"; do - cmd+="$p"$'\0' - done - cmd+=___ZSH_ASNYC_EOC___$'\0' + local -a cmd + cmd=("$@") + if (( $#cmd > 1 )); then + cmd=(${(q)cmd}) # Quote special characters in multi argument commands. + fi - zpty -w $worker $cmd + zpty -w $worker $cmd$'\0' } # This function traps notification signals and calls all registered callbacks @@ -276,7 +316,9 @@ async_register_callback() { ASYNC_CALLBACKS[$worker]="$*" - if (( ! ASYNC_USE_ZLE_HANDLER )); then + # Enable trap when the ZLE watcher is unavailable, allows + # workers to notify (via -n) when a job is done. + if [[ ! -o interactive ]] || [[ ! -o zle ]]; then trap '_async_notify_trap' WINCH fi } @@ -311,10 +353,17 @@ async_flush_jobs() { # Send kill command to worker async_job $worker "_killjobs" - # Clear all output buffers - while zpty -r $worker line; do true; done + # Clear the zpty buffer. + local junk + if zpty -r -t $worker junk '*'; then + (( ASYNC_DEBUG )) && print -n "async_flush_jobs $worker: ${(V)junk}" + while zpty -r -t $worker junk '*'; do + (( ASYNC_DEBUG )) && print -n "${(V)junk}" + done + (( ASYNC_DEBUG )) && print + fi - # Clear any partial buffers + # Finally, clear the process buffer in case of partially parsed responses. typeset -gA ASYNC_PROCESS_BUFFER unset "ASYNC_PROCESS_BUFFER[$worker]" } @@ -339,16 +388,47 @@ async_start_worker() { typeset -gA ASYNC_PTYS typeset -h REPLY + typeset has_xtrace=0 + + # Make sure async worker is started without xtrace + # (the trace output interferes with the worker). + [[ -o xtrace ]] && { + has_xtrace=1 + unsetopt xtrace + } + + if (( ! ASYNC_ZPTY_RETURNS_FD )) && [[ -o interactive ]] && [[ -o zle ]]; then + # When zpty doesn't return a file descriptor (on older versions of zsh) + # we try to guess it anyway. + integer -l zptyfd + exec {zptyfd}>&1 # Open a new file descriptor (above 10). + exec {zptyfd}>&- # Close it so it's free to be used by zpty. + fi + zpty -b $worker _async_worker -p $$ $@ || { async_stop_worker $worker return 1 } - if (( ASYNC_USE_ZLE_HANDLER )); then - ASYNC_PTYS[$REPLY]=$worker - zle -F $REPLY _async_zle_watcher + # Re-enable it if it was enabled, for debugging. + (( has_xtrace )) && setopt xtrace + + if [[ $ZSH_VERSION < 5.0.8 ]]; then + # For ZSH versions older than 5.0.8 we delay a bit to give + # time for the worker to start before issuing commands, + # otherwise it will not be ready to receive them. + sleep 0.001 + fi - # If worker was called with -n, disable trap in favor of zle handler + if [[ -o interactive ]] && [[ -o zle ]]; then + if (( ! ASYNC_ZPTY_RETURNS_FD )); then + REPLY=$zptyfd # Use the guessed value for the file desciptor. + fi + + ASYNC_PTYS[$REPLY]=$worker # Map the file desciptor to the worker. + zle -F $REPLY _async_zle_watcher # Register the ZLE handler. + + # Disable trap in favor of ZLE handler when notify is enabled (-n). async_job $worker _unset_trap fi } @@ -373,6 +453,10 @@ async_stop_worker() { done async_unregister_callback $worker zpty -d $worker 2>/dev/null || ret=$? + + # Clear any partial buffers. + typeset -gA ASYNC_PROCESS_BUFFER + unset "ASYNC_PROCESS_BUFFER[$worker]" done return $ret @@ -391,12 +475,13 @@ async_init() { zmodload zsh/zpty zmodload zsh/datetime - # Check if zsh/zpty returns a file descriptor or not, shell must also be interactive - ASYNC_USE_ZLE_HANDLER=0 - [[ -o interactive ]] && { + # Check if zsh/zpty returns a file descriptor or not, + # shell must also be interactive with zle enabled. + ASYNC_ZPTY_RETURNS_FD=0 + [[ -o interactive ]] && [[ -o zle ]] && { typeset -h REPLY - zpty _async_test cat - (( REPLY )) && ASYNC_USE_ZLE_HANDLER=1 + zpty _async_test : + (( REPLY )) && ASYNC_ZPTY_RETURNS_FD=1 zpty -d _async_test } }