diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index d84411e2b75..379d737e7ec 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -255,6 +255,9 @@ func (s *StreamFrame) IsCleared() bool { // StreamFramer is used to buffer and send frames as well as heartbeat. type StreamFramer struct { + // plainTxt determines whether we frame or just send plain text data. + plainTxt bool + out io.WriteCloser enc *codec.Encoder encLock sync.Mutex @@ -281,8 +284,11 @@ type StreamFramer struct { } // NewStreamFramer creates a new stream framer that will output StreamFrames to -// the passed output. -func NewStreamFramer(out io.WriteCloser, heartbeatRate, batchWindow time.Duration, frameSize int) *StreamFramer { +// the passed output. If plainTxt is set we do not frame and just batch plain +// text data. +func NewStreamFramer(out io.WriteCloser, plainTxt bool, + heartbeatRate, batchWindow time.Duration, frameSize int) *StreamFramer { + // Create a JSON encoder enc := codec.NewEncoder(out, jsonHandle) @@ -291,6 +297,7 @@ func NewStreamFramer(out io.WriteCloser, heartbeatRate, batchWindow time.Duratio flusher := time.NewTicker(batchWindow) return &StreamFramer{ + plainTxt: plainTxt, out: out, enc: enc, frameSize: frameSize, @@ -390,6 +397,10 @@ OUTER: func (s *StreamFramer) send(f *StreamFrame) error { s.encLock.Lock() defer s.encLock.Unlock() + if s.plainTxt { + _, err := io.Copy(s.out, bytes.NewReader(f.Data)) + return err + } return s.enc.Encode(f) } @@ -549,7 +560,7 @@ func (s *HTTPServer) Stream(resp http.ResponseWriter, req *http.Request) (interf output := ioutils.NewWriteFlusher(resp) // Create the framer - framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer := NewStreamFramer(output, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize) framer.Run() defer framer.Destroy() @@ -697,7 +708,7 @@ OUTER: // applied. Defaults to "start". func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interface{}, error) { var allocID, task, logType string - var follow bool + var plain, follow bool var err error q := req.URL.Query() @@ -710,8 +721,16 @@ func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interfac return nil, taskNotPresentErr } - if follow, err = strconv.ParseBool(q.Get("follow")); err != nil { - return nil, fmt.Errorf("Failed to parse follow field to boolean: %v", err) + if followStr := q.Get("follow"); followStr != "" { + if follow, err = strconv.ParseBool(followStr); err != nil { + return nil, fmt.Errorf("Failed to parse follow field to boolean: %v", err) + } + } + + if plainStr := q.Get("plain"); plainStr != "" { + if plain, err = strconv.ParseBool(plainStr); err != nil { + return nil, fmt.Errorf("Failed to parse plain field to boolean: %v", err) + } } logType = q.Get("type") @@ -747,15 +766,15 @@ func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interfac // Create an output that gets flushed on every write output := ioutils.NewWriteFlusher(resp) - return nil, s.logs(follow, offset, origin, task, logType, fs, output) + return nil, s.logs(follow, plain, offset, origin, task, logType, fs, output) } -func (s *HTTPServer) logs(follow bool, offset int64, +func (s *HTTPServer) logs(follow, plain bool, offset int64, origin, task, logType string, fs allocdir.AllocDirFS, output io.WriteCloser) error { // Create the framer - framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer := NewStreamFramer(output, plain, streamHeartbeatRate, streamBatchWindow, streamFrameSize) framer.Run() defer framer.Destroy() diff --git a/command/agent/fs_endpoint_test.go b/command/agent/fs_endpoint_test.go index abb9a974b33..0e9b732547c 100644 --- a/command/agent/fs_endpoint_test.go +++ b/command/agent/fs_endpoint_test.go @@ -14,6 +14,7 @@ import ( "reflect" "runtime" "strconv" + "strings" "testing" "time" @@ -118,7 +119,7 @@ func TestStreamFramer_Flush(t *testing.T) { r, w := io.Pipe() wrappedW := &WriteCloseChecker{WriteCloser: w} hRate, bWindow := 100*time.Millisecond, 100*time.Millisecond - sf := NewStreamFramer(wrappedW, hRate, bWindow, 100) + sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 100) sf.Run() // Create a decoder @@ -186,7 +187,7 @@ func TestStreamFramer_Batch(t *testing.T) { wrappedW := &WriteCloseChecker{WriteCloser: w} // Ensure the batch window doesn't get hit hRate, bWindow := 100*time.Millisecond, 500*time.Millisecond - sf := NewStreamFramer(wrappedW, hRate, bWindow, 3) + sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 3) sf.Run() // Create a decoder @@ -263,7 +264,7 @@ func TestStreamFramer_Heartbeat(t *testing.T) { r, w := io.Pipe() wrappedW := &WriteCloseChecker{WriteCloser: w} hRate, bWindow := 100*time.Millisecond, 100*time.Millisecond - sf := NewStreamFramer(wrappedW, hRate, bWindow, 100) + sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 100) sf.Run() // Create a decoder @@ -315,7 +316,7 @@ func TestStreamFramer_Order(t *testing.T) { wrappedW := &WriteCloseChecker{WriteCloser: w} // Ensure the batch window doesn't get hit hRate, bWindow := 100*time.Millisecond, 10*time.Millisecond - sf := NewStreamFramer(wrappedW, hRate, bWindow, 10) + sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 10) sf.Run() // Create a decoder @@ -401,6 +402,102 @@ func TestStreamFramer_Order(t *testing.T) { } } +// This test checks that frames are received in order +func TestStreamFramer_Order_PlainText(t *testing.T) { + // Create the stream framer + r, w := io.Pipe() + wrappedW := &WriteCloseChecker{WriteCloser: w} + // Ensure the batch window doesn't get hit + hRate, bWindow := 100*time.Millisecond, 10*time.Millisecond + sf := NewStreamFramer(wrappedW, true, hRate, bWindow, 10) + sf.Run() + + files := []string{"1", "2", "3", "4", "5"} + input := bytes.NewBuffer(make([]byte, 0, 100000)) + for i := 0; i <= 1000; i++ { + str := strconv.Itoa(i) + "," + input.WriteString(str) + } + + expected := bytes.NewBuffer(make([]byte, 0, 100000)) + for _, _ = range files { + expected.Write(input.Bytes()) + } + receivedBuf := bytes.NewBuffer(make([]byte, 0, 100000)) + + // Start the reader + resultCh := make(chan struct{}) + go func() { + OUTER: + for { + if _, err := receivedBuf.ReadFrom(r); err != nil { + if strings.Contains(err.Error(), "closed pipe") { + resultCh <- struct{}{} + return + } + t.Fatalf("bad read: %v", err) + } + + if expected.Len() != receivedBuf.Len() { + continue + } + expectedBytes := expected.Bytes() + actualBytes := receivedBuf.Bytes() + for i, e := range expectedBytes { + if a := actualBytes[i]; a != e { + continue OUTER + } + } + resultCh <- struct{}{} + return + + } + }() + + // Send the data + b := input.Bytes() + shards := 10 + each := len(b) / shards + for _, f := range files { + for i := 0; i < shards; i++ { + l, r := each*i, each*(i+1) + if i == shards-1 { + r = len(b) + } + + if err := sf.Send(f, "", b[l:r], 0); err != nil { + t.Fatalf("Send() failed %v", err) + } + } + } + + // Ensure we get data + select { + case <-resultCh: + case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * bWindow): + if expected.Len() != receivedBuf.Len() { + t.Fatalf("Got %v; want %v", expected.Len(), receivedBuf.Len()) + } + expectedBytes := expected.Bytes() + actualBytes := receivedBuf.Bytes() + for i, e := range expectedBytes { + if a := actualBytes[i]; a != e { + t.Fatalf("Index %d; Got %q; want %q", i, a, e) + } + } + } + + // Close the reader and wait. This should cause the runner to exit + if err := r.Close(); err != nil { + t.Fatalf("failed to close reader") + } + + sf.Destroy() + if !wrappedW.Closed { + t.Fatalf("writer not closed") + } +} + func TestHTTP_Stream_MissingParams(t *testing.T) { httpTest(t, nil, func(s *TestServer) { req, err := http.NewRequest("GET", "/v1/client/fs/stream/", nil) @@ -467,7 +564,7 @@ func TestHTTP_Stream_NoFile(t *testing.T) { ad := tempAllocDir(t) defer os.RemoveAll(ad.AllocDir) - framer := NewStreamFramer(nopWriteCloser{ioutil.Discard}, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer := NewStreamFramer(nopWriteCloser{ioutil.Discard}, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize) framer.Run() defer framer.Destroy() @@ -526,7 +623,7 @@ func TestHTTP_Stream_Modify(t *testing.T) { t.Fatalf("write failed: %v", err) } - framer := NewStreamFramer(w, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer := NewStreamFramer(w, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize) framer.Run() defer framer.Destroy() @@ -607,7 +704,7 @@ func TestHTTP_Stream_Truncate(t *testing.T) { t.Fatalf("write failed: %v", err) } - framer := NewStreamFramer(w, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer := NewStreamFramer(w, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize) framer.Run() defer framer.Destroy() @@ -710,7 +807,7 @@ func TestHTTP_Stream_Delete(t *testing.T) { t.Fatalf("write failed: %v", err) } - framer := NewStreamFramer(wrappedW, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer := NewStreamFramer(wrappedW, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize) framer.Run() // Start streaming @@ -804,7 +901,7 @@ func TestHTTP_Logs_NoFollow(t *testing.T) { // Start streaming logs go func() { - if err := s.Server.logs(false, 0, OriginStart, task, logType, ad, wrappedW); err != nil { + if err := s.Server.logs(false, false, 0, OriginStart, task, logType, ad, wrappedW); err != nil { t.Fatalf("logs() failed: %v", err) } }() @@ -893,7 +990,7 @@ func TestHTTP_Logs_Follow(t *testing.T) { // Start streaming logs go func() { - if err := s.Server.logs(true, 0, OriginStart, task, logType, ad, wrappedW); err != nil { + if err := s.Server.logs(true, false, 0, OriginStart, task, logType, ad, wrappedW); err != nil { t.Fatalf("logs() failed: %v", err) } }() @@ -1006,7 +1103,7 @@ func BenchmarkHTTP_Logs_Follow(t *testing.B) { // Start streaming logs go func() { - if err := s.Server.logs(true, 0, OriginStart, task, logType, ad, wrappedW); err != nil { + if err := s.Server.logs(true, false, 0, OriginStart, task, logType, ad, wrappedW); err != nil { t.Fatalf("logs() failed: %v", err) } }() diff --git a/website/source/docs/http/client-fs.html.md b/website/source/docs/http/client-fs.html.md index 2278c6d1a84..0c6c7140b1e 100644 --- a/website/source/docs/http/client-fs.html.md +++ b/website/source/docs/http/client-fs.html.md @@ -227,6 +227,11 @@ allocation was placed. Origin can be either "start" or "end" and applies the offset relative to either the start or end of the logs respectively. Defaults to "start". +