-
Notifications
You must be signed in to change notification settings - Fork 55
/
Copy pathagent_main.py
executable file
·2525 lines (2143 loc) · 107 KB
/
agent_main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
# Copyright 2014 Scalyr Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ------------------------------------------------------------------------
#
# flake8: noqa: E266
###
# chkconfig: 2345 98 02
# description: Manages the Scalyr Agent 2, which provides log copying
# and basic system metric collection.
###
### BEGIN INIT INFO
# Provides: scalyr-agent-2
# Required-Start: $network
# Required-Stop: $network
# Default-Start: 2 3 4 5
# Default-Stop: 0 1 6
# Description: Manages the Scalyr Agent 2, which provides log copying
# and back system metric collection.
### END INIT INFO
#
# author: Steven Czerwinski <[email protected]>
from __future__ import unicode_literals
from __future__ import print_function
from __future__ import absolute_import
__author__ = "[email protected]"
import collections
import datetime
import json
import platform
import subprocess
import tempfile
import traceback
import errno
import gc
import os
import sys
import time
import re
import argparse
import ssl
from io import open
if False:
from typing import Optional
from typing import Dict
# Work around with a striptime race we see every now and then with docker monitor run() method.
# That race would occur very rarely, since it depends on the order threads are started and when
# strptime is first called.
# See:
# 1. https://github.com/scalyr/scalyr-agent-2/pull/700#issuecomment-761676613
# 2. https://bugs.python.org/issue7980
import _strptime # NOQA
try:
import psutil
except ImportError:
psutil = None
try:
from __scalyr__ import SCALYR_VERSION
from __scalyr__ import scalyr_init
from __scalyr__ import INSTALL_TYPE
from __scalyr__ import DEV_INSTALL
except ImportError:
from scalyr_agent.__scalyr__ import SCALYR_VERSION
from scalyr_agent.__scalyr__ import scalyr_init
from scalyr_agent.__scalyr__ import INSTALL_TYPE
from scalyr_agent.__scalyr__ import DEV_INSTALL
# We must invoke this since we are an executable script.
scalyr_init()
import six
try:
import glob
except ImportError:
import glob2 as glob # type: ignore
import scalyr_agent.scalyr_logging as scalyr_logging
import scalyr_agent.util as scalyr_util
import scalyr_agent.remote_shell as remote_shell
# We have to be careful to set this logger class very early in processing, even before other
# imports to ensure that any loggers created are AgentLoggers.
from scalyr_agent.monitors_manager import MonitorsManager
from scalyr_agent.scalyr_monitor import UnsupportedSystem
# Set up the main logger. We set it up initially to log to standard out,
# but once we run fork off the daemon, we will use a rotating log file.
log = scalyr_logging.getLogger("scalyr_agent")
scalyr_logging.set_log_destination(use_stdout=True)
from optparse import OptionParser
from scalyr_agent.profiler import ScalyrProfiler
from scalyr_agent.scalyr_client import ScalyrClientSession
from scalyr_agent.scalyr_client import create_client, verify_server_certificate
from scalyr_agent.copying_manager import CopyingManager
from scalyr_agent.monitors_manager import set_monitors_manager
from scalyr_agent.configuration import Configuration
from scalyr_agent.util import RunState, ScriptEscalator
from scalyr_agent.util import warn_on_old_or_unsupported_python_version
from scalyr_agent.agent_status import AgentStatus
from scalyr_agent.agent_status import ConfigStatus
from scalyr_agent.agent_status import OverallStats
from scalyr_agent.agent_status import GCStatus
from scalyr_agent.agent_status import report_status
from scalyr_agent.platform_controller import (
PlatformController,
AgentAlreadyRunning,
CannotExecuteAsUser,
)
from scalyr_agent.platform_controller import AgentNotRunning
from scalyr_agent.build_info import get_build_revision
from scalyr_agent.metrics.base import clear_internal_cache
from scalyr_agent import config_main
from scalyr_agent import compat
import scalyr_agent.monitors_manager
STATUS_FILE = "last_status"
STATUS_FORMAT_FILE = "status_format"
VALID_STATUS_FORMATS = ["text", "json"]
AGENT_LOG_FILENAME = "agent.log"
AGENT_NOT_RUNNING_MESSAGE = "The agent does not appear to be running."
# Message which is logged when locale used for the scalyr agent process is not UTF-8
NON_UTF8_LOCALE_WARNING_LINUX_MESSAGE = """
Detected a non UTF-8 locale (%s) being used. You are strongly encouraged to set the locale /
coding for the agent process to UTF-8. Otherwise things won't work when trying to monitor files
with non-ascii content or non-ascii characters in the log file names. On Linux you can do that by
setting LANG and LC_ALL environment variable: e.g. export LC_ALL=en_US.UTF-8.
""".strip().replace(
"\n", " "
)
NON_UTF8_LOCALE_WARNING_WINDOWS_MESSAGE = """
Detected a non UTF-8 locale (%s) being used. You are strongly encouraged to set the locale /
coding for the agent process to UTF-8. Otherwise things won't work when trying to monitor files
with non-ascii content or non-ascii characters in the log file names. On Windows you can do that by
setting setting PYTHONUTF8=1 environment variable.
""".strip().replace(
"\n", " "
)
# Work around for a potential race which may happen when threads try to resolve a unicode hostname
# or similar
# See:
# - https://bugs.python.org/issue29288
# - https://github.com/aws/aws-cli/pull/4383/files#diff-45cb40c448cb3c90162a08a8d5c86559afb843a7678339500c3fb15933b5dcceR55
try:
"".encode("idna")
except Exception as e:
print("Failed to force idna encoding: %s" % (str(e)))
def _update_disabled_until(config_value, current_time):
if config_value is not None:
return config_value + current_time
else:
return current_time
def _check_disabled(current_time, other_time, message):
result = current_time < other_time
if result:
log.log(
scalyr_logging.DEBUG_LEVEL_0,
"%s disabled for %d more seconds" % (message, other_time - current_time),
)
return result
class ScalyrAgent(object):
"""Encapsulates the entire Scalyr Agent 2 application."""
def __init__(self, platform_controller):
"""Initialize the object.
@param platform_controller: The controller for this platform.
@type platform_controller: PlatformController
"""
# NOTE: This abstraction is not thread safe, but it does not need to be. Even the calls to
# create the status file are always issued on the main thread since that's how signals are handled.
# The current config being used to run the agent. This may not be the latest
# version of the config, if that latest version had parsing errors.
self.__config = None
# The platform-specific controller that does things like fork daemon processes, sleeps, etc.
self.__controller = platform_controller
# A helper for running this script as another user if need be.
self.__escalator = None
# The DefaultPaths object for defining the default paths for various things like the log directory based on
# the platform.
self.__default_paths = platform_controller.default_paths
self.__config_file_path = None
# An extra directory for config snippets
self.__extra_config_dir = None
# If the current contents of the configuration file has errors in it, then this will be set to the config
# object produced by reading it.
self.__current_bad_config = None
# The last time the configuration file was checked to see if it had changed.
self.__last_config_check_time = None
self.__start_time = None
# The path where the agent log file is being written.
self.__log_file_path = None
# The current copying manager.
self.__copying_manager = None
# The current monitors manager.
self.__monitors_manager = None
# Tracks whether or not the agent should still be running. When a terminate signal is received,
# the run state is set to false. Threads are expected to notice this and finish as quickly as
# possible.
self.__run_state = None
# Store references to the last value of the OverallStats instance
self.__overall_stats = OverallStats()
# Whether or not the unsafe debugging mode is running (meaning the RemoteShell is accepting connections
# on the local host port and the memory profiler is turned on). Note, this mode is very unsafe since
# arbitrary python commands can be executed by any user on the system as the user running the agent.
self.__unsafe_debugging_running = False
# A reference to the remote shell debug server.
self.__debug_server = None
# Used below for a small cache for a slight optimization.
self.__last_verify_config = None
# Collection of generation time intervals for recent status reports.
self.__agent_status_report_durations = collections.deque(maxlen=100)
# Average duration of the status report preparation, based on collection
# of all recent status report durations.
self.__agent_avg_status_report_duration = None
self.__no_fork = False
self.__last_total_bytes_skipped = 0
self.__last_total_bytes_copied = 0
self.__last_total_bytes_pending = 0
self.__last_total_rate_limited_time = 0
@staticmethod
def agent_run_method(controller, config_file_path, perform_config_check=False):
"""Begins executing the agent service on the current thread.
This will not return until the service is requested to terminate.
This method can be used as an entry point by PlatformControllers that cannot invoke the ``agent_run_method``
argument passed in the ``start_agent_service`` method. It immediately because execution of the service.
@param controller: The controller to use to run the service.
@param config_file_path: The path to the configuration file to use.
@param perform_config_check: If true, will check common configuration errors such as forgetting to
provide an api token and raise an exception if they fail.
@type controller: PlatformController
@type config_file_path: six.text_type
@type perform_config_check: bool
@return: The return code when the agent exits.
@rtype: int
"""
class Options(object):
pass
my_options = Options()
my_options.quiet = True
my_options.verbose = False
my_options.health_check = False
my_options.status_format = "text"
my_options.no_fork = True
my_options.no_change_user = True
my_options.no_check_remote = False
my_options.extra_config_dir = None
my_options.debug = False
my_options.stats_capture_interval = 1
if perform_config_check:
command = "inner_run_with_checks"
else:
command = "inner_run"
try:
return ScalyrAgent(controller).main(config_file_path, command, my_options)
except Exception:
log.exception("Agent failed while running. Will be shutting down.")
raise
def main(self, config_file_path, command, command_options):
"""Run the Scalyr Agent.
@param config_file_path: The path to the configuration file.
@param command: The command passed in at the commandline for the agent to execute, such as 'start', 'stop', etc.
@param command_options: The options from the commandline. These will include 'quiet', 'verbose', etc.
@type config_file_path: six.text_type
@type command: six.text_type
@return: The exit status code to exit with, such as 0 for success.
@rtype: int
"""
quiet = command_options.quiet
verbose = command_options.verbose
health_check = command_options.health_check
status_format = command_options.status_format
extra_config_dir = command_options.extra_config_dir
self.__no_fork = command_options.no_fork
no_check_remote = False
self.__extra_config_dir = Configuration.get_extra_config_dir(extra_config_dir)
# We process for the 'version' command early since we do not need the configuration file for it.
if command == "version":
print("The Scalyr Agent 2 version is %s" % SCALYR_VERSION)
return 0
# Read the configuration file. Fail if we can't read it, unless the command is stop or status.
if config_file_path is None:
config_file_path = self.__default_paths.config_file_path
self.__config_file_path = config_file_path
no_check_remote = command_options.no_check_remote
try:
log_warnings = command not in ["status", "stop"]
self.__config = self.__read_and_verify_config(
config_file_path, log_warnings=log_warnings
)
# NOTE: isatty won't be available on Redirector object on Windows when doing permission
# escalation so we need to handle this scenario as well
isatty_func = getattr(getattr(sys, "stdout", None), "isatty", None)
if not isatty_func or (isatty_func and not isatty_func()):
# If stdout.atty is not available or if it is and returns False, we fall back to
# "check_remote_if_no_tty" config option
# check if not a tty and override the no check remote variable
no_check_remote = not self.__config.check_remote_if_no_tty
except Exception as e:
# We ignore a bad configuration file for 'stop' and 'status' because sometimes you do just accidentally
# screw up the config and you want to let the rest of the system work enough to do the stop or get the
# status.
if command != "stop" and command != "status":
import traceback
raise Exception(
"Error reading configuration file: %s\n"
"Terminating agent, please fix the configuration file and restart agent.\n%s"
% (six.text_type(e), traceback.format_exc())
)
else:
self.__config = None
print(
"Could not parse configuration file at '%s'" % config_file_path,
file=sys.stderr,
)
if log_warnings:
warn_on_old_or_unsupported_python_version()
self.__controller.consume_config(self.__config, config_file_path)
self.__escalator = ScriptEscalator(
self.__controller,
config_file_path,
os.getcwd(),
command_options.no_change_user,
)
# noinspection PyBroadException
try:
# Execute the command.
if command == "start":
return self.__start(quiet, no_check_remote)
elif command == "inner_run_with_checks":
self.__perform_config_checks(no_check_remote)
return self.__run(self.__controller)
elif command == "inner_run":
return self.__run(self.__controller)
elif command == "stop":
return self.__stop(quiet)
elif command == "status" and not (verbose or health_check):
return self.__status()
elif command == "status" and (verbose or health_check):
if self.__config is not None:
agent_data_path = self.__config.agent_data_path
else:
agent_data_path = self.__default_paths.agent_data_path
print(
"Assuming agent data path is '%s'" % agent_data_path,
file=sys.stderr,
)
return self.__detailed_status(
agent_data_path,
command_options=command_options,
status_format=status_format,
health_check=health_check,
)
elif command == "restart":
return self.__restart(quiet, no_check_remote)
elif command == "condrestart":
return self.__condrestart(quiet, no_check_remote)
else:
print('Unknown command given: "%s".' % command, file=sys.stderr)
return 1
except SystemExit:
return 0
except Exception as e:
# We special case the inner_run_with checks since we know that exception is human-readable.
if command == "inner_run_with_checks":
raise e
else:
import traceback
raise Exception(
"Caught exception when attempt to execute command %s. Exception was %s. "
"Traceback:\n%s"
% (command, six.text_type(e), traceback.format_exc())
)
def __read_and_verify_config(self, config_file_path, log_warnings=True):
"""Reads the configuration and verifies it can be successfully parsed including the monitors existing and
having valid configurations.
@param config_file_path: The path to read the configuration from.
@type config_file_path: six.text_type
@param log_warnings: True if config.parse() should log any warnings which may come up during
parsing.
@type log_warnings: bool
@return: The configuration object.
@rtype: scalyr_agent.Configuration
"""
config = self.__make_config(config_file_path, log_warnings=log_warnings)
self.__verify_config(config)
return config
def __make_config(self, config_file_path, log_warnings=True):
"""Make Configuration object. Does not read nor verify.
You must call ``__verify_config`` to read and fully verify the configuration.
@param config_file_path: The path to read the configuration from.
@type config_file_path: six.text_type
@param log_warnings: True if config.parse() should log any warnings which may come up during
parsing.
@type log_warnings: bool
@return: The configuration object.
@rtype: scalyr_agent.Configuration
"""
return Configuration(
config_file_path,
self.__default_paths,
log,
extra_config_dir=self.__extra_config_dir,
log_warnings=log_warnings,
)
def __verify_config(
self,
config,
disable_create_monitors_manager=False,
disable_create_copying_manager=False,
disable_cache_config=False,
):
"""Verifies the passed-in configuration object is valid, and that the referenced monitors exist and have
valid configuration.
@param config: The configuration object.
@type config: scalyr_agent.Configuration
@return: A boolean value indicating whether or not the configuration was fully verified
"""
try:
config.parse()
if disable_create_monitors_manager:
log.info("verify_config - creation of monitors manager disabled")
return False
monitors_manager = MonitorsManager(config, self.__controller)
if disable_create_copying_manager:
log.info("verify_config - creation of copying manager disabled")
return False
copying_manager = CopyingManager(config, monitors_manager.monitors)
# To do the full verification, we have to create the managers. However, this call does not need them,
# but it is very likely the caller of this method will invoke ``__create_worker_thread`` next, so let's
# save them for that call. This helps us avoid having to read and instantiate the monitors multiple times.
if disable_cache_config:
log.info("verify_config - not caching verify_config results")
self.__last_verify_config = None
# return true here because config is verified, just not cached
# this means the rest of the loop will continue but the config
# will be verified again when the worker thread is created
return True
self.__last_verify_config = {
"config": config,
"monitors_manager": monitors_manager,
"copying_manager": copying_manager,
}
except UnsupportedSystem as e:
# We want to emit a better error message for this exception, so capture it here.
raise Exception(
"Configuration file uses a monitor that is not supported on this system Monitor '%s' "
"cannot be used due to: %s. If you require support for this monitor for your system, "
"please e-mail [email protected]" % (e.monitor_name, six.text_type(e))
)
return True
def __create_worker_thread(self, config):
"""Creates the worker thread that will run the copying and monitor managers for the specified configuration.
@param config: The configuration object.
@type config: scalyr_agent.Configuration
@return: The worker thread object to use. You must start it.
@rtype: WorkerThread
"""
# Use the cached results from __last_verify_config if available. If not, force it to create them.
if (
self.__last_verify_config is None
or self.__last_verify_config["config"] is not config
):
self.__verify_config(config)
# Apply any global config options
if self.__last_verify_config and self.__last_verify_config.get("config", None):
self.__last_verify_config["config"].apply_config()
return WorkerThread(
self.__last_verify_config["config"],
self.__last_verify_config["copying_manager"],
self.__last_verify_config["monitors_manager"],
)
def __perform_config_checks(self, no_check_remote):
"""Perform checks for common configuration errors. Raises an exception with a human-readable message
if any of the checks fail.
In particular, this checks if (1) the user has actually entered an api_key, (2) the agent process can
write to the logs directory, (3) we can send a request to the the configured scalyr server
and (4) the api key is correct.
"""
# Make sure the user has set an API key... a common step that can be forgotten.
# If they haven't set it, it will have REPLACE_THIS as the value since that's what is in the template.
if self.__config.api_key == "REPLACE_THIS" or self.__config.api_key == "":
raise Exception(
"Error, you have not set a valid api key in the configuration file.\n"
'Edit the file %s and replace the value for "api_key" with a valid logs '
"write key for your account.\n"
"You can see your write logs keys at https://www.scalyr.com/keys"
% self.__config.file_path
)
self.__verify_can_write_to_logs_and_data(self.__config)
# Begin writing to the log once we confirm we are able to, so we can log any connection errors
scalyr_logging.set_log_destination(
use_disk=True,
no_fork=self.__no_fork,
stdout_severity=self.__config.stdout_severity,
max_bytes=self.__config.log_rotation_max_bytes,
backup_count=self.__config.log_rotation_backup_count,
logs_directory=self.__config.agent_log_path,
agent_log_file_path=AGENT_LOG_FILENAME,
)
# Send a test message to the server to make sure everything works. If not, print a decent error message.
if not no_check_remote or self.__config.use_new_ingestion:
client = create_client(self.__config, quiet=True)
try:
ping_result = client.ping()
if ping_result != "success":
if "badClientClockSkew" in ping_result:
# TODO: The server does not yet send this error message, but it will in the future.
log.error(
"Sending request to the server failed due to bad clock skew. The system "
"clock on this host is too far off from actual time. The agent will keep "
"trying to connect in the background."
)
print(
"Sending request to the server failed due to bad clock skew. The system "
"clock on this host is too far off from actual time. The agent will keep "
"trying to connect in the background.",
file=sys.stderr,
)
elif "invalidApiKey" in ping_result:
# TODO: The server does not yet send this error message, but it will in the future.
raise Exception(
"Sending request to the server failed due to an invalid API key. This probably "
"means the 'api_key' field in configuration file '%s' is not correct. "
"Please visit https://www.scalyr.com/keys and copy a Write Logs key into the "
"'api_key' field in the configuration file"
% self.__config.file_path
)
else:
log.error(
"Failed to send request to the server. The server address could be "
"wrong, there could be a network connectivity issue, or the provided "
"token could be incorrect. The agent will keep trying to connect in the "
"background. You can disable this check with --no-check-remote-server."
)
print(
"Failed to send request to the server. The server address could be "
"wrong, there could be a network connectivity issue, or the provided "
"token could be incorrect. The agent will keep trying to connect in the "
"background. You can disable this check with --no-check-remote-server.",
file=sys.stderr,
)
finally:
client.close()
def __start(self, quiet, no_check_remote):
"""Executes the start command.
This will perform some initial checks to see if the agent can be started, such as making sure it can
read and write to the logs and data directory, and that it can send a successful message to the
Scalyr servers (therefore verifying the authentication token is correct.)
After it determines that the agent is likely to be able to run, it will start the real agent. If self.__no_fork
is False, then a new process will be started in the background and this method will return. Otherwise,
this method will not return.
@param quiet: True if output should be kept to a minimal and only record errors that occur.
@param no_check_remote: True if this method should not try to contact the remote Scalyr servers to
verify connectivity. If it does try to contact the remote servers and it cannot connect, then
the script exits with a failure.
@type quiet: bool
@type no_check_remote: bool
@return: The exit status code for the process.
@rtype: int
"""
# First, see if we have to change the user that is executing this script to match the owner of the config.
if self.__escalator.is_user_change_required():
return self.__escalator.change_user_and_rerun_script(
"start the scalyr agent"
)
# Make sure we do not try to start it up again.
self.__fail_if_already_running()
# noinspection PyBroadException
try:
self.__perform_config_checks(no_check_remote)
except Exception as e:
print(file=sys.stderr)
traceback.print_exc(file=sys.stderr)
print(
"Terminating agent, please fix the error and restart the agent.",
file=sys.stderr,
)
log.error("%s" % six.text_type(e))
log.error("Terminating agent, please fix the error and restart the agent.")
return 1
if not self.__no_fork:
# Do one last check to just cut down on the window of race conditions.
self.__fail_if_already_running()
if not quiet:
if no_check_remote:
print("Configuration verified, starting agent in background.")
else:
print(
"Configuration and server connection verified, starting agent in background."
)
self.__controller.start_agent_service(self.__run, quiet, fork=True)
else:
self.__controller.start_agent_service(self.__run, quiet, fork=False)
return 0
def __handle_terminate(self):
"""Invoked when the process is requested to shutdown, such as by a signal"""
if self.__run_state.is_running():
log.info("Received signal to shutdown, attempt to shutdown cleanly.")
self.__run_state.stop()
def _get_system_and_agent_stats(
self, data_directory, status_file_path
): # type: (six.text_type, six.text_type) -> Optional[Dict]
"""
Return current machine's and agent process' stats.
:return: Dict with stats.
"""
# Get information about agent process.
def get_agent_process_stats():
agent_process_stats = {}
if psutil is None:
return {"error": "No psutil"}
pidfile = os.path.join(self.__config.agent_log_path, "agent.pid")
if not os.path.exists(pidfile):
return {"error": "No PID file."}
try:
with open(pidfile, "r") as fp:
pidfile_content = fp.read().strip()
except OSError as e:
return {"error": "Error during PID file read. Error: {}".format(e)}
try:
psutil_process = psutil.Process(pid=int(pidfile_content))
except psutil.Error as e:
return {"error": "Can't create psutil process. Error: {}".format(e)}
try:
with psutil_process.oneshot():
agent_process_stats["pid"] = psutil_process.pid
agent_process_stats["cpu_percent"] = psutil_process.cpu_percent()
agent_process_stats["memory_rss"] = psutil_process.memory_info().rss
except Exception as e:
agent_process_stats["error"] = str(e)
return agent_process_stats
# Get information about machine's current state.
def get_machine_stats():
machine_stats = {}
try:
machine_stats["cpu_percent"] = psutil.cpu_percent()
machine_stats["cpu_count"] = psutil.cpu_count()
machine_stats["getloadavg"] = psutil.getloadavg()
machine_stats[
"virtual_memory_percent"
] = psutil.virtual_memory().percent
except Exception as e:
machine_stats["error"] = str(e)
return machine_stats
# Get list of related processes'
def find_agent_processes():
agent_processes = []
try:
ps_output = subprocess.check_output(
[
"ps",
"aux",
]
).decode()
except Exception as e:
return {"error": "Can't get list of processes. Error: {}".format(e)}
for line in ps_output.splitlines():
if "scalyr-agent" in line or "python" in line or "agent_main" in line:
agent_processes.append(line)
return agent_processes
# Get status file stats and content.
def get_status_files_info():
result = {}
try:
ls_output = subprocess.check_output(
["ls", "-la", data_directory]
).decode()
result["data_root_file_stats"] = ls_output.splitlines()
except Exception as e:
result["data_root_file_stats"] = six.text_type(e)
try:
with open(status_file_path, "rb") as fp:
content = fp.read()
result["content"] = content.decode(errors="ignore")
except Exception as e:
result["content"] = six.text_type(e)
return result
result = {}
result.update(get_machine_stats())
result["timestamp"] = datetime.datetime.now().microsecond * 1000
result["agent_process"] = get_agent_process_stats()
if not platform.system().lower().startswith("windows"):
result["processes"] = find_agent_processes()
result["status_file_info"] = get_status_files_info()
return result
def __detailed_status(
self,
data_directory,
command_options,
status_format="text",
health_check=False,
zero_status_file=True,
):
"""Execute the status -v or -H command.
This will request the current agent to dump its detailed status to a file in the data directory, which
this process will then read.
@param data_directory: The path to the data directory.
@type data_directory: str
:param zero_status_file: True to zero the status file content so we can detect when agent writes
a new status file This is primary meant to be used in testing where we can
set it to False which means we can avoid a lot of nasty mocking if
open() and related functiond.
@return: An exit status code for the status command indicating success or failure.
@rtype: int
"""
# Health check ignores format but uses `json` under the hood
if health_check:
status_format = "json"
# List of all debug stats that are captured during status report.
debug_stats = []
status_file = os.path.join(data_directory, STATUS_FILE)
status_format_file = os.path.join(data_directory, STATUS_FORMAT_FILE)
# Capture debug stats at the beginning.
if command_options.debug:
stats = self._get_system_and_agent_stats(
data_directory=data_directory, status_file_path=status_file
)
debug_stats.append(stats)
if status_format not in VALID_STATUS_FORMATS:
print(
"Invalid status format: %s. Valid formats are: %s"
% (status_format, ", ".join(VALID_STATUS_FORMATS))
)
return 1
# First, see if we have to change the user that is executing this script to match the owner of the config.
if self.__escalator.is_user_change_required():
try:
return self.__escalator.change_user_and_rerun_script(
"retrieved detailed status", handle_error=False
)
except CannotExecuteAsUser:
# For now, we just ignore the error and try to get the status anyway. This might work on Linux
# platforms depending on permissions. This is legacy behavior.
pass
try:
self.__controller.is_agent_running(fail_if_not_running=True)
except AgentNotRunning as e:
print(AGENT_NOT_RUNNING_MESSAGE)
print("%s" % six.text_type(e))
return 1
# The status works by sending telling the running agent to dump the status into a well known file and
# then we read it from there, echoing it to stdout.
if not os.path.isdir(data_directory):
print(
'Cannot get status due to bad config. The data path "%s" is not a directory'
% data_directory,
file=sys.stderr,
)
return 1
# This users needs to zero out the current status file (if it exists), so they need write access to it.
# When we do create the status file, we give everyone read/write access, so it should not be an issue.
if os.path.isfile(status_file) and not os.access(status_file, os.W_OK):
print(
"Cannot get status due to insufficient permissions. The current user does not "
'have write access to "%s" as required.' % status_file,
file=sys.stderr,
)
return 1
# Zero out the current file so that we can detect once the agent process has updated it.
if os.path.isfile(status_file) and zero_status_file:
f = open(status_file, "w")
f.truncate(0)
f.close()
# Write the file with the format we need to use
with open(status_format_file, "w") as fp:
status_format = six.text_type(status_format)
fp.write(status_format)
# Signal to the running process. This should cause that process to write to the status file
result = self.__controller.request_agent_status()
if result is not None:
if result == errno.ESRCH:
print(AGENT_NOT_RUNNING_MESSAGE, file=sys.stderr)
return 1
elif result == errno.EPERM:
# TODO: We probably should just get the name of the user running the agent and output it
# here, instead of hard coding it to root.
print(
"To view agent status, you must be running as the same user as the agent. "
"Try running this command as root or Administrator.",
file=sys.stderr,
)
return 2
# We wait for five seconds at most to get the status.
deadline = time.time() + self.__config.agent_status_timeout
last_debug_stat_time = 0
# Now loop until we see it show up.
while True:
# Capture debug stats every second while waiting.
if command_options.debug and (
last_debug_stat_time + command_options.stats_capture_interval
< time.time()
):
stats = self._get_system_and_agent_stats(
data_directory=data_directory, status_file_path=status_file
)
debug_stats.append(stats)
last_debug_stat_time = time.time()
try:
if os.path.isfile(status_file) and os.path.getsize(status_file) > 0:
break
except OSError as e:
if e.errno == 2:
# File doesn't exist - it could mean isfile() returned true, but getsize()
# raised an exception since the file was deleted after isfile() call, but
# before getsize()
pass
else:
raise e
if time.time() > deadline:
if self.__config is not None:
agent_log = os.path.join(
self.__config.agent_log_path, AGENT_LOG_FILENAME
)
else:
agent_log = os.path.join(
self.__default_paths.agent_log_path, AGENT_LOG_FILENAME
)
# Capture debug stats on timeout.
if command_options.debug:
debug_stats.append(
self._get_system_and_agent_stats(
data_directory=data_directory, status_file_path=status_file
)
)
if command_options.debug:
debug_stats_str = "Debug stats: {}".format(
json.dumps(debug_stats, sort_keys=True, indent=4)
)
else:
debug_stats_str = ""
print(
"Failed to get status within %d seconds. Giving up. The agent process is "
"possibly stuck. See %s for more details.\n%s"
% (self.__config.agent_status_timeout, agent_log, debug_stats_str),
file=sys.stderr,
)
return 1
time.sleep(0.03)
if not os.access(status_file, os.R_OK):
print(
"Cannot get status due to insufficient permissions. The current user does not "
'have read access to "%s" as required.' % status_file,
file=sys.stderr,
)
return 1
return_code = 0