diff --git a/cmd/oras/internal/display/status/progress/manager.go b/cmd/oras/internal/display/status/progress/manager.go index 09929156a..1b6204e51 100644 --- a/cmd/oras/internal/display/status/progress/manager.go +++ b/cmd/oras/internal/display/status/progress/manager.go @@ -34,12 +34,9 @@ const ( var errManagerStopped = errors.New("progress output manager has already been stopped") -// Status is print message channel -type Status chan *status - // Manager is progress view master type Manager interface { - Add() (Status, error) + Add() (*Messenger, error) SendAndStop(desc ocispec.Descriptor, prompt string) error Close() error } @@ -107,7 +104,7 @@ func (m *manager) render() { } // Add appends a new status with 2-line space for rendering. -func (m *manager) Add() (Status, error) { +func (m *manager) Add() (*Messenger, error) { if m.closed() { return nil, errManagerStopped } @@ -124,26 +121,25 @@ func (m *manager) Add() (Status, error) { // SendAndStop send message for descriptor and stop timing. func (m *manager) SendAndStop(desc ocispec.Descriptor, prompt string) error { - status, err := m.Add() + messenger, err := m.Add() if err != nil { return err } - defer close(status) - status <- NewStatusMessage(prompt, desc, desc.Size) - status <- EndTiming() + messenger.Send(prompt, desc, desc.Size) + messenger.Stop() return nil } -func (m *manager) statusChan(s *status) Status { +func (m *manager) statusChan(s *status) *Messenger { ch := make(chan *status, BufferSize) m.updating.Add(1) go func() { defer m.updating.Done() for newStatus := range ch { - s.Update(newStatus) + s.update(newStatus) } }() - return ch + return &Messenger{ch: ch} } // Close stops all status and waits for updating and rendering. diff --git a/cmd/oras/internal/display/status/progress/messenger.go b/cmd/oras/internal/display/status/progress/messenger.go new file mode 100644 index 000000000..150b120c0 --- /dev/null +++ b/cmd/oras/internal/display/status/progress/messenger.go @@ -0,0 +1,55 @@ +/* +Copyright The ORAS Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package progress + +import ( + ocispec "github.com/opencontainers/image-spec/specs-go/v1" +) + +// Messenger is progress message channel +type Messenger struct { + ch chan *status +} + +func (sm *Messenger) Start() { + if sm.ch == nil { + return + } + sm.ch <- startTiming() +} + +func (sm *Messenger) Send(prompt string, descriptor ocispec.Descriptor, offset int64) { + for { + if sm.ch == nil { + return + } + select { + case sm.ch <- newStatusMessage(prompt, descriptor, offset): + return + case <-sm.ch: + // purge the channel until successfully pushed + } + } +} + +func (sm *Messenger) Stop() { + if sm.ch == nil { + return + } + sm.ch <- endTiming() + close(sm.ch) + sm.ch = nil +} diff --git a/cmd/oras/internal/display/status/progress/messenger_test.go b/cmd/oras/internal/display/status/progress/messenger_test.go new file mode 100644 index 000000000..81b308ddf --- /dev/null +++ b/cmd/oras/internal/display/status/progress/messenger_test.go @@ -0,0 +1,113 @@ +/* +Copyright The ORAS Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package progress + +import ( + v1 "github.com/opencontainers/image-spec/specs-go/v1" + "testing" +) + +func Test_Messenger(t *testing.T) { + var msg *status + ch := make(chan *status, BufferSize) + messenger := &Messenger{ch: ch} + + messenger.Start() + select { + case msg = <-ch: + if msg.offset != -1 { + t.Errorf("Expected start message with offset -1, got %d", msg.offset) + } + default: + t.Error("Expected start message") + } + + desc := v1.Descriptor{ + Digest: "mouse", + Size: 100, + } + expected := int64(50) + messenger.Send("Reading", desc, expected) + select { + case msg = <-ch: + if msg.offset != expected { + t.Errorf("Expected status message with offset %d, got %d", expected, msg.offset) + } + if msg.prompt != "Reading" { + t.Errorf("Expected status message prompt Reading, got %s", msg.prompt) + } + default: + t.Error("Expected status message") + } + + messenger.Send("Reading", desc, expected) + messenger.Send("Read", desc, desc.Size) + select { + case msg = <-ch: + if msg.offset != desc.Size { + t.Errorf("Expected status message with offset %d, got %d", expected, msg.offset) + } + if msg.prompt != "Read" { + t.Errorf("Expected status message prompt Read, got %s", msg.prompt) + } + default: + t.Error("Expected status message") + } + select { + case msg = <-ch: + t.Errorf("Unexpected status message %v", msg) + default: + } + + expected = int64(-1) + messenger.Stop() + select { + case msg = <-ch: + if msg.offset != expected { + t.Errorf("Expected END status message with offset %d, got %d", expected, msg.offset) + } + default: + t.Error("Expected END status message") + } + + messenger.Send("Skipping", desc, desc.Size) + select { + case msg = <-ch: + if msg != nil { + t.Errorf("Unexpected status message %v", msg) + } + default: + } + + messenger.Stop() + select { + case msg = <-ch: + if msg != nil { + t.Errorf("Unexpected status message %v", msg) + } + default: + } + + messenger.Start() + messenger.Send("Skipping", desc, desc.Size) + select { + case msg = <-ch: + if msg != nil { + t.Errorf("Unexpected status message %v", msg) + } + default: + } +} diff --git a/cmd/oras/internal/display/status/progress/status.go b/cmd/oras/internal/display/status/progress/status.go index b2abb6ee4..469f845bf 100644 --- a/cmd/oras/internal/display/status/progress/status.go +++ b/cmd/oras/internal/display/status/progress/status.go @@ -65,8 +65,8 @@ func newStatus() *status { } } -// NewStatusMessage generates a status for messaging. -func NewStatusMessage(prompt string, descriptor ocispec.Descriptor, offset int64) *status { +// newStatusMessage generates a status for messaging. +func newStatusMessage(prompt string, descriptor ocispec.Descriptor, offset int64) *status { return &status{ prompt: prompt, descriptor: descriptor, @@ -74,16 +74,16 @@ func NewStatusMessage(prompt string, descriptor ocispec.Descriptor, offset int64 } } -// StartTiming starts timing. -func StartTiming() *status { +// startTiming starts timing. +func startTiming() *status { return &status{ offset: -1, startTime: time.Now(), } } -// EndTiming ends timing and set status to done. -func EndTiming() *status { +// endTiming ends timing and set status to done. +func endTiming() *status { return &status{ offset: -1, endTime: time.Now(), @@ -121,7 +121,7 @@ func (s *status) String(width int) (string, string) { percent = 1 default: // 0% ~ 99%, show 2-digit precision if total != 0 && s.offset >= 0 { - // percentage calculatable + // calculate percentage percent = float64(s.offset) / float64(total) } offset = fmt.Sprintf("%.2f", humanize.RoundTo(s.total.Size*percent)) @@ -190,8 +190,7 @@ func (s *status) durationString() string { return d.String() } -// Update updates a status. -func (s *status) Update(n *status) { +func (s *status) update(n *status) { s.lock.Lock() defer s.lock.Unlock() diff --git a/cmd/oras/internal/display/status/progress/status_test.go b/cmd/oras/internal/display/status/progress/status_test.go index 417ffb45e..bbcfbf8c0 100644 --- a/cmd/oras/internal/display/status/progress/status_test.go +++ b/cmd/oras/internal/display/status/progress/status_test.go @@ -35,7 +35,7 @@ func Test_status_String(t *testing.T) { } // not done - s.Update(&status{ + s.update(&status{ prompt: "test", descriptor: ocispec.Descriptor{ MediaType: "application/vnd.oci.empty.oras.test.v1+json", @@ -57,7 +57,7 @@ func Test_status_String(t *testing.T) { t.Error(err) } // done - s.Update(&status{ + s.update(&status{ endTime: time.Now(), offset: s.descriptor.Size, descriptor: s.descriptor, @@ -76,7 +76,7 @@ func Test_status_String_zeroWitdth(t *testing.T) { } // not done - s.Update(&status{ + s.update(&status{ prompt: "test", descriptor: ocispec.Descriptor{ MediaType: "application/vnd.oci.empty.oras.test.v1+json", @@ -93,7 +93,7 @@ func Test_status_String_zeroWitdth(t *testing.T) { t.Error(err) } // done - s.Update(&status{ + s.update(&status{ endTime: time.Now(), offset: s.descriptor.Size, descriptor: s.descriptor, diff --git a/cmd/oras/internal/display/status/track/reader.go b/cmd/oras/internal/display/status/track/reader.go index 28f647d36..93919381f 100644 --- a/cmd/oras/internal/display/status/track/reader.go +++ b/cmd/oras/internal/display/status/track/reader.go @@ -30,7 +30,7 @@ type reader struct { donePrompt string descriptor ocispec.Descriptor manager progress.Manager - status progress.Status + messenger *progress.Messenger } // NewReader returns a new reader with tracked progress. @@ -43,7 +43,7 @@ func NewReader(r io.Reader, descriptor ocispec.Descriptor, actionPrompt string, } func managedReader(r io.Reader, descriptor ocispec.Descriptor, manager progress.Manager, actionPrompt string, donePrompt string) (*reader, error) { - ch, err := manager.Add() + messenger, err := manager.Add() if err != nil { return nil, err } @@ -54,11 +54,11 @@ func managedReader(r io.Reader, descriptor ocispec.Descriptor, manager progress. actionPrompt: actionPrompt, donePrompt: donePrompt, manager: manager, - status: ch, + messenger: messenger, }, nil } -// StopManager stops the status channel and related manager. +// StopManager stops the messenger channel and related manager. func (r *reader) StopManager() { r.Close() _ = r.manager.Close() @@ -66,18 +66,18 @@ func (r *reader) StopManager() { // Done sends message to mark the tracked progress as complete. func (r *reader) Done() { - r.status <- progress.NewStatusMessage(r.donePrompt, r.descriptor, r.descriptor.Size) - r.status <- progress.EndTiming() + r.messenger.Send(r.donePrompt, r.descriptor, r.descriptor.Size) + r.messenger.Stop() } // Close closes the update channel. func (r *reader) Close() { - close(r.status) + r.messenger.Stop() } -// Start sends the start timing to the status channel. +// Start sends the start timing to the messenger channel. func (r *reader) Start() { - r.status <- progress.StartTiming() + r.messenger.Start() } // Read reads from the underlying reader and updates the progress. @@ -93,12 +93,6 @@ func (r *reader) Read(p []byte) (int, error) { return n, io.ErrUnexpectedEOF } } - for { - select { - case r.status <- progress.NewStatusMessage(r.actionPrompt, r.descriptor, r.offset): - // purge the channel until successfully pushed - return n, err - case <-r.status: - } - } + r.messenger.Send(r.actionPrompt, r.descriptor, r.offset) + return n, err }