From 152af986d2569d2f535e51dc893d968fdd7f8d57 Mon Sep 17 00:00:00 2001 From: Jordan Orelli Date: Thu, 19 Jun 2014 16:10:26 +0000 Subject: [PATCH 1/3] fmt d'oh, should have done that before. --- lumberjack.go | 10 ++++---- prospector.go | 66 +++++++++++++++++++++++++-------------------------- publisher1.go | 12 +++++----- 3 files changed, 44 insertions(+), 44 deletions(-) diff --git a/lumberjack.go b/lumberjack.go index 7c00411c..d0861623 100644 --- a/lumberjack.go +++ b/lumberjack.go @@ -4,7 +4,7 @@ import ( "flag" "log" "os" - "runtime" + "runtime" "runtime/pprof" "time" ) @@ -58,10 +58,10 @@ func main() { // Harvesters dump events into the spooler. go Spool(event_chan, publisher_chan, *spool_size, *idle_timeout) - for i := 0; i < runtime.NumCPU() * 2; i++ { - log.Printf("adding publish worker") - go Publishv1(publisher_chan, registrar_chan, &config.Network) - } + for i := 0; i < runtime.NumCPU()*2; i++ { + log.Printf("adding publish worker") + go Publishv1(publisher_chan, registrar_chan, &config.Network) + } // registrar records last acknowledged positions in all files. Registrar(registrar_chan) diff --git a/prospector.go b/prospector.go index b302dd93..072bc2a2 100644 --- a/prospector.go +++ b/prospector.go @@ -39,39 +39,39 @@ func resume_tracking(fileconfig FileConfig, fileinfo map[string]os.FileInfo, out // Start up with any registrar data. history, err := os.Open(".lumberjack") - if err != nil { - log.Printf("unable to open lumberjack history file: %v", err.Error()) - return - } - - historical_state := make(map[string]*FileState) - log.Printf("Loading registrar data\n") - decoder := json.NewDecoder(history) - decoder.Decode(&historical_state) - history.Close() - - for path, state := range historical_state { - // if the file is the same inode/device as we last saw, - // start a harvester on it at the last known position - info, err := os.Stat(path) - if err != nil { - continue - } - - if is_file_same(path, info, state) { - // same file, seek to last known position - fileinfo[path] = info - - for _, pathglob := range fileconfig.Paths { - match, _ := filepath.Match(pathglob, path) - if match { - harvester := Harvester{Path: path, Fields: fileconfig.Fields, Offset: state.Offset} - go harvester.Harvest(output) - break - } - } - } - } + if err != nil { + log.Printf("unable to open lumberjack history file: %v", err.Error()) + return + } + + historical_state := make(map[string]*FileState) + log.Printf("Loading registrar data\n") + decoder := json.NewDecoder(history) + decoder.Decode(&historical_state) + history.Close() + + for path, state := range historical_state { + // if the file is the same inode/device as we last saw, + // start a harvester on it at the last known position + info, err := os.Stat(path) + if err != nil { + continue + } + + if is_file_same(path, info, state) { + // same file, seek to last known position + fileinfo[path] = info + + for _, pathglob := range fileconfig.Paths { + match, _ := filepath.Match(pathglob, path) + if match { + harvester := Harvester{Path: path, Fields: fileconfig.Fields, Offset: state.Offset} + go harvester.Harvest(output) + break + } + } + } + } } func prospector_scan(path string, fields map[string]string, diff --git a/publisher1.go b/publisher1.go index 1c4fbc9e..f96aa9e4 100644 --- a/publisher1.go +++ b/publisher1.go @@ -34,14 +34,14 @@ func Publishv1(input chan []*FileEvent, var socket *tls.Conn var sequence uint32 var err error - id := publisherId - publisherId++ + id := publisherId + publisherId++ socket = connect(config, id) defer func() { - log.Printf("publisher %v done", id) - socket.Close() - }() + log.Printf("publisher %v done", id) + socket.Close() + }() for events := range input { buffer.Truncate(0) @@ -193,7 +193,7 @@ func connect(config *NetworkConfig, id int) (socket *tls.Conn) { // connected, let's rock and roll. return } - panic("not reached") + panic("not reached") } func writeDataFrame(event *FileEvent, sequence uint32, output io.Writer) { From 7659eae16933e690925f3f5266364492959d72f6 Mon Sep 17 00:00:00 2001 From: Jordan Orelli Date: Thu, 19 Jun 2014 18:07:40 +0000 Subject: [PATCH 2/3] fix EOF error --- harvester.go | 60 +++++++++++++++++++++++----------------------------- 1 file changed, 27 insertions(+), 33 deletions(-) diff --git a/harvester.go b/harvester.go index 55e86965..4794f65e 100644 --- a/harvester.go +++ b/harvester.go @@ -2,10 +2,10 @@ package main import ( "bufio" - "bytes" "io" "log" "os" // for File and friends + "strings" "time" ) @@ -69,16 +69,22 @@ func (h *Harvester) Harvest(output chan *FileEvent) { } last_read_time = time.Now() + offset += int64(len(text)) + text = strings.TrimSpace(text) line++ + + if text == "" { + continue + } + event := &FileEvent{ Source: &h.Path, Offset: offset, Line: line, - Text: text, + Text: &text, Fields: &h.Fields, fileinfo: &info, } - offset += int64(len(*event.Text)) + 1 // +1 because of the line terminator output <- event // ship the new event downstream } /* forever */ @@ -116,38 +122,26 @@ func (h *Harvester) open() *os.File { return h.file } -func (h *Harvester) readline(reader *bufio.Reader, eof_timeout time.Duration) (*string, error) { - var buffer bytes.Buffer +func (h *Harvester) readline(reader *bufio.Reader, eof_timeout time.Duration) (string, error) { start_time := time.Now() +ReadLines: for { - segment, is_partial, err := reader.ReadLine() - - if err != nil { - if err == io.EOF { - time.Sleep(1 * time.Second) // TODO(sissel): Implement backoff - - // Give up waiting for data after a certain amount of time. - // If we time out, return the error (eof) - if time.Since(start_time) > eof_timeout { - return nil, err - } - continue - } else { - log.Println(err) - return nil, err // TODO(sissel): don't do this? + line, err := reader.ReadString('\n') + switch err { + case io.EOF: + if line != "" { + return line, nil } + time.Sleep(1 * time.Second) + if time.Since(start_time) > eof_timeout { + return "", err + } + continue ReadLines + case nil: + default: + log.Println(err) + return "", err } - - // TODO(sissel): if buffer exceeds a certain length, maybe report an error condition? chop it? - buffer.Write(segment) - - if !is_partial { - // If we got a full line, return the whole line. - str := new(string) - *str = buffer.String() - return str, nil - } - } /* forever read chunks */ - - return nil, nil + return line, nil + } } From 40a6968bf51066600fab7ac3114932f3f33b1e26 Mon Sep 17 00:00:00 2001 From: Jordan Orelli Date: Thu, 19 Jun 2014 21:56:48 +0000 Subject: [PATCH 3/3] fix registrar state clobbering bug --- harvester.go | 25 +++++++++++++------------ registrar.go | 6 +++--- registrar_other.go | 33 ++++++++++++++++++++++++++++++--- 3 files changed, 46 insertions(+), 18 deletions(-) diff --git a/harvester.go b/harvester.go index 4794f65e..c3e8b4c2 100644 --- a/harvester.go +++ b/harvester.go @@ -32,9 +32,9 @@ func (h *Harvester) Harvest(output chan *FileEvent) { var line uint64 = 0 // Ask registrar about the line number // get current offset in file - offset, _ := h.file.Seek(0, os.SEEK_CUR) + h.Offset, _ = h.file.Seek(0, os.SEEK_CUR) - log.Printf("Current file offset: %d\n", offset) + log.Printf("Current file offset: %d\n", h.Offset) // TODO(sissel): Make the buffer size tunable at start-time reader := bufio.NewReaderSize(h.file, 16<<10) // 16kb buffer by default @@ -49,10 +49,10 @@ func (h *Harvester) Harvest(output chan *FileEvent) { // timed out waiting for data, got eof. // Check to see if the file was truncated info, _ := h.file.Stat() - if info.Size() < offset { - log.Printf("File truncated, seeking to beginning: %s\n", h.Path) + if info.Size() < h.Offset { + log.Printf("Current offset: %d file size: %d. Seeking to beginning because we believe the file to be truncated: %s", h.Offset, info.Size(), h.Path) h.file.Seek(0, os.SEEK_SET) - offset = 0 + h.Offset = 0 } else if age := time.Since(last_read_time); age > (24 * time.Hour) { // if last_read_time was more than 24 hours ago, this file is probably // dead. Stop watching it. @@ -68,24 +68,25 @@ func (h *Harvester) Harvest(output chan *FileEvent) { } } last_read_time = time.Now() - - offset += int64(len(text)) + rawTextWidth := int64(len(text)) text = strings.TrimSpace(text) line++ - if text == "" { - continue - } - event := &FileEvent{ Source: &h.Path, - Offset: offset, + Offset: h.Offset, Line: line, Text: &text, Fields: &h.Fields, fileinfo: &info, } + h.Offset += rawTextWidth + + if text == "" { + continue + } + output <- event // ship the new event downstream } /* forever */ } diff --git a/registrar.go b/registrar.go index d8420cc0..cddee301 100644 --- a/registrar.go +++ b/registrar.go @@ -7,11 +7,11 @@ import ( ) func Registrar(input chan []*FileEvent) { - for events := range input { + for page := range input { state := make(map[string]*FileState) counts := make(map[string]int) // Take the last event found for each file source - for _, event := range events { + for _, event := range page { // skip stdin if *event.Source == "-" { continue @@ -41,7 +41,7 @@ func Registrar(input chan []*FileEvent) { fmt.Fprintf(&buf, "%s: %d ", name, count) } - log.Printf("Registrar received %d events. %s\n", len(events), buf.String()) + log.Printf("Registrar received %d events. %s\n", len(page), buf.String()) if len(state) > 0 { WriteRegistry(state, ".lumberjack") diff --git a/registrar_other.go b/registrar_other.go index 154a9842..4c514add 100644 --- a/registrar_other.go +++ b/registrar_other.go @@ -8,6 +8,20 @@ import ( "os" ) +func loadRegistry(fname string) (map[string]*FileState, error) { + f, err := os.Open(fname) + if err != nil { + return nil, err + } + defer f.Close() + + var existingState map[string]*FileState + if err := json.NewDecoder(f).Decode(&existingState); err != nil { + return nil, err + } + return existingState, nil +} + func WriteRegistry(state map[string]*FileState, path string) { // Open tmp file, write, flush, rename file, err := os.Create(".lumberjack.new") @@ -17,8 +31,21 @@ func WriteRegistry(state map[string]*FileState, path string) { } defer file.Close() - encoder := json.NewEncoder(file) - encoder.Encode(state) + existingState, err := loadRegistry(path) + if err != nil { + log.Printf("Failed to read existing state at path %s: %s", path, err.Error()) + existingState = make(map[string]*FileState) + } + + for name, fs := range state { + existingState[name] = fs + } - os.Rename(".lumberjack.new", path) + if err := json.NewEncoder(file).Encode(existingState); err != nil { + log.Printf("Failed to write log state to file: %v", err) + return + } + if err := os.Rename(".lumberjack.new", path); err != nil { + log.Printf("Failed to move log state file: %v", err) + } }