Skip to content

Commit

Permalink
Convert commitlog chan to batches
Browse files Browse the repository at this point in the history
  • Loading branch information
Richard Artoul committed Nov 6, 2018
1 parent 0fd715a commit bb6a808
Showing 1 changed file with 38 additions and 24 deletions.
62 changes: 38 additions & 24 deletions src/dbnode/persist/fs/commitlog/commit_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ var (
timeZero = time.Time{}
)

type WritesBatch []Write
type Write struct {
series Series
datapoint ts.Datapoint
unit xtime.Unit
annotation ts.Annotation
}

type newCommitLogWriterFn func(
flushFn flushFn,
opts Options,
Expand Down Expand Up @@ -193,13 +201,9 @@ func (r callbackResult) rotateLogsResult() (rotateLogsResult, error) {
}

type commitLogWrite struct {
eventType eventType

series Series
datapoint ts.Datapoint
unit xtime.Unit
annotation ts.Annotation
callbackFn callbackFn
eventType eventType
writesBatch WritesBatch
callbackFn callbackFn
}

// NewCommitLog creates a new commit log
Expand Down Expand Up @@ -443,18 +447,20 @@ func (l *commitLog) write() {
}
}

err := l.writerState.writer.Write(write.series,
write.datapoint, write.unit, write.annotation)
for i := 0; i < len(write.writesBatch); i++ {
write := write.writesBatch[i]
err := l.writerState.writer.Write(write.series,
write.datapoint, write.unit, write.annotation)
if err != nil {
l.metrics.errors.Inc(1)
l.log.Errorf("failed to write to commit log: %v", err)

if err != nil {
l.metrics.errors.Inc(1)
l.log.Errorf("failed to write to commit log: %v", err)
if l.commitLogFailFn != nil {
l.commitLogFailFn(err)
}

if l.commitLogFailFn != nil {
l.commitLogFailFn(err)
continue
}

continue
}
l.metrics.success.Inc(1)
}
Expand Down Expand Up @@ -564,10 +570,14 @@ func (l *commitLog) writeWait(
}

write := commitLogWrite{
series: series,
datapoint: datapoint,
unit: unit,
annotation: annotation,
writesBatch: []Write{
{
series: series,
datapoint: datapoint,
unit: unit,
annotation: annotation,
},
},
callbackFn: completion,
}

Expand Down Expand Up @@ -604,10 +614,14 @@ func (l *commitLog) writeBehind(
}

write := commitLogWrite{
series: series,
datapoint: datapoint,
unit: unit,
annotation: annotation,
writesBatch: []Write{
{
series: series,
datapoint: datapoint,
unit: unit,
annotation: annotation,
},
},
}

enqueued := false
Expand Down

0 comments on commit bb6a808

Please sign in to comment.