Skip to content

Commit

Permalink
aws_ssm - cleanup logging (#1660)
Browse files Browse the repository at this point in the history
aws_ssm - cleanup logging

SUMMARY

Minor tweaks to how we clean up S3 buckets in our integration tests
Make SSM logging more consistent

ISSUE TYPE

Feature Pull Request

COMPONENT NAME
aws_ssm
ADDITIONAL INFORMATION

Reviewed-by: Alina Buzachis <None>
  • Loading branch information
tremble authored Jan 19, 2023
1 parent efbe850 commit 9733031
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 110 deletions.
4 changes: 4 additions & 0 deletions changelogs/fragments/1660-aws_ssm-logging.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
minor_changes:
- aws_ssm - cleanup logging output (https://github.com/ansible-collections/community.aws/pull/1660).
- aws_ssm - avoid overloading ``subprocess`` (https://github.com/ansible-collections/community.aws/pull/1660).
- aws_ssm - minor refactoring (https://github.com/ansible-collections/community.aws/pull/1660).
196 changes: 102 additions & 94 deletions plugins/connection/aws_ssm.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,36 +250,34 @@ def _ssm_retry(func):
@wraps(func)
def wrapped(self, *args, **kwargs):
remaining_tries = int(self.get_option('reconnection_retries')) + 1
cmd_summary = "%s..." % args[0]
cmd_summary = f"{args[0]}..."
for attempt in range(remaining_tries):
cmd = args[0]

try:
return_tuple = func(self, *args, **kwargs)
display.vvv(return_tuple, host=self.host)
self._vvvv(f"ssm_retry: (success) {to_text(return_tuple)}")
break

except (AnsibleConnectionFailure, Exception) as e:
if attempt == remaining_tries - 1:
raise
else:
pause = 2 ** attempt - 1
if pause > 30:
pause = 30
pause = 2 ** attempt - 1
pause = min(pause, 30)

if isinstance(e, AnsibleConnectionFailure):
msg = "ssm_retry: attempt: %d, cmd (%s), pausing for %d seconds" % (attempt, cmd_summary, pause)
else:
msg = "ssm_retry: attempt: %d, caught exception(%s) from cmd (%s), pausing for %d seconds" % (attempt, e, cmd_summary, pause)
if isinstance(e, AnsibleConnectionFailure):
msg = f"ssm_retry: attempt: {attempt}, cmd ({cmd_summary}), pausing for {pause} seconds"
else:
msg = f"ssm_retry: attempt: {attempt}, caught exception({e}) from cmd ({cmd_summary}), pausing for {pause} seconds"

display.vv(msg, host=self.host)
self._vv(msg)

time.sleep(pause)
time.sleep(pause)

# Do not attempt to reuse the existing session on retries
self.close()
# Do not attempt to reuse the existing session on retries
# This will cause the SSM session to be completely restarted,
# as well as reinitializing the boto3 clients
self.close()

continue
continue

return return_tuple
return wrapped
Expand All @@ -306,11 +304,31 @@ class Connection(ConnectionBase):
_timeout = False
MARK_LENGTH = 26

def _display(self, f, message):
if self.host:
host_args = {"host": self.host}
else:
host_args = {}
f(to_text(message), **host_args)

def _v(self, message):
self._display(display.v, message)

def _vv(self, message):
self._display(display.vv, message)

def _vvv(self, message):
self._display(display.vvv, message)

def _vvvv(self, message):
self._display(display.vvvv, message)

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

if not HAS_BOTO3:
raise AnsibleError('{0}'.format(missing_required_lib("boto3")))
raise AnsibleError(missing_required_lib("boto3"))

super(Connection, self).__init__(*args, **kwargs)
self.host = self._play_context.remote_addr

if getattr(self._shell, "SHELL_FAMILY", '') == 'powershell':
Expand All @@ -337,7 +355,7 @@ def _connect(self):

def reset(self):
''' start a fresh ssm session '''
display.vvvv('reset called on ssm connection')
self._vvvv('reset called on ssm connection')
return self.start_session()

def start_session(self):
Expand All @@ -348,18 +366,18 @@ def start_session(self):
else:
self.instance_id = self.get_option('instance_id')

display.vvv(u"ESTABLISH SSM CONNECTION TO: {0}".format(self.instance_id), host=self.host)
self._vvv(f"ESTABLISH SSM CONNECTION TO: {self.instance_id}")

executable = self.get_option('plugin')
if not os.path.exists(to_bytes(executable, errors='surrogate_or_strict')):
raise AnsibleError("failed to find the executable specified %s."
" Please verify if the executable exists and re-try." % executable)
raise AnsibleError(f"failed to find the executable specified {executable}.")

profile_name = self.get_option('profile') or ''
region_name = self.get_option('region')
ssm_parameters = dict()
client = self._get_boto_client('ssm', region_name=region_name, profile_name=profile_name)
self._client = client
self._vvvv(f"START SSM SESSION: {self.instance_id}")
start_session_args = dict(Target=self.instance_id, Parameters=ssm_parameters)
document_name = self.get_option('ssm_document')
if document_name is not None:
Expand All @@ -374,10 +392,10 @@ def start_session(self):
"StartSession",
profile_name,
json.dumps({"Target": self.instance_id}),
client.meta.endpoint_url
client.meta.endpoint_url,
]

display.vvvv(u"SSM COMMAND: {0}".format(to_text(cmd)), host=self.host)
self._vvvv(f"SSM COMMAND: {to_text(cmd)}")

stdout_r, stdout_w = pty.openpty()
session = subprocess.Popen(
Expand All @@ -398,17 +416,17 @@ def start_session(self):
# Disable command echo and prompt.
self._prepare_terminal()

display.vvv(u"SSM CONNECTION ID: {0}".format(self._session_id), host=self.host)
self._vvvv(f"SSM CONNECTION ID: {self._session_id}")

return session

@_ssm_retry
def exec_command(self, cmd, in_data=None, sudoable=True):
''' run a command on the ssm host '''

super(Connection, self).exec_command(cmd, in_data=in_data, sudoable=sudoable)
super().exec_command(cmd, in_data=in_data, sudoable=sudoable)

display.vvv(u"EXEC {0}".format(to_text(cmd)), host=self.host)
self._vvv(f"EXEC: {to_text(cmd)}")

session = self._session

Expand Down Expand Up @@ -436,14 +454,14 @@ def exec_command(self, cmd, in_data=None, sudoable=True):
remaining = stop_time - int(round(time.time()))
if remaining < 1:
self._timeout = True
display.vvvv(u"EXEC timeout stdout: {0}".format(to_text(stdout)), host=self.host)
raise AnsibleConnectionFailure("SSM exec_command timeout on host: %s"
% self.instance_id)
self._vvvv(f"EXEC timeout stdout: \n{to_text(stdout)}")
raise AnsibleConnectionFailure(
f"SSM exec_command timeout on host: {self.instance_id}")
if self._poll_stdout.poll(1000):
line = self._filter_ansi(self._stdout.readline())
display.vvvv(u"EXEC stdout line: {0}".format(to_text(line)), host=self.host)
self._vvvv(f"EXEC stdout line: \n{to_text(line)}")
else:
display.vvvv(u"EXEC remaining: {0}".format(remaining), host=self.host)
self._vvvv(f"EXEC remaining: {remaining}")
continue

if not begin and self.is_windows:
Expand All @@ -457,11 +475,11 @@ def exec_command(self, cmd, in_data=None, sudoable=True):
continue
if begin:
if mark_end in line:
display.vvvv(u"POST_PROCESS: {0}".format(to_text(stdout)), host=self.host)
self._vvvv(f"POST_PROCESS: \n{to_text(stdout)}")
returncode, stdout = self._post_process(stdout, mark_begin)
self._vvvv(f"POST_PROCESSED: \n{to_text(stdout)}")
break
else:
stdout = stdout + line
stdout = stdout + line

stderr = self._flush_stderr(session)

Expand Down Expand Up @@ -501,32 +519,26 @@ def _prepare_terminal(self):
remaining = stop_time - int(round(time.time()))
if remaining < 1:
self._timeout = True
display.vvvv(
"PRE timeout stdout: {0}".format(to_bytes(stdout)), host=self.host
)
self._vvvv(f"PRE timeout stdout: \n{to_bytes(stdout)}")
raise AnsibleConnectionFailure(
"SSM start_session timeout on host: %s" % self.instance_id
f"SSM start_session timeout on host: {self.instance_id}"
)
if self._poll_stdout.poll(1000):
stdout += to_text(self._stdout.read(1024))
display.vvvv(
"PRE stdout line: {0}".format(to_bytes(stdout)), host=self.host
)
self._vvvv(f"PRE stdout line: \n{to_bytes(stdout)}")
else:
display.vvvv("PRE remaining: {0}".format(remaining), host=self.host)
self._vvvv(f"PRE remaining: {remaining}")

# wait til prompt is ready
if startup_complete is False:
match = str(stdout).find("Starting session with SessionId")
if match != -1:
display.vvvv("PRE startup output received", host=self.host)
self._vvvv("PRE startup output received")
startup_complete = True

# disable echo
if startup_complete and (disable_echo_complete is None):
display.vvvv(
"PRE Disabling Echo: {0}".format(disable_echo_cmd), host=self.host
)
self._vvvv(f"PRE Disabling Echo: {disable_echo_cmd}")
self._session.stdin.write(disable_echo_cmd)
disable_echo_complete = False

Expand All @@ -537,10 +549,7 @@ def _prepare_terminal(self):

# disable prompt
if disable_echo_complete and disable_prompt_complete is None:
display.vvvv(
"PRE Disabling Prompt: {0}".format(disable_prompt_cmd),
host=self.host,
)
self._vvvv(f"PRE Disabling Prompt: \n{disable_prompt_cmd}")
self._session.stdin.write(disable_prompt_cmd)
disable_prompt_complete = False

Expand All @@ -552,11 +561,9 @@ def _prepare_terminal(self):

if not disable_prompt_complete:
raise AnsibleConnectionFailure(
"SSM process closed during _prepare_terminal on host: %s"
% self.instance_id
f"SSM process closed during _prepare_terminal on host: {self.instance_id}"
)
else:
display.vvv("PRE Terminal configured", host=self.host)
self._vvvv("PRE Terminal configured")

def _wrap_command(self, cmd, sudoable, mark_start, mark_end):
''' wrap command so stdout and status can be extracted '''
Expand All @@ -574,38 +581,39 @@ def _wrap_command(self, cmd, sudoable, mark_start, mark_end):
f"printf '\\n%s\\n%s\\n' \"$?\" '{mark_end}';\n"
)

display.vvvv(u"_wrap_command: '{0}'".format(to_text(cmd)), host=self.host)
self._vvvv(f"_wrap_command: \n'{to_text(cmd)}'")
return cmd

def _post_process(self, stdout, mark_begin):
''' extract command status and strip unwanted lines '''

if self.is_windows:
# Value of $LASTEXITCODE will be the line after the mark
trailer = stdout[stdout.rfind(mark_begin):]
last_exit_code = trailer.splitlines()[1]
if last_exit_code.isdigit:
returncode = int(last_exit_code)
else:
returncode = -1
# output to keep will be before the mark
stdout = stdout[:stdout.rfind(mark_begin)]

# If it looks like JSON remove any newlines
if stdout.startswith('{'):
stdout = stdout.replace('\n', '')

return (returncode, stdout)
else:
if not self.is_windows:
# Get command return code
returncode = int(stdout.splitlines()[-2])

# Throw away ending lines
for x in range(0, 3):
# Throw away final lines
for _x in range(0, 3):
stdout = stdout[:stdout.rfind('\n')]

return (returncode, stdout)

# Windows is a little more complex
# Value of $LASTEXITCODE will be the line after the mark
trailer = stdout[stdout.rfind(mark_begin):]
last_exit_code = trailer.splitlines()[1]
if last_exit_code.isdigit:
returncode = int(last_exit_code)
else:
returncode = -1
# output to keep will be before the mark
stdout = stdout[:stdout.rfind(mark_begin)]

# If it looks like JSON remove any newlines
if stdout.startswith('{'):
stdout = stdout.replace('\n', '')

return (returncode, stdout)

def _filter_ansi(self, line):
''' remove any ANSI terminal control codes '''
line = to_text(line)
Expand All @@ -623,20 +631,19 @@ def _filter_ansi(self, line):

return line

def _flush_stderr(self, subprocess):
def _flush_stderr(self, session_process):
''' read and return stderr with minimal blocking '''

poll_stderr = select.poll()
poll_stderr.register(subprocess.stderr, select.POLLIN)
poll_stderr.register(session_process.stderr, select.POLLIN)
stderr = ''

while subprocess.poll() is None:
if poll_stderr.poll(1):
line = subprocess.stderr.readline()
display.vvvv(u"stderr line: {0}".format(to_text(line)), host=self.host)
stderr = stderr + line
else:
while session_process.poll() is None:
if not poll_stderr.poll(1):
break
line = session_process.stderr.readline()
self._vvvv(f"stderr line: {to_text(line)}")
stderr = stderr + line

return stderr

Expand Down Expand Up @@ -740,40 +747,41 @@ def _file_transport_command(self, in_path, out_path, ssm_action):
# Check the return code
if returncode == 0:
return (returncode, stdout, stderr)
else:
raise AnsibleError("failed to transfer file to %s %s:\n%s\n%s" %
(to_native(in_path), to_native(out_path), to_native(stdout), to_native(stderr)))

raise AnsibleError(
f"failed to transfer file to {in_path} {out_path}:\n"
f"{stdout}\n{stderr}")

def put_file(self, in_path, out_path):
''' transfer a file from local to remote '''

super(Connection, self).put_file(in_path, out_path)
super().put_file(in_path, out_path)

display.vvv(u"PUT {0} TO {1}".format(in_path, out_path), host=self.host)
self._vvv(f"PUT {in_path} TO {out_path}")
if not os.path.exists(to_bytes(in_path, errors='surrogate_or_strict')):
raise AnsibleFileNotFound("file or module does not exist: {0}".format(to_native(in_path)))
raise AnsibleFileNotFound(f"file or module does not exist: {in_path}")

return self._file_transport_command(in_path, out_path, 'put')

def fetch_file(self, in_path, out_path):
''' fetch a file from remote to local '''

super(Connection, self).fetch_file(in_path, out_path)
super().fetch_file(in_path, out_path)

display.vvv(u"FETCH {0} TO {1}".format(in_path, out_path), host=self.host)
self._vvv(f"FETCH {in_path} TO {out_path}")
return self._file_transport_command(in_path, out_path, 'get')

def close(self):
''' terminate the connection '''
if self._session_id:

display.vvv(u"CLOSING SSM CONNECTION TO: {0}".format(self.instance_id), host=self.host)
self._vvv(f"CLOSING SSM CONNECTION TO: {self.instance_id}")
if self._timeout:
self._session.terminate()
else:
cmd = b"\nexit\n"
self._session.communicate(cmd)

display.vvvv(u"TERMINATE SSM SESSION: {0}".format(self._session_id), host=self.host)
self._vvvv(f"TERMINATE SSM SESSION: {self._session_id}")
self._client.terminate_session(SessionId=self._session_id)
self._session_id = ''
Loading

0 comments on commit 9733031

Please sign in to comment.