From 731516a7712f5522c496c1a6d5f14f74f4932360 Mon Sep 17 00:00:00 2001 From: Andrew Davis <1709934+Savid@users.noreply.github.com> Date: Mon, 23 Oct 2023 09:38:07 +1000 Subject: [PATCH] feat(outputs): allow multiple connections for xatu --- docs/sentry.md | 2 ++ example_sentry.yaml | 1 + pkg/output/xatu/config.go | 1 + pkg/output/xatu/exporter.go | 63 ++++++++++++++++++++++++++++--------- pkg/output/xatu/xatu.go | 2 +- 5 files changed, 53 insertions(+), 16 deletions(-) diff --git a/docs/sentry.md b/docs/sentry.md index 58aa6f67..13a06510 100644 --- a/docs/sentry.md +++ b/docs/sentry.md @@ -67,6 +67,7 @@ Output configuration to send sentry events to a [Xatu server](./server.md). | outputs[].config.batchTimeout | string | `5s` | The maximum duration for constructing a batch. Processor forcefully sends available events when timeout is reached | | outputs[].config.exportTimeout | string | `30s` | The maximum duration for exporting events. If the timeout is reached, the export will be cancelled | | outputs[].config.maxExportBatchSize | int | `512` | MaxExportBatchSize is the maximum number of events to process in a single batch. If there are more than one batch worth of events then it processes multiple batches of events one batch after the other without any delay | +| outputs[].config.connections | int | `1` | Connections is the number of simultaneous connections to xatu to use | ### Output `http` configuration @@ -156,6 +157,7 @@ outputs: batchTimeout: 5s exportTimeout: 30s maxExportBatchSize: 512 + connections: 5 ``` ## Running locally diff --git a/example_sentry.yaml b/example_sentry.yaml index efb0cc10..9d458be7 100644 --- a/example_sentry.yaml +++ b/example_sentry.yaml @@ -80,3 +80,4 @@ outputs: batchTimeout: 5s exportTimeout: 30s maxExportBatchSize: 512 + connections: 5 diff --git a/pkg/output/xatu/config.go b/pkg/output/xatu/config.go index 432e2329..a434738f 100644 --- a/pkg/output/xatu/config.go +++ b/pkg/output/xatu/config.go @@ -13,6 +13,7 @@ type Config struct { BatchTimeout time.Duration `yaml:"batchTimeout" default:"5s"` ExportTimeout time.Duration `yaml:"exportTimeout" default:"30s"` MaxExportBatchSize int `yaml:"maxExportBatchSize" default:"512"` + Connections int `yaml:"connections" default:"1"` } func (c *Config) Validate() error { diff --git a/pkg/output/xatu/exporter.go b/pkg/output/xatu/exporter.go index 9fa5f166..be489785 100644 --- a/pkg/output/xatu/exporter.go +++ b/pkg/output/xatu/exporter.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net" + "sync" "github.com/ethpandaops/xatu/pkg/observability" pb "github.com/ethpandaops/xatu/pkg/proto/xatu" @@ -22,8 +23,10 @@ type ItemExporter struct { config *Config log logrus.FieldLogger - client pb.EventIngesterClient - conn *grpc.ClientConn + clients []pb.EventIngesterClient + conns []*grpc.ClientConn + mu sync.Mutex + currentConn int } func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (ItemExporter, error) { @@ -40,26 +43,50 @@ func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (ItemE opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) } - conn, err := grpc.Dial(config.Address, opts...) - if err != nil { - return ItemExporter{}, fmt.Errorf("fail to dial: %v", err) + var conns []*grpc.ClientConn + + var clients []pb.EventIngesterClient + + for i := 0; i < config.Connections; i++ { + conn, err := grpc.Dial(config.Address, opts...) + if err != nil { + for _, c := range conns { + c.Close() + } + + return ItemExporter{}, fmt.Errorf("fail to dial: %v", err) + } + + conns = append(conns, conn) + clients = append(clients, pb.NewEventIngesterClient(conn)) } return ItemExporter{ - config: config, - log: log.WithField("output_name", name).WithField("output_type", SinkType), - conn: conn, - client: pb.NewEventIngesterClient(conn), + config: config, + log: log.WithField("output_name", name).WithField("output_type", SinkType), + conns: conns, + clients: clients, }, nil } -func (e ItemExporter) ExportItems(ctx context.Context, items []*pb.DecoratedEvent) error { +func (e *ItemExporter) getNextConn() int { + e.mu.Lock() + defer e.mu.Unlock() + + conn := e.currentConn + e.currentConn = (e.currentConn + 1) % len(e.conns) + + return conn +} + +func (e *ItemExporter) ExportItems(ctx context.Context, items []*pb.DecoratedEvent) error { _, span := observability.Tracer().Start(ctx, "XatuItemExporter.ExportItems", trace.WithAttributes(attribute.Int64("num_events", int64(len(items))))) defer span.End() e.log.WithField("events", len(items)).Debug("Sending batch of events to xatu sink") - if err := e.sendUpstream(ctx, items); err != nil { + connIndex := e.getNextConn() + if err := e.sendUpstream(ctx, items, connIndex); err != nil { e.log. WithError(err). WithField("num_events", len(items)). @@ -73,11 +100,17 @@ func (e ItemExporter) ExportItems(ctx context.Context, items []*pb.DecoratedEven return nil } -func (e ItemExporter) Shutdown(ctx context.Context) error { - return e.conn.Close() +func (e *ItemExporter) Shutdown(ctx context.Context) error { + for _, conn := range e.conns { + if err := conn.Close(); err != nil { + return err + } + } + + return nil } -func (e *ItemExporter) sendUpstream(ctx context.Context, items []*pb.DecoratedEvent) error { +func (e *ItemExporter) sendUpstream(ctx context.Context, items []*pb.DecoratedEvent, connIndex int) error { req := &pb.CreateEventsRequest{ Events: items, } @@ -85,7 +118,7 @@ func (e *ItemExporter) sendUpstream(ctx context.Context, items []*pb.DecoratedEv md := metadata.New(e.config.Headers) ctx = metadata.NewOutgoingContext(ctx, md) - rsp, err := e.client.CreateEvents(ctx, req, grpc.UseCompressor(gzip.Name)) + rsp, err := e.clients[connIndex].CreateEvents(ctx, req, grpc.UseCompressor(gzip.Name)) if err != nil { return err } diff --git a/pkg/output/xatu/xatu.go b/pkg/output/xatu/xatu.go index 9ec75fb2..ba1fc1be 100644 --- a/pkg/output/xatu/xatu.go +++ b/pkg/output/xatu/xatu.go @@ -38,7 +38,7 @@ func New(name string, config *Config, log logrus.FieldLogger, filterConfig *xatu return nil, err } - proc, err := processor.NewBatchItemProcessor[xatu.DecoratedEvent](exporter, + proc, err := processor.NewBatchItemProcessor[xatu.DecoratedEvent](&exporter, xatu.ImplementationLower()+"_output_"+SinkType+"_"+name, log, processor.WithMaxQueueSize(config.MaxQueueSize),