From 7a2130a4c5bb8932e94e90dde9acfd064bf84bae Mon Sep 17 00:00:00 2001 From: Igor Partola Date: Fri, 29 Mar 2013 18:29:35 -0400 Subject: [PATCH] Added streaming compression and schedule/size based rotation * Added a compress_on_write option which enables writing compressed logs directly. * Made file rotation respect both a schedule and size based reason to rotate the files. --- README.md | 14 +++++++--- conf/facilities.conf | 2 +- conf/loghogd.conf | 4 +++ debian/changelog | 10 +++++++ example-conf/facilities.conf | 3 +-- example-conf/loghogd.conf | 4 +++ loghogd/compressor.py | 52 ++++++++++++++++++++++++++++++++++-- loghogd/writer.py | 52 +++++++++++++++++++++--------------- 8 files changed, 112 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index b1bd721..fb1ed1a 100644 --- a/README.md +++ b/README.md @@ -215,9 +215,17 @@ LogHog deleted the logs it thought were too old, or crashed your server by not rotating them frequently enough. The *rotate* defines when the log is rotated. This can be done based on a schedule -(e.g.: hourly) or size (e.g.: when it reaches 10 MB). If it is based on size, -you must also specify the *max\_size* (in bytes). See Specifying File Rotation Frequency -section for more details on how to set the schedule. +(e.g.: hourly) and/or size (e.g.: when it reaches 10 MB). If it is based on size, +you must also specify the *max\_size* (in bytes). Otherwisze *max\_size* is optional, +but will be taken into account when testing for whether the file should be rotated. +In other words, if you want the log file to be rotated daily or when it reaches +100 megabytes, LogHog will rotate it at 8am when the maximum size is reached, +as well as at midnight. + +Combining *max\_size* with a schedule-based file rotation will help reduce the chance +of your server running out of disk space. + +See Specifying File Rotation Frequency section for more details on how to set the schedule. The backup\_count option simply says how many backups of the given file to keep. diff --git a/conf/facilities.conf b/conf/facilities.conf index f5722a9..5e73f0d 100644 --- a/conf/facilities.conf +++ b/conf/facilities.conf @@ -14,7 +14,7 @@ ; [kitchen-sink-app] ; rotate = size ; available values are size, hourly, daily, weekly, monthly ; backup_count = 14 ; How many backups to keep -; max_size = 16777216 ; (required if using rotate = size) max size of the file +; max_size = 16777216 ; (required if using rotate = size, optional otherwise) max size of the file in bytes ; flush_every = 1 ; (optional) defaults to 1. flush/fsync log files after this many writes ; file_per_host = yes ; (optional) whether to combine hosts or use separate files ; secret = my-big-secret ; (optional) if set, the client must sign messages with this secret diff --git a/conf/loghogd.conf b/conf/loghogd.conf index c725c0f..2a39f57 100644 --- a/conf/loghogd.conf +++ b/conf/loghogd.conf @@ -17,6 +17,10 @@ level = INFO format = xz ; Options are xz, bzip2, gzip. gzip is the fallback. level = 6 ; Compression level from 0 to 9, from least to most +; If true/yes/1, logs will be written to disk compressed via gzip, instead of +; being compressed after file rotation in the "format" format +compress_on_write = no + [server] ; Comma separated list of addresses. e.g.: [::1], [::17] or 127.0.0.1, 192.168.1.10 ; You may also specify a port with each address: [::0]:5588 or localhost:15566, 192.168.1.10:16655 diff --git a/debian/changelog b/debian/changelog index 9acdced..258d731 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,13 @@ +loghogd (0.10) quantal; urgency=high + + * Critical bug fix: handling malformed client data and buffer underrun error. + * Added a compress_on_write option which enables writing compressed logs + directly. + * Made file rotation respect both a schedule and size based reason to + rotate the files. + + -- Igor Partola Fri, 29 Mar 2013 18:28:57 -0400 + loghogd (0.9) quantal; urgency=high * Bug fix: init script did not trigger proper facilities.conf reload diff --git a/example-conf/facilities.conf b/example-conf/facilities.conf index 89d81ed..af8280b 100644 --- a/example-conf/facilities.conf +++ b/example-conf/facilities.conf @@ -5,13 +5,12 @@ backup_count = 5 [app-with-secret:] rotate = * * * * * backup_count = 5 -max_size = 4096 secret = my-big-secret [kitchen-sink-app:] rotate = size ; available values are size, hourly, daily, weekly, monthly backup_count = 4 ; How many backups to keep -max_size = 1024 ; (optional) max size of the file, if using rotate = size +max_size = 1024 ; (optional) max size of the file, in bytes, before it should be rotated flush_every = 1 ; (optional) defaults to 1. flush/fsync log files after this many writes file_per_host = yes ; (optional) whether to combine hosts or use separate files secret = my-big-secret ; (optional) if set, the client must sign messages with this secret diff --git a/example-conf/loghogd.conf b/example-conf/loghogd.conf index 5369866..c4200df 100644 --- a/example-conf/loghogd.conf +++ b/example-conf/loghogd.conf @@ -18,6 +18,10 @@ level = DEBUG format = xz ; Options are xz, bzip2, gzip. gzip is the fallback. level = 6 ; Compression level from 0 to 9, from least to most +; If true/yes/1, logs will be written to disk compressed via gzip, instead of +; being compressed after file rotation in the "format" format +compress_on_write = no + [server] ; Comma separated list of addresses. e.g.: [::1], [::17] or 127.0.0.1, 192.168.1.10 ; You may also specify a port with each address: [::0]:5588 or localhost:15566, 192.168.1.10:16655 diff --git a/loghogd/compressor.py b/loghogd/compressor.py index 5cff6a8..43ec9ec 100644 --- a/loghogd/compressor.py +++ b/loghogd/compressor.py @@ -1,5 +1,5 @@ -import threading, logging, os, errno, re +import threading, logging, os, errno, re, gzip from subprocess import Popen, PIPE from collections import deque try: @@ -10,10 +10,12 @@ from ext.groper import define_opt, options FALLBACK_COMPRESSOR = 'gzip' +STREAM_COMPRESSOR = 'gzip' # Try to use xz by default. It provides the best compression/speed ratio define_opt('compressor', 'format', default='xz') define_opt('compressor', 'level', type=int, default=6) +define_opt('compressor', 'compress_on_write', type=bool) class CompressorStartupError(Exception): '''Raised by Compressor instances if a misconfigruation is detected.''' @@ -56,6 +58,12 @@ def __init__(self, compress_cmd=None, level=None): if not (0 <= self.compress_level <= 9): raise CompressorStartupError('The compression level must be between 0 and 9 incluse. It is set to {0}.'.format(self.compress_level)) + if options.compressor.compress_on_write: + # Note: this command will not actually be used. Instead + # we will wrap the file object in the compressor of the appropriate type + self.compress_cmd = STREAM_COMPRESSOR + self.log.info('Streaming compression enabled using {0}'.format(STREAM_COMPRESSOR)) + self.extension = self.COMPRESS_EXTS[self.compress_cmd] def start(self): @@ -81,7 +89,8 @@ def compress(self, filename): # Putting None on the queue means "shut down now" if filename: - self.queue.put(filename) + if not options.compressor.compress_on_write: + self.queue.put(filename) def call(self, cmd, stdout=PIPE, stderr=PIPE): '''A wrapper around Popen. Returns (status, stdout, stderr).''' @@ -173,3 +182,42 @@ def check_exec_exists(self, exe): finally: devnull.close() + def wrap_fileobj(self, f, filename): + '''If compress_on_write is enabled, wrap the file object into a GzipFile. + + If compress_on_write is disabled, return the original file object. + + param f : file object + the file object to wrap + param filename : string + basename of the file. This goes into the gzip metadata + ''' + + if options.compressor.compress_on_write: + return gzip.GzipFile(mode='ab', fileobj=f, compresslevel=options.compressor.level, filename=filename) + + return f + + def wrap_filename(self, filename): + '''If compress_on_write is enabled, return a filename + .gz extension. + + If compress_on_write is disabled, return the original filename. + ''' + + if options.compressor.compress_on_write: + return filename + self.extension + + return filename + + def unwrap_filename(self, filename): + '''If compress_on_write is enabled, remove the .gz extension. + + If compress_on_write is disabled, return the original filename. + ''' + + if options.compressor.compress_on_write: + if filename.endswith(self.extension): + return filename[:-len(self.extension)] + + return filename + diff --git a/loghogd/writer.py b/loghogd/writer.py index 7e8933f..21535a5 100644 --- a/loghogd/writer.py +++ b/loghogd/writer.py @@ -1,5 +1,5 @@ -from __future__ import print_function +from __future__ import print_function, unicode_literals import os, datetime, time, logging, errno from scheduler import Scheduler @@ -10,14 +10,14 @@ class LogFile(object): This class is able to write to the corresponding log file and rotate it. ''' - def __init__(self, filename, scheduler, compressed_extension, backup_count, max_size, rotate, flush_every): + def __init__(self, filename, scheduler, compressor, backup_count, max_size, rotate, flush_every): '''Initializes and opens a LogFile instance.''' self.log = logging.getLogger('writer.log_file') # internal logger self.filename = filename self.scheduler = scheduler - self.compressed_extension = compressed_extension + self.compressor = compressor self.backup_count = backup_count self.max_size = max_size self.rotate = rotate @@ -44,13 +44,15 @@ def open(self): # File does not exist self.scheduler.record_execution(self.filename, time.time()) - self.file = os.fdopen(fd, 'a', 0o644) + f = os.fdopen(fd, 'ab', 0o644) except OSError as e: if e.errno == errno.EEXIST: # File exists - self.file = open(self.filename, 'a', 0o644) + f = open(self.filename, 'ab', 0o644) else: raise + self.file = self.compressor.wrap_fileobj(f, os.path.basename(self.filename)) + self.size = os.stat(self.filename).st_size def close(self): @@ -63,44 +65,52 @@ def write(self, data): self.file.write(data) self.dirty_writes += 1 - self.size += len(data) if self.dirty_writes >= self.flush_every: self.file.flush() self.dirty_writes = 0 + # Optimization: only check file size when flushing + self.size = os.stat(self.filename).st_size + def should_rotate(self): '''Figures out if the given file should be rotated. :param f: file dict, as generated in Writer.open() - :return: bool + :return: None or str with reason for rotation ''' - if self.rotate == 'size': - return self.size >= self.max_size + if self.max_size: + if self.size >= self.max_size: + return 'max_size' now = time.time() next_rotation_at = self.scheduler.get_next_execution(self.filename, self.rotate, now) - return next_rotation_at < now + if next_rotation_at < now: + return self.rotate def do_rotate(self): '''Performs the file rotation.''' - self.log.info('Rotating {0} based on "{1}"'.format(self.filename, self.rotate)) + reason = self.should_rotate() + if not reason: + return + + self.log.info('Rotating {0} based on "{1}"'.format(self.filename, reason)) try: # Close the file before renaming it self.close() last_rotation_dt = datetime.datetime.fromtimestamp(self.scheduler.get_last_execution(self.filename)) - new_name = '{0}.{1}'.format(self.filename, last_rotation_dt.strftime('%Y-%m-%d-%H-%M-%S-%f')) - self._rename(self.filename, new_name) + new_name = '{0}.{1}'.format(self.compressor.unwrap_filename(self.filename), last_rotation_dt.strftime('%Y-%m-%d-%H-%M-%S-%f')) + self._rename(self.filename, self.compressor.wrap_filename(new_name)) self.remove_old_backups() - return new_name + self.compressor.compress(new_name) finally: # Make sure that no matter what we try to open the file self.open() @@ -143,7 +153,7 @@ class Writer(object): the appropriate LogFile instances. ''' - LOG_LINE_PROTO = '{0!s} - {1!s} - {2!s}\n'.decode('utf-8') + LOG_LINE_PROTO = '{0!s} - {1!s} - {2!s}\n' def __init__(self, facility_db, compressor, log_dir): '''Initializes a Writer instance. @@ -168,9 +178,7 @@ def write(self, app_id, mod_id, msg): log_file = self.get_file(msg['hostname'], facility) - if log_file.should_rotate(): - rotated_filename = log_file.do_rotate() - self.compressor.compress(rotated_filename) + log_file.do_rotate() s = self.LOG_LINE_PROTO.format(datetime.datetime.now(), msg['hostname'], msg['body']).encode('utf8') @@ -180,9 +188,11 @@ def get_filename(self, hostname, facility): '''Returns the log filename given a hostname.''' if facility.file_per_host: - return os.path.join(self.log_dir, facility.app_id, '{0}-{1}.log'.format(hostname, facility.mod_str)) + filename = os.path.join(self.log_dir, facility.app_id, '{0}-{1}.log'.format(hostname, facility.mod_str)) else: - return os.path.join(self.log_dir, facility.app_id, '{0}.log'.format(facility.mod_str)) + filename = os.path.join(self.log_dir, facility.app_id, '{0}.log'.format(facility.mod_str)) + + return self.compressor.wrap_filename(filename) def get_file(self, hostname, facility): '''Returns a LogFile instance that should be used. @@ -198,7 +208,7 @@ def get_file(self, hostname, facility): self.files[filename] = LogFile( filename=filename, scheduler=self.scheduler, - compressed_extension=self.compressor.extension, + compressor=self.compressor, backup_count=facility.backup_count, max_size=facility.max_size, rotate=facility.rotate,