From 1b94da77a33ad6691d830c4db1e43be2ccb123d7 Mon Sep 17 00:00:00 2001 From: Ehtesh Choudhury Date: Thu, 24 Jul 2014 19:48:25 +0000 Subject: [PATCH 01/33] Have `make selfsigned` now call `lc-tlscert` `make selfsigned` was pretty nifty. Even though the process is different, maybe we should keep the target in the Makefile? I have `make selfsigned` depend and run `lcs-tlscert`. --- Makefile | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Makefile b/Makefile index a5c30725..bcbb40d7 100644 --- a/Makefile +++ b/Makefile @@ -34,6 +34,10 @@ gem: test: all vendor/bundle/.GemfileModT bundle exec rspec $(TESTS) +selfsigned: | bin/lc-tlscert + bin/lc-tlscert + + doc: @npm --version >/dev/null || (echo "'npm' not found. You need to install node.js.") @npm install doctoc >/dev/null || (echo "Failed to perform local install of doctoc.") From bf26040d2ddc6ad05310dada0fb217092007f0c9 Mon Sep 17 00:00:00 2001 From: Driskell Date: Mon, 29 Sep 2014 19:07:49 +0100 Subject: [PATCH 02/33] Fix #51 - empty lines missing and following line starting with a new line --- src/lc-lib/harvester/linereader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lc-lib/harvester/linereader.go b/src/lc-lib/harvester/linereader.go index 0f28d557..3098be91 100644 --- a/src/lc-lib/harvester/linereader.go +++ b/src/lc-lib/harvester/linereader.go @@ -56,7 +56,7 @@ func (lr *LineReader) ReadSlice() ([]byte, error) { } for { - if n := bytes.IndexByte(lr.buf[lr.start:lr.end], '\n'); n > 0 { + if n := bytes.IndexByte(lr.buf[lr.start:lr.end], '\n'); n >= 0 { line := lr.buf[lr.start:lr.start+n+1] lr.start += n + 1 return line, nil From 7c51debf52950128ed113e5ff78742b0b6b5a488 Mon Sep 17 00:00:00 2001 From: Driskell Date: Mon, 29 Sep 2014 19:51:24 +0100 Subject: [PATCH 03/33] Add Go line reader tests --- Makefile | 1 + src/lc-lib/harvester/linereader_test.go | 101 ++++++++++++++++++++++++ 2 files changed, 102 insertions(+) create mode 100644 src/lc-lib/harvester/linereader_test.go diff --git a/Makefile b/Makefile index a34334dc..6d272e34 100644 --- a/Makefile +++ b/Makefile @@ -34,6 +34,7 @@ gem: gem build log-courier.gemspec test: all vendor/bundle/.GemfileModT + go test -tags "$(TAGS)" lc-admin lc-curvekey lc-lib/... lc-tlscert log-courier bundle exec rspec $(TESTS) doc: diff --git a/src/lc-lib/harvester/linereader_test.go b/src/lc-lib/harvester/linereader_test.go new file mode 100644 index 00000000..aae1b030 --- /dev/null +++ b/src/lc-lib/harvester/linereader_test.go @@ -0,0 +1,101 @@ +/* +* Copyright 2014 Jason Woods. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package harvester + +import ( + "bytes" + "testing" +) + +func checkLine(t *testing.T, reader *LineReader, expected []byte) { + line, err := reader.ReadSlice() + if line == nil { + t.Log("No line returned") + t.FailNow() + } + if !bytes.Equal(line, expected) { + t.Logf("Line data incorrect: [% X]", line) + t.FailNow() + } + if err != nil { + t.Logf("Unexpected error: %s", err) + t.FailNow() + } +} + +func checkLineFull(t *testing.T, reader *LineReader, expected []byte) { + line, err := reader.ReadSlice() + if line == nil { + t.Log("No line returned") + t.FailNow() + } + if !bytes.Equal(line, expected) { + t.Logf("Line data incorrect: [% X]", line) + t.FailNow() + } + if err != ErrBufferFull { + t.Logf("Unexpected error: %s", err) + t.FailNow() + } +} + +func checkEnd(t *testing.T, reader *LineReader) { + line, err := reader.ReadSlice() + if line != nil { + t.Log("Unexpected line returned") + t.FailNow() + } + if err == nil { + t.Log("Expected error") + t.FailNow() + } +} + +func TestLineRead(t *testing.T) { + data := bytes.NewBufferString("12345678901234567890\n12345678901234567890\n") + + // New line read with 100 bytes, enough for the above + reader := NewLineReader(data, 100) + + checkLine(t, reader, []byte("12345678901234567890\n")) + checkLine(t, reader, []byte("12345678901234567890\n")) + checkEnd(t, reader) +} + +func TestLineReadEmpty(t *testing.T) { + data := bytes.NewBufferString("\n12345678901234567890\n") + + // New line read with 100 bytes, enough for the above + reader := NewLineReader(data, 100) + + checkLine(t, reader, []byte("\n")) + checkLine(t, reader, []byte("12345678901234567890\n")) + checkEnd(t, reader) +} + +func TestLineReadFull(t *testing.T) { + data := bytes.NewBufferString("12345678901234567890\n123456789012345678901234567890\n12345678901234567890\n") + + // New line read with 21 bytes buffer to experience full buffer error + reader := NewLineReader(data, 21) + + checkLine(t, reader, []byte("12345678901234567890\n")) + checkLineFull(t, reader, []byte("123456789012345678901")) + checkLine(t, reader, []byte("234567890\n")) + checkLine(t, reader, []byte("12345678901234567890\n")) + checkEnd(t, reader) +} From c24196188018478d9eaa3993807604c8a9405477 Mon Sep 17 00:00:00 2001 From: Driskell Date: Mon, 29 Sep 2014 19:58:23 +0100 Subject: [PATCH 04/33] Fix #50 - don't require a connection to show lc-admin help --- src/lc-admin/lc-admin.go | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/src/lc-admin/lc-admin.go b/src/lc-admin/lc-admin.go index 185ead41..cbcb494f 100644 --- a/src/lc-admin/lc-admin.go +++ b/src/lc-admin/lc-admin.go @@ -107,10 +107,7 @@ func (a *Admin) ProcessCommand(command string) bool { a.renderSnap(" ", snap) } case "help": - fmt.Printf("Available commands:\n") - fmt.Printf(" reload Reload configuration\n") - fmt.Printf(" status Display the current shipping status\n") - fmt.Printf(" exit Exit\n") + PrintHelp() default: fmt.Printf("Unknown command: %s\n", command) } @@ -231,6 +228,13 @@ WatchLoop: return true } +func PrintHelp() { + fmt.Printf("Available commands:\n") + fmt.Printf(" reload Reload configuration\n") + fmt.Printf(" status Display the current shipping status\n") + fmt.Printf(" exit Exit\n") +} + func main() { var version bool var quiet bool @@ -253,10 +257,16 @@ func main() { fmt.Printf("Log Courier version %s client\n\n", core.Log_Courier_Version) } - admin := NewAdmin(quiet, admin_connect) - args := flag.Args() + if len(args) != 0 { + // Don't require a connection to display the help message + if args[0] == "help" { + PrintHelp() + os.Exit(0) + } + + admin := NewAdmin(quiet, admin_connect) if admin.argsCommand(args, watch) { os.Exit(0) } @@ -273,6 +283,7 @@ func main() { os.Exit(1) } + admin := NewAdmin(quiet, admin_connect) if err := admin.connect(); err != nil { return } From e4335356e06ff9b715f709dfc9c19d1bc81a0e70 Mon Sep 17 00:00:00 2001 From: Driskell Date: Mon, 29 Sep 2014 20:01:36 +0100 Subject: [PATCH 05/33] Fix #49 and enter TODO regarding implementing automatic ping --- src/lc-admin/lc-admin.go | 7 ------- src/lc-lib/admin/client.go | 3 +++ 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/src/lc-admin/lc-admin.go b/src/lc-admin/lc-admin.go index cbcb494f..edd7d613 100644 --- a/src/lc-admin/lc-admin.go +++ b/src/lc-admin/lc-admin.go @@ -80,13 +80,6 @@ func (a *Admin) ProcessCommand(command string) bool { var err error switch command { - case "ping": - err = a.client.Ping() - if err != nil { - break - } - - fmt.Printf("Pong\n") case "reload": err = a.client.Reload() if err != nil { diff --git a/src/lc-lib/admin/client.go b/src/lc-lib/admin/client.go index 428dd2e0..3822fe1a 100644 --- a/src/lc-lib/admin/client.go +++ b/src/lc-lib/admin/client.go @@ -36,6 +36,9 @@ func NewClient(admin_connect string) (*Client, error) { ret := &Client{} + // TODO: handle the connection in a goroutine that can PING + // on idle, and implement a close member to shut it + // it down. For now we'll rely on the auto-reconnect if ret.conn, err = ret.connect(admin_connect); err != nil { return nil, err } From 05e216508883ef3161bd667230907131db2a0e0b Mon Sep 17 00:00:00 2001 From: Driskell Date: Mon, 29 Sep 2014 20:17:34 +0100 Subject: [PATCH 06/33] Fix tests --- Makefile | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 6d272e34..934e45b3 100644 --- a/Makefile +++ b/Makefile @@ -6,6 +6,7 @@ export GOPATH := $(GOPATH) TAGS := BINS := bin/log-courier bin/lc-tlscert bin/lc-admin +GOTESTS := log-courier lc-tlscert lc-admin lc-lib/... TESTS := spec/courier_spec.rb spec/tcp_spec.rb spec/gem_spec.rb spec/multiline_spec.rb ifeq ($(with),zmq3) @@ -15,6 +16,7 @@ endif ifeq ($(with),zmq4) TAGS := $(TAGS) zmq zmq_4_x BINS := $(BINS) bin/lc-curvekey +GOTESTS := $(GOTESTS) lc-curvekey TESTS := $(TESTS) spec/plainzmq_spec.rb spec/zmq_spec.rb endif @@ -34,7 +36,8 @@ gem: gem build log-courier.gemspec test: all vendor/bundle/.GemfileModT - go test -tags "$(TAGS)" lc-admin lc-curvekey lc-lib/... lc-tlscert log-courier + go get -d -tags "$(TAGS)" $(GOTESTS) + go test -tags "$(TAGS)" $(GOTESTS) bundle exec rspec $(TESTS) doc: From eafeacbc4e5ea1c63c693d002f51e6ff3be37532 Mon Sep 17 00:00:00 2001 From: Driskell Date: Sun, 12 Oct 2014 13:52:23 +0100 Subject: [PATCH 07/33] Fix go version check --- Makefile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 73f8131a..09db038b 100644 --- a/Makefile +++ b/Makefile @@ -85,8 +85,8 @@ ifneq ($(implyclean),yes) endif go-check: - @go version >/dev/null || (echo "Go not found. You need to install Go: http://golang.org/doc/install"; false) - @go version | grep -q 'go version go1.[123]' || (echo "Go version 1.2 or 1.3 required, you have a version of Go that is not supported."; false) + @go version >/dev/null || (echo "Go not found. You need to install Go version 1.2 or 1.3: http://golang.org/doc/install"; false) + @go version | grep -q 'go version go1.[23]' || (echo "Go version 1.2 or 1.3 required, you have a version of Go that is not supported."; false) @echo "GOPATH: $${GOPATH}" bin/%: FORCE | go-check From 56a7e919193de05b7e6dfe10e753c1959d88f842 Mon Sep 17 00:00:00 2001 From: Driskell Date: Sun, 12 Oct 2014 13:53:42 +0100 Subject: [PATCH 08/33] Documentation update --- README.md | 66 ++++++++++++++++-------------------- docs/ChangeLog.md | 13 ++++++- docs/CommandLineArguments.md | 65 +++++++++++++++++++++++++++++++++++ 3 files changed, 107 insertions(+), 37 deletions(-) create mode 100644 docs/CommandLineArguments.md diff --git a/README.md b/README.md index 6ec6326e..e6b4d521 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,10 @@ # Log Courier [![Build Status](https://travis-ci.org/driskell/log-courier.svg?branch=develop)](https://travis-ci.org/driskell/log-courier) -Log Courier is a tool created to transmit log files speedily and securely to +Log Courier is a tool created to ship log files speedily and securely to remote [Logstash](http://logstash.net) instances for processing whilst using small amounts of local resources. The project is an enhanced fork of [Logstash Forwarder](https://github.com/elasticsearch/logstash-forwarder) 0.3.1 -with many enhancements and behavioural improvements. +with many fixes and behavioural improvements. @@ -16,7 +16,6 @@ with many enhancements and behavioural improvements. - [Building](#building) - [Logstash Integration](#logstash-integration) - [Generating Certificates and Keys](#generating-certificates-and-keys) -- [Command Line Options](#command-line-options) - [Documentation](#documentation) @@ -25,19 +24,23 @@ with many enhancements and behavioural improvements. Log Courier implements the following features: -* Tail log files, following rotations and resuming at the last offset on -restart -* Read from standard input for lightweight shipping of a program's output -* Extra event fields, arrays and hashes on a per file basis -* Fast and secure transmission of logs using TLS with both server and client -certificate verification -* Secure transmission of logs via CurveZMQ to multiple receivers simultaneously -(optional, requires ZeroMQ 4+) -* Plaintext transmission over plain ZMQ and TCP when security is not required -* Multiline codec to combine multiple lines into single events prior to shipping -* Load multiple configuration files from a directory for ease of use with -configuration management -* Reload the configuration without restarting +* Tail active log files +* Follow rotations +* Suspend tailing on no more updates +* Tail STDIN stream +* Set extra fields to values (host=name), arrays (tags=[one,two]) or hashes +(origin={host=name,IP=address}) +* Secure TLS shipping transport with server certificate verification +* TLS client certificate verification +* Secure CurveZMQ shipping transport to load balance across multiple Logstash +instances (optional, requires ZeroMQ 4+) +* Plaintext TCP shipping transport for configuration simplicity in local networks +* Plaintext ZMQ shipping transport +* Reload configuration without restarting +* [Administration utility](docs/AdministrationUtility.md) to monitor the +shipping speed and status +* [Multiline](docs/codecs/Multiline.md) codec +* [Filter](docs/codecs/Filter.md) codec Log Courier integrates with Logstash using an event receiver ruby gem. An event sender ruby gem is also available to allow fast and secure transmission between @@ -47,10 +50,10 @@ two Logstash instances. ### Build Requirements -1. The [go](http://golang.org/doc/install) compiler tools (1.2 or 1.3) +1. The [golang](http://golang.org/doc/install) compiler tools (1.2 or 1.3) 1. [Logstash](http://logstash.net) 1.4.x -1. (Optional) [ZeroMQ](http://zeromq.org/intro:get-the-software) (3.2 or 4.0 for -CurveZMQ) +1. (Optional) [ZeroMQ](http://zeromq.org/intro:get-the-software) (>=3.2 for +plaintext ZMQ, >=4.0 for secure CurveZMQ) ### Building @@ -85,28 +88,19 @@ found on the [Logstash Integration](docs/LogstashIntegration.md) page. ### Generating Certificates and Keys -After Log Courier is built you will find a utility named lc-tlscert inside the -'bin' folder alongside the main log-courier program. This will generate a -self-signed certificate to get you started quickly with the TLS transport, and -the necessary Log Courier and Logstash configuration snippets to make it work. +Running `make selfsigned` will automatically build and run the `lc-tlscert` +utility that can quickly and easily generate a self-signed certificate for the +TLS shipping transport. -Likewise, a utility called lc-curvekey is produced when ZMQ support is enabled. -This utility will generate CurveZMQ key pairs as well as the necessary -configuration snippets. +Likewise, running `make curvekey` will automatically build and run the +`lc-curvekey` utility that can quickly and easily generate CurveZMQ key pairs +for the CurveZMQ shipping transport. -## Command Line Options - -The `log-courier` command accepts the following command line options. - - -config="": The config file to load - -config-test=false: Test the configuration specified by -config and exit - -cpuprofile="": write cpu profile to file - -from-beginning=false: On first run, read new files from the beginning instead of the end - -list-supported=false: List supported transports and codecs - -version=false: show version information +Both tools also generate the required configuration file snippets. ## Documentation * [Administration Utility](docs/AdministrationUtility.md) +* [Command Line Arguments](docs/CommandLineArguments.md) * [Configuration](docs/Configuration.md) * [Change Log](docs/ChangeLog.md) diff --git a/docs/ChangeLog.md b/docs/ChangeLog.md index 36c906e9..ffa86d70 100644 --- a/docs/ChangeLog.md +++ b/docs/ChangeLog.md @@ -4,7 +4,7 @@ **Table of Contents** *generated with [DocToc](http://doctoc.herokuapp.com/)* -- [Latest Development](#latest-development) +- [0.15](#015) - [0.14](#014) - [0.13](#013) - [0.12](#012) @@ -15,6 +15,17 @@ +## Latest Development + +*TBC* + +* Remove `ping` command from `lc-admin` (#49) +* Empty lines in a log file are incorrectly merged with the following line (#51) +* Don't require a connection to Log Courier when running `lc-admin help` (#50) +* Bring back `make selfsigned` to quickly generate self-signed TLS certificates +(#25) +* Implement `make curvekey` to quickly generate curve key pairs (#25) + ## 0.15 *23rd September 2014* diff --git a/docs/CommandLineArguments.md b/docs/CommandLineArguments.md new file mode 100644 index 00000000..3283e7da --- /dev/null +++ b/docs/CommandLineArguments.md @@ -0,0 +1,65 @@ +# Command Line Arguments + + + +**Table of Contents** *generated with [DocToc](http://doctoc.herokuapp.com/)* + +- [Overview](#overview) +- [`-config=`](#-config=path) +- [`-config-test`](#-config-test) +- [`-cpuprofile=`](#-cpuprofile=path) +- [`-from-beginning`](#-from-beginning) +- [`-list-supported`](#-list-supported) +- [`-version`](#-version) + + + +## Overview + +The `log-courier` command accepts various command line arguments. + +## `-config=` + +The path to the JSON configuration file to load. + +``` +log-courier -config=/etc/log-courier/log-courier.json +``` + +## `-config-test` + +Load the configuration and test it for validity, then exit. + +Will exit with code 0 if the configuration is valid and would not prevent +log-courier from starting up. Will exit with code 1 if an error occurred, +printing the error to standard output. + +## `-cpuprofile=` + +The path to file to write CPU profiling information to, when investigating +performance problems. Log Courier will run for a small period of time and then +quit, writing the profiling information to this file. + +This flag should generally only be used when requested by a developer. + +## `-from-beginning` + +The `.log-courier` file stores the current shipping status as logs are shipped +so that in the event of a service restart, not a single log entry is missed. + +In the event that the `.log-courier` file does not exist, Log Courier will by +default start the initial shipping of log files from the end of the file. +Setting this flag in the initial startup of Log Courier will trigger files to +start shipping from the beginning of the file instead of the end. + +After the first `.log-courier` status file is written, all subsequent newly +discovered log files will start from the begining, regardless of this flag. + +## `-list-supported` + +Print a list of available transports and codecs provided by this build of Log +Courier, then exit. + +## `-version` + +Print the version of this build of Log Courier, then exit. From 5dd79658dd7188fcaf5f8711905a90a7197b4df6 Mon Sep 17 00:00:00 2001 From: Driskell Date: Sun, 12 Oct 2014 14:14:26 +0100 Subject: [PATCH 09/33] Further readme updates --- README.md | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index e6b4d521..aa87ca3c 100644 --- a/README.md +++ b/README.md @@ -24,19 +24,19 @@ with many fixes and behavioural improvements. Log Courier implements the following features: -* Tail active log files +* Follow active log files * Follow rotations -* Suspend tailing on no more updates -* Tail STDIN stream -* Set extra fields to values (host=name), arrays (tags=[one,two]) or hashes -(origin={host=name,IP=address}) +* Follow standard input stream +* Suspend tailing after periods of inactivity +* Set [extra fields](docs/Configuration.md#fields), supporting hashes and arrays +(`tags: ['one','two']`) +* [Reload configuration](docs/Configuration.md#reloading) without restarting * Secure TLS shipping transport with server certificate verification * TLS client certificate verification * Secure CurveZMQ shipping transport to load balance across multiple Logstash instances (optional, requires ZeroMQ 4+) * Plaintext TCP shipping transport for configuration simplicity in local networks * Plaintext ZMQ shipping transport -* Reload configuration without restarting * [Administration utility](docs/AdministrationUtility.md) to monitor the shipping speed and status * [Multiline](docs/codecs/Multiline.md) codec @@ -50,11 +50,23 @@ two Logstash instances. ### Build Requirements +1. \*nix, OS X or Windows 1. The [golang](http://golang.org/doc/install) compiler tools (1.2 or 1.3) -1. [Logstash](http://logstash.net) 1.4.x +1. [git](http://git-scm.com) +1. GNU make 1. (Optional) [ZeroMQ](http://zeromq.org/intro:get-the-software) (>=3.2 for plaintext ZMQ, >=4.0 for secure CurveZMQ) +*\*nix: Most requirements can usually be installed by your favourite package +manager.* + +*OS X: Git and GNU make are provided automatically by XCode. ZeroMQ can be +installed via [Homebrew](http://brew.sh).* + +*Windows: GNU make for Windows can be found +[here](http://gnuwin32.sourceforge.net/packages/make.htm). ZeroMQ may need to be +built and installed manually.* + ### Building To build without the optional ZMQ support, simply run `make` as From cca5c9b8cf12029503304eed635e5ed6d47a0be0 Mon Sep 17 00:00:00 2001 From: Driskell Date: Sun, 12 Oct 2014 14:23:12 +0100 Subject: [PATCH 10/33] Update doctoc --- docs/ChangeLog.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/ChangeLog.md b/docs/ChangeLog.md index ffa86d70..aa3e129b 100644 --- a/docs/ChangeLog.md +++ b/docs/ChangeLog.md @@ -4,6 +4,7 @@ **Table of Contents** *generated with [DocToc](http://doctoc.herokuapp.com/)* +- [Latest Development](#latest-development) - [0.15](#015) - [0.14](#014) - [0.13](#013) From ed96bfcb5f02a106179886a7968777bf56f8e5a8 Mon Sep 17 00:00:00 2001 From: Driskell Date: Sun, 12 Oct 2014 15:09:09 +0100 Subject: [PATCH 11/33] Log the transport in use to logstash logs --- lib/log-courier/server_tcp.rb | 2 +- lib/log-courier/server_zmq.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/log-courier/server_tcp.rb b/lib/log-courier/server_tcp.rb index 83c50ef1..f209cc66 100644 --- a/lib/log-courier/server_tcp.rb +++ b/lib/log-courier/server_tcp.rb @@ -100,7 +100,7 @@ def initialize(options = {}) end if @options[:port] == 0 - @logger.warn '[LogCourierServer] Transport is listening on ephemeral port ' + @port.to_s + @logger.warn '[LogCourierServer] Transport ' + @options[:transport] + ' is listening on ephemeral port ' + @port.to_s end rescue => e raise "[LogCourierServer] Failed to initialise: #{e}" diff --git a/lib/log-courier/server_zmq.rb b/lib/log-courier/server_zmq.rb index 328cf505..04192cd7 100644 --- a/lib/log-courier/server_zmq.rb +++ b/lib/log-courier/server_zmq.rb @@ -67,7 +67,7 @@ def initialize(options = {}) @poller = ZMQ::Poller.new if @options[:port] == 0 - @logger.warn '[LogCourierServer] Transport is listening on ephemeral port ' + @port.to_s + @logger.warn '[LogCourierServer] Transport ' + @options[:transport] + ' is listening on ephemeral port ' + @port.to_s end rescue => e raise "[LogCourierServer] Failed to initialise: #{e}" From 64ff3ae9efc3591f6217d3da52614566d4c9e045 Mon Sep 17 00:00:00 2001 From: Driskell Date: Sun, 12 Oct 2014 15:09:18 +0100 Subject: [PATCH 12/33] Improve test resiliency --- spec/lib/helpers/common.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/lib/helpers/common.rb b/spec/lib/helpers/common.rb index f23e2f0e..f39f62c9 100644 --- a/spec/lib/helpers/common.rb +++ b/spec/lib/helpers/common.rb @@ -173,7 +173,7 @@ def receive_and_check(args = {}, &block) check = args[:check] waited = 0 - while total > 0 && waited < EVENT_WAIT_COUNT + while total > 0 && waited <= EVENT_WAIT_COUNT if @event_queue.length == 0 sleep(EVENT_WAIT_TIME) waited += 1 From 67f02626575f9842885d9a0059eee00df389c7a0 Mon Sep 17 00:00:00 2001 From: Driskell Date: Sun, 12 Oct 2014 15:09:29 +0100 Subject: [PATCH 13/33] Add shutdown log for tests --- spec/lib/helpers/log-courier.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/spec/lib/helpers/log-courier.rb b/spec/lib/helpers/log-courier.rb index c9bc60ba..d7087844 100644 --- a/spec/lib/helpers/log-courier.rb +++ b/spec/lib/helpers/log-courier.rb @@ -114,6 +114,7 @@ def _write_config(config) end def shutdown + puts 'Shutting down Log Courier' return if @log_courier.nil? terminated = false # Send SIGTERM From 5c1d7e9b44a7c9bde5258c72f33f5cbe8980291a Mon Sep 17 00:00:00 2001 From: Driskell Date: Sun, 12 Oct 2014 15:10:37 +0100 Subject: [PATCH 14/33] Fix hanging ZMQ transport on timeout and restart --- docs/ChangeLog.md | 1 + src/lc-lib/transports/zmq.go | 8 ++++++++ 2 files changed, 9 insertions(+) diff --git a/docs/ChangeLog.md b/docs/ChangeLog.md index aa3e129b..c9b2c0ba 100644 --- a/docs/ChangeLog.md +++ b/docs/ChangeLog.md @@ -26,6 +26,7 @@ * Bring back `make selfsigned` to quickly generate self-signed TLS certificates (#25) * Implement `make curvekey` to quickly generate curve key pairs (#25) +* Fix hanging ZMQ transport on transport error ## 0.15 diff --git a/src/lc-lib/transports/zmq.go b/src/lc-lib/transports/zmq.go index 92c5be30..27c53756 100644 --- a/src/lc-lib/transports/zmq.go +++ b/src/lc-lib/transports/zmq.go @@ -165,6 +165,8 @@ func (t *TransportZmq) ReloadConfig(new_net_config *core.NetworkConfig) int { func (t *TransportZmq) Init() (err error) { // Initialise once for ZMQ if t.ready { + // If already initialised, ask if we can send again + t.bridge_chan <- []byte(zmq_signal_output) return nil } @@ -360,10 +362,16 @@ func (t *TransportZmq) poller(bridge_out *zmq.Socket) { runtime.LockOSThread() t.poll_items = make([]zmq.PollItem, 3) + + // Listen always on bridge t.poll_items[0].Socket = bridge_out t.poll_items[0].Events = zmq.POLLIN | zmq.POLLOUT + + // Always check for input on dealer - but also initially check for OUT so we can flag send is ready t.poll_items[1].Socket = t.dealer t.poll_items[1].Events = zmq.POLLIN | zmq.POLLOUT + + // Always listen for input on monitor t.poll_items[2].Socket = t.monitor t.poll_items[2].Events = zmq.POLLIN From 17ea7f7364acf1549452ca464cdbbf05971e4629 Mon Sep 17 00:00:00 2001 From: Driskell Date: Mon, 13 Oct 2014 11:57:30 +0100 Subject: [PATCH 15/33] Fix random timeouts when logstash busy due to non-threadsafe timeout timer --- docs/ChangeLog.md | 2 ++ lib/log-courier/server.rb | 11 +++-------- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/docs/ChangeLog.md b/docs/ChangeLog.md index c9b2c0ba..e10cea2d 100644 --- a/docs/ChangeLog.md +++ b/docs/ChangeLog.md @@ -27,6 +27,8 @@ (#25) * Implement `make curvekey` to quickly generate curve key pairs (#25) * Fix hanging ZMQ transport on transport error +* Fix timeout on log-courier side when Logstash busy due to non-thread safe +timeout timer in the log-courier gem ## 0.15 diff --git a/lib/log-courier/server.rb b/lib/log-courier/server.rb index d007fcda..5be65bb8 100644 --- a/lib/log-courier/server.rb +++ b/lib/log-courier/server.rb @@ -106,7 +106,7 @@ def process_ping(message, comm) def process_jdat(message, comm, event_queue) # Now we have the data, aim to respond within 5 seconds - reset_ack_timeout + ack_timeout = Time.now.to_i + 5 # OK - first is a nonce - we send this back with sequence acks # This allows the client to know what is being acknowledged @@ -165,12 +165,12 @@ def process_jdat(message, comm, event_queue) # Queue the event begin - event_queue.push event, @ack_timeout - Time.now.to_i + event_queue.push event, [0, ack_timeout - Time.now.to_i].max rescue TimeoutError # Full pipeline, partial ack # NOTE: comm.send can raise a Timeout::Error of its own comm.send 'ACKN', [nonce, sequence].pack('A*N') - reset_ack_timeout + ack_timeout = Time.now.to_i + 5 retry end @@ -181,10 +181,5 @@ def process_jdat(message, comm, event_queue) # NOTE: comm.send can raise a Timeout::Error comm.send 'ACKN', [nonce, sequence].pack('A*N') end - - def reset_ack_timeout - # TODO: Make a constant or configurable - @ack_timeout = Time.now.to_i + 5 - end end end From 7de0ebbf95e82dd7d0414f1e3f49f37ca4902f23 Mon Sep 17 00:00:00 2001 From: Driskell Date: Tue, 14 Oct 2014 22:27:39 +0100 Subject: [PATCH 16/33] Remove redundant comment --- src/lc-lib/harvester/harvester.go | 1 - 1 file changed, 1 deletion(-) diff --git a/src/lc-lib/harvester/harvester.go b/src/lc-lib/harvester/harvester.go index 8b78737b..8299829a 100644 --- a/src/lc-lib/harvester/harvester.go +++ b/src/lc-lib/harvester/harvester.go @@ -288,7 +288,6 @@ func (h *Harvester) eventCallback(start_offset int64, end_offset int64, text str desc := &core.EventDescriptor{ Stream: h.stream, Offset: end_offset, - // NOTE: Make this include the fileconfig fields? Event: encoded, } From 09896d7d6768b71447153825418445a9b47840da Mon Sep 17 00:00:00 2001 From: Driskell Date: Tue, 14 Oct 2014 22:28:59 +0100 Subject: [PATCH 17/33] Mark TODO on truncation related logic --- src/lc-lib/harvester/harvester.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/lc-lib/harvester/harvester.go b/src/lc-lib/harvester/harvester.go index 8299829a..ebc5b2e6 100644 --- a/src/lc-lib/harvester/harvester.go +++ b/src/lc-lib/harvester/harvester.go @@ -237,6 +237,8 @@ ReadLoop: log.Warning("Unexpected file truncation, seeking to beginning: %s", h.path) h.file.Seek(0, os.SEEK_SET) h.offset = 0 + // TODO: How does this impact a partial line reader buffer? + // TODO: How does this imapct multiline? continue } From 259b8a3c1c3367dcd0d639b0053ff9a477a8d7d3 Mon Sep 17 00:00:00 2001 From: Driskell Date: Thu, 16 Oct 2014 22:31:55 +0100 Subject: [PATCH 18/33] Auto version numbering from git repository tag and status --- Makefile | 7 ++++--- build/fix_version | 20 ++++++++++++++++++++ log-courier.gemspec | 2 +- log-courier.gemspec.template | 23 +++++++++++++++++++++++ src/lc-lib/core/version.go | 2 +- src/lc-lib/core/version.go.template | 19 +++++++++++++++++++ 6 files changed, 68 insertions(+), 5 deletions(-) create mode 100755 build/fix_version create mode 100644 log-courier.gemspec.template create mode 100644 src/lc-lib/core/version.go.template diff --git a/Makefile b/Makefile index 09db038b..1c5629b8 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: go-check all log-courier gem test doc clean +.PHONY: prepare all log-courier gem test doc clean MAKEFILE := $(word $(words $(MAKEFILE_LIST)),$(MAKEFILE_LIST)) GOPATH := $(patsubst %/,%,$(dir $(abspath $(MAKEFILE)))) @@ -84,12 +84,13 @@ ifneq ($(implyclean),yes) rm -f log-courier-*.gem endif -go-check: +prepare: @go version >/dev/null || (echo "Go not found. You need to install Go version 1.2 or 1.3: http://golang.org/doc/install"; false) @go version | grep -q 'go version go1.[23]' || (echo "Go version 1.2 or 1.3 required, you have a version of Go that is not supported."; false) @echo "GOPATH: $${GOPATH}" + build/fix_version -bin/%: FORCE | go-check +bin/%: FORCE | prepare go get -d -tags "$(TAGS)" $* go install -tags "$(TAGS)" $* diff --git a/build/fix_version b/build/fix_version new file mode 100755 index 00000000..56811e73 --- /dev/null +++ b/build/fix_version @@ -0,0 +1,20 @@ +#!/bin/bash + +# If this is not a git repository, use the existing version +if [ ! -d '.git' ]; then + exit +fi + +if [ "$1" == 'current' ]; then + VERSION=$(git describe --abbrev=0) +else + VERSION="$(git describe)" + if [ $(git status -s 2>/dev/null | wc -l) -ne 0 ]; then + VERSION="${VERSION}-dirty" + fi +fi + +sed "s//${VERSION}/g" src/lc-lib/core/version.go.template > src/lc-lib/core/version.go +sed "s//${VERSION}/g" log-courier.gemspec.template > log-courier.gemspec + +echo "Setting Log Courier Version ${VERSION}" diff --git a/log-courier.gemspec b/log-courier.gemspec index dbeb3dd1..ec29192a 100644 --- a/log-courier.gemspec +++ b/log-courier.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |gem| gem.name = 'log-courier' - gem.version = '0.15' + gem.version = 'v0.13' gem.description = 'Log Courier library' gem.summary = 'Receive events from Log Courier and transmit between LogStash instances' gem.homepage = 'https://github.com/driskell/log-courier' diff --git a/log-courier.gemspec.template b/log-courier.gemspec.template new file mode 100644 index 00000000..543b4637 --- /dev/null +++ b/log-courier.gemspec.template @@ -0,0 +1,23 @@ +Gem::Specification.new do |gem| + gem.name = 'log-courier' + gem.version = '' + gem.description = 'Log Courier library' + gem.summary = 'Receive events from Log Courier and transmit between LogStash instances' + gem.homepage = 'https://github.com/driskell/log-courier' + gem.authors = ['Jason Woods'] + gem.email = ['devel@jasonwoods.me.uk'] + gem.licenses = ['Apache'] + gem.rubyforge_project = 'nowarning' + gem.require_paths = ['lib'] + gem.files = %w( + lib/log-courier/client.rb + lib/log-courier/client_tls.rb + lib/log-courier/event_queue.rb + lib/log-courier/server.rb + lib/log-courier/server_tcp.rb + lib/log-courier/server_zmq.rb + ) + + gem.add_runtime_dependency 'ffi-rzmq', '>= 2.0' + gem.add_runtime_dependency 'multi_json' +end diff --git a/src/lc-lib/core/version.go b/src/lc-lib/core/version.go index 0f61cf39..f638f5d6 100644 --- a/src/lc-lib/core/version.go +++ b/src/lc-lib/core/version.go @@ -16,4 +16,4 @@ package core -const Log_Courier_Version string = "0.15" +const Log_Courier_Version string = "v0.13" diff --git a/src/lc-lib/core/version.go.template b/src/lc-lib/core/version.go.template new file mode 100644 index 00000000..0b7ece04 --- /dev/null +++ b/src/lc-lib/core/version.go.template @@ -0,0 +1,19 @@ +/* +* Copyright 2014 Jason Woods. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package core + +const Log_Courier_Version string = "" From 4166e840d985ac9acb2397d0268ad9b0fb3a9237 Mon Sep 17 00:00:00 2001 From: Driskell Date: Thu, 23 Oct 2014 09:39:56 +0100 Subject: [PATCH 19/33] Fix missing .PHONY in Makefile --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 09db038b..0b7e8c0c 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: go-check all log-courier gem test doc clean +.PHONY: go-check all log-courier gem test doc profile benchmark jrprofile jrbenchmark clean MAKEFILE := $(word $(words $(MAKEFILE_LIST)),$(MAKEFILE_LIST)) GOPATH := $(patsubst %/,%,$(dir $(abspath $(MAKEFILE)))) From b8b5f0552650481d8a13037b9199ee9fb8e23160 Mon Sep 17 00:00:00 2001 From: Driskell Date: Thu, 23 Oct 2014 09:40:13 +0100 Subject: [PATCH 20/33] Strip unnecessary comments from gem --- lib/logstash/inputs/courier.rb | 4 ---- 1 file changed, 4 deletions(-) diff --git a/lib/logstash/inputs/courier.rb b/lib/logstash/inputs/courier.rb index 70da59e2..0a5a505f 100644 --- a/lib/logstash/inputs/courier.rb +++ b/lib/logstash/inputs/courier.rb @@ -17,10 +17,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -# TODO: Were these needed? Output doesn't seem to need them -#require "logstash/inputs/base" -#require "logstash/namespace" - module LogStash module Inputs # Receive events over the Log Courier protocol From 95d8447116cd07984ee335fae0dad6c76570766c Mon Sep 17 00:00:00 2001 From: Driskell Date: Thu, 23 Oct 2014 09:40:54 +0100 Subject: [PATCH 21/33] Add constraints to some settings --- src/lc-lib/core/config.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/lc-lib/core/config.go b/src/lc-lib/core/config.go index 9b97ad8a..0ad04c13 100644 --- a/src/lc-lib/core/config.go +++ b/src/lc-lib/core/config.go @@ -272,19 +272,27 @@ func (c *Config) Load(path string) (err error) { c.General.SpoolSize = default_GeneralConfig_SpoolSize } - // TODO: If max line bytes plus fields size is too big, this could be exceeded + // Enforce maximum of 2 GB since event transmit length is uint32 if c.General.SpoolMaxBytes == 0 { c.General.SpoolMaxBytes = default_GeneralConfig_SpoolMaxBytes } + if c.General.SpoolMaxBytes >= 2*1024*1024*1024 { + err = fmt.Errorf("/general/spool max bytes can not be greater than 2 GiB") + return + } if c.General.SpoolTimeout == time.Duration(0) { c.General.SpoolTimeout = default_GeneralConfig_SpoolTimeout } - // TODO: Event transmit length is uint32 - if this is bigger a rediculously large line will fail + // Max line bytes can not be larger than spool max bytes if c.General.MaxLineBytes == 0 { c.General.MaxLineBytes = default_GeneralConfig_MaxLineBytes } + if c.General.MaxLineBytes > c.General.SpoolMaxBytes { + err = fmt.Errorf("/general/max line bytes can not be greater than /general/spool max bytes") + return + } if c.Network.Transport == "" { c.Network.Transport = default_NetworkConfig_Transport From 4b2c71cfd632faeb1db65d6e47323219fdbbaed5 Mon Sep 17 00:00:00 2001 From: Driskell Date: Thu, 23 Oct 2014 09:41:20 +0100 Subject: [PATCH 22/33] Implement multiline max bytes - TODO: tests --- src/lc-lib/codecs/multiline.go | 43 ++++++++++++++++++++++++++++++---- 1 file changed, 39 insertions(+), 4 deletions(-) diff --git a/src/lc-lib/codecs/multiline.go b/src/lc-lib/codecs/multiline.go index aa7f0adf..94ccac3d 100644 --- a/src/lc-lib/codecs/multiline.go +++ b/src/lc-lib/codecs/multiline.go @@ -32,10 +32,11 @@ const ( ) type CodecMultilineFactory struct { - Pattern string `config:"pattern"` - What string `config:"what"` - Negate bool `config:"negate"` - PreviousTimeout time.Duration `config:"previous timeout"` + Pattern string `config:"pattern"` + What string `config:"what"` + Negate bool `config:"negate"` + PreviousTimeout time.Duration `config:"previous timeout"` + MaxMultilineBytes int64 `config:"max multiline bytes"` matcher *regexp.Regexp what int @@ -50,6 +51,7 @@ type CodecMultiline struct { start_offset int64 buffer []string buffer_lines uint64 + buffer_len int timer_lock sync.Mutex timer_stop chan interface{} timer_wait sync.WaitGroup @@ -82,6 +84,14 @@ func NewMultilineCodecFactory(config *core.Config, config_path string, unused ma result.what = codecMultiline_What_Next } + if result.MaxMultilineBytes == 0 { + result.MaxMultilineBytes = config.General.MaxLineBytes + } + + if result.MaxMultilineBytes > config.General.SpoolMaxBytes { + return nil, fmt.Errorf("max multiline bytes cannot be greater than /general/spool max bytes") + } + return result, nil } @@ -133,12 +143,36 @@ func (c *CodecMultiline) Event(start_offset int64, end_offset int64, text string c.flush() } } + + text_len := len(text) + + // Check we don't exceed the max multiline bytes + if check_len := c.buffer_len + text_len + c.buffer_lines; check_len > c.config.MaxMultilineBytes { + // Store partial and flush + overflow := check_len - c.config.MaxMultilineBytes + cut := text_len - overflow + c.end_offset = end_offset - overflow + + c.buffer = append(c.buffer, text[:cut]) + c.buffer_lines++ + c.buffer_len += cut + + c.flush() + + // Append the remaining data to the buffer + start_offset += cut + text = text[cut:] + } + if len(c.buffer) == 0 { c.start_offset = start_offset } c.end_offset = end_offset + c.buffer = append(c.buffer, text) c.buffer_lines++ + c.buffer_len += text_len + if c.config.what == codecMultiline_What_Previous { if c.config.PreviousTimeout != 0 { // Reset the timer and unlock @@ -148,6 +182,7 @@ func (c *CodecMultiline) Event(start_offset int64, end_offset int64, text string } else if c.config.what == codecMultiline_What_Next && match_failed { c.flush() } + // TODO: Split the line if its too big } func (c *CodecMultiline) flush() { From 9e85230ca42e0e887e0acd346683d68656ebf5d3 Mon Sep 17 00:00:00 2001 From: Driskell Date: Thu, 23 Oct 2014 09:43:15 +0100 Subject: [PATCH 23/33] Strip the prefix v from version numbering --- build/fix_version | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/build/fix_version b/build/fix_version index 56811e73..87732e1c 100755 --- a/build/fix_version +++ b/build/fix_version @@ -14,7 +14,7 @@ else fi fi -sed "s//${VERSION}/g" src/lc-lib/core/version.go.template > src/lc-lib/core/version.go -sed "s//${VERSION}/g" log-courier.gemspec.template > log-courier.gemspec +sed "s//${VERSION#v}/g" src/lc-lib/core/version.go.template > src/lc-lib/core/version.go +sed "s//${VERSION#v}/g" log-courier.gemspec.template > log-courier.gemspec echo "Setting Log Courier Version ${VERSION}" From c309925191b7b73d2817f29135333f6d5888df06 Mon Sep 17 00:00:00 2001 From: Driskell Date: Thu, 23 Oct 2014 19:48:20 +0100 Subject: [PATCH 24/33] Fix version takes a parameter to fix the version to a specific one --- build/fix_version | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/build/fix_version b/build/fix_version index 87732e1c..1cb16f4a 100755 --- a/build/fix_version +++ b/build/fix_version @@ -5,8 +5,8 @@ if [ ! -d '.git' ]; then exit fi -if [ "$1" == 'current' ]; then - VERSION=$(git describe --abbrev=0) +if [ -n "$1" ]; then + VERSION="$1" else VERSION="$(git describe)" if [ $(git status -s 2>/dev/null | wc -l) -ne 0 ]; then From a3a819366a76efe6255b6e0f0e20be772ff8bf4d Mon Sep 17 00:00:00 2001 From: Driskell Date: Thu, 23 Oct 2014 19:50:10 +0100 Subject: [PATCH 25/33] Set version 1.0 --- log-courier.gemspec | 2 +- src/lc-lib/core/version.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/log-courier.gemspec b/log-courier.gemspec index ec29192a..8beb7d26 100644 --- a/log-courier.gemspec +++ b/log-courier.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |gem| gem.name = 'log-courier' - gem.version = 'v0.13' + gem.version = '1.0' gem.description = 'Log Courier library' gem.summary = 'Receive events from Log Courier and transmit between LogStash instances' gem.homepage = 'https://github.com/driskell/log-courier' diff --git a/src/lc-lib/core/version.go b/src/lc-lib/core/version.go index f638f5d6..9663477b 100644 --- a/src/lc-lib/core/version.go +++ b/src/lc-lib/core/version.go @@ -16,4 +16,4 @@ package core -const Log_Courier_Version string = "v0.13" +const Log_Courier_Version string = "1.0" From b8932006a9cd79819abc18f72b9cdd36280b390b Mon Sep 17 00:00:00 2001 From: Driskell Date: Thu, 23 Oct 2014 19:54:34 +0100 Subject: [PATCH 26/33] Add v prefix to version in Go binaries. --- build/fix_version | 2 +- src/lc-lib/core/version.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/build/fix_version b/build/fix_version index 1cb16f4a..36ae1f76 100755 --- a/build/fix_version +++ b/build/fix_version @@ -14,7 +14,7 @@ else fi fi -sed "s//${VERSION#v}/g" src/lc-lib/core/version.go.template > src/lc-lib/core/version.go +sed "s//${VERSION}/g" src/lc-lib/core/version.go.template > src/lc-lib/core/version.go sed "s//${VERSION#v}/g" log-courier.gemspec.template > log-courier.gemspec echo "Setting Log Courier Version ${VERSION}" diff --git a/src/lc-lib/core/version.go b/src/lc-lib/core/version.go index 9663477b..00d92f59 100644 --- a/src/lc-lib/core/version.go +++ b/src/lc-lib/core/version.go @@ -16,4 +16,4 @@ package core -const Log_Courier_Version string = "1.0" +const Log_Courier_Version string = "v1.0" From bbe22b7c4ca930672bb33d3d0e7c0ae3aa2188c3 Mon Sep 17 00:00:00 2001 From: Driskell Date: Thu, 23 Oct 2014 20:30:41 +0100 Subject: [PATCH 27/33] Don't worry about payload header size --- src/lc-lib/spooler/spooler.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/lc-lib/spooler/spooler.go b/src/lc-lib/spooler/spooler.go index e9f50ee3..20a1d816 100644 --- a/src/lc-lib/spooler/spooler.go +++ b/src/lc-lib/spooler/spooler.go @@ -28,8 +28,6 @@ import ( const ( // Event header is just uint32 at the moment event_header_size = 4 - // Payload header is the nonce plus the ZLIB overheads (http://www.zlib.net/zlib_tech.html) - payload_header_size = 16 + 11 ) type Spooler struct { @@ -74,7 +72,7 @@ SpoolerLoop: for { select { case event := <-s.input: - if len(s.spool) > 0 && int64(s.spool_size) + int64(len(event.Event)) + event_header_size >= s.config.SpoolMaxBytes - payload_header_size { + if len(s.spool) > 0 && int64(s.spool_size) + int64(len(event.Event)) + event_header_size >= s.config.SpoolMaxBytes { log.Debug("Spooler flushing %d events due to spool max bytes (%d/%d - next is %d)", len(s.spool), s.spool_size, s.config.SpoolMaxBytes, len(event.Event) + 4) // Can't fit this event in the spool - flush and then queue From d50cef92a9f3afda7521f0365cbef046956ec8c3 Mon Sep 17 00:00:00 2001 From: Driskell Date: Thu, 23 Oct 2014 20:30:54 +0100 Subject: [PATCH 28/33] Fix compilation and panic --- src/lc-lib/codecs/multiline.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/lc-lib/codecs/multiline.go b/src/lc-lib/codecs/multiline.go index 94ccac3d..94199395 100644 --- a/src/lc-lib/codecs/multiline.go +++ b/src/lc-lib/codecs/multiline.go @@ -50,14 +50,14 @@ type CodecMultiline struct { end_offset int64 start_offset int64 buffer []string - buffer_lines uint64 - buffer_len int + buffer_lines int64 + buffer_len int64 timer_lock sync.Mutex timer_stop chan interface{} timer_wait sync.WaitGroup timer_deadline time.Time - meter_lines uint64 + meter_lines int64 meter_bytes int64 } @@ -88,6 +88,8 @@ func NewMultilineCodecFactory(config *core.Config, config_path string, unused ma result.MaxMultilineBytes = config.General.MaxLineBytes } + // We conciously allow a line 4 bytes longer what we would normally have as the limit + // This 4 bytes is the event header size. It's not worth considering though if result.MaxMultilineBytes > config.General.SpoolMaxBytes { return nil, fmt.Errorf("max multiline bytes cannot be greater than /general/spool max bytes") } @@ -144,7 +146,7 @@ func (c *CodecMultiline) Event(start_offset int64, end_offset int64, text string } } - text_len := len(text) + var text_len int64 = int64(len(text)) // Check we don't exceed the max multiline bytes if check_len := c.buffer_len + text_len + c.buffer_lines; check_len > c.config.MaxMultilineBytes { @@ -195,6 +197,7 @@ func (c *CodecMultiline) flush() { // Set last offset - this is returned in Teardown so if we're mid multiline and crash, we start this multiline again c.last_offset = c.end_offset c.buffer = nil + c.buffer_len = 0 c.buffer_lines = 0 c.callback_func(c.start_offset, c.end_offset, text) From 0e7935d7cfae0e0e5c4fec65fa44f81922fb961a Mon Sep 17 00:00:00 2001 From: Driskell Date: Thu, 23 Oct 2014 20:31:04 +0100 Subject: [PATCH 29/33] Implement some preliminary tests --- src/lc-lib/codecs/multiline_test.go | 97 +++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 src/lc-lib/codecs/multiline_test.go diff --git a/src/lc-lib/codecs/multiline_test.go b/src/lc-lib/codecs/multiline_test.go new file mode 100644 index 00000000..00119dd5 --- /dev/null +++ b/src/lc-lib/codecs/multiline_test.go @@ -0,0 +1,97 @@ +package codecs + +import ( + "lc-lib/core" + "testing" +) + +var gt *testing.T +var lines int + +func createCodec(unused map[string]interface{}, callback core.CodecCallbackFunc, t *testing.T) core.Codec { + config := core.NewConfig() + config.General.MaxLineBytes = 1048576 + config.General.SpoolMaxBytes = 10485760 + + factory, err := NewMultilineCodecFactory(config, "", unused, "multiline") + if err != nil { + t.Logf("Failed to create multiline codec: %s", err) + t.FailNow() + } + + return factory.NewCodec(callback, 0) +} + +func checkMultiline(start_offset int64, end_offset int64, text string) { + lines++ + + if text != "DEBUG First line\nsecond line\nthird line" { + gt.Logf("Event data incorrect [% X]", text) + gt.FailNow() + } + + if end_offset != 5 { + gt.Logf("Event end offset is incorrect [%d]", end_offset) + gt.FailNow() + } +} + +func TestMultiline(t *testing.T) { + gt = t + lines = 0 + + codec := createCodec(map[string]interface{}{ + "pattern": "^DEBUG ", + "negate": true, + }, checkMultiline, t) + + // Send some data + codec.Event(0, 1, "DEBUG First line") + codec.Event(2, 3, "second line") + codec.Event(4, 5, "third line") + codec.Event(6, 7, "DEBUG Next line") + + if lines != 1 { + gt.Logf("Wrong line count received") + gt.FailNow() + } +} + +func checkMultilineMaxBytes(start_offset int64, end_offset int64, text string) { + lines++ + + if lines == 1 { + if text != "DEBUG First line\nsecond line\nthi" { + gt.Logf("Event data incorrect [% X]", text) + gt.FailNow() + } + return + } + + if text != "rd line" { + gt.Logf("Second event data incorrect [% X]", text) + gt.FailNow() + } +} + +func TestMultilineMaxBytes(t *testing.T) { + gt = t + lines = 0 + + codec := createCodec(map[string]interface{}{ + "max multiline bytes": int64(32), + "pattern": "^DEBUG ", + "negate": true, + }, checkMultilineMaxBytes, t) + + // Send some data + codec.Event(0, 1, "DEBUG First line") + codec.Event(2, 3, "second line") + codec.Event(4, 5, "third line") + codec.Event(6, 7, "DEBUG Next line") + + if lines != 2 { + gt.Logf("Wrong line count received") + gt.FailNow() + } +} From 7486cff03b601686ff060e3aa7051c82937968bb Mon Sep 17 00:00:00 2001 From: Driskell Date: Thu, 23 Oct 2014 20:59:34 +0100 Subject: [PATCH 30/33] Tweak some defaults and constraints --- src/lc-lib/codecs/multiline.go | 2 +- src/lc-lib/core/config.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lc-lib/codecs/multiline.go b/src/lc-lib/codecs/multiline.go index 94199395..88bc5595 100644 --- a/src/lc-lib/codecs/multiline.go +++ b/src/lc-lib/codecs/multiline.go @@ -85,7 +85,7 @@ func NewMultilineCodecFactory(config *core.Config, config_path string, unused ma } if result.MaxMultilineBytes == 0 { - result.MaxMultilineBytes = config.General.MaxLineBytes + result.MaxMultilineBytes = config.General.SpoolMaxBytes } // We conciously allow a line 4 bytes longer what we would normally have as the limit diff --git a/src/lc-lib/core/config.go b/src/lc-lib/core/config.go index 0ad04c13..f72b91de 100644 --- a/src/lc-lib/core/config.go +++ b/src/lc-lib/core/config.go @@ -276,7 +276,7 @@ func (c *Config) Load(path string) (err error) { if c.General.SpoolMaxBytes == 0 { c.General.SpoolMaxBytes = default_GeneralConfig_SpoolMaxBytes } - if c.General.SpoolMaxBytes >= 2*1024*1024*1024 { + if c.General.SpoolMaxBytes > 2*1024*1024*1024 { err = fmt.Errorf("/general/spool max bytes can not be greater than 2 GiB") return } From 6aedeedd00af360bdfd6878b0d6857d62c798761 Mon Sep 17 00:00:00 2001 From: Driskell Date: Thu, 23 Oct 2014 20:59:44 +0100 Subject: [PATCH 31/33] Update documentation --- docs/Configuration.md | 8 ++++++-- docs/codecs/Multiline.md | 10 ++++++++++ 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/docs/Configuration.md b/docs/Configuration.md index 7dbec630..8657cc2a 100644 --- a/docs/Configuration.md +++ b/docs/Configuration.md @@ -211,8 +211,8 @@ give around a 25% boost of events per second, at the expense of more memory usage. *For most installations you should leave this at the default as it can -easily cope with over 10,000 events a second on most machines and uses little -memory. It is useful only in very specific circumstances.* +easily cope with over 10,000 events a second and uses little memory. It is +useful only in very specific circumstances.* ### `"spool max bytes"` @@ -224,6 +224,8 @@ does not have enough room for the next event, it will be flushed immediately. If this value is modified, the receiving end should also be configured with the new limit. For the Logstash plugin, this is the `max_packet_size` setting. +The maximum value for this setting is 2147483648 (2 GiB). + ### `"spool timeout"` *Duration. Optional. Default: 5* @@ -243,6 +245,8 @@ field added. If the `fields` configuration already contained a "tags" entry, and it is an array, it will be appended to. Otherwise, the "tag" field will be left as is. +This setting can not be greater than the `spool max bytes` setting. + ### `"log level"` *String. Optional. Default: "info". diff --git a/docs/codecs/Multiline.md b/docs/codecs/Multiline.md index f8319618..8306bd56 100644 --- a/docs/codecs/Multiline.md +++ b/docs/codecs/Multiline.md @@ -16,6 +16,7 @@ option. - [`"negate"`](#negate) - [`"what"`](#what) - [`"previous timeout"`](#previous-timeout) + - [`"max multiline bytes"`](#max-multiline-bytes) @@ -72,3 +73,12 @@ offers a solution to this. When using `"previous"`, if `"previous timeout"` is not 0 any buffered lines will be flushed as a single event if no more lines are received within the specified time period. + +### `"max multiline bytes"` + +*Number. Optional. Default: `spool max bytes`* + +The maximum multiline length to process. If a multiline block exeeds this +length, it will be split across multiple events. + +This setting can not be greater than the `spool max bytes` setting. From e992e111b89c158c8bd4a32b721729f69e6996ea Mon Sep 17 00:00:00 2001 From: Driskell Date: Thu, 23 Oct 2014 21:02:19 +0100 Subject: [PATCH 32/33] Tweak to make more clear ZMQ optional --- README.md | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index aa87ca3c..d66bf0ca 100644 --- a/README.md +++ b/README.md @@ -58,14 +58,15 @@ two Logstash instances. plaintext ZMQ, >=4.0 for secure CurveZMQ) *\*nix: Most requirements can usually be installed by your favourite package -manager.* +manager. The optional ZeroMQ >=3.2 is usually also available via the package +manager. ZeroMQ >=4.0 may need to be build and installed manually.* -*OS X: Git and GNU make are provided automatically by XCode. ZeroMQ can be -installed via [Homebrew](http://brew.sh).* +*OS X: Git and GNU make are provided automatically by XCode. The optional ZeroMQ +can be installed via [Homebrew](http://brew.sh).* *Windows: GNU make for Windows can be found -[here](http://gnuwin32.sourceforge.net/packages/make.htm). ZeroMQ may need to be -built and installed manually.* +[here](http://gnuwin32.sourceforge.net/packages/make.htm). The optional ZeroMQ +may need to be built and installed manually.* ### Building From afb9ad8d5f7f1fb6cbefd8d2a256b59bf7cab5fe Mon Sep 17 00:00:00 2001 From: Driskell Date: Thu, 23 Oct 2014 21:03:54 +0100 Subject: [PATCH 33/33] Change log for 1.0 --- docs/ChangeLog.md | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/docs/ChangeLog.md b/docs/ChangeLog.md index e10cea2d..4be8d638 100644 --- a/docs/ChangeLog.md +++ b/docs/ChangeLog.md @@ -4,7 +4,7 @@ **Table of Contents** *generated with [DocToc](http://doctoc.herokuapp.com/)* -- [Latest Development](#latest-development) +- [1.0](#10) - [0.15](#015) - [0.14](#014) - [0.13](#013) @@ -16,9 +16,9 @@ -## Latest Development +## 1.0 -*TBC* +*23rd October 2014* * Remove `ping` command from `lc-admin` (#49) * Empty lines in a log file are incorrectly merged with the following line (#51) @@ -29,6 +29,8 @@ * Fix hanging ZMQ transport on transport error * Fix timeout on log-courier side when Logstash busy due to non-thread safe timeout timer in the log-courier gem +* Gracefully handle multiline events greater than 10 MiB in size by splitting +events ## 0.15