123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297 |
- From 8d6f2e3fe8851b581309da25fc4c32f8be675932 Mon Sep 17 00:00:00 2001
- From: Brian Goff <cpuguy83@gmail.com>
- Date: Mon, 11 Jul 2016 16:31:42 -0400
- Subject: [PATCH] Fix issues with tailing rotated jsonlog file
- Fixes a race where the log reader would get events for both an actual
- rotation as we from fsnotify (`fsnotify.Rename`).
- This issue becomes extremely apparent when rotations are fast, for
- example:
- ```
- $ docker run -d --name test --log-opt max-size=1 --log-opt max-file=2
- busybox sh -c 'while true; do echo hello; usleep 100000; done'
- ```
- With this change the log reader for jsonlogs can handle rotations that
- happen as above.
- Instead of listening for both fs events AND rotation events
- simultaneously, potentially meaning we see 2 rotations for only a single
- rotation due to channel buffering, only listen for fs events (like
- `Rename`) and then wait to be notified about rotation by the logger.
- This makes sure that we don't see 2 rotations for 1, and that we don't
- start trying to read until the logger is actually ready for us to.
- Signed-off-by: Brian Goff <cpuguy83@gmail.com>
- This commit is pending upstream commit fixing broken log tailing. The
- original commit can be found in the PR here:
- - https://github.com/docker/docker/pull/24514
- Signed-off-by: Christian Stewart <christian@paral.in>
- ---
- daemon/logger/jsonfilelog/read.go | 180 +++++++++++++++++++++++++-------------
- 1 file changed, 119 insertions(+), 61 deletions(-)
- diff --git a/daemon/logger/jsonfilelog/read.go b/daemon/logger/jsonfilelog/read.go
- index bea83dd..0cb44af 100644
- --- a/daemon/logger/jsonfilelog/read.go
- +++ b/daemon/logger/jsonfilelog/read.go
- @@ -3,11 +3,14 @@ package jsonfilelog
- import (
- "bytes"
- "encoding/json"
- + "errors"
- "fmt"
- "io"
- "os"
- "time"
-
- + "gopkg.in/fsnotify.v1"
- +
- "github.com/Sirupsen/logrus"
- "github.com/docker/docker/daemon/logger"
- "github.com/docker/docker/pkg/filenotify"
- @@ -44,6 +47,10 @@ func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
- func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
- defer close(logWatcher.Msg)
-
- + // lock so the read stream doesn't get corrupted do to rotations or other log data written while we read
- + // This will block writes!!!
- + l.mu.Lock()
- +
- pth := l.writer.LogPath()
- var files []io.ReadSeeker
- for i := l.writer.MaxFiles(); i > 1; i-- {
- @@ -61,6 +68,7 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
- latestFile, err := os.Open(pth)
- if err != nil {
- logWatcher.Err <- err
- + l.mu.Unlock()
- return
- }
-
- @@ -80,6 +88,7 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
- if err := latestFile.Close(); err != nil {
- logrus.Errorf("Error closing file: %v", err)
- }
- + l.mu.Unlock()
- return
- }
-
- @@ -87,7 +96,6 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
- latestFile.Seek(0, os.SEEK_END)
- }
-
- - l.mu.Lock()
- l.readers[logWatcher] = struct{}{}
- l.mu.Unlock()
-
- @@ -128,92 +136,142 @@ func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since ti
- }
- }
-
- +func watchFile(name string) (filenotify.FileWatcher, error) {
- + fileWatcher, err := filenotify.New()
- + if err != nil {
- + return nil, err
- + }
- +
- + if err := fileWatcher.Add(name); err != nil {
- + logrus.WithField("logger", "json-file").Warnf("falling back to file poller due to error: %v", err)
- + fileWatcher.Close()
- + fileWatcher = filenotify.NewPollingWatcher()
- +
- + if err := fileWatcher.Add(name); err != nil {
- + fileWatcher.Close()
- + logrus.Debugf("error watching log file for modifications: %v", err)
- + return nil, err
- + }
- + }
- + return fileWatcher, nil
- +}
- +
- func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) {
- dec := json.NewDecoder(f)
- l := &jsonlog.JSONLog{}
-
- - fileWatcher, err := filenotify.New()
- + name := f.Name()
- + fileWatcher, err := watchFile(name)
- if err != nil {
- logWatcher.Err <- err
- + return
- }
- defer func() {
- f.Close()
- fileWatcher.Close()
- }()
- - name := f.Name()
-
- - if err := fileWatcher.Add(name); err != nil {
- - logrus.WithField("logger", "json-file").Warnf("falling back to file poller due to error: %v", err)
- - fileWatcher.Close()
- - fileWatcher = filenotify.NewPollingWatcher()
- + var retries int
- + handleRotate := func() error {
- + f.Close()
- + fileWatcher.Remove(name)
-
- + // retry when the file doesn't exist
- + for retries := 0; retries <= 5; retries++ {
- + f, err = os.Open(name)
- + if err == nil || !os.IsNotExist(err) {
- + break
- + }
- + }
- + if err != nil {
- + return err
- + }
- if err := fileWatcher.Add(name); err != nil {
- - logrus.Debugf("error watching log file for modifications: %v", err)
- - logWatcher.Err <- err
- - return
- + return err
- }
- + dec = json.NewDecoder(f)
- + return nil
- }
-
- - var retries int
- - for {
- - msg, err := decodeLogLine(dec, l)
- - if err != nil {
- - if err != io.EOF {
- - // try again because this shouldn't happen
- - if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
- - dec = json.NewDecoder(f)
- - retries++
- - continue
- + errRetry := errors.New("retry")
- + errDone := errors.New("done")
- + waitRead := func() error {
- + select {
- + case e := <-fileWatcher.Events():
- + switch e.Op {
- + case fsnotify.Write:
- + dec = json.NewDecoder(f)
- + return nil
- + case fsnotify.Rename, fsnotify.Remove:
- + <-notifyRotate
- + if err := handleRotate(); err != nil {
- + return err
- }
- -
- - // io.ErrUnexpectedEOF is returned from json.Decoder when there is
- - // remaining data in the parser's buffer while an io.EOF occurs.
- - // If the json logger writes a partial json log entry to the disk
- - // while at the same time the decoder tries to decode it, the race condition happens.
- - if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry {
- - reader := io.MultiReader(dec.Buffered(), f)
- - dec = json.NewDecoder(reader)
- - retries++
- - continue
- + return nil
- + }
- + return errRetry
- + case err := <-fileWatcher.Errors():
- + logrus.Debug("logger got error watching file: %v", err)
- + // Something happened, let's try and stay alive and create a new watcher
- + if retries <= 5 {
- + fileWatcher, err = watchFile(name)
- + if err != nil {
- + return err
- }
- -
- - return
- + retries++
- + return errRetry
- }
- + return err
- + case <-logWatcher.WatchClose():
- + fileWatcher.Remove(name)
- + return errDone
- + }
- + }
-
- - select {
- - case <-fileWatcher.Events():
- - dec = json.NewDecoder(f)
- - continue
- - case <-fileWatcher.Errors():
- - logWatcher.Err <- err
- - return
- - case <-logWatcher.WatchClose():
- - fileWatcher.Remove(name)
- - return
- - case <-notifyRotate:
- - f.Close()
- - fileWatcher.Remove(name)
- -
- - // retry when the file doesn't exist
- - for retries := 0; retries <= 5; retries++ {
- - f, err = os.Open(name)
- - if err == nil || !os.IsNotExist(err) {
- - break
- - }
- + handleDecodeErr := func(err error) error {
- + if err == io.EOF {
- + for err := waitRead(); err != nil; {
- + if err == errRetry {
- + // retry the waitRead
- + continue
- }
- + return err
- + }
- + return nil
- + }
- + // try again because this shouldn't happen
- + if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
- + dec = json.NewDecoder(f)
- + retries++
- + return nil
- + }
- + // io.ErrUnexpectedEOF is returned from json.Decoder when there is
- + // remaining data in the parser's buffer while an io.EOF occurs.
- + // If the json logger writes a partial json log entry to the disk
- + // while at the same time the decoder tries to decode it, the race condition happens.
- + if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry {
- + reader := io.MultiReader(dec.Buffered(), f)
- + dec = json.NewDecoder(reader)
- + retries++
- + return nil
- + }
- + return err
- + }
-
- - if err = fileWatcher.Add(name); err != nil {
- - logWatcher.Err <- err
- - return
- - }
- - if err != nil {
- - logWatcher.Err <- err
- + // main loop
- + for {
- + msg, err := decodeLogLine(dec, l)
- + if err != nil {
- + if err := handleDecodeErr(err); err != nil {
- + if err == errDone {
- return
- }
- -
- - dec = json.NewDecoder(f)
- - continue
- + // we got an unrecoverable error, so return
- + logWatcher.Err <- err
- + return
- }
- + // ready to try again
- + continue
- }
-
- retries = 0 // reset retries since we've succeeded
- --
- 2.7.3
|