Skip to content

Commit

Permalink
Add Transport Package with Event Publishing Components (#4721)
Browse files Browse the repository at this point in the history
## Overview
This PR introduces a new `transport` package that bridges BoltDB event
logs with NATS messaging:

- `Dispatcher`: Ensures reliable delivery of events from compute nodes
to orchestrator
  - Links BoltDB local event log with NATS publisher
  - Provides ordering and durability guarantees
  - Handles recovery and retries during network issues

- `Forwarder`: Best-effort publishing from orchestrator to compute nodes
  - Temporary solution until proper node join handshake is implemented
  - Prevents single compute node from blocking event flow to others
  - Simple pass-through without reliability guarantees

## Key Aspects
- Dispatcher maintains sequence tracking between BoltDB and NATS
- Forwarder is deliberately simple to avoid complexity in
orchestrator->compute path
- Both components share common utilities but serve different reliability
needs

## Next Steps
- Implement proper compute node join handshake protocol
- Replace forwarder with more robust solution once handshake is in place

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Release Notes

- **New Features**
- Introduced a new term `Lenf` in the spell checker dictionary for
enhanced recognition.
- Added a centralized error handling mechanism in message processing for
improved logging and error management.
- Implemented a `ProtocolRouter` for better protocol management and
routing of commands.
- Added a `Forwarder` for forwarding events without delivery guarantees.

- **Bug Fixes**
- Improved error handling in event processing to ensure consistent
behavior and logging.

- **Tests**
- Comprehensive test suites created for the `NCLMessageCreator`,
`ProtocolRouter`, and `Forwarder` to validate functionality and error
handling.
- Enhanced existing tests for robustness and clarity in error reporting.

- **Documentation**
- Updated configuration structures and validation methods for the
dispatcher to improve setup and error handling.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
  • Loading branch information
wdbaruni and coderabbitai[bot] authored Nov 26, 2024
1 parent a294c3c commit e04a386
Show file tree
Hide file tree
Showing 56 changed files with 4,353 additions and 1,407 deletions.
1 change: 1 addition & 0 deletions .cspell/custom-dictionary.txt
Original file line number Diff line number Diff line change
Expand Up @@ -438,3 +438,4 @@ buildvcs
Nilf
IMDS
tlsca
Lenf
10 changes: 10 additions & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,7 @@ go.opentelemetry.io/contrib/propagators/ot v1.21.1/go.mod h1:oy0MYCbS/b3cqUDW37w
go.opentelemetry.io/otel v1.26.0/go.mod h1:UmLkJHUAidDval2EICqBMbnAd0/m2vmpf/dAM+fvFs4=
go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo=
go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4=
go.opentelemetry.io/otel v1.31.0/go.mod h1:O0C14Yl9FgkjqcCZAsE053C13OaddMYr/hz6clDkEJE=
go.opentelemetry.io/otel/exporters/jaeger v1.17.0 h1:D7UpUy2Xc2wsi1Ras6V40q806WM07rqoCWzXu7Sqy+4=
go.opentelemetry.io/otel/exporters/jaeger v1.17.0/go.mod h1:nPCqOnEH9rNLKqH/+rrUjiMzHJdV1BlpKcTwRTyKkKI=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.40.0 h1:MZbjiZeMmn5wFMORhozpouGKDxj9POHTuU5UA8msBQk=
Expand All @@ -613,17 +614,22 @@ go.opentelemetry.io/otel/exporters/prometheus v0.42.0/go.mod h1:f3bYiqNqhoPxkvI2
go.opentelemetry.io/otel/metric v1.26.0/go.mod h1:SY+rHOI4cEawI9a7N1A4nIg/nTQXe1ccCNWYOJUrpX4=
go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q=
go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s=
go.opentelemetry.io/otel/metric v1.31.0/go.mod h1:C3dEloVbLuYoX41KpmAhOqNriGbA+qqH6PQ5E5mUfnY=
go.opentelemetry.io/otel/sdk v1.26.0/go.mod h1:0p8MXpqLeJ0pzcszQQN4F0S5FVjBLgypeGSngLsmirs=
go.opentelemetry.io/otel/sdk v1.28.0 h1:b9d7hIry8yZsgtbmM0DKyPWMMUMlK9NEKuIG4aBqWyE=
go.opentelemetry.io/otel/sdk v1.28.0/go.mod h1:oYj7ClPUA7Iw3m+r7GeEjz0qckQRJK2B8zjcZEfu7Pg=
go.opentelemetry.io/otel/sdk v1.31.0/go.mod h1:TfRbMdhvxIIr/B2N2LQW2S5v9m3gOQ/08KsbbO5BPT0=
go.opentelemetry.io/otel/sdk/metric v1.24.0/go.mod h1:I6Y5FjH6rvEnTTAYQz3Mmv2kl6Ek5IIrmwTLqMrrOE0=
go.opentelemetry.io/otel/trace v1.26.0/go.mod h1:4iDxvGDQuUkHve82hJJ8UqrwswHYsZuWCBllGV2U2y0=
go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g=
go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI=
go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A=
go.starlark.net v0.0.0-20230525235612-a134d8f9ddca h1:VdD38733bfYv5tUZwEIskMM93VanwNIi5bIKnDrJdEY=
go.starlark.net v0.0.0-20230525235612-a134d8f9ddca/go.mod h1:jxU+3+j+71eXOW14274+SmmuW82qJzl6iZSeqEtTGds=
go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8=
go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4=
golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw=
golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54=
Expand All @@ -644,13 +650,15 @@ golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2 h1:IRJeR9r1pYWsHKTRe/IInb7lYvbBVIqOgsX/u0mbOWY=
golang.org/x/telemetry v0.0.0-20240521205824-bda55230c457 h1:zf5N6UOrA487eEFacMePxjXAJctxKmyjKUsjA11Uzuk=
golang.org/x/telemetry v0.0.0-20240521205824-bda55230c457/go.mod h1:pRgIJT+bRLFKnoM1ldnzKoxTIn14Yxz928LQRYYgIN0=
golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU=
golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk=
golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/tools v0.11.0/go.mod h1:anzJrxPjNtfgiYQYirP2CPGzGLxrH2u2QBhn6Bf3qY8=
golang.org/x/tools v0.12.0/go.mod h1:Sc0INKfu04TlqNoRA1hgpFZbhYXHPr4V5DzpSBTPqQM=
golang.org/x/tools v0.16.1/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0=
Expand All @@ -664,12 +672,14 @@ google.golang.org/genproto v0.0.0-20240701130421-f6361c86f094 h1:6whtk83KtD3FkGr
google.golang.org/genproto/googleapis/api v0.0.0-20240701130421-f6361c86f094/go.mod h1:fJ/e3If/Q67Mj99hin0hMhiNyCRmt6BQ2aWIJshUSJw=
google.golang.org/genproto/googleapis/api v0.0.0-20240808171019-573a1156607a h1:KyUe15n7B1YCu+kMmPtlXxgkLQbp+Dw0tCRZf9Sd+CE=
google.golang.org/genproto/googleapis/api v0.0.0-20240808171019-573a1156607a/go.mod h1:4+X6GvPs+25wZKbQq9qyAXrwIRExv7w0Ea6MgZLZiDM=
google.golang.org/genproto/googleapis/api v0.0.0-20241021214115-324edc3d5d38/go.mod h1:vuAjtvlwkDKF6L1GQ0SokiRLCGFfeBUXWr/aFFkHACc=
google.golang.org/genproto/googleapis/bytestream v0.0.0-20240617180043-68d350f18fd4 h1:Rie8vnNXn/RjOgFacUrolQKaHsN10UPAXBb3IkfDdE4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240808171019-573a1156607a h1:EKiZZXueP9/T68B8Nl0GAx9cjbQnCId0yP3qPMgaaHs=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240808171019-573a1156607a/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0 h1:M1YKkFIboKNieVO5DLUEVzQfGwJD30Nv2jfUgzb5UcE=
gopkg.in/airbrake/gobrake.v2 v2.0.9 h1:7z2uVWwn7oVeeugY1DtlPAy5H+KYgB1KeKTnqjNatLo=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
Expand Down
29 changes: 23 additions & 6 deletions pkg/compute/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,37 @@ func (m *MessageHandler) ShouldProcess(ctx context.Context, message *envelope.Me
}

// HandleMessage handles incoming messages
// TODO: handle messages arriving out of order gracefully
func (m *MessageHandler) HandleMessage(ctx context.Context, message *envelope.Message) error {
var err error

switch message.Metadata.Get(envelope.KeyMessageType) {
case messages.AskForBidMessageType:
return m.handleAskForBid(ctx, message)
err = m.handleAskForBid(ctx, message)
case messages.BidAcceptedMessageType:
return m.handleBidAccepted(ctx, message)
err = m.handleBidAccepted(ctx, message)
case messages.BidRejectedMessageType:
return m.handleBidRejected(ctx, message)
err = m.handleBidRejected(ctx, message)
case messages.CancelExecutionMessageType:
return m.handleCancel(ctx, message)
default:
err = m.handleCancel(ctx, message)
}

return m.handleError(ctx, message, err)
}

// handleError logs the error with context and returns nil.
// In the future, this can be extended to handle different error types differently.
func (m *MessageHandler) handleError(ctx context.Context, message *envelope.Message, err error) error {
if err == nil {
return nil
}

// For now, just log the error and return nil
logger := log.Ctx(ctx).Error()
for key, value := range message.Metadata.ToMap() {
logger = logger.Str(key, value)
}
logger.Err(err).Msg("Error handling message")
return nil
}

func (m *MessageHandler) handleAskForBid(ctx context.Context, message *envelope.Message) error {
Expand Down
3 changes: 3 additions & 0 deletions pkg/compute/watchers/bprotocol_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ func (d *BProtocolDispatcher) HandleEvent(ctx context.Context, event watcher.Eve
}

execution := upsert.Current
if execution.OrchestrationProtocol() != models.ProtocolBProtocolV2 {
return nil
}

// Prepare base response with common fields
routingMetadata := legacy.RoutingMetadata{
Expand Down
3 changes: 1 addition & 2 deletions pkg/compute/watchers/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package watchers

// Error components
const (
dispatcherErrComponent = "Dispatcher"
nclDispatcherErrComponent = "NCLDispatcher"
nclDispatcherErrComponent = "NCLMessageCreator"
bprotocolErrComponent = "BProtocolDispatcher"
)
62 changes: 0 additions & 62 deletions pkg/compute/watchers/dispatcher.go

This file was deleted.

153 changes: 0 additions & 153 deletions pkg/compute/watchers/dispatcher_test.go

This file was deleted.

Loading

0 comments on commit e04a386

Please sign in to comment.