From 8b387e73eeda3041926a09d9598a99cb1b1ed139 Mon Sep 17 00:00:00 2001 From: Anbraten <6918444+anbraten@users.noreply.github.com> Date: Thu, 13 Jun 2024 17:18:32 +0200 Subject: [PATCH] Improve step logging (#3722) --- agent/logger.go | 22 +- cli/exec/exec.go | 7 +- cli/exec/line.go | 44 +-- pipeline/log/line_writer.go | 79 ++++++ pipeline/log/line_writer_test.go | 58 ++++ pipeline/log/utils.go | 62 +++++ pipeline/log/utils_test.go | 146 ++++++++++ pipeline/rpc/log_entry.go | 63 +---- pipeline/rpc/log_entry_test.go | 2 +- pipeline/rpc/mocks/peer.go | 261 ++++++++++++++++++ pipeline/rpc/peer.go | 2 + pipeline/rpc/proto/version.go | 2 +- pipeline/rpc/proto/woodpecker.pb.go | 12 +- pipeline/rpc/proto/woodpecker.proto | 2 +- pipeline/rpc/proto/woodpecker_grpc.pb.go | 65 +++-- server/api/stream.go | 9 +- server/grpc/rpc.go | 2 +- .../components/repo/pipeline/PipelineLog.vue | 28 +- 18 files changed, 716 insertions(+), 150 deletions(-) create mode 100644 pipeline/log/line_writer.go create mode 100644 pipeline/log/line_writer_test.go create mode 100644 pipeline/log/utils.go create mode 100644 pipeline/log/utils_test.go create mode 100644 pipeline/rpc/mocks/peer.go diff --git a/agent/logger.go b/agent/logger.go index 894f044e82..89ccaf3a55 100644 --- a/agent/logger.go +++ b/agent/logger.go @@ -19,16 +19,22 @@ import ( "sync" "github.com/rs/zerolog" - "github.com/rs/zerolog/log" "go.woodpecker-ci.org/woodpecker/v2/pipeline" backend "go.woodpecker-ci.org/woodpecker/v2/pipeline/backend/types" + "go.woodpecker-ci.org/woodpecker/v2/pipeline/log" "go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc" ) -func (r *Runner) createLogger(logger zerolog.Logger, uploads *sync.WaitGroup, workflow *rpc.Workflow) pipeline.Logger { +const ( + // Store not more than 1mb in a log-line as 4mb is the limit of a grpc message + // and log-lines needs to be parsed by the browsers later on. + maxLogLineLength = 1024 * 1024 // 1mb +) + +func (r *Runner) createLogger(_logger zerolog.Logger, uploads *sync.WaitGroup, workflow *rpc.Workflow) pipeline.Logger { return func(step *backend.Step, rc io.Reader) error { - logLogger := logger.With(). + logger := _logger.With(). Str("image", step.Image). Str("workflowID", workflow.ID). Logger() @@ -40,14 +46,14 @@ func (r *Runner) createLogger(logger zerolog.Logger, uploads *sync.WaitGroup, wo secrets = append(secrets, secret.Value) } - logLogger.Debug().Msg("log stream opened") + logger.Debug().Msg("log stream opened") - logStream := rpc.NewLineWriter(r.client, step.UUID, secrets...) - if _, err := io.Copy(logStream, rc); err != nil { - log.Error().Err(err).Msg("copy limited logStream part") + logStream := log.NewLineWriter(r.client, step.UUID, secrets...) + if err := log.CopyLineByLine(logStream, rc, maxLogLineLength); err != nil { + logger.Error().Err(err).Msg("copy limited logStream part") } - logLogger.Debug().Msg("log stream copied, close ...") + logger.Debug().Msg("log stream copied, close ...") uploads.Done() return nil diff --git a/cli/exec/exec.go b/cli/exec/exec.go index 3b9e4d5639..68db9aabef 100644 --- a/cli/exec/exec.go +++ b/cli/exec/exec.go @@ -39,6 +39,7 @@ import ( "go.woodpecker-ci.org/woodpecker/v2/pipeline/frontend/yaml/compiler" "go.woodpecker-ci.org/woodpecker/v2/pipeline/frontend/yaml/linter" "go.woodpecker-ci.org/woodpecker/v2/pipeline/frontend/yaml/matrix" + pipelineLog "go.woodpecker-ci.org/woodpecker/v2/pipeline/log" "go.woodpecker-ci.org/woodpecker/v2/shared/utils" ) @@ -273,8 +274,8 @@ func convertPathForWindows(path string) string { return filepath.ToSlash(path) } +const maxLogLineLength = 1024 * 1024 // 1mb var defaultLogger = pipeline.Logger(func(step *backendTypes.Step, rc io.Reader) error { - logStream := NewLineWriter(step.Name, step.UUID) - _, err := io.Copy(logStream, rc) - return err + logWriter := NewLineWriter(step.Name, step.UUID) + return pipelineLog.CopyLineByLine(logWriter, rc, maxLogLineLength) }) diff --git a/cli/exec/line.go b/cli/exec/line.go index 1197209812..d1ddc6d178 100644 --- a/cli/exec/line.go +++ b/cli/exec/line.go @@ -16,50 +16,34 @@ package exec import ( "fmt" + "io" "os" - "strings" "time" - - "go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc" ) // LineWriter sends logs to the client. type LineWriter struct { - stepName string - stepUUID string - num int - now time.Time - rep *strings.Replacer - lines []*rpc.LogEntry + stepName string + stepUUID string + num int + startTime time.Time } // NewLineWriter returns a new line reader. -func NewLineWriter(stepName, stepUUID string) *LineWriter { +func NewLineWriter(stepName, stepUUID string) io.WriteCloser { return &LineWriter{ - stepName: stepName, - stepUUID: stepUUID, - now: time.Now().UTC(), + stepName: stepName, + stepUUID: stepUUID, + startTime: time.Now().UTC(), } } func (w *LineWriter) Write(p []byte) (n int, err error) { - data := string(p) - if w.rep != nil { - data = w.rep.Replace(data) - } - - line := &rpc.LogEntry{ - Data: data, - StepUUID: w.stepUUID, - Line: w.num, - Time: int64(time.Since(w.now).Seconds()), - Type: rpc.LogEntryStdout, - } - - fmt.Fprintf(os.Stderr, "[%s:L%d:%ds] %s", w.stepName, w.num, int64(time.Since(w.now).Seconds()), data) - + fmt.Fprintf(os.Stderr, "[%s:L%d:%ds] %s", w.stepName, w.num, int64(time.Since(w.startTime).Seconds()), p) w.num++ - - w.lines = append(w.lines, line) return len(p), nil } + +func (w *LineWriter) Close() error { + return nil +} diff --git a/pipeline/log/line_writer.go b/pipeline/log/line_writer.go new file mode 100644 index 0000000000..e2a6fd2e48 --- /dev/null +++ b/pipeline/log/line_writer.go @@ -0,0 +1,79 @@ +// Copyright 2022 Woodpecker Authors +// Copyright 2011 Drone.IO Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package log + +import ( + "context" + "io" + "strings" + "sync" + "time" + + "github.com/rs/zerolog/log" + + "go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc" + "go.woodpecker-ci.org/woodpecker/v2/pipeline/shared" +) + +// LineWriter sends logs to the client. +type LineWriter struct { + sync.Mutex + + peer rpc.Peer + stepUUID string + num int + startTime time.Time + replacer *strings.Replacer +} + +// NewLineWriter returns a new line reader. +func NewLineWriter(peer rpc.Peer, stepUUID string, secret ...string) io.WriteCloser { + lw := &LineWriter{ + peer: peer, + stepUUID: stepUUID, + startTime: time.Now().UTC(), + replacer: shared.NewSecretsReplacer(secret), + } + return lw +} + +func (w *LineWriter) Write(p []byte) (n int, err error) { + data := string(p) + if w.replacer != nil { + data = w.replacer.Replace(data) + } + log.Trace().Str("step-uuid", w.stepUUID).Msgf("grpc write line: %s", data) + + line := &rpc.LogEntry{ + Data: []byte(strings.TrimSuffix(data, "\n")), // remove trailing newline + StepUUID: w.stepUUID, + Time: int64(time.Since(w.startTime).Seconds()), + Type: rpc.LogEntryStdout, + Line: w.num, + } + + w.num++ + + if err := w.peer.Log(context.Background(), line); err != nil { + return 0, err + } + + return len(data), nil +} + +func (w *LineWriter) Close() error { + return nil +} diff --git a/pipeline/log/line_writer_test.go b/pipeline/log/line_writer_test.go new file mode 100644 index 0000000000..669f759733 --- /dev/null +++ b/pipeline/log/line_writer_test.go @@ -0,0 +1,58 @@ +// Copyright 2019 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package log_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "go.woodpecker-ci.org/woodpecker/v2/pipeline/log" + "go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc" + "go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc/mocks" +) + +func TestLineWriter(t *testing.T) { + peer := mocks.NewPeer(t) + peer.On("Log", mock.Anything, mock.Anything).Return(nil) + + secrets := []string{"world"} + lw := log.NewLineWriter(peer, "e9ea76a5-44a1-4059-9c4a-6956c478b26d", secrets...) + defer lw.Close() + + _, err := lw.Write([]byte("hello world\n")) + assert.NoError(t, err) + _, err = lw.Write([]byte("the previous line had no newline at the end")) + assert.NoError(t, err) + + peer.AssertCalled(t, "Log", mock.Anything, &rpc.LogEntry{ + StepUUID: "e9ea76a5-44a1-4059-9c4a-6956c478b26d", + Time: 0, + Type: rpc.LogEntryStdout, + Line: 0, + Data: []byte("hello ********"), + }) + + peer.AssertCalled(t, "Log", mock.Anything, &rpc.LogEntry{ + StepUUID: "e9ea76a5-44a1-4059-9c4a-6956c478b26d", + Time: 0, + Type: rpc.LogEntryStdout, + Line: 1, + Data: []byte("the previous line had no newline at the end"), + }) + + peer.AssertExpectations(t) +} diff --git a/pipeline/log/utils.go b/pipeline/log/utils.go new file mode 100644 index 0000000000..ade772c82d --- /dev/null +++ b/pipeline/log/utils.go @@ -0,0 +1,62 @@ +// Copyright 2024 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package log + +import ( + "bufio" + "errors" + "io" +) + +func writeChunks(dst io.WriteCloser, data []byte, size int) error { + if len(data) <= size { + _, err := dst.Write(data) + return err + } + + for len(data) > size { + if _, err := dst.Write(data[:size]); err != nil { + return err + } + data = data[size:] + } + + if len(data) > 0 { + _, err := dst.Write(data) + return err + } + + return nil +} + +func CopyLineByLine(dst io.WriteCloser, src io.Reader, maxSize int) error { + r := bufio.NewReader(src) + defer dst.Close() + + for { + // TODO: read til newline or maxSize directly + line, err := r.ReadBytes('\n') + if errors.Is(err, io.EOF) { + return writeChunks(dst, line, maxSize) + } else if err != nil { + return err + } + + err = writeChunks(dst, line, maxSize) + if err != nil { + return err + } + } +} diff --git a/pipeline/log/utils_test.go b/pipeline/log/utils_test.go new file mode 100644 index 0000000000..2e21b01ef2 --- /dev/null +++ b/pipeline/log/utils_test.go @@ -0,0 +1,146 @@ +// Copyright 2024 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package log_test + +import ( + "io" + "strings" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "go.woodpecker-ci.org/woodpecker/v2/pipeline/log" +) + +type testWriter struct { + *sync.Mutex + writes []string +} + +func (b *testWriter) Write(p []byte) (n int, err error) { + b.Lock() + defer b.Unlock() + b.writes = append(b.writes, string(p)) + return len(p), nil +} + +func (b *testWriter) Close() error { + return nil +} + +func (b *testWriter) GetWrites() []string { + b.Lock() + defer b.Unlock() + w := make([]string, len(b.writes)) + copy(w, b.writes) + return w +} + +func TestCopyLineByLine(t *testing.T) { + r, w := io.Pipe() + + testWriter := &testWriter{ + Mutex: &sync.Mutex{}, + writes: make([]string, 0), + } + + go func() { + err := log.CopyLineByLine(testWriter, r, 1024) + assert.NoError(t, err) + }() + + // wait for the goroutine to start + time.Sleep(time.Millisecond) + + // write 4 bytes without newline + if _, err := w.Write([]byte("1234")); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + writes := testWriter.GetWrites() + assert.Lenf(t, writes, 0, "expected 0 writes, got: %v", writes) + + // write more bytes with newlines + if _, err := w.Write([]byte("5\n678\n90")); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + writes = testWriter.GetWrites() + assert.Lenf(t, writes, 2, "expected 2 writes, got: %v", writes) + + // wait for the goroutine to write the data + time.Sleep(time.Millisecond) + + writtenData := strings.Join(writes, "-") + assert.Equal(t, "12345\n-678\n", writtenData, "unexpected writtenData: %s", writtenData) + + // closing the writer should flush the remaining data + w.Close() + + // wait for the goroutine to finish + time.Sleep(10 * time.Millisecond) + + // the written data contains all the data we wrote + writtenData = strings.Join(testWriter.GetWrites(), "-") + assert.Equal(t, "12345\n-678\n-90", writtenData, "unexpected writtenData: %s", writtenData) +} + +func TestCopyLineByLineSizeLimit(t *testing.T) { + r, w := io.Pipe() + + testWriter := &testWriter{ + Mutex: &sync.Mutex{}, + writes: make([]string, 0), + } + + wg := sync.WaitGroup{} + wg.Add(1) + + go func() { + defer wg.Done() + err := log.CopyLineByLine(testWriter, r, 4) + assert.NoError(t, err) + }() + + // wait for the goroutine to start + time.Sleep(time.Millisecond) + + // write 4 bytes without newline + if _, err := w.Write([]byte("12345")); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + writes := testWriter.GetWrites() + assert.Lenf(t, testWriter.GetWrites(), 0, "expected 0 writes, got: %v", writes) + + // write more bytes + if _, err := w.Write([]byte("67\n89")); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + writes = testWriter.GetWrites() + assert.Lenf(t, testWriter.GetWrites(), 2, "expected 2 writes, got: %v", writes) + + writes = testWriter.GetWrites() + writtenData := strings.Join(writes, "-") + assert.Equal(t, "1234-567\n", writtenData, "unexpected writtenData: %s", writtenData) + + // closing the writer should flush the remaining data + w.Close() + + wg.Wait() +} diff --git a/pipeline/rpc/log_entry.go b/pipeline/rpc/log_entry.go index c489c7640f..23257faaca 100644 --- a/pipeline/rpc/log_entry.go +++ b/pipeline/rpc/log_entry.go @@ -16,14 +16,7 @@ package rpc import ( - "context" "fmt" - "strings" - "time" - - "github.com/rs/zerolog/log" - - "go.woodpecker-ci.org/woodpecker/v2/pipeline/shared" ) // Identifies the type of line in the logs. @@ -41,7 +34,7 @@ type LogEntry struct { Time int64 `json:"time,omitempty"` Type int `json:"type,omitempty"` Line int `json:"line,omitempty"` - Data string `json:"data,omitempty"` + Data []byte `json:"data,omitempty"` } func (l *LogEntry) String() string { @@ -52,57 +45,3 @@ func (l *LogEntry) String() string { return fmt.Sprintf("[%s:L%v:%vs] %s", l.StepUUID, l.Line, l.Time, l.Data) } } - -// LineWriter sends logs to the client. -type LineWriter struct { - peer Peer - stepUUID string - num int - now time.Time - rep *strings.Replacer - lines []*LogEntry -} - -// NewLineWriter returns a new line reader. -func NewLineWriter(peer Peer, stepUUID string, secret ...string) *LineWriter { - return &LineWriter{ - peer: peer, - stepUUID: stepUUID, - now: time.Now().UTC(), - rep: shared.NewSecretsReplacer(secret), - lines: nil, - } -} - -func (w *LineWriter) Write(p []byte) (n int, err error) { - data := string(p) - if w.rep != nil { - data = w.rep.Replace(data) - } - log.Trace().Str("step-uuid", w.stepUUID).Msgf("grpc write line: %s", data) - - line := &LogEntry{ - Data: data, - StepUUID: w.stepUUID, - Time: int64(time.Since(w.now).Seconds()), - Type: LogEntryStdout, - Line: w.num, - } - if err := w.peer.Log(context.Background(), line); err != nil { - log.Error().Err(err).Str("step-uuid", w.stepUUID).Msg("fail to write pipeline log to peer") - } - w.num++ - - w.lines = append(w.lines, line) - return len(p), nil -} - -// Lines returns the line history. -func (w *LineWriter) Lines() []*LogEntry { - return w.lines -} - -// Clear clears the line history. -func (w *LineWriter) Clear() { - w.lines = w.lines[:0] -} diff --git a/pipeline/rpc/log_entry_test.go b/pipeline/rpc/log_entry_test.go index a76fc62446..a5f1d364d0 100644 --- a/pipeline/rpc/log_entry_test.go +++ b/pipeline/rpc/log_entry_test.go @@ -25,7 +25,7 @@ func TestLogEntry(t *testing.T) { StepUUID: "e9ea76a5-44a1-4059-9c4a-6956c478b26d", Time: 60, Line: 1, - Data: "starting redis server", + Data: []byte("starting redis server"), } assert.Equal(t, "[e9ea76a5-44a1-4059-9c4a-6956c478b26d:L1:60s] starting redis server", line.String()) } diff --git a/pipeline/rpc/mocks/peer.go b/pipeline/rpc/mocks/peer.go new file mode 100644 index 0000000000..3409ca35d5 --- /dev/null +++ b/pipeline/rpc/mocks/peer.go @@ -0,0 +1,261 @@ +// Code generated by mockery. DO NOT EDIT. + +package mocks + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" + rpc "go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc" +) + +// Peer is an autogenerated mock type for the Peer type +type Peer struct { + mock.Mock +} + +// Done provides a mock function with given fields: c, id, state +func (_m *Peer) Done(c context.Context, id string, state rpc.State) error { + ret := _m.Called(c, id, state) + + if len(ret) == 0 { + panic("no return value specified for Done") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, rpc.State) error); ok { + r0 = rf(c, id, state) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Extend provides a mock function with given fields: c, id +func (_m *Peer) Extend(c context.Context, id string) error { + ret := _m.Called(c, id) + + if len(ret) == 0 { + panic("no return value specified for Extend") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { + r0 = rf(c, id) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Init provides a mock function with given fields: c, id, state +func (_m *Peer) Init(c context.Context, id string, state rpc.State) error { + ret := _m.Called(c, id, state) + + if len(ret) == 0 { + panic("no return value specified for Init") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, rpc.State) error); ok { + r0 = rf(c, id, state) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Log provides a mock function with given fields: c, logEntry +func (_m *Peer) Log(c context.Context, logEntry *rpc.LogEntry) error { + ret := _m.Called(c, logEntry) + + if len(ret) == 0 { + panic("no return value specified for Log") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *rpc.LogEntry) error); ok { + r0 = rf(c, logEntry) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Next provides a mock function with given fields: c, f +func (_m *Peer) Next(c context.Context, f rpc.Filter) (*rpc.Workflow, error) { + ret := _m.Called(c, f) + + if len(ret) == 0 { + panic("no return value specified for Next") + } + + var r0 *rpc.Workflow + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, rpc.Filter) (*rpc.Workflow, error)); ok { + return rf(c, f) + } + if rf, ok := ret.Get(0).(func(context.Context, rpc.Filter) *rpc.Workflow); ok { + r0 = rf(c, f) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*rpc.Workflow) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, rpc.Filter) error); ok { + r1 = rf(c, f) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// RegisterAgent provides a mock function with given fields: ctx, platform, backend, version, capacity +func (_m *Peer) RegisterAgent(ctx context.Context, platform string, backend string, version string, capacity int) (int64, error) { + ret := _m.Called(ctx, platform, backend, version, capacity) + + if len(ret) == 0 { + panic("no return value specified for RegisterAgent") + } + + var r0 int64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, string, string, int) (int64, error)); ok { + return rf(ctx, platform, backend, version, capacity) + } + if rf, ok := ret.Get(0).(func(context.Context, string, string, string, int) int64); ok { + r0 = rf(ctx, platform, backend, version, capacity) + } else { + r0 = ret.Get(0).(int64) + } + + if rf, ok := ret.Get(1).(func(context.Context, string, string, string, int) error); ok { + r1 = rf(ctx, platform, backend, version, capacity) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ReportHealth provides a mock function with given fields: c +func (_m *Peer) ReportHealth(c context.Context) error { + ret := _m.Called(c) + + if len(ret) == 0 { + panic("no return value specified for ReportHealth") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(c) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// UnregisterAgent provides a mock function with given fields: ctx +func (_m *Peer) UnregisterAgent(ctx context.Context) error { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for UnregisterAgent") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Update provides a mock function with given fields: c, id, state +func (_m *Peer) Update(c context.Context, id string, state rpc.State) error { + ret := _m.Called(c, id, state) + + if len(ret) == 0 { + panic("no return value specified for Update") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, rpc.State) error); ok { + r0 = rf(c, id, state) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Version provides a mock function with given fields: c +func (_m *Peer) Version(c context.Context) (*rpc.Version, error) { + ret := _m.Called(c) + + if len(ret) == 0 { + panic("no return value specified for Version") + } + + var r0 *rpc.Version + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (*rpc.Version, error)); ok { + return rf(c) + } + if rf, ok := ret.Get(0).(func(context.Context) *rpc.Version); ok { + r0 = rf(c) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*rpc.Version) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(c) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Wait provides a mock function with given fields: c, id +func (_m *Peer) Wait(c context.Context, id string) error { + ret := _m.Called(c, id) + + if len(ret) == 0 { + panic("no return value specified for Wait") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { + r0 = rf(c, id) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewPeer creates a new instance of Peer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewPeer(t interface { + mock.TestingT + Cleanup(func()) +}) *Peer { + mock := &Peer{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pipeline/rpc/peer.go b/pipeline/rpc/peer.go index 8c2a5aef42..8025080b55 100644 --- a/pipeline/rpc/peer.go +++ b/pipeline/rpc/peer.go @@ -50,6 +50,8 @@ type ( } ) +//go:generate mockery --name Peer --output mocks --case underscore + // Peer defines a peer-to-peer connection. type Peer interface { // Version returns the server- & grpc-version diff --git a/pipeline/rpc/proto/version.go b/pipeline/rpc/proto/version.go index 9c4bfc91c0..b009af0745 100644 --- a/pipeline/rpc/proto/version.go +++ b/pipeline/rpc/proto/version.go @@ -16,4 +16,4 @@ package proto // Version is the version of the woodpecker.proto file, // IMPORTANT: increased by 1 each time it get changed. -const Version int32 = 7 +const Version int32 = 8 diff --git a/pipeline/rpc/proto/woodpecker.pb.go b/pipeline/rpc/proto/woodpecker.pb.go index c5cf5f47d9..2bda93f065 100644 --- a/pipeline/rpc/proto/woodpecker.pb.go +++ b/pipeline/rpc/proto/woodpecker.pb.go @@ -15,8 +15,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.33.0 -// protoc v4.24.4 +// protoc-gen-go v1.34.1 +// protoc v4.25.1 // source: woodpecker.proto package proto @@ -131,7 +131,7 @@ type LogEntry struct { Time int64 `protobuf:"varint,2,opt,name=time,proto3" json:"time,omitempty"` Line int32 `protobuf:"varint,3,opt,name=line,proto3" json:"line,omitempty"` Type int32 `protobuf:"varint,4,opt,name=type,proto3" json:"type,omitempty"` // 0 = stdout, 1 = stderr, 2 = exit-code, 3 = metadata, 4 = progress - Data string `protobuf:"bytes,5,opt,name=data,proto3" json:"data,omitempty"` + Data []byte `protobuf:"bytes,5,opt,name=data,proto3" json:"data,omitempty"` } func (x *LogEntry) Reset() { @@ -194,11 +194,11 @@ func (x *LogEntry) GetType() int32 { return 0 } -func (x *LogEntry) GetData() string { +func (x *LogEntry) GetData() []byte { if x != nil { return x.Data } - return "" + return nil } type Filter struct { @@ -1109,7 +1109,7 @@ var file_woodpecker_proto_rawDesc = []byte{ 0x0a, 0x04, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x04, 0x6c, 0x69, 0x6e, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x05, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x76, 0x0a, 0x06, 0x46, 0x69, + 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x76, 0x0a, 0x06, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x12, 0x31, 0x0a, 0x06, 0x6c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x2e, 0x4c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, diff --git a/pipeline/rpc/proto/woodpecker.proto b/pipeline/rpc/proto/woodpecker.proto index 8c5120a697..4d752c2436 100644 --- a/pipeline/rpc/proto/woodpecker.proto +++ b/pipeline/rpc/proto/woodpecker.proto @@ -55,7 +55,7 @@ message LogEntry { int64 time = 2; int32 line = 3; int32 type = 4; // 0 = stdout, 1 = stderr, 2 = exit-code, 3 = metadata, 4 = progress - string data = 5; + bytes data = 5; } message Filter { diff --git a/pipeline/rpc/proto/woodpecker_grpc.pb.go b/pipeline/rpc/proto/woodpecker_grpc.pb.go index f591f4dfef..837ce74e09 100644 --- a/pipeline/rpc/proto/woodpecker_grpc.pb.go +++ b/pipeline/rpc/proto/woodpecker_grpc.pb.go @@ -1,7 +1,22 @@ +// Copyright 2021 Woodpecker Authors +// Copyright 2011 Drone.IO Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.24.4 +// - protoc v4.25.1 // source: woodpecker.proto package proto @@ -59,7 +74,7 @@ func NewWoodpeckerClient(cc grpc.ClientConnInterface) WoodpeckerClient { func (c *woodpeckerClient) Version(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*VersionResponse, error) { out := new(VersionResponse) - err := c.cc.Invoke(ctx, "/proto.Woodpecker/Version", in, out, opts...) + err := c.cc.Invoke(ctx, Woodpecker_Version_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -68,7 +83,7 @@ func (c *woodpeckerClient) Version(ctx context.Context, in *Empty, opts ...grpc. func (c *woodpeckerClient) Next(ctx context.Context, in *NextRequest, opts ...grpc.CallOption) (*NextResponse, error) { out := new(NextResponse) - err := c.cc.Invoke(ctx, "/proto.Woodpecker/Next", in, out, opts...) + err := c.cc.Invoke(ctx, Woodpecker_Next_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -77,7 +92,7 @@ func (c *woodpeckerClient) Next(ctx context.Context, in *NextRequest, opts ...gr func (c *woodpeckerClient) Init(ctx context.Context, in *InitRequest, opts ...grpc.CallOption) (*Empty, error) { out := new(Empty) - err := c.cc.Invoke(ctx, "/proto.Woodpecker/Init", in, out, opts...) + err := c.cc.Invoke(ctx, Woodpecker_Init_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -86,7 +101,7 @@ func (c *woodpeckerClient) Init(ctx context.Context, in *InitRequest, opts ...gr func (c *woodpeckerClient) Wait(ctx context.Context, in *WaitRequest, opts ...grpc.CallOption) (*Empty, error) { out := new(Empty) - err := c.cc.Invoke(ctx, "/proto.Woodpecker/Wait", in, out, opts...) + err := c.cc.Invoke(ctx, Woodpecker_Wait_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -95,7 +110,7 @@ func (c *woodpeckerClient) Wait(ctx context.Context, in *WaitRequest, opts ...gr func (c *woodpeckerClient) Done(ctx context.Context, in *DoneRequest, opts ...grpc.CallOption) (*Empty, error) { out := new(Empty) - err := c.cc.Invoke(ctx, "/proto.Woodpecker/Done", in, out, opts...) + err := c.cc.Invoke(ctx, Woodpecker_Done_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -104,7 +119,7 @@ func (c *woodpeckerClient) Done(ctx context.Context, in *DoneRequest, opts ...gr func (c *woodpeckerClient) Extend(ctx context.Context, in *ExtendRequest, opts ...grpc.CallOption) (*Empty, error) { out := new(Empty) - err := c.cc.Invoke(ctx, "/proto.Woodpecker/Extend", in, out, opts...) + err := c.cc.Invoke(ctx, Woodpecker_Extend_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -113,7 +128,7 @@ func (c *woodpeckerClient) Extend(ctx context.Context, in *ExtendRequest, opts . func (c *woodpeckerClient) Update(ctx context.Context, in *UpdateRequest, opts ...grpc.CallOption) (*Empty, error) { out := new(Empty) - err := c.cc.Invoke(ctx, "/proto.Woodpecker/Update", in, out, opts...) + err := c.cc.Invoke(ctx, Woodpecker_Update_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -122,7 +137,7 @@ func (c *woodpeckerClient) Update(ctx context.Context, in *UpdateRequest, opts . func (c *woodpeckerClient) Log(ctx context.Context, in *LogRequest, opts ...grpc.CallOption) (*Empty, error) { out := new(Empty) - err := c.cc.Invoke(ctx, "/proto.Woodpecker/Log", in, out, opts...) + err := c.cc.Invoke(ctx, Woodpecker_Log_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -131,7 +146,7 @@ func (c *woodpeckerClient) Log(ctx context.Context, in *LogRequest, opts ...grpc func (c *woodpeckerClient) RegisterAgent(ctx context.Context, in *RegisterAgentRequest, opts ...grpc.CallOption) (*RegisterAgentResponse, error) { out := new(RegisterAgentResponse) - err := c.cc.Invoke(ctx, "/proto.Woodpecker/RegisterAgent", in, out, opts...) + err := c.cc.Invoke(ctx, Woodpecker_RegisterAgent_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -149,7 +164,7 @@ func (c *woodpeckerClient) UnregisterAgent(ctx context.Context, in *Empty, opts func (c *woodpeckerClient) ReportHealth(ctx context.Context, in *ReportHealthRequest, opts ...grpc.CallOption) (*Empty, error) { out := new(Empty) - err := c.cc.Invoke(ctx, "/proto.Woodpecker/ReportHealth", in, out, opts...) + err := c.cc.Invoke(ctx, Woodpecker_ReportHealth_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -234,7 +249,7 @@ func _Woodpecker_Version_Handler(srv interface{}, ctx context.Context, dec func( } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/proto.Woodpecker/Version", + FullMethod: Woodpecker_Version_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(WoodpeckerServer).Version(ctx, req.(*Empty)) @@ -252,7 +267,7 @@ func _Woodpecker_Next_Handler(srv interface{}, ctx context.Context, dec func(int } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/proto.Woodpecker/Next", + FullMethod: Woodpecker_Next_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(WoodpeckerServer).Next(ctx, req.(*NextRequest)) @@ -270,7 +285,7 @@ func _Woodpecker_Init_Handler(srv interface{}, ctx context.Context, dec func(int } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/proto.Woodpecker/Init", + FullMethod: Woodpecker_Init_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(WoodpeckerServer).Init(ctx, req.(*InitRequest)) @@ -288,7 +303,7 @@ func _Woodpecker_Wait_Handler(srv interface{}, ctx context.Context, dec func(int } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/proto.Woodpecker/Wait", + FullMethod: Woodpecker_Wait_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(WoodpeckerServer).Wait(ctx, req.(*WaitRequest)) @@ -306,7 +321,7 @@ func _Woodpecker_Done_Handler(srv interface{}, ctx context.Context, dec func(int } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/proto.Woodpecker/Done", + FullMethod: Woodpecker_Done_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(WoodpeckerServer).Done(ctx, req.(*DoneRequest)) @@ -324,7 +339,7 @@ func _Woodpecker_Extend_Handler(srv interface{}, ctx context.Context, dec func(i } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/proto.Woodpecker/Extend", + FullMethod: Woodpecker_Extend_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(WoodpeckerServer).Extend(ctx, req.(*ExtendRequest)) @@ -342,7 +357,7 @@ func _Woodpecker_Update_Handler(srv interface{}, ctx context.Context, dec func(i } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/proto.Woodpecker/Update", + FullMethod: Woodpecker_Update_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(WoodpeckerServer).Update(ctx, req.(*UpdateRequest)) @@ -360,7 +375,7 @@ func _Woodpecker_Log_Handler(srv interface{}, ctx context.Context, dec func(inte } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/proto.Woodpecker/Log", + FullMethod: Woodpecker_Log_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(WoodpeckerServer).Log(ctx, req.(*LogRequest)) @@ -378,7 +393,7 @@ func _Woodpecker_RegisterAgent_Handler(srv interface{}, ctx context.Context, dec } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/proto.Woodpecker/RegisterAgent", + FullMethod: Woodpecker_RegisterAgent_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(WoodpeckerServer).RegisterAgent(ctx, req.(*RegisterAgentRequest)) @@ -414,7 +429,7 @@ func _Woodpecker_ReportHealth_Handler(srv interface{}, ctx context.Context, dec } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/proto.Woodpecker/ReportHealth", + FullMethod: Woodpecker_ReportHealth_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(WoodpeckerServer).ReportHealth(ctx, req.(*ReportHealthRequest)) @@ -478,6 +493,10 @@ var Woodpecker_ServiceDesc = grpc.ServiceDesc{ Metadata: "woodpecker.proto", } +const ( + WoodpeckerAuth_Auth_FullMethodName = "/proto.WoodpeckerAuth/Auth" +) + // WoodpeckerAuthClient is the client API for WoodpeckerAuth service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. @@ -495,7 +514,7 @@ func NewWoodpeckerAuthClient(cc grpc.ClientConnInterface) WoodpeckerAuthClient { func (c *woodpeckerAuthClient) Auth(ctx context.Context, in *AuthRequest, opts ...grpc.CallOption) (*AuthResponse, error) { out := new(AuthResponse) - err := c.cc.Invoke(ctx, "/proto.WoodpeckerAuth/Auth", in, out, opts...) + err := c.cc.Invoke(ctx, WoodpeckerAuth_Auth_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -540,7 +559,7 @@ func _WoodpeckerAuth_Auth_Handler(srv interface{}, ctx context.Context, dec func } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/proto.WoodpeckerAuth/Auth", + FullMethod: WoodpeckerAuth_Auth_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(WoodpeckerAuthServer).Auth(ctx, req.(*AuthRequest)) diff --git a/server/api/stream.go b/server/api/stream.go index cd187681cb..2d3c7ee863 100644 --- a/server/api/stream.go +++ b/server/api/stream.go @@ -186,7 +186,7 @@ func LogStreamSSE(c *gin.Context) { return } - if step.State != model.StatusRunning { + if step.State != model.StatusPending && step.State != model.StatusRunning { log.Debug().Msg("step not running (anymore).") logWriteStringErr(io.WriteString(rw, "event: error\ndata: step not running (anymore)\n\n")) return @@ -205,6 +205,13 @@ func LogStreamSSE(c *gin.Context) { log.Debug().Msg("log stream: connection closed") }() + err = server.Config.Services.Logs.Open(ctx, step.ID) + if err != nil { + log.Error().Err(err).Msg("log stream: open failed") + logWriteStringErr(io.WriteString(rw, "event: error\ndata: can't open stream\n\n")) + return + } + go func() { err := server.Config.Services.Logs.Tail(ctx, step.ID, func(entries ...*model.LogEntry) { for _, entry := range entries { diff --git a/server/grpc/rpc.go b/server/grpc/rpc.go index 9e585a56f1..94cda672ca 100644 --- a/server/grpc/rpc.go +++ b/server/grpc/rpc.go @@ -328,7 +328,7 @@ func (s *RPC) Log(c context.Context, _logEntry *rpc.LogEntry) error { StepID: step.ID, Time: _logEntry.Time, Line: _logEntry.Line, - Data: []byte(_logEntry.Data), + Data: _logEntry.Data, Type: model.LogEntryType(_logEntry.Type), } // make sure writes to pubsub are non blocking (https://github.com/woodpecker-ci/woodpecker/blob/c919f32e0b6432a95e1a6d3d0ad662f591adf73f/server/logging/log.go#L9) diff --git a/web/src/components/repo/pipeline/PipelineLog.vue b/web/src/components/repo/pipeline/PipelineLog.vue index a403e55f44..6fa42fc79b 100644 --- a/web/src/components/repo/pipeline/PipelineLog.vue +++ b/web/src/components/repo/pipeline/PipelineLog.vue @@ -112,7 +112,7 @@ import { useStorage } from '@vueuse/core'; import { AnsiUp } from 'ansi_up'; import { decode } from 'js-base64'; import { debounce } from 'lodash'; -import { computed, inject, nextTick, onMounted, ref, toRef, watch, type Ref } from 'vue'; +import { computed, inject, nextTick, onBeforeUnmount, onMounted, ref, toRef, watch, type Ref } from 'vue'; import { useI18n } from 'vue-i18n'; import { useRoute } from 'vue-router'; @@ -126,7 +126,7 @@ import { findStep, isStepFinished, isStepRunning } from '~/utils/helpers'; interface LogLine { index: number; number: number; - text: string; + text?: string; time?: number; type: 'error' | 'warning' | null; } @@ -184,7 +184,7 @@ function writeLog(line: Partial) { logBuffer.value.push({ index: line.index ?? 0, number: (line.index ?? 0) + 1, - text: ansiUp.value.ansi_to_html(line.text ?? ''), + text: ansiUp.value.ansi_to_html(decode(line.text ?? '')), time: line.time ?? 0, type: null, // TODO: implement way to detect errors and warnings }); @@ -277,18 +277,16 @@ async function loadLogs() { return; } + if (!repo) { + throw new Error('Unexpected: "repo" should be provided at this place'); + } + log.value = undefined; logBuffer.value = []; ansiUp.value = new AnsiUp(); ansiUp.value.use_classes = true; - if (!repo) { - throw new Error('Unexpected: "repo" should be provided at this place'); - } - - if (stream.value) { - stream.value.close(); - } + stream.value?.close(); if (!hasLogs.value || !step.value) { return; @@ -297,12 +295,12 @@ async function loadLogs() { if (isStepFinished(step.value)) { loadedStepSlug.value = stepSlug.value; const logs = await apiClient.getLogs(repo.value.id, pipeline.value.number, step.value.id); - logs?.forEach((line) => writeLog({ index: line.line, text: decode(line.data), time: line.time })); + logs?.forEach((line) => writeLog({ index: line.line, text: line.data, time: line.time })); flushLogs(false); - } else if (isStepRunning(step.value)) { + } else if (step.value.state === 'pending' || isStepRunning(step.value)) { loadedStepSlug.value = stepSlug.value; stream.value = apiClient.streamLogs(repo.value.id, pipeline.value.number, step.value.id, (line) => { - writeLog({ index: line.line, text: decode(line.data), time: line.time }); + writeLog({ index: line.line, text: line.data, time: line.time }); flushLogs(true); }); } @@ -331,6 +329,10 @@ onMounted(async () => { await loadLogs(); }); +onBeforeUnmount(() => { + stream.value?.close(); +}); + watch(stepSlug, async () => { await loadLogs(); });