Skip to content

Commit

Permalink
Add StopNow() method to stop the client without retries.
Browse files Browse the repository at this point in the history
Use `StopNow()` from the docker-driver
  • Loading branch information
kavirajk committed Nov 23, 2020
1 parent 129ca28 commit 820ec24
Show file tree
Hide file tree
Showing 9 changed files with 55 additions and 12 deletions.
4 changes: 3 additions & 1 deletion cmd/docker-driver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ var (
MaxBackoff: client.MaxBackoff,
MaxRetries: client.MaxRetries,
},
Timeout: client.Timeout,
// Avoid blocking the docker-driver on the worst case
// https://github.com/grafana/loki/pull/2898#issuecomment-730218963
Timeout: 5 * time.Second,
}
)

Expand Down
2 changes: 1 addition & 1 deletion cmd/docker-driver/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,6 @@ func (l *loki) Name() string {

// Log implements `logger.Logger`
func (l *loki) Close() error {
l.client.Stop()
l.client.StopNow()
return nil
}
4 changes: 3 additions & 1 deletion cmd/docker-driver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ func main() {
}
logger := newLogger(logLevel)
level.Info(util.Logger).Log("msg", "Starting docker-plugin", "version", version.Info())

h := sdk.NewHandler(`{"Implements": ["LoggingDriver"]}`)

handlers(&h, newDriver(logger))

if err := h.ServeUnix(socketAddress, 0); err != nil {
panic(err)
}

}

func newLogger(lvl logging.Level) log.Logger {
Expand Down
6 changes: 6 additions & 0 deletions cmd/fluent-bit/dque.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ func (c *dqueClient) Stop() {
c.loki.Stop()
}

// Stop the client
func (c *dqueClient) StopNow() {
c.once.Do(func() { c.queue.Close() })
c.loki.StopNow()
}

// Handle implement EntryHandler; adds a new line to the next batch; send is async.
func (c *dqueClient) Handle(ls model.LabelSet, t time.Time, s string) error {
if err := c.queue.Enqueue(&dqueEntry{ls, t, s}); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions cmd/fluent-bit/loki_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ func (r *recorder) toEntry() *entry { return r.entry }

func (r *recorder) Stop() {}

func (r *recorder) StopNow() {}

var now = time.Now()

func Test_loki_sendRecord(t *testing.T) {
Expand Down
35 changes: 26 additions & 9 deletions pkg/promtail/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,20 @@ type Client interface {
api.EntryHandler
// Stop goroutine sending batch of entries.
Stop()

// Stop goroutine sending batch of entries without retries.
StopNow()
}

// Client for pushing logs in snappy-compressed protos over HTTP.
type client struct {
logger log.Logger
cfg Config
client *http.Client
quit chan struct{}
logger log.Logger
cfg Config
client *http.Client

// quit chan is depricated. Will be removed. Use `client.ctx` and `client.cancel` instead.
quit chan struct{}

once sync.Once
entries chan entry
wg sync.WaitGroup
Expand Down Expand Up @@ -256,9 +262,11 @@ func (c *client) sendBatch(tenantID string, batch *batch) {

backoff := util.NewBackoff(c.ctx, c.cfg.BackoffConfig)
var status int
for backoff.Ongoing() {
for {
start := time.Now()
status, err = c.send(c.ctx, tenantID, buf)
// send uses `timeout` internally, so `context.Background` is good enough.
status, err = c.send(context.Background(), tenantID, buf)

requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds())

if err == nil {
Expand Down Expand Up @@ -295,6 +303,11 @@ func (c *client) sendBatch(tenantID string, batch *batch) {
level.Warn(c.logger).Log("msg", "error sending batch, will retry", "status", status, "error", err)
batchRetries.WithLabelValues(c.cfg.URL.Host).Inc()
backoff.Wait()

// Make sure it sends at least once before checking for retry.
if !backoff.Ongoing() {
break
}
}

if err != nil {
Expand Down Expand Up @@ -356,13 +369,17 @@ func (c *client) getTenantID(labels model.LabelSet) string {

// Stop the client.
func (c *client) Stop() {
// cancel any upstream calls made using client's `ctx`.
c.cancel()

c.once.Do(func() { close(c.quit) })
c.wg.Wait()
}

// StopNow stops the client without retries
func (c *client) StopNow() {
// cancel any upstream calls made using client's `ctx`.
c.cancel()
c.Stop()
}

// Handle implement EntryHandler; adds a new line to the next batch; send is async.
func (c *client) Handle(ls model.LabelSet, t time.Time, s string) error {
if len(c.externalLabels) > 0 {
Expand Down
5 changes: 5 additions & 0 deletions pkg/promtail/client/fake/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ func (c *Client) Stop() {
c.OnStop()
}

// StopNow implements client.Client
func (c *Client) StopNow() {
c.OnStop()
}

// Handle implements client.Client
func (c *Client) Handle(labels model.LabelSet, time time.Time, entry string) error {
return c.OnHandleEntry.Handle(labels, time, entry)
Expand Down
2 changes: 2 additions & 0 deletions pkg/promtail/client/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ func NewLogger(log log.Logger, externalLabels lokiflag.LabelSet, cfgs ...Config)

func (*logger) Stop() {}

func (*logger) StopNow() {}

func (l *logger) Handle(labels model.LabelSet, time time.Time, entry string) error {
l.Lock()
defer l.Unlock()
Expand Down
7 changes: 7 additions & 0 deletions pkg/promtail/client/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,10 @@ func (m MultiClient) Stop() {
c.Stop()
}
}

// StopNow implements Client
func (m MultiClient) StopNow() {
for _, c := range m {
c.StopNow()
}
}

0 comments on commit 820ec24

Please sign in to comment.