Skip to content

Commit

Permalink
Merge pull request #327 from OlivierCazade/loki-retry
Browse files Browse the repository at this point in the history
Remove unnecessary goroutine of loki writer
  • Loading branch information
OlivierCazade authored Oct 24, 2022
2 parents 6809920 + 7efa8fa commit 15efbb8
Showing 1 changed file with 3 additions and 25 deletions.
28 changes: 3 additions & 25 deletions pkg/pipeline/write/write_loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ type emitter interface {
Handle(labels model.LabelSet, timestamp time.Time, record string) error
}

const channelSize = 1000

// Loki record writer
type Loki struct {
lokiConfig loki.Config
Expand All @@ -57,7 +55,6 @@ type Loki struct {
saneLabels map[string]model.LabelName
client emitter
timeNow func() time.Time
in chan config.GenericMap
exitChan <-chan struct{}
metrics *metrics
}
Expand Down Expand Up @@ -208,21 +205,9 @@ func getFloat64(timestamp interface{}) (ft float64, ok bool) {
// Write writes a flow before being stored
func (l *Loki) Write(entry config.GenericMap) {
log.Debugf("entering Loki Write")
l.in <- entry
}

func (l *Loki) processRecords() {
for {
select {
case <-l.exitChan:
log.Debugf("exiting writeLoki because of signal")
return
case record := <-l.in:
err := l.ProcessRecord(record)
if err != nil {
log.Errorf("Write (Loki) error %v", err)
}
}
err := l.ProcessRecord(entry)
if err != nil {
log.Errorf("Write (Loki) error %v", err)
}
}

Expand Down Expand Up @@ -266,10 +251,6 @@ func NewWriteLoki(opMetrics *operational.Metrics, params config.StageParam) (*Lo
}
}

// TODO / FIXME / FIGUREOUT: seems like we have 2 input channels for Loki? (this one, and see also pipeline_builder.go / getStageNode / StageWrite)
in := make(chan config.GenericMap, channelSize)
opMetrics.CreateInQueueSizeGauge(params.Name+"-2", func() int { return len(in) })

l := &Loki{
lokiConfig: lokiConfig,
apiConfig: lokiConfigIn,
Expand All @@ -278,11 +259,8 @@ func NewWriteLoki(opMetrics *operational.Metrics, params config.StageParam) (*Lo
client: client,
timeNow: time.Now,
exitChan: pUtils.ExitChannel(),
in: in,
metrics: newMetrics(opMetrics, params.Name),
}

go l.processRecords()

return l, nil
}

0 comments on commit 15efbb8

Please sign in to comment.