Skip to content

Commit

Permalink
enhance: add streaming client metrics
Browse files Browse the repository at this point in the history
Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh committed Sep 27, 2024
1 parent 7c2cb8c commit c57d20f
Show file tree
Hide file tree
Showing 62 changed files with 1,715 additions and 271 deletions.
22 changes: 12 additions & 10 deletions internal/distributed/streaming/internal/consumer/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/client/handler"
"github.com/milvus-io/milvus/internal/streamingnode/client/handler/consumer"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/options"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/syncutil"
)

Expand All @@ -37,6 +35,7 @@ func NewResumableConsumer(factory factory, opts *ConsumerOptions) ResumableConsu
},
factory: factory,
consumeErr: syncutil.NewFuture[error](),
metrics: newConsumerMetrics(opts.PChannel),
}
go consumer.resumeLoop()
return consumer
Expand All @@ -54,6 +53,7 @@ type resumableConsumerImpl struct {
mh *timeTickOrderMessageHandler
factory factory
consumeErr *syncutil.Future[error]
metrics *consumerMetrics
}

type factory = func(ctx context.Context, opts *handler.ConsumerOptions) (consumer.Consumer, error)
Expand All @@ -63,6 +63,7 @@ func (rc *resumableConsumerImpl) resumeLoop() {
defer func() {
// close the message handler.
rc.mh.Close()
rc.metrics.IntoUnavailable()
rc.logger.Info("resumable consumer is closed")
close(rc.resumingExitCh)
}()
Expand All @@ -71,11 +72,17 @@ func (rc *resumableConsumerImpl) resumeLoop() {
deliverPolicy := rc.opts.DeliverPolicy
deliverFilters := rc.opts.DeliverFilters
// consumer need to resume when error occur, so message handler shouldn't close if the internal consumer encounter failure.
nopCloseMH := message.NopCloseHandler{
nopCloseMH := nopCloseHandler{
Handler: rc.mh,
HandleInterceptor: func(msg message.ImmutableMessage, handle func(message.ImmutableMessage)) {
g := rc.metrics.StartConsume(msg.EstimateSize())
handle(msg)
g.Finish()
},
}

for {
rc.metrics.IntoUnavailable()
// Get last checkpoint sent.
// Consume ordering is always time tick order now.
if rc.mh.lastConfirmedMessageID != nil {
Expand Down Expand Up @@ -104,6 +111,7 @@ func (rc *resumableConsumerImpl) resumeLoop() {
rc.consumeErr.Set(err)
return
}
rc.metrics.IntoAvailable()

// Wait until the consumer is unavailable or context canceled.
if err := rc.waitUntilUnavailable(consumer); err != nil {
Expand All @@ -114,10 +122,6 @@ func (rc *resumableConsumerImpl) resumeLoop() {
}

func (rc *resumableConsumerImpl) createNewConsumer(opts *handler.ConsumerOptions) (consumer.Consumer, error) {
// Mark as unavailable.
metrics.StreamingServiceClientConsumerTotal.WithLabelValues(paramtable.GetStringNodeID(), metrics.StreamingServiceClientProducerUnAvailable).Inc()
defer metrics.StreamingServiceClientConsumerTotal.WithLabelValues(paramtable.GetStringNodeID(), metrics.StreamingServiceClientProducerUnAvailable).Dec()

logger := rc.logger.With(zap.Any("deliverPolicy", opts.DeliverPolicy))

backoff := backoff.NewExponentialBackOff()
Expand Down Expand Up @@ -145,14 +149,11 @@ func (rc *resumableConsumerImpl) createNewConsumer(opts *handler.ConsumerOptions

// waitUntilUnavailable is used to wait until the consumer is unavailable or context canceled.
func (rc *resumableConsumerImpl) waitUntilUnavailable(consumer handler.Consumer) error {
// Mark as available.
metrics.StreamingServiceClientConsumerTotal.WithLabelValues(paramtable.GetStringNodeID(), metrics.StreamingServiceClientProducerAvailable).Inc()
defer func() {
consumer.Close()
if consumer.Error() != nil {
rc.logger.Warn("consumer is closed with error", zap.Error(consumer.Error()))
}
metrics.StreamingServiceClientConsumerTotal.WithLabelValues(paramtable.GetStringNodeID(), metrics.StreamingServiceClientProducerAvailable).Dec()
}()

select {
Expand Down Expand Up @@ -191,6 +192,7 @@ func (rc *resumableConsumerImpl) Close() {
// force close is applied by cancel context if graceful close is failed.
rc.cancel()
<-rc.resumingExitCh
rc.metrics.Close()
}

// Done returns a channel which will be closed when scanner is finished or closed.
Expand Down
16 changes: 16 additions & 0 deletions internal/distributed/streaming/internal/consumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"

"github.com/milvus-io/milvus/internal/mocks/streamingnode/client/handler/mock_consumer"
"github.com/milvus-io/milvus/internal/streamingnode/client/handler"
Expand Down Expand Up @@ -69,3 +70,18 @@ func TestResumableConsumer(t *testing.T) {
rc.Close()
<-rc.Done()
}

func TestHandler(t *testing.T) {
ch := make(chan message.ImmutableMessage, 100)
hNop := nopCloseHandler{
Handler: message.ChanMessageHandler(ch),
}
hNop.Handle(nil)
assert.Nil(t, <-ch)
hNop.Close()
select {
case <-ch:
panic("should not be closed")
default:
}
}
22 changes: 22 additions & 0 deletions internal/distributed/streaming/internal/consumer/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package consumer

import "github.com/milvus-io/milvus/pkg/streaming/util/message"

// nopCloseHandler is a handler that do nothing when close.
type nopCloseHandler struct {
message.Handler
HandleInterceptor func(msg message.ImmutableMessage, handle func(message.ImmutableMessage))
}

// Handle is the callback for handling message.
func (nch nopCloseHandler) Handle(msg message.ImmutableMessage) {
if nch.HandleInterceptor != nil {
nch.HandleInterceptor(msg, nch.Handler.Handle)
return
}
nch.Handler.Handle(msg)
}

// Close is called after all messages are handled or handling is interrupted.
func (nch nopCloseHandler) Close() {
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package consumer

import (
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

// timeTickOrderMessageHandler is a message handler that will do metrics and record the last sent message id.
Expand All @@ -16,14 +14,11 @@ type timeTickOrderMessageHandler struct {
func (mh *timeTickOrderMessageHandler) Handle(msg message.ImmutableMessage) {
lastConfirmedMessageID := msg.LastConfirmedMessageID()
timetick := msg.TimeTick()
messageSize := msg.EstimateSize()

mh.inner.Handle(msg)

mh.lastConfirmedMessageID = lastConfirmedMessageID
mh.lastTimeTick = timetick
// Do a metric here.
metrics.StreamingServiceClientConsumeBytes.WithLabelValues(paramtable.GetStringNodeID()).Observe(float64(messageSize))
}

func (mh *timeTickOrderMessageHandler) Close() {
Expand Down
79 changes: 79 additions & 0 deletions internal/distributed/streaming/internal/consumer/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package consumer

import (
"github.com/prometheus/client_golang/prometheus"

"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

// newConsumerMetrics creates a new producer metrics.
func newConsumerMetrics(pchannel string) *consumerMetrics {
constLabel := prometheus.Labels{
metrics.NodeIDLabelName: paramtable.GetStringNodeID(),
metrics.WALChannelLabelName: pchannel,
}
m := &consumerMetrics{
available: false,
clientTotal: metrics.StreamingServiceClientConsumerTotal.MustCurryWith(constLabel),
inflightTotal: metrics.StreamingServiceClientConsumeInflightTotal.With(constLabel),
bytes: metrics.StreamingServiceClientConsumeBytes.With(constLabel),
}
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusUnavailable).Inc()
return m
}

// consumerMetrics is the metrics for producer.
type consumerMetrics struct {
available bool
clientTotal *prometheus.GaugeVec
inflightTotal prometheus.Gauge
bytes prometheus.Observer
}

// IntoUnavailable sets the producer metrics to unavailable.
func (m *consumerMetrics) IntoUnavailable() {
if !m.available {
return
}
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusUnavailable).Inc()
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusAvailable).Dec()
m.available = false
}

// IntoAvailable sets the producer metrics to available.
func (m *consumerMetrics) IntoAvailable() {
if m.available {
return
}
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusAvailable).Inc()
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusUnavailable).Dec()
m.available = true
}

// StartConsume starts a consume operation.
func (m *consumerMetrics) StartConsume(bytes int) consumerMetricsGuard {
m.inflightTotal.Inc()
return consumerMetricsGuard{
metrics: m,
bytes: bytes,
}
}

func (m *consumerMetrics) Close() {
if m.available {
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusAvailable).Dec()
} else {
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusUnavailable).Dec()
}
}

type consumerMetricsGuard struct {
metrics *consumerMetrics
bytes int
}

func (g consumerMetricsGuard) Finish() {
g.metrics.inflightTotal.Dec()
g.metrics.bytes.Observe(float64(g.bytes))
}
101 changes: 101 additions & 0 deletions internal/distributed/streaming/internal/producer/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package producer

import (
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

// newProducerMetrics creates a new producer metrics.
func newProducerMetrics(pchannel string) *producerMetrics {
constLabel := prometheus.Labels{
metrics.NodeIDLabelName: paramtable.GetStringNodeID(),
metrics.WALChannelLabelName: pchannel,
}
m := &producerMetrics{
available: false,
clientTotal: metrics.StreamingServiceClientProducerTotal.MustCurryWith(constLabel),
inflightTotal: metrics.StreamingServiceClientProduceInflightTotal.With(constLabel),
bytes: metrics.StreamingServiceClientProduceBytes.MustCurryWith(constLabel),
durationSeconds: metrics.StreamingServiceClientProduceDurationSeconds.MustCurryWith(constLabel),
}
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusUnavailable).Inc()
return m
}

// producerMetrics is the metrics for producer.
type producerMetrics struct {
available bool
clientTotal *prometheus.GaugeVec
inflightTotal prometheus.Gauge
bytes prometheus.ObserverVec
durationSeconds prometheus.ObserverVec
}

// IntoUnavailable sets the producer metrics to unavailable.
func (m *producerMetrics) IntoUnavailable() {
if !m.available {
return
}
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusUnavailable).Inc()
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusAvailable).Dec()
m.available = false
}

// IntoAvailable sets the producer metrics to available.
func (m *producerMetrics) IntoAvailable() {
if m.available {
return
}
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusAvailable).Inc()
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusUnavailable).Dec()
m.available = true
}

// StartProduce starts the produce metrics.
func (m *producerMetrics) StartProduce(bytes int) produceMetricsGuard {
m.inflightTotal.Inc()
return produceMetricsGuard{
start: time.Now(),
bytes: bytes,
metrics: m,
}
}

func (m *producerMetrics) Close() {
if m.available {
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusAvailable).Dec()
} else {
m.clientTotal.WithLabelValues(metrics.StreamingServiceClientStatusUnavailable).Dec()
}
}

// produceMetricsGuard is the guard for produce metrics.
type produceMetricsGuard struct {
start time.Time
bytes int
metrics *producerMetrics
}

// Finish finishes the produce metrics.
func (g produceMetricsGuard) Finish(err error) {
status := parseError(err)
g.metrics.bytes.WithLabelValues(status).Observe(float64(g.bytes))
g.metrics.durationSeconds.WithLabelValues(status).Observe(time.Since(g.start).Seconds())
g.metrics.inflightTotal.Dec()
}

// parseError parses the error to status.
func parseError(err error) string {
if err == nil {
return metrics.StreamingServiceClientStatusOK
}
if status.IsCanceled(err) {
return metrics.StreamingServiceClientStatusCancel
}
return metrics.StreamignServiceClientStatusError
}
Loading

0 comments on commit c57d20f

Please sign in to comment.