Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: adjust data stream from pouch pull api #1586

Merged
merged 1 commit into from
Jul 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apis/server/image_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (s *Server) pullImage(ctx context.Context, rw http.ResponseWriter, req *htt
}
}
// Error information has be sent to client, so no need call resp.Write
if err := s.ImageMgr.PullImage(ctx, image, &authConfig, rw); err != nil {
if err := s.ImageMgr.PullImage(ctx, image, &authConfig, newWriteFlusher(rw)); err != nil {
logrus.Errorf("failed to pull image %s: %v", image, err)
return nil
}
Expand Down
112 changes: 45 additions & 67 deletions cli/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/alibaba/pouch/apis/types"
"github.com/alibaba/pouch/client"
"github.com/alibaba/pouch/credential"
"github.com/alibaba/pouch/ctrd"
"github.com/alibaba/pouch/pkg/jsonstream"
"github.com/alibaba/pouch/pkg/reference"

"github.com/containerd/containerd/progress"
Expand Down Expand Up @@ -93,62 +93,72 @@ func showProgress(body io.ReadCloser) error {
output = progress.NewWriter(os.Stdout)
}

dec := json.NewDecoder(body)
if _, err := dec.Token(); err != nil {
return fmt.Errorf("failed to read the opening token: %v", err)
}
pos := make(map[string]int)
status := []jsonstream.JSONMessage{}

refStatus := make(map[string]string)
for dec.More() {
var infos []ctrd.ProgressInfo
dec := json.NewDecoder(body)
for {
var (
msg jsonstream.JSONMessage
msgs []jsonstream.JSONMessage
)

if err := dec.Decode(&msg); err != nil {
if err == io.EOF {
break
}
return err
}

if err := dec.Decode(&infos); err != nil {
return fmt.Errorf("failed to decode: %v", err)
change := true
if _, ok := pos[msg.ID]; !ok {
status = append(status, msg)
pos[msg.ID] = len(status) - 1
} else {
change = (status[pos[msg.ID]].Status != msg.Status)
status[pos[msg.ID]] = msg
}

// only display the new status if the stdout is not terminal
if !isTerminal {
newInfos := make([]ctrd.ProgressInfo, 0)
for i, info := range infos {
old, ok := refStatus[info.Ref]
if !ok || info.Status != old {
refStatus[info.Ref] = info.Status
newInfos = append(newInfos, infos[i])
}
// if the status doesn't change, skip to avoid duplicate status
if !change {
continue
}

infos = newInfos
msgs = []jsonstream.JSONMessage{msg}
} else {
msgs = status
}

if err := displayProgressInfos(output, isTerminal, infos, start); err != nil {
if err := displayImageReferenceProgress(output, isTerminal, msgs, start); err != nil {
return fmt.Errorf("failed to display progress: %v", err)
}

if err := output.Flush(); err != nil {
return fmt.Errorf("failed to display progress: %v", err)
}
}

if _, err := dec.Token(); err != nil {
return fmt.Errorf("failed to read the closing token: %v", err)
}
return nil
}

// displayProgressInfos uses tabwriter to show current progress info.
func displayProgressInfos(output io.Writer, isTerminal bool, infos []ctrd.ProgressInfo, start time.Time) error {
// displayImageReferenceProgress uses tabwriter to show current progress status.
func displayImageReferenceProgress(output io.Writer, isTerminal bool, msgs []jsonstream.JSONMessage, start time.Time) error {
var (
tw = tabwriter.NewWriter(output, 1, 8, 1, ' ', 0)
total = int64(0)
tw = tabwriter.NewWriter(output, 1, 8, 1, ' ', 0)
current = int64(0)
)

for _, info := range infos {
if info.ErrorMessage != "" {
return fmt.Errorf(info.ErrorMessage)
for _, msg := range msgs {
if msg.Error != nil {
return fmt.Errorf(msg.Error.Message)
}

total += info.Offset
if _, err := fmt.Fprint(tw, formatProgressInfo(info, isTerminal)); err != nil {
if msg.Detail != nil {
current += msg.Detail.Current
}

status := jsonstream.PullReferenceStatus(!isTerminal, msg)
if _, err := fmt.Fprint(tw, status); err != nil {
return err
}
}
Expand All @@ -157,47 +167,15 @@ func displayProgressInfos(output io.Writer, isTerminal bool, infos []ctrd.Progre
if isTerminal {
_, err := fmt.Fprintf(tw, "elapsed: %-4.1fs\ttotal: %7.6v\t(%v)\t\n",
time.Since(start).Seconds(),
progress.Bytes(total),
progress.NewBytesPerSecond(total, time.Since(start)))
progress.Bytes(current),
progress.NewBytesPerSecond(current, time.Since(start)))
if err != nil {
return err
}
}
return tw.Flush()
}

// formatProgressInfo formats ProgressInfo into string.
func formatProgressInfo(info ctrd.ProgressInfo, isTerminal bool) string {
if !isTerminal {
return fmt.Sprintf("%s:\t%s\n", info.Ref, info.Status)
}

switch info.Status {
case "downloading", "uploading":
var bar progress.Bar
if info.Total > 0.0 {
bar = progress.Bar(float64(info.Offset) / float64(info.Total))
}
return fmt.Sprintf("%s:\t%s\t%40r\t%8.8s/%s\t\n",
info.Ref,
info.Status,
bar,
progress.Bytes(info.Offset), progress.Bytes(info.Total))

case "resolving", "waiting":
return fmt.Sprintf("%s:\t%s\t%40r\t\n",
info.Ref,
info.Status,
progress.Bar(0.0))

default:
return fmt.Sprintf("%s:\t%s\t%40r\t\n",
info.Ref,
info.Status,
progress.Bar(1.0))
}
}

// pullExample shows examples in pull command, and is used in auto-generated cli docs.
func pullExample() string {
return `$ pouch images
Expand Down
96 changes: 45 additions & 51 deletions ctrd/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,13 @@ func (c *Client) PullImage(ctx context.Context, ref string, authConfig *types.Au

if err != nil {
// Send Error information to client through stream
messages := []ProgressInfo{
{Code: http.StatusInternalServerError, ErrorMessage: err.Error()},
message := jsonstream.JSONMessage{
Error: &jsonstream.JSONError{
Code: http.StatusInternalServerError,
Message: err.Error(),
},
}
stream.WriteObject(messages)
stream.WriteObject(message)
return nil, err
}

Expand All @@ -165,26 +168,13 @@ func (c *Client) pullImage(ctx context.Context, wrapperCli *WrapperClient, ref s
return img, nil
}

// ProgressInfo represents the status of downloading image.
type ProgressInfo struct {
Ref string
Status string
Offset int64
Total int64
StartedAt time.Time
UpdatedAt time.Time

// For Error handling
Code int // http response code
ErrorMessage string // detail error information
}

// FIXME(fuwei): put the fetchProgress into jsonstream and make it readable.
func (c *Client) fetchProgress(ctx context.Context, wrapperCli *WrapperClient, ongoing *jobs, stream *jsonstream.JSONStream) error {
var (
ticker = time.NewTicker(100 * time.Millisecond)
ticker = time.NewTicker(300 * time.Millisecond)
cs = wrapperCli.client.ContentStore()
start = time.Now()
progresses = map[string]ProgressInfo{}
progresses = map[string]jsonstream.JSONMessage{}
done bool
)
defer ticker.Stop()
Expand All @@ -193,30 +183,33 @@ outer:
for {
select {
case <-ticker.C:
resolved := "resolved"
resolved := jsonstream.PullStatusResolved
if !ongoing.isResolved() {
resolved = "resolving"
resolved = jsonstream.PullStatusResolving
}
progresses[ongoing.name] = ProgressInfo{
Ref: ongoing.name,
progresses[ongoing.name] = jsonstream.JSONMessage{
ID: ongoing.name,
Status: resolved,
Detail: &jsonstream.ProgressDetail{},
}
keys := []string{ongoing.name}

activeSeen := map[string]struct{}{}
if !done {
active, err := cs.ListStatuses(context.TODO(), "")
actives, err := cs.ListStatuses(context.TODO(), "")
if err != nil {
logrus.Errorf("failed to list statuses: %v", err)
continue
}
// update status of active entries!
for _, active := range active {
progresses[active.Ref] = ProgressInfo{
Ref: active.Ref,
Status: "downloading",
Offset: active.Offset,
Total: active.Total,
for _, active := range actives {
progresses[active.Ref] = jsonstream.JSONMessage{
ID: active.Ref,
Status: jsonstream.PullStatusDownloading,
Detail: &jsonstream.ProgressDetail{
Current: active.Offset,
Total: active.Total,
},
StartedAt: active.StartedAt,
UpdatedAt: active.UpdatedAt,
}
Expand All @@ -233,54 +226,55 @@ outer:
}

status, ok := progresses[key]
if !done && (!ok || status.Status == "downloading") {
if !done && (!ok || status.Status == jsonstream.PullStatusDownloading) {
info, err := cs.Info(context.TODO(), j.Digest)
if err != nil {
if !errdefs.IsNotFound(err) {
logrus.Errorf("failed to get content info: %v", err)
continue outer
} else {
progresses[key] = ProgressInfo{
Ref: key,
Status: "waiting",
progresses[key] = jsonstream.JSONMessage{
ID: key,
Status: jsonstream.PullStatusWaiting,
}
}
} else if info.CreatedAt.After(start) {
progresses[key] = ProgressInfo{
Ref: key,
Status: "done",
Offset: info.Size,
Total: info.Size,
progresses[key] = jsonstream.JSONMessage{
ID: key,
Status: jsonstream.PullStatusDone,
Detail: &jsonstream.ProgressDetail{
Current: info.Size,
Total: info.Size,
},
UpdatedAt: info.CreatedAt,
}
} else {
progresses[key] = ProgressInfo{
Ref: key,
Status: "exists",
progresses[key] = jsonstream.JSONMessage{
ID: key,
Status: jsonstream.PullStatusExists,
}
}
} else if done {
if ok {
if status.Status != "done" && status.Status != "exists" {
status.Status = "done"
if status.Status != jsonstream.PullStatusDone &&
status.Status != jsonstream.PullStatusExists {

status.Status = jsonstream.PullStatusDone
progresses[key] = status
}
} else {
progresses[key] = ProgressInfo{
Ref: key,
Status: "done",
progresses[key] = jsonstream.JSONMessage{
ID: key,
Status: jsonstream.PullStatusDone,
}
}
}
}

var ordered []ProgressInfo
for _, key := range keys {
ordered = append(ordered, progresses[key])
stream.WriteObject(progresses[key])
}

stream.WriteObject(ordered)

if done {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion daemon/mgr/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (mgr *ImageManager) PullImage(ctx context.Context, ref string, authConfig *
}

pctx, cancel := context.WithCancel(ctx)
stream := jsonstream.New(out)
stream := jsonstream.New(out, nil)
wait := make(chan struct{})

go func() {
Expand Down
Loading