From 4c7ecd3f86c5ce1f59f54f03a6dceec82d21b77e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20Serv=C3=A9n=20Mar=C3=ADn?= Date: Wed, 2 Oct 2019 10:07:54 +0200 Subject: [PATCH] cmd/thanos/receive: fix close chan panic (#1595) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * cmd/thanos/receive: fix close chan panic This commit fixes a panic caused by sending on the chan which is closed too early. This chan was closed when the setup func returned, rather than when the group's goroutine returned. Fixes: #1594 Signed-off-by: Lucas Servén Marín * cmd/thanos/receive: correctly clean up uploader This commit fixes how the uploader is cleaned up. Currently, it was cleaned up in a defer block that was run before the run groups. This ensures the cleanup occurs only when the run groups are returning. Signed-off-by: Lucas Servén Marín --- cmd/thanos/receive.go | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 3a37628ed3..84d035a713 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -263,9 +263,9 @@ func runReceive( cancel() }) } else { - defer close(updates) cancel := make(chan struct{}) g.Add(func() error { + defer close(updates) updates <- receive.SingleNodeHashring(endpoint) <-cancel return nil @@ -370,25 +370,12 @@ func runReceive( return err } - // Ensure we close up everything properly. - defer func() { - if err != nil { - runutil.CloseWithLogOnErr(logger, bkt, "bucket client") - } - }() - s := shipper.New(logger, reg, dataDir, bkt, func() labels.Labels { return lset }, metadata.ReceiveSource) // Before starting, ensure any old blocks are uploaded. if uploaded, err := s.Sync(context.Background()); err != nil { level.Warn(logger).Log("err", err, "failed to upload", uploaded) } - // Before quitting, ensure all blocks are uploaded. - defer func() { - if uploaded, err := s.Sync(context.Background()); err != nil { - level.Warn(logger).Log("err", err, "failed to upload", uploaded) - } - }() { // Run the uploader in a loop. @@ -410,7 +397,18 @@ func runReceive( // Upload on demand. ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { - defer close(uploadC) + // Ensure we clean up everything properly. + defer func() { + runutil.CloseWithLogOnErr(logger, bkt, "bucket client") + }() + // Before quitting, ensure all blocks are uploaded. + defer func() { + <-uploadC + if uploaded, err := s.Sync(context.Background()); err != nil { + level.Warn(logger).Log("err", err, "failed to upload", uploaded) + } + }() + defer close(uploadDone) for { select { case <-ctx.Done(): @@ -419,7 +417,6 @@ func runReceive( if uploaded, err := s.Sync(ctx); err != nil { level.Warn(logger).Log("err", err, "failed to upload", uploaded) } - cancel() uploadDone <- struct{}{} } }