Skip to content

Commit

Permalink
Merge pull request #2 from un000/change-api
Browse files Browse the repository at this point in the history
Change channel API
  • Loading branch information
un000 authored Jun 2, 2019
2 parents 35b69b2 + bef727b commit 12e020b
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 35 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ Tailor, the library for tailing nginx access logs
-----
[![Go Doc](https://godoc.org/github.com/un000/tailor?status.svg)](https://godoc.org/github.com/un000/tailor)

Tailor provides the functionality of tailing nginx access log under logrotate.
Tailor provides the functionality of tailing for e. g. nginx logs under logrotate.
Tailor will follow a selected log file and reopen it if it's been rotated. Now, tailor doesn't require inotify, because it polls logs
with a tiny delay. So the library can achieve cross-platform.

Expand Down Expand Up @@ -40,21 +40,21 @@ func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
lines, errs, err := t.Run(ctx)
err := t.Run(ctx)
if err != nil {
panic(err)
}
fmt.Println("Tailing file:", t.FileName())
for {
select {
case line, ok := <-lines:
case line, ok := <-t.Lines():
if !ok {
return
}
fmt.Println(line.StringTrimmed())
case err, ok := <-errs:
case err, ok := <-t.Errors():
if !ok {
return
}
Expand Down
6 changes: 3 additions & 3 deletions example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,21 @@ func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

lines, errs, err := t.Run(ctx)
err := t.Run(ctx)
if err != nil {
panic(err)
}

fmt.Println("Tailing file:", t.FileName())
for {
select {
case line, ok := <-lines:
case line, ok := <-t.Lines():
if !ok {
return
}

fmt.Println(line.StringTrimmed())
case err, ok := <-errs:
case err, ok := <-t.Errors():
if !ok {
return
}
Expand Down
61 changes: 36 additions & 25 deletions tailor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type Tailor struct {
lastSize int64
lag int64

lines chan Line
errs chan error

working int32
}

Expand All @@ -53,9 +56,9 @@ func New(filename string, opts ...Option) *Tailor {
// If the file has been logrotated, Tailor will follow the first file to the end and after reopen it.
// If error happens file will be closed.
// Tailor makes an exponential sleep to reduce stat syscalls.
func (t *Tailor) Run(ctx context.Context) (<-chan Line, <-chan error, error) {
func (t *Tailor) Run(ctx context.Context) error {
if !atomic.CompareAndSwapInt32(&t.working, 0, 1) {
return nil, nil, errors.New("already working")
return errors.New("already working")
}

finalizer := func() {
Expand All @@ -69,47 +72,47 @@ func (t *Tailor) Run(ctx context.Context) (<-chan Line, <-chan error, error) {
t.file, err = os.Open(t.fileName)
if err != nil {
finalizer()
return nil, nil, errors.Wrap(err, "can't open file for tailing")
return errors.Wrap(err, "can't open file for tailing")
}

_, err = t.file.Seek(t.opts.runOffset, t.opts.runWhence)
if err != nil {
finalizer()
return nil, nil, errors.Wrapf(err, "error seeking file %s", t.fileName)
return errors.Wrapf(err, "error seeking file %s", t.fileName)
}

err = t.seekToLineStart()
if err != nil {
finalizer()
return nil, nil, errors.Wrapf(err, "error seeking to the line beginning %s", t.fileName)
return errors.Wrapf(err, "error seeking to the line beginning %s", t.fileName)
}

err = t.updateFileStatus()
if err != nil {
finalizer()
return nil, nil, errors.Wrapf(err, "error getting file size %s", t.fileName)
return errors.Wrapf(err, "error getting file size %s", t.fileName)
}

lines, errs := t.readLoop(ctx)
t.readLoop(ctx)

return lines, errs, nil
return nil
}

// readLoop starts goroutine, which reads the given file and send to the line chan tailed strings.
func (t *Tailor) readLoop(ctx context.Context) (chan Line, chan error) {
lines := make(chan Line)
errs := make(chan error)
func (t *Tailor) readLoop(ctx context.Context) {
t.lines = make(chan Line)
t.errs = make(chan error)

go func() {
defer func() {
if t.file != nil {
err := t.file.Close()
if err != nil {
errs <- errors.Wrap(err, "error closing file")
t.errs <- errors.Wrap(err, "error closing file")
}
}
close(lines)
close(errs)
close(t.lines)
close(t.errs)
atomic.StoreInt32(&t.working, 0)
}()

Expand All @@ -125,7 +128,7 @@ func (t *Tailor) readLoop(ctx context.Context) (chan Line, chan error) {
case <-lagReporter.C:
err := t.updateFileStatus()
if err != nil {
errs <- errors.Wrap(err, "error getting file status")
t.errs <- errors.Wrap(err, "error getting file status")
break
}
default:
Expand All @@ -150,7 +153,7 @@ func (t *Tailor) readLoop(ctx context.Context) (chan Line, chan error) {
break
}
} else {
errs <- errors.Wrap(err, "error reading line")
t.errs <- errors.Wrap(err, "error reading line")
return
}

Expand All @@ -171,34 +174,34 @@ func (t *Tailor) readLoop(ctx context.Context) (chan Line, chan error) {
if len(line) == 0 && err == io.EOF {
isSameFile, err := t.isFileStillTheSame()
if err != nil {
errs <- errors.Wrap(err, "error checking that file is the same")
t.errs <- errors.Wrap(err, "error checking that file is the same")
return
}

if !isSameFile {
err := t.file.Close()
if err != nil {
errs <- errors.Wrap(err, "error closing current file")
t.errs <- errors.Wrap(err, "error closing current file")
}

t.file, err = os.Open(t.fileName)
if err != nil {
if os.IsNotExist(err) {
errs <- ErrFileNotExists
t.errs <- ErrFileNotExists
return
}
errs <- errors.Wrap(err, "error reopening file")
t.errs <- errors.Wrap(err, "error reopening file")
return
}

err = t.updateFileStatus()
if err != nil {
errs <- errors.Wrap(err, "error getting file status")
t.errs <- errors.Wrap(err, "error getting file status")
return
}
err = t.seekToLineStart()
if err != nil {
errs <- errors.Wrap(err, "error seeking to the line beginning")
t.errs <- errors.Wrap(err, "error seeking to the line beginning")
return
}

Expand All @@ -212,20 +215,28 @@ func (t *Tailor) readLoop(ctx context.Context) (chan Line, chan error) {
continue
}
if err != nil && err != io.EOF {
errs <- errors.Wrap(err, "error reading line")
t.errs <- errors.Wrap(err, "error reading line")
return
}

pollerTimeout = t.opts.pollerTimeout

lines <- Line{
t.lines <- Line{
line: line,
fileName: t.fileName,
}
}
}()
}

// Lines returns chanel of read lines.
func (t *Tailor) Lines() chan Line {
return t.lines
}

return lines, errs
// Errors returns chanel of errors, associated with reading files.
func (t *Tailor) Errors() chan error {
return t.errs
}

// exponentialSleep sleeps for pollerTimeout and returns new exponential grown timeout <= maxWait.
Expand Down
6 changes: 3 additions & 3 deletions tailor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestTailFileFromStart(t *testing.T) {
}

ctx, _ := context.WithTimeout(context.Background(), 2*time.Second)
lines, errs, err := f.Run(ctx)
err = f.Run(ctx)
if err != nil {
t.Error(err)
}
Expand All @@ -48,7 +48,7 @@ func TestTailFileFromStart(t *testing.T) {

for ; i <= 3; i++ {
select {
case line, ok := <-lines:
case line, ok := <-f.Lines():
if !ok {
return
}
Expand All @@ -57,7 +57,7 @@ func TestTailFileFromStart(t *testing.T) {
t.Error(err)
}
t.Log(line.StringTrimmed())
case err, ok := <-errs:
case err, ok := <-f.Errors():
if !ok {
return
}
Expand Down

0 comments on commit 12e020b

Please sign in to comment.