Skip to content
This repository has been archived by the owner on Dec 18, 2019. It is now read-only.

Commit

Permalink
Merge pull request #2 from jorelli/master
Browse files Browse the repository at this point in the history
fix EOF and state handling bugs
  • Loading branch information
avig committed Jun 20, 2014
2 parents 456bfe3 + 40a6968 commit 78c77cb
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 90 deletions.
75 changes: 35 additions & 40 deletions harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package main

import (
"bufio"
"bytes"
"io"
"log"
"os" // for File and friends
"strings"
"time"
)

Expand All @@ -32,9 +32,9 @@ func (h *Harvester) Harvest(output chan *FileEvent) {
var line uint64 = 0 // Ask registrar about the line number

// get current offset in file
offset, _ := h.file.Seek(0, os.SEEK_CUR)
h.Offset, _ = h.file.Seek(0, os.SEEK_CUR)

log.Printf("Current file offset: %d\n", offset)
log.Printf("Current file offset: %d\n", h.Offset)

// TODO(sissel): Make the buffer size tunable at start-time
reader := bufio.NewReaderSize(h.file, 16<<10) // 16kb buffer by default
Expand All @@ -49,10 +49,10 @@ func (h *Harvester) Harvest(output chan *FileEvent) {
// timed out waiting for data, got eof.
// Check to see if the file was truncated
info, _ := h.file.Stat()
if info.Size() < offset {
log.Printf("File truncated, seeking to beginning: %s\n", h.Path)
if info.Size() < h.Offset {
log.Printf("Current offset: %d file size: %d. Seeking to beginning because we believe the file to be truncated: %s", h.Offset, info.Size(), h.Path)
h.file.Seek(0, os.SEEK_SET)
offset = 0
h.Offset = 0
} else if age := time.Since(last_read_time); age > (24 * time.Hour) {
// if last_read_time was more than 24 hours ago, this file is probably
// dead. Stop watching it.
Expand All @@ -68,17 +68,24 @@ func (h *Harvester) Harvest(output chan *FileEvent) {
}
}
last_read_time = time.Now()

rawTextWidth := int64(len(text))
text = strings.TrimSpace(text)
line++

event := &FileEvent{
Source: &h.Path,
Offset: offset,
Offset: h.Offset,
Line: line,
Text: text,
Text: &text,
Fields: &h.Fields,
fileinfo: &info,
}
offset += int64(len(*event.Text)) + 1 // +1 because of the line terminator

h.Offset += rawTextWidth

if text == "" {
continue
}

output <- event // ship the new event downstream
} /* forever */
Expand Down Expand Up @@ -116,38 +123,26 @@ func (h *Harvester) open() *os.File {
return h.file
}

func (h *Harvester) readline(reader *bufio.Reader, eof_timeout time.Duration) (*string, error) {
var buffer bytes.Buffer
func (h *Harvester) readline(reader *bufio.Reader, eof_timeout time.Duration) (string, error) {
start_time := time.Now()
ReadLines:
for {
segment, is_partial, err := reader.ReadLine()

if err != nil {
if err == io.EOF {
time.Sleep(1 * time.Second) // TODO(sissel): Implement backoff

// Give up waiting for data after a certain amount of time.
// If we time out, return the error (eof)
if time.Since(start_time) > eof_timeout {
return nil, err
}
continue
} else {
log.Println(err)
return nil, err // TODO(sissel): don't do this?
line, err := reader.ReadString('\n')
switch err {
case io.EOF:
if line != "" {
return line, nil
}
time.Sleep(1 * time.Second)
if time.Since(start_time) > eof_timeout {
return "", err
}
continue ReadLines
case nil:
default:
log.Println(err)
return "", err
}

// TODO(sissel): if buffer exceeds a certain length, maybe report an error condition? chop it?
buffer.Write(segment)

if !is_partial {
// If we got a full line, return the whole line.
str := new(string)
*str = buffer.String()
return str, nil
}
} /* forever read chunks */

return nil, nil
return line, nil
}
}
10 changes: 5 additions & 5 deletions lumberjack.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"flag"
"log"
"os"
"runtime"
"runtime"
"runtime/pprof"
"time"
)
Expand Down Expand Up @@ -58,10 +58,10 @@ func main() {
// Harvesters dump events into the spooler.
go Spool(event_chan, publisher_chan, *spool_size, *idle_timeout)

for i := 0; i < runtime.NumCPU() * 2; i++ {
log.Printf("adding publish worker")
go Publishv1(publisher_chan, registrar_chan, &config.Network)
}
for i := 0; i < runtime.NumCPU()*2; i++ {
log.Printf("adding publish worker")
go Publishv1(publisher_chan, registrar_chan, &config.Network)
}

// registrar records last acknowledged positions in all files.
Registrar(registrar_chan)
Expand Down
66 changes: 33 additions & 33 deletions prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,39 +39,39 @@ func resume_tracking(fileconfig FileConfig, fileinfo map[string]os.FileInfo, out
// Start up with any registrar data.

history, err := os.Open(".lumberjack")
if err != nil {
log.Printf("unable to open lumberjack history file: %v", err.Error())
return
}

historical_state := make(map[string]*FileState)
log.Printf("Loading registrar data\n")
decoder := json.NewDecoder(history)
decoder.Decode(&historical_state)
history.Close()

for path, state := range historical_state {
// if the file is the same inode/device as we last saw,
// start a harvester on it at the last known position
info, err := os.Stat(path)
if err != nil {
continue
}

if is_file_same(path, info, state) {
// same file, seek to last known position
fileinfo[path] = info

for _, pathglob := range fileconfig.Paths {
match, _ := filepath.Match(pathglob, path)
if match {
harvester := Harvester{Path: path, Fields: fileconfig.Fields, Offset: state.Offset}
go harvester.Harvest(output)
break
}
}
}
}
if err != nil {
log.Printf("unable to open lumberjack history file: %v", err.Error())
return
}

historical_state := make(map[string]*FileState)
log.Printf("Loading registrar data\n")
decoder := json.NewDecoder(history)
decoder.Decode(&historical_state)
history.Close()

for path, state := range historical_state {
// if the file is the same inode/device as we last saw,
// start a harvester on it at the last known position
info, err := os.Stat(path)
if err != nil {
continue
}

if is_file_same(path, info, state) {
// same file, seek to last known position
fileinfo[path] = info

for _, pathglob := range fileconfig.Paths {
match, _ := filepath.Match(pathglob, path)
if match {
harvester := Harvester{Path: path, Fields: fileconfig.Fields, Offset: state.Offset}
go harvester.Harvest(output)
break
}
}
}
}
}

func prospector_scan(path string, fields map[string]string,
Expand Down
12 changes: 6 additions & 6 deletions publisher1.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ func Publishv1(input chan []*FileEvent,
var socket *tls.Conn
var sequence uint32
var err error
id := publisherId
publisherId++
id := publisherId
publisherId++

socket = connect(config, id)
defer func() {
log.Printf("publisher %v done", id)
socket.Close()
}()
log.Printf("publisher %v done", id)
socket.Close()
}()

for events := range input {
buffer.Truncate(0)
Expand Down Expand Up @@ -193,7 +193,7 @@ func connect(config *NetworkConfig, id int) (socket *tls.Conn) {
// connected, let's rock and roll.
return
}
panic("not reached")
panic("not reached")
}

func writeDataFrame(event *FileEvent, sequence uint32, output io.Writer) {
Expand Down
6 changes: 3 additions & 3 deletions registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import (
)

func Registrar(input chan []*FileEvent) {
for events := range input {
for page := range input {
state := make(map[string]*FileState)
counts := make(map[string]int)
// Take the last event found for each file source
for _, event := range events {
for _, event := range page {
// skip stdin
if *event.Source == "-" {
continue
Expand Down Expand Up @@ -41,7 +41,7 @@ func Registrar(input chan []*FileEvent) {
fmt.Fprintf(&buf, "%s: %d ", name, count)
}

log.Printf("Registrar received %d events. %s\n", len(events), buf.String())
log.Printf("Registrar received %d events. %s\n", len(page), buf.String())

if len(state) > 0 {
WriteRegistry(state, ".lumberjack")
Expand Down
33 changes: 30 additions & 3 deletions registrar_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,20 @@ import (
"os"
)

func loadRegistry(fname string) (map[string]*FileState, error) {
f, err := os.Open(fname)
if err != nil {
return nil, err
}
defer f.Close()

var existingState map[string]*FileState
if err := json.NewDecoder(f).Decode(&existingState); err != nil {
return nil, err
}
return existingState, nil
}

func WriteRegistry(state map[string]*FileState, path string) {
// Open tmp file, write, flush, rename
file, err := os.Create(".lumberjack.new")
Expand All @@ -17,8 +31,21 @@ func WriteRegistry(state map[string]*FileState, path string) {
}
defer file.Close()

encoder := json.NewEncoder(file)
encoder.Encode(state)
existingState, err := loadRegistry(path)
if err != nil {
log.Printf("Failed to read existing state at path %s: %s", path, err.Error())
existingState = make(map[string]*FileState)
}

for name, fs := range state {
existingState[name] = fs
}

os.Rename(".lumberjack.new", path)
if err := json.NewEncoder(file).Encode(existingState); err != nil {
log.Printf("Failed to write log state to file: %v", err)
return
}
if err := os.Rename(".lumberjack.new", path); err != nil {
log.Printf("Failed to move log state file: %v", err)
}
}

0 comments on commit 78c77cb

Please sign in to comment.