Skip to content

Commit

Permalink
feat(outputs): allow multiple connections for xatu
Browse files Browse the repository at this point in the history
  • Loading branch information
Savid committed Oct 22, 2023
1 parent c9704a2 commit 731516a
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 16 deletions.
2 changes: 2 additions & 0 deletions docs/sentry.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ Output configuration to send sentry events to a [Xatu server](./server.md).
| outputs[].config.batchTimeout | string | `5s` | The maximum duration for constructing a batch. Processor forcefully sends available events when timeout is reached |
| outputs[].config.exportTimeout | string | `30s` | The maximum duration for exporting events. If the timeout is reached, the export will be cancelled |
| outputs[].config.maxExportBatchSize | int | `512` | MaxExportBatchSize is the maximum number of events to process in a single batch. If there are more than one batch worth of events then it processes multiple batches of events one batch after the other without any delay |
| outputs[].config.connections | int | `1` | Connections is the number of simultaneous connections to xatu to use |

### Output `http` configuration

Expand Down Expand Up @@ -156,6 +157,7 @@ outputs:
batchTimeout: 5s
exportTimeout: 30s
maxExportBatchSize: 512
connections: 5
```
## Running locally
Expand Down
1 change: 1 addition & 0 deletions example_sentry.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,4 @@ outputs:
batchTimeout: 5s
exportTimeout: 30s
maxExportBatchSize: 512
connections: 5
1 change: 1 addition & 0 deletions pkg/output/xatu/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Config struct {
BatchTimeout time.Duration `yaml:"batchTimeout" default:"5s"`
ExportTimeout time.Duration `yaml:"exportTimeout" default:"30s"`
MaxExportBatchSize int `yaml:"maxExportBatchSize" default:"512"`
Connections int `yaml:"connections" default:"1"`
}

func (c *Config) Validate() error {
Expand Down
63 changes: 48 additions & 15 deletions pkg/output/xatu/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net"
"sync"

"github.com/ethpandaops/xatu/pkg/observability"
pb "github.com/ethpandaops/xatu/pkg/proto/xatu"
Expand All @@ -22,8 +23,10 @@ type ItemExporter struct {
config *Config
log logrus.FieldLogger

client pb.EventIngesterClient
conn *grpc.ClientConn
clients []pb.EventIngesterClient
conns []*grpc.ClientConn
mu sync.Mutex
currentConn int
}

func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (ItemExporter, error) {
Expand All @@ -40,26 +43,50 @@ func NewItemExporter(name string, config *Config, log logrus.FieldLogger) (ItemE
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}

conn, err := grpc.Dial(config.Address, opts...)
if err != nil {
return ItemExporter{}, fmt.Errorf("fail to dial: %v", err)
var conns []*grpc.ClientConn

var clients []pb.EventIngesterClient

for i := 0; i < config.Connections; i++ {
conn, err := grpc.Dial(config.Address, opts...)
if err != nil {
for _, c := range conns {
c.Close()
}

return ItemExporter{}, fmt.Errorf("fail to dial: %v", err)
}

conns = append(conns, conn)
clients = append(clients, pb.NewEventIngesterClient(conn))
}

return ItemExporter{
config: config,
log: log.WithField("output_name", name).WithField("output_type", SinkType),
conn: conn,
client: pb.NewEventIngesterClient(conn),
config: config,
log: log.WithField("output_name", name).WithField("output_type", SinkType),
conns: conns,
clients: clients,
}, nil
}

func (e ItemExporter) ExportItems(ctx context.Context, items []*pb.DecoratedEvent) error {
func (e *ItemExporter) getNextConn() int {
e.mu.Lock()
defer e.mu.Unlock()

conn := e.currentConn
e.currentConn = (e.currentConn + 1) % len(e.conns)

return conn
}

func (e *ItemExporter) ExportItems(ctx context.Context, items []*pb.DecoratedEvent) error {
_, span := observability.Tracer().Start(ctx, "XatuItemExporter.ExportItems", trace.WithAttributes(attribute.Int64("num_events", int64(len(items)))))
defer span.End()

e.log.WithField("events", len(items)).Debug("Sending batch of events to xatu sink")

if err := e.sendUpstream(ctx, items); err != nil {
connIndex := e.getNextConn()
if err := e.sendUpstream(ctx, items, connIndex); err != nil {
e.log.
WithError(err).
WithField("num_events", len(items)).
Expand All @@ -73,19 +100,25 @@ func (e ItemExporter) ExportItems(ctx context.Context, items []*pb.DecoratedEven
return nil
}

func (e ItemExporter) Shutdown(ctx context.Context) error {
return e.conn.Close()
func (e *ItemExporter) Shutdown(ctx context.Context) error {
for _, conn := range e.conns {
if err := conn.Close(); err != nil {
return err
}
}

return nil
}

func (e *ItemExporter) sendUpstream(ctx context.Context, items []*pb.DecoratedEvent) error {
func (e *ItemExporter) sendUpstream(ctx context.Context, items []*pb.DecoratedEvent, connIndex int) error {
req := &pb.CreateEventsRequest{
Events: items,
}

md := metadata.New(e.config.Headers)
ctx = metadata.NewOutgoingContext(ctx, md)

rsp, err := e.client.CreateEvents(ctx, req, grpc.UseCompressor(gzip.Name))
rsp, err := e.clients[connIndex].CreateEvents(ctx, req, grpc.UseCompressor(gzip.Name))
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/output/xatu/xatu.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func New(name string, config *Config, log logrus.FieldLogger, filterConfig *xatu
return nil, err
}

proc, err := processor.NewBatchItemProcessor[xatu.DecoratedEvent](exporter,
proc, err := processor.NewBatchItemProcessor[xatu.DecoratedEvent](&exporter,
xatu.ImplementationLower()+"_output_"+SinkType+"_"+name,
log,
processor.WithMaxQueueSize(config.MaxQueueSize),
Expand Down

0 comments on commit 731516a

Please sign in to comment.