-
Notifications
You must be signed in to change notification settings - Fork 89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Transport Package with Event Publishing Components #4721
Conversation
WalkthroughThe changes in this pull request encompass a wide range of modifications across multiple files. Key updates include the addition of new methods for error handling in message processing, the introduction of a dispatcher for event handling, and the restructuring of existing classes to enhance functionality. Additionally, several test suites have been introduced or modified to ensure comprehensive coverage of the new and existing features. Constants and types have been added or renamed to reflect the updated logic and improve clarity in the codebase. Changes
Poem
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 37
🧹 Outside diff range and nitpick comments (90)
pkg/orchestrator/watchers/constants.go (1)
5-8
: Consider adding documentation for error component responsibilitiesTo improve maintainability, consider adding comments explaining:
- The role of each error component
- The relationship between these components
- When each error component should be used
// Error components +// protocolRouterErrComponent: Used for routing-related errors in the protocol layer +// nclDispatcherErrComponent: Used for NCL (Node Communication Layer) dispatch errors +// bprotocolErrComponent: Used for BProtocol dispatch-related errors +// executionCancellerErrComponent: Used for execution cancellation-related errors const ( protocolRouterErrComponent = "ProtocolRouter" nclDispatcherErrComponent = "NCLDispatcher" bprotocolErrComponent = "BProtocolDispatcher" executionCancellerErrComponent = "ExecutionCanceller" )pkg/transport/dispatcher/errors.go (2)
5-8
: Add package-level documentation for the error type.While the fields are documented, consider adding a detailed doc comment for the
ErrDispatcher
type explaining its purpose in the context of the dispatcher package and when it should be used.+// ErrDispatcher represents an error that occurs during dispatcher operations. +// It provides context about the operation that failed and wraps the underlying error. +// This type is used throughout the dispatcher package to provide consistent error handling +// and reporting for event publishing operations. type ErrDispatcher struct { Op string // Operation that failed Err error // Underlying error }
10-15
: LGTM! Consider performance optimization for complex error chains.The error interface implementation is correct and handles the nil case appropriately. For future scalability, consider using strings.Builder if the error chain becomes more complex.
func (e *ErrDispatcher) Error() string { if e.Err == nil { return fmt.Sprintf("dispatcher error during %s", e.Op) } - return fmt.Sprintf("dispatcher error during %s: %v", e.Op, e.Err) + var b strings.Builder + b.WriteString("dispatcher error during ") + b.WriteString(e.Op) + b.WriteString(": ") + b.WriteString(e.Err.Error()) + return b.String() }pkg/transport/types.go (2)
11-13
: Add documentation for the KeySeqNum constant.While the constant name is self-explanatory, adding documentation would help clarify its purpose in the event publishing system, especially regarding message ordering guarantees mentioned in the PR objectives.
const ( + // KeySeqNum is the metadata key used to track message sequence numbers + // for ensuring ordered event delivery between BoltDB and NATS KeySeqNum = "Bacalhau-SeqNum" )
25-28
: Enhance helper function with documentation and validation.While the function is straightforward, it could benefit from more detailed documentation and input validation.
-// GenerateMsgID Message ID generation helper +// GenerateMsgID generates a unique message ID based on the event's sequence number. +// The ID format is "seq-{number}" which helps with message tracking and debugging. func GenerateMsgID(event watcher.Event) string { + if event.SeqNum < 0 { + // Handle invalid sequence numbers gracefully + return fmt.Sprintf("seq-invalid-%d", event.SeqNum) + } return fmt.Sprintf("seq-%d", event.SeqNum) }pkg/orchestrator/watchers/execution_canceller.go (2)
14-16
: Add documentation for the ExecutionCanceller typeConsider adding a doc comment explaining the type's purpose, responsibilities, and any important implementation details.
+// ExecutionCanceller watches for execution state changes and handles cancellation +// logic by updating the execution state in the job store when necessary. type ExecutionCanceller struct { jobStore jobstore.Store }
25-55
: Add method documentationThe
HandleEvent
method lacks documentation explaining its purpose, parameters, and return values.+// HandleEvent processes execution state change events and handles cancellation logic. +// It implements the watcher.EventHandler interface. +// +// Parameters: +// - ctx: The context for the operation +// - event: The event to process, expected to contain a models.ExecutionUpsert +// +// Returns an error if the event processing fails. func (d *ExecutionCanceller) HandleEvent(ctx context.Context, event watcher.Event) error {pkg/node/constants.go (2)
28-30
: Fix grammatical error in comment and improve documentationThe current comment has a grammatical error and lacks clarity about the cancellation logic.
Apply this diff to fix the comment:
- // and cancels them the execution's observed state + // and cancels executions based on their observed state
8-30
: Consider adding protocol documentationWith the introduction of multiple protocol-specific watchers (BProtocol and NCL), it would be helpful to add documentation (either in this file or in a separate README) explaining:
- The purpose and differences between these protocols
- When each protocol should be used
- How they relate to the transport package's reliability guarantees
Would you like me to help create a documentation template for this?
pkg/orchestrator/watchers/utils_test.go (4)
9-24
: LGTM with suggestions for improvementThe function is well-structured and serves its purpose for test setup. Consider these enhancements:
- Add parameter validation for invalid state transitions
- Document the initial state provided by
mock.Execution()
// setupNewExecution creates an upsert for a new execution with no previous state +// The base execution is created using mock.Execution() which initializes with default values. +// Parameters: +// desiredState: The target desired state for the execution +// computeState: The target compute state for the execution +// events: Optional events to attach to the execution func setupNewExecution( desiredState models.ExecutionDesiredStateType, computeState models.ExecutionStateType, events ...*models.Event, ) models.ExecutionUpsert { + // Validate state transition + if !models.IsValidStateTransition(models.ExecutionStateType(""), computeState) { + panic("Invalid compute state transition in test setup") + } + execution := mock.Execution() execution.ComputeState = models.NewExecutionState(computeState) execution.DesiredState = models.NewExecutionDesiredState(desiredState)
26-50
: LGTM with suggestions for robustnessThe function effectively maintains execution identity during state transitions. Consider these enhancements:
- Add validation for state transition legitimacy
- Document the relationship with the Dispatcher component mentioned in PR objectives
// setupStateTransition creates an upsert for an execution state transition +// This function is particularly useful for testing the Dispatcher component's +// handling of state transitions between BoltDB and NATS messaging. +// Parameters: +// prevDesiredState: The initial desired state +// prevComputeState: The initial compute state +// newDesiredState: The target desired state +// newComputeState: The target compute state +// events: Optional events to attach to the transition func setupStateTransition( prevDesiredState models.ExecutionDesiredStateType, prevComputeState models.ExecutionStateType, newDesiredState models.ExecutionDesiredStateType, newComputeState models.ExecutionStateType, events ...*models.Event, ) models.ExecutionUpsert { + // Validate state transition + if !models.IsValidStateTransition(prevComputeState, newComputeState) { + panic("Invalid compute state transition in test setup") + } + previous := mock.Execution()
52-57
: LGTM with minor documentation enhancementThe helper function is well-implemented. Consider adding context about its role in testing the transport package's event publishing components.
-// createExecutionEvent is a helper to create watcher.Event from an ExecutionUpsert +// createExecutionEvent is a helper to create watcher.Event from an ExecutionUpsert. +// This helper is particularly useful for testing the transport package's event +// publishing components (Dispatcher and Forwarder) by creating events that +// simulate state transitions in the execution lifecycle. func createExecutionEvent(upsert models.ExecutionUpsert) watcher.Event {
1-57
: Consider expanding test utilities for error scenariosWhile the current utilities effectively support happy path testing, consider adding utilities specifically designed for testing error handling in the transport package. This could include:
- Functions to simulate network failures
- Utilities for testing retry mechanisms
- Helpers for testing partial state transitions
This would ensure comprehensive testing of the reliability guarantees mentioned in the PR objectives, particularly for the Dispatcher component.
pkg/transport/forwarder/forwarder.go (5)
13-22
: Consider adding field documentation.While the type documentation is clear, consider adding documentation for individual fields to improve code maintainability.
// Forwarder forwards events from a watcher to a destination in order. // Unlike Dispatcher, it provides no delivery guarantees or recovery mechanisms. type Forwarder struct { + // watcher provides the event stream to forward watcher watcher.Watcher + // creator transforms events into messages creator transport.MessageCreator + // publisher sends messages to the destination publisher ncl.OrderedPublisher + // running tracks the operational state of the forwarder running bool + // mu protects the running state mu sync.RWMutex }
24-47
: Enhance error messages with wrapping.While the error handling is good, consider wrapping the validation errors to provide more context about the component.
func New( publisher ncl.OrderedPublisher, watcher watcher.Watcher, creator transport.MessageCreator) (*Forwarder, error) { if publisher == nil { - return nil, fmt.Errorf("publisher cannot be nil") + return nil, fmt.Errorf("forwarder: %w", fmt.Errorf("publisher cannot be nil")) } if watcher == nil { - return nil, fmt.Errorf("watcher cannot be nil") + return nil, fmt.Errorf("forwarder: %w", fmt.Errorf("watcher cannot be nil")) } if creator == nil { - return nil, fmt.Errorf("message creator cannot be nil") + return nil, fmt.Errorf("forwarder: %w", fmt.Errorf("message creator cannot be nil")) }
49-59
: Consider using defer for mutex unlock.While the mutex handling is correct, using defer would make the code more robust against future modifications.
func (f *Forwarder) Start(ctx context.Context) error { f.mu.Lock() + defer f.mu.Unlock() if f.running { - f.mu.Unlock() return fmt.Errorf("forwarder already running") } f.running = true - f.mu.Unlock() return f.watcher.Start(ctx) }
74-98
: Consider checking for context cancellation.The method should check if the context is cancelled before proceeding with expensive operations.
func (f *Forwarder) HandleEvent(ctx context.Context, event watcher.Event) error { + if err := ctx.Err(); err != nil { + return fmt.Errorf("context error: %w", err) + } + message, err := f.creator.CreateMessage(event) if err != nil { return fmt.Errorf("create message failed: %w", err)
1-98
: Architecture aligns well with PR objectives.The implementation successfully achieves the goal of providing best-effort publishing from orchestrator to compute nodes. The code maintains simplicity while including necessary thread safety and error handling, making it a suitable temporary solution until the proper node join handshake is implemented.
pkg/compute/watchers/ncl_message_creator.go (3)
14-19
: Well-structured refactor following SRPThe renaming from
NCLDispatcher
toNCLMessageCreator
and removal of the publisher field better reflects the single responsibility principle. The struct is now focused solely on message creation, which aligns well with the transport package architecture described in the PR objectives.This separation of concerns will make it easier to implement the future node join handshake protocol mentioned in the PR objectives, as the message creation logic is now decoupled from the publishing mechanism.
44-62
: Consider enhancing error context in log messagesWhile the logging is consistent, consider adding more context about the state transitions and any relevant error information.
Example enhancement:
-log.Debug().Msgf("Accepting bid for execution %s", execution.ID) +log.Debug(). + Str("job_id", execution.JobID). + Str("job_type", string(execution.Job.Type)). + Msgf("Accepting bid for execution %s", execution.ID)
Line range hint
1-70
: Architecture aligns well with PR objectivesThe refactoring of this component from
NCLDispatcher
toNCLMessageCreator
strongly supports the PR's goal of establishing a clean separation between event log connectivity and message handling. This separation will facilitate:
- The future implementation of the node join handshake
- Independent evolution of transport and message creation logic
- Better testability of message creation logic in isolation
Consider documenting the architectural decisions and constraints in a README or design doc to help future maintainers understand the rationale behind these changes.
pkg/node/ncl.go (1)
46-58
: Consider adding documentation about subject hierarchyWhile the individual functions are well-documented, it would be helpful to add package-level documentation explaining the overall subject hierarchy and routing patterns, especially given the removal of wildcards. This would help maintainers understand the message flow between compute nodes and orchestrator.
Add this documentation at the package level:
package node +// Subject Hierarchy for Node Communication +// +// The following subject patterns are used for node communication: +// - compute.*.out.msgs: Messages from compute nodes to orchestrator +// - compute.<node_id>.in.msgs: Messages from orchestrator to specific compute node +// - compute.*.out.heartbeat: Heartbeat messages from compute nodespkg/transport/dispatcher/config_test.go (2)
28-96
: Consider adding more validation test cases.The test suite has good coverage, but consider adding these additional test cases:
- Validation for CheckpointTimeout
- Validation for ProcessInterval
- Validation for SeekTimeout
Here's a suggested addition to the test cases:
testCases := []struct { name string mutate func(*Config) expectError string }{ // ... existing cases ... + { + name: "zero checkpoint timeout", + mutate: func(c *Config) { c.CheckpointTimeout = 0 }, + expectError: "CheckpointTimeout must be positive", + }, + { + name: "zero process interval", + mutate: func(c *Config) { c.ProcessInterval = 0 }, + expectError: "ProcessInterval must be positive", + }, + { + name: "zero seek timeout", + mutate: func(c *Config) { c.SeekTimeout = 0 }, + expectError: "SeekTimeout must be positive", + }, }
1-100
: Consider alignment with dispatcher's reliability requirements.Given that the dispatcher is responsible for ensuring reliable delivery of events from compute nodes to the orchestrator (as per PR objectives), consider if the current configuration validation rules sufficiently support this reliability requirement. For example:
- Should there be validation for minimum acceptable intervals to prevent too aggressive retries?
- Should there be a relationship check between CheckpointInterval and ProcessInterval to ensure proper event ordering?
pkg/compute/watchers/bprotocol_dispatcher.go (2)
36-38
: Add debug logging for skipped protocolsWhile the protocol check is correct, consider adding debug logging when skipping non-BProtocolV2 events to improve observability and debugging.
if execution.OrchestrationProtocol() != models.ProtocolBProtocolV2 { + log.Ctx(ctx).Debug(). + Str("protocol", string(execution.OrchestrationProtocol())). + Str("execution_id", execution.ID). + Msg("Skipping execution with non-BProtocolV2 protocol") return nil }
Line range hint
1-110
: Consider using const for error componentThe error component string
bprotocolErrComponent
appears to be used as a literal. Consider defining it as a package-level constant for better maintainability and reuse.+const ( + bprotocolErrComponent = "bprotocol_dispatcher" +)pkg/orchestrator/watchers/execution_canceller_test.go (3)
19-36
: Add TearDownTest method to clean up resources.While the setup is comprehensive, it's recommended to add a
TearDownTest
method to properly clean up resources, especially the mock controller.Add the following method:
+func (s *ExecutionCancellerTestSuite) TearDownTest() { + s.ctrl.Finish() +}
38-43
: Enhance error type verification.The test verifies that an error occurs but doesn't check the specific error type or message. Consider using
errors.Is()
or verifying the error message to ensure the correct error is being returned.-s.Error(err) +s.ErrorIs(err, models.ErrInvalidType) // or +s.EqualError(err, "expected *models.ExecutionUpsert, got string")
72-91
: Enhance state transition verification.While the test verifies the basic state transition, it could be more thorough in checking the complete execution state. Consider verifying additional fields that might be affected during cancellation.
s.jobStore.EXPECT().UpdateExecution(s.ctx, gomock.Any()).DoAndReturn( func(_ context.Context, req jobstore.UpdateExecutionRequest) error { s.Equal(upsert.Current.ID, req.ExecutionID) s.Equal(models.ExecutionStateCancelled, req.NewValues.ComputeState.StateType) + // Verify additional state fields + s.Equal(models.ExecutionDesiredStateStopped, req.NewValues.DesiredState) + s.NotEmpty(req.NewValues.ComputeState.StateEnteredTime) return nil })pkg/transport/forwarder/forwarder_test.go (2)
16-35
: Consider adding documentation for mock dependencies.While the test suite setup is well-structured, it would be helpful to add documentation explaining the purpose of each mock dependency (publisher, watcher, creator) and their roles in the test scenarios.
type ForwarderUnitTestSuite struct { suite.Suite + // ctrl manages the lifecycle of mock objects ctrl *gomock.Controller + // ctx provides the test context ctx context.Context + // publisher mocks the OrderedPublisher interface for reliable message delivery publisher *ncl.MockOrderedPublisher + // watcher mocks the event watching interface watcher *watcher.MockWatcher + // creator mocks the message creation interface creator *transport.MockMessageCreator }
96-151
: Add tests for concurrent operations and context handling.Consider adding test cases for:
- Concurrent start/stop operations to ensure thread safety
- Context cancellation handling during startup/shutdown
- Graceful shutdown behavior with in-flight messages
Example test to add:
func (s *ForwarderUnitTestSuite) TestConcurrentStartStop() { s.watcher.EXPECT().SetHandler(gomock.Any()).Return(nil) s.watcher.EXPECT().Start(gomock.Any()).Return(nil).AnyTimes() s.watcher.EXPECT().Stop(gomock.Any()).AnyTimes() f, err := New(s.publisher, s.watcher, s.creator) s.Require().NoError(err) var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(2) go func() { defer wg.Done() _ = f.Start(s.ctx) }() go func() { defer wg.Done() _ = f.Stop(s.ctx) }() } wg.Wait() }pkg/transport/dispatcher/handler_test.go (3)
29-36
: Consider adding error handling in SetupTest.While the setup is comprehensive, it would be beneficial to handle potential initialization errors, especially for the dispatcher state.
Consider updating the setup:
func (suite *HandlerTestSuite) SetupTest() { suite.ctrl = gomock.NewController(suite.T()) suite.ctx = context.Background() suite.creator = transport.NewMockMessageCreator(suite.ctrl) suite.publisher = ncl.NewMockOrderedPublisher(suite.ctrl) - suite.state = newDispatcherState() + state, err := newDispatcherState() + suite.Require().NoError(err, "Failed to initialize dispatcher state") + suite.state = state suite.handler = newMessageHandler(suite.creator, suite.publisher, suite.state) }
42-93
: Consider using table-driven tests for error scenarios.The error handling test cases (TestHandleEventCreatorError and TestHandleEventPublishError) follow similar patterns and could be refactored into a table-driven test for better maintainability.
Here's a suggested refactor:
+type errorTestCase struct { + name string + setupMocks func(*HandlerTestSuite) + expectedErr string +} + +func (suite *HandlerTestSuite) TestHandleEventErrors() { + cases := []errorTestCase{ + { + name: "creator error", + setupMocks: func(s *HandlerTestSuite) { + s.creator.EXPECT(). + CreateMessage(gomock.Any()). + Return(nil, fmt.Errorf("creation failed")) + }, + expectedErr: "creation failed", + }, + { + name: "publish error", + setupMocks: func(s *HandlerTestSuite) { + s.creator.EXPECT(). + CreateMessage(gomock.Any()). + Return(envelope.NewMessage("msg"), nil) + s.publisher.EXPECT(). + PublishAsync(s.ctx, gomock.Any()). + Return(nil, fmt.Errorf("publish failed")) + }, + expectedErr: "publish failed", + }, + } + + for _, tc := range cases { + suite.Run(tc.name, func() { + event := watcher.Event{SeqNum: 1} + tc.setupMocks(suite) + + err := suite.handler.HandleEvent(suite.ctx, event) + suite.Error(err) + suite.ErrorContains(err, tc.expectedErr) + suite.Equal(uint64(0), suite.state.lastObservedSeq) + suite.Equal(0, suite.state.pending.Size()) + }) + } +}
95-124
: Add timeout scenario test and enhance metadata assertions.The success case is well-tested but could be enhanced with:
- A timeout scenario test using context cancellation
- Additional assertions for message metadata consistency
Consider adding this test case:
func (suite *HandlerTestSuite) TestHandleEventTimeout() { event := watcher.Event{SeqNum: 1} msg := envelope.NewMessage("msg") ctx, cancel := context.WithCancel(suite.ctx) suite.creator.EXPECT(). CreateMessage(event). Return(msg, nil) suite.publisher.EXPECT(). PublishAsync(ctx, gomock.Any()). DoAndReturn(func(_ context.Context, _ ncl.PublishRequest) (ncl.PubFuture, error) { cancel() // Simulate timeout return nil, context.Canceled }) err := suite.handler.HandleEvent(ctx, event) suite.Error(err) suite.ErrorIs(err, context.Canceled) }pkg/lib/ncl/publisher_config.go (2)
11-20
: LGTM! Consider enhancing documentation.The
AckMode
type and its constants are well-defined and follow Go idioms.Consider adding examples to the documentation to illustrate when each mode should be used:
// AckMode determines how published messages should be acknowledged type AckMode int const ( - // ExplicitAck requires explicit acknowledgment from one subscriber + // ExplicitAck requires explicit acknowledgment from one subscriber. + // Use this mode when message delivery confirmation is critical, + // e.g., for important system events or state changes. ExplicitAck AckMode = iota - // NoAck means the message is considered delivered as soon as it's published + // NoAck means the message is considered delivered as soon as it's published. + // Use this mode for non-critical messages where best-effort delivery is acceptable, + // e.g., metrics or status updates. NoAck )
11-20
: Implementation aligns with PR objectives.The new
AckMode
type and its integration intoOrderedPublisherConfig
effectively supports both:
- Reliable delivery through
ExplicitAck
(for Dispatcher's guaranteed delivery)- Best-effort publishing through
NoAck
(for Forwarder's simpler pass-through)This implementation aligns perfectly with the PR's goal of providing different reliability guarantees for different components.
Consider documenting these architectural decisions in the package documentation to help future maintainers understand the relationship between acknowledgment modes and component responsibilities.
Also applies to: 93-96
pkg/compute/message_handler.go (1)
52-65
: Enhance error handling structureWhile the logging implementation is good, consider these improvements:
- Add structured error fields (e.g., error type, severity)
- Implement basic error categorization now to make future extensions easier
Consider this enhancement:
func (m *MessageHandler) handleError(ctx context.Context, message *envelope.Message, err error) error { if err == nil { return nil } // For now, just log the error and return nil logger := log.Ctx(ctx).Error() for key, value := range message.Metadata.ToMap() { logger = logger.Str(key, value) } + // Add structured error information + if typed, ok := err.(bacerrors.Error); ok { + logger = logger.Str("error_component", typed.Component()) + } + logger = logger.Str("error_type", reflect.TypeOf(err).String()) logger.Err(err).Msg("Error handling message") return nil }pkg/transport/dispatcher/dispatcher_test.go (4)
1-2
: Add a descriptive comment for the build tag.Consider adding a comment explaining the purpose of the build tags to improve maintainability.
-//go:build unit || !integration +// Package dispatcher_test contains unit tests for the dispatcher package. +//go:build unit || !integration
19-28
: Document the test suite struct fields.Consider adding documentation to explain the purpose of each field in the
DispatcherTestSuite
struct to improve maintainability.type DispatcherTestSuite struct { suite.Suite + // ctrl manages the lifecycle of mock objects ctrl *gomock.Controller + // ctx is the base context for all test operations ctx context.Context + // publisher is a mock for testing ordered message publishing publisher *ncl.MockOrderedPublisher + // watcher is a mock for testing event watching watcher *watcher.MockWatcher + // creator is a mock for testing message creation creator *transport.MockMessageCreator + // config holds the dispatcher configuration config Config + // handler stores the event handler for verification handler watcher.EventHandler }
43-86
: Add positive test case to TestNewDispatcher.The table-driven tests only cover error cases. Consider adding a positive test case to verify successful dispatcher creation.
tests := []struct { name string setup func() (*Dispatcher, error) expectError string }{ + { + name: "successful creation", + setup: func() (*Dispatcher, error) { + suite.watcher.EXPECT().SetHandler(gomock.Any()).Return(nil) + return New(suite.publisher, suite.watcher, suite.creator, suite.config) + }, + expectError: "", + }, { name: "nil publisher",
1-177
: Enhance test coverage for reliability guarantees.Given that the PR objectives emphasize reliable event delivery and recovery mechanisms, consider enhancing the test suite to specifically verify:
- Event ordering guarantees during normal operation
- Recovery behavior during network issues
- Retry mechanism effectiveness
- Sequence tracking between BoltDB and NATS
This will ensure the test coverage aligns with the core reliability requirements outlined in the PR objectives.
pkg/transport/dispatcher/recovery_test.go (5)
29-44
: Consider parameterizing test configuration values.The retry intervals are hardcoded in the test setup. Consider extracting these as constants or test parameters for better maintainability.
+const ( + testBaseRetryInterval = 50 * time.Millisecond + testMaxRetryInterval = 200 * time.Millisecond +) func (suite *RecoveryTestSuite) SetupTest() { // ... suite.recovery = newRecovery( suite.publisher, suite.watcher, suite.state, Config{ - BaseRetryInterval: 50 * time.Millisecond, - MaxRetryInterval: 200 * time.Millisecond, + BaseRetryInterval: testBaseRetryInterval, + MaxRetryInterval: testMaxRetryInterval, }, ) }
31-31
: Consider using a context with timeout for tests.Using a background context without timeout could lead to hanging tests if something goes wrong. Consider using
context.WithTimeout
for better test reliability.-suite.ctx = context.Background() +ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) +suite.ctx = ctx +defer cancel()
80-83
: Consider extracting test timeout values.The timeout values in
Eventually
should be extracted as constants for consistency and maintainability.+const ( + testEventualTimeout = time.Second + testEventualInterval = 50 * time.Millisecond +) -}, 1*time.Second, 50*time.Millisecond) +}, testEventualTimeout, testEventualInterval)
112-128
: Add test cases for edge cases in recovery loop.The recovery loop tests could be more comprehensive. Consider adding test cases for:
- Maximum retry count exceeded
- Zero/negative retry intervals
- Multiple concurrent recovery attempts
159-175
: Consider testing concurrent reset operations.The
TestReset
method tests the basic reset functionality, but it might be worth adding tests for concurrent reset operations to ensure thread safety.pkg/orchestrator/watchers/protocol_router_test.go (2)
18-45
: Consider adding a TearDown method.While the setup is comprehensive, consider adding a
TearDown
method to ensure proper cleanup of resources, particularly the mock controller:+func (s *ProtocolRouterTestSuite) TearDownTest() { + s.ctrl.Finish() +}This ensures that all mock expectations are verified after each test.
47-90
: Enhance validation test coverage.Consider adding these test cases to
TestNewProtocolRouter_ValidationErrors
:tests := []struct { name string params ProtocolRouterParams shouldError bool }{ + { + name: "invalid_protocol", + params: ProtocolRouterParams{ + NodeStore: s.nodeStore, + SupportedProtocols: []models.Protocol{"invalid"}, + }, + shouldError: true, + }, + { + name: "duplicate_protocols", + params: ProtocolRouterParams{ + NodeStore: s.nodeStore, + SupportedProtocols: []models.Protocol{ + models.ProtocolNCLV1, + models.ProtocolNCLV1, + }, + }, + shouldError: true, + },pkg/transport/dispatcher/state_test.go (2)
16-31
: Consider documenting the test suite structure.The test suite setup follows best practices with proper initialization and cleanup. Consider adding a brief comment describing the purpose of the StateTestSuite and its components for better maintainability.
+// StateTestSuite tests the dispatcher state management functionality. +// It validates state updates, checkpointing, and pending message handling. type StateTestSuite struct { suite.Suite ctrl *gomock.Controller state *dispatcherState pending *pendingMessageStore }
33-112
: Consider adding error case tests for state management.While the current tests cover the happy path comprehensively, consider adding test cases for:
- Invalid sequence numbers (e.g., attempting to update with a lower sequence number)
- Edge cases around uint64 boundaries
- Concurrent state updates
pkg/compute/watchers/ncl_message_creator_test.go (3)
17-28
: Consider adding TearDownTest method for completeness.While the current setup is sufficient, adding a TearDownTest method would make the test suite more maintainable for future additions.
+func (s *NCLMessageCreatorTestSuite) TearDownTest() { + s.creator = nil +}
30-187
: Consider adding more test coverage for edge cases and concurrency.While the existing test cases are well-structured and cover the main functionality, consider adding:
- Table-driven tests for similar state transitions (e.g., BidAccepted, BidRejected)
- Edge cases:
- Nil execution
- Invalid state transitions
- Concurrent access to NCLMessageCreator
Example of table-driven test approach:
func (s *NCLMessageCreatorTestSuite) TestCreateMessage_States() { tests := []struct { name string state models.ExecutionStateType expectedType string shouldBeNil bool }{ { name: "bid accepted", state: models.ExecutionStateAskForBidAccepted, expectedType: messages.BidResultMessageType, }, { name: "bid rejected", state: models.ExecutionStateAskForBidRejected, expectedType: messages.BidResultMessageType, }, // Add more cases } for _, tt := range tests { s.Run(tt.name, func() { execution := mock.Execution() execution.Job.Meta[models.MetaOrchestratorProtocol] = models.ProtocolNCLV1.String() execution.ComputeState = models.State[models.ExecutionStateType]{ StateType: tt.state, } msg, err := s.creator.CreateMessage(watcher.Event{ Object: models.ExecutionUpsert{ Current: execution, }, }) s.NoError(err) if tt.shouldBeNil { s.Nil(msg) return } s.Equal(tt.expectedType, msg.Metadata.Get(envelope.KeyMessageType)) }) } }
67-81
: Enhance assertion messages for better test failure debugging.Consider adding descriptive messages to key assertions to make test failures more informative.
Example improvement:
-s.Require().NoError(err) +s.Require().NoError(err, "should create message without error") -s.Equal(messages.BidResultMessageType, msg.Metadata.Get(envelope.KeyMessageType)) +s.Equal(messages.BidResultMessageType, msg.Metadata.Get(envelope.KeyMessageType), + "should create message with correct type")Also applies to: 97-110, 128-142, 158-170
pkg/lib/ncl/publisher_ordered.go (5)
59-81
: Improve error handling in constructorWhile the conditional initialization based on
AckMode
is good, we should ensure proper cleanup if initialization fails. If subscription succeeds but a subsequent operation fails, we should clean up the subscription.Consider wrapping the initialization in a defer cleanup:
func NewOrderedPublisher(nc *nats.Conn, config OrderedPublisherConfig) (OrderedPublisher, error) { config.setDefaults() p := &orderedPublisher{ publisher: &publisher{ nc: nc, config: *config.toPublisherConfig(), }, config: config, queue: make(chan *pendingMsg, queueSize), shutdown: make(chan struct{}), reset: make(chan struct{}), resetDone: make(chan struct{}), } // Validate if err := p.validate(); err != nil { return nil, fmt.Errorf("invalid ordered publisher config: %w", err) } + var cleanup func() // Always need publishLoop p.wg.Add(1) go p.publishLoop() // Start timeoutLoop and subscribe to inbox if ack mode is enabled if config.AckMode != NoAck { // Subscribe to responses p.inbox = nc.NewInbox() sub, err := nc.Subscribe(p.inbox+".*", p.handleResponse) if err != nil { return nil, fmt.Errorf("failed to create response subscription: %w", err) } + cleanup = func() { sub.Unsubscribe() } + defer func() { + if err != nil && cleanup != nil { + cleanup() + } + }() p.subscription = sub // Start timeout loop p.wg.Add(1) go p.timeoutLoop() } return p, nil }
168-171
: Consider collecting multiple errors during cleanupThe nil check is good, but we might want to collect all errors that occur during cleanup rather than returning on the first error.
Consider using
errors.Join
to collect all cleanup errors:- if p.subscription != nil { - if err := p.subscription.Unsubscribe(); err != nil { - return fmt.Errorf("failed to unsubscribe: %w", err) - } - } + var errs []error + if p.subscription != nil { + if err := p.subscription.Unsubscribe(); err != nil { + errs = append(errs, fmt.Errorf("failed to unsubscribe: %w", err)) + } + } + // Add other cleanup operations here + if len(errs) > 0 { + return errors.Join(errs...) + }
246-249
: Handle potential UUID generation errorsWhile UUID generation failures are extremely rare, for completeness, consider using
uuid.Must
to make the failure case explicit, or handle any potential errors.Consider this change:
- msg.Reply = p.inbox + "." + uuid.NewString() + msg.Reply = p.inbox + "." + uuid.Must(uuid.NewRandom()).String()
271-277
: Make Result object creation more explicitConsider making the Result object creation more explicit by defining what fields are being initialized.
Consider this change:
- pubMsg.future.setResult(&Result{}) + pubMsg.future.setResult(&Result{ + // Explicitly show which fields are being initialized + Error: "", + // Add any other relevant fields + })
Line range hint
1-341
: Consider adding more detailed logging for operational visibilityGiven this is a critical component for event delivery, consider adding more detailed logging to help with monitoring and debugging:
- Log when switching between ack modes
- Log message processing states
- Add metrics for success/failure rates
This will help with:
- Monitoring system health
- Debugging delivery issues
- Performance optimization
pkg/orchestrator/message_handler.go (2)
39-50
: Consider adding message processing metrics and tracing.Given that this is part of a transport package focused on reliability, consider adding metrics and tracing to track message processing attempts, successes, and failures. This would help with monitoring and debugging in production.
52-65
: Add context values for message tracking.To support the reliability goals mentioned in the PR objectives, consider adding context values for tracking message processing attempts and correlation IDs.
+const ( + ctxKeyAttempt = "attempt" + ctxKeyCorrelationID = "correlation_id" +) func (m *MessageHandler) handleError(ctx context.Context, message *envelope.Message, err error) error { if err == nil { return nil } logger := log.Ctx(ctx).Error() + if attempt := ctx.Value(ctxKeyAttempt); attempt != nil { + logger = logger.Int("attempt", attempt.(int)) + } + if correlationID := ctx.Value(ctxKeyCorrelationID); correlationID != nil { + logger = logger.Str("correlation_id", correlationID.(string)) + } for key, value := range message.Metadata.ToMap() { logger = logger.Str(key, value) }pkg/orchestrator/watchers/ncl_message_creator_test.go (3)
43-45
: Consider enhancing subject pattern testingThe current subject pattern is very simple. Consider adding test cases with more complex patterns that might occur in production, such as hierarchical subjects or patterns with special characters.
91-135
: Consider adding concurrent access testsGiven that the NCLMessageCreator might be accessed concurrently in production, consider adding tests that verify thread-safety.
Example test case:
func (s *NCLMessageCreatorTestSuite) TestCreateMessage_ConcurrentAccess() { const goroutines = 10 var wg sync.WaitGroup wg.Add(goroutines) s.nodeStore.EXPECT().Get(gomock.Any(), gomock.Any()).Return( models.NodeState{ Info: models.NodeInfo{ SupportedProtocols: []models.Protocol{models.ProtocolNCLV1}, }, }, nil).Times(goroutines) for i := 0; i < goroutines; i++ { go func() { defer wg.Done() upsert := setupNewExecution( models.ExecutionDesiredStatePending, models.ExecutionStateNew, ) _, err := s.creator.CreateMessage(createExecutionEvent(upsert)) s.NoError(err) }() } wg.Wait() }
168-196
: Add edge cases for execution state transitionsConsider adding test cases for edge cases in state transitions, such as:
- Invalid state transitions
- Multiple state changes in quick succession
- State transitions with missing or corrupted events
pkg/transport/forwarder/forwarder_e2e_test.go (4)
40-96
: Consider enhancing setup and teardown robustness.While the setup and teardown are comprehensive, consider these improvements:
- Initialize the cleanup functions slice with a capacity hint to avoid reallocations
- Add a timeout context for teardown operations
- Add error handling for subscriber operations
func (s *ForwarderE2ETestSuite) SetupTest() { logger.ConfigureTestLogging(s.T()) s.ctx, s.cancel = context.WithCancel(context.Background()) + s.cleanupFuncs = make([]func(), 0, 5) // Add capacity hint // ... rest of the setup } func (s *ForwarderE2ETestSuite) TearDownTest() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() for i := len(s.cleanupFuncs) - 1; i >= 0; i-- { s.cleanupFuncs[i]() } s.cleanupFuncs = nil if s.subscriber != nil { - s.Require().NoError(s.subscriber.Close(context.Background())) + if err := s.subscriber.Close(ctx); err != nil { + s.T().Errorf("failed to close subscriber: %v", err) + } } // ... rest of the teardown }
98-278
: Enhance helper methods robustness.The helper methods are well-structured but could benefit from additional error handling and type safety:
func (s *ForwarderE2ETestSuite) startForwarder() *forwarder.Forwarder { // ... existing setup code ... - s.Require().NoError(f.Start(s.ctx)) + if err := f.Start(s.ctx); err != nil { + s.T().Fatalf("failed to start forwarder: %v", err) + } return f } func (s *ForwarderE2ETestSuite) verifyMsg(msg *envelope.Message, i int) { - payload, ok := msg.GetPayload("") + payload, ok := msg.GetPayload("string") // Use explicit type s.Require().True(ok, "payload missing or not a string") + s.Require().IsType("", payload, "payload is not a string") s.Contains(payload, fmt.Sprintf("event-%d", i)) s.Require().Equal(fmt.Sprintf("%d", i), msg.Metadata.Get(transport.KeySeqNum)) }
124-256
: Consider enhancing test robustness and coverage.While the test cases are comprehensive, consider these improvements:
- Define timeout and polling interval constants
- Add more descriptive assertion messages
- Consider additional edge cases
+const ( + eventuallyTimeout = time.Second + eventuallyInterval = 10 * time.Millisecond + messageWaitTime = 100 * time.Millisecond +) func (s *ForwarderE2ETestSuite) TestEventFlow() { s.startForwarder() s.storeEvents(5) s.Eventually(func() bool { return len(s.received) == 5 - }, time.Second, 10*time.Millisecond) + }, eventuallyTimeout, eventuallyInterval, "expected 5 messages, got %d", len(s.received)) // ... rest of the test }Consider adding these test cases:
- Test with large messages
- Test with rapid message publishing
- Test with slow subscribers
Would you like me to help implement these additional test cases?
280-285
: Consider enhancing test message creator validation.The test message creator could benefit from additional validation:
func (c *testMessageCreator) CreateMessage(event watcher.Event) (*envelope.Message, error) { + if event.Object == nil { + return nil, fmt.Errorf("event object cannot be nil") + } return envelope.NewMessage(event.Object), nil }pkg/transport/dispatcher/dispatcher_e2e_test.go (2)
215-221
: Extract checkpoint verification to helper methodThe checkpoint verification logic is complex and could be reused. Consider extracting it to a helper method.
+func (s *DispatcherE2ETestSuite) waitForCheckpoint(expected uint64) bool { + checkpoint, err := s.store.GetCheckpoint(s.ctx, "test-watcher") + if err != nil { + return false + } + return checkpoint == expected +} // In test: -s.Eventually(func() bool { - checkpoint, err := s.store.GetCheckpoint(s.ctx, "test-watcher") - if err != nil { - return false - } - return checkpoint == 5 -}, time.Second, 10*time.Millisecond) +s.Eventually(func() bool { + return s.waitForCheckpoint(5) +}, time.Second, 10*time.Millisecond)
126-243
: Consider adding more edge cases to test suiteWhile the current test cases cover the main scenarios (event flow, recovery, and checkpointing), consider adding tests for:
- Concurrent event publishing
- Message size limits
- Network latency simulation
- Error scenarios in message creation
- Partial checkpoint recovery
These additional test cases would help ensure the dispatcher's reliability under various conditions.
pkg/models/execution.go (1)
255-259
: LGTM! Consider adding documentation.The implementation is clean and follows the codebase's patterns. Consider adding a doc comment to explain the method's purpose and relationship with the transport package's Dispatcher/Forwarder components.
+// OrchestrationProtocol returns the protocol used to orchestrate this execution, +// which determines whether to use the Dispatcher or Forwarder for event transport. func (e *Execution) OrchestrationProtocol() Protocol {pkg/models/job.go (1)
382-394
: Document the Protocol type and add migration planThe implementation looks good with proper nil checks and default handling. However, a few suggestions:
- Consider adding godoc comments to document:
- The Protocol type and its possible values
- The purpose of this method
- The significance of the default protocol
- Since this is a temporary solution (as per TODO), consider:
- Adding a target date/version for removal
- Creating a tracking issue for the migration
Would you like me to help create a tracking issue for the migration plan?
pkg/orchestrator/watchers/protocol_router.go (1)
17-18
: Clarify usage ofsupportedProtocols
mapUsing a
map[models.Protocol]bool
to represent a set of supported protocols is functional but can be less clear to readers.Consider defining a type alias or using a dedicated set structure for clarity:
type ProtocolSet map[models.Protocol]struct{}Then initialize it as:
supportedProtocols := make(ProtocolSet) for _, protocol := range params.SupportedProtocols { supportedProtocols[protocol] = struct{}{} }This makes it explicit that
supportedProtocols
is used as a set.pkg/transport/dispatcher/recovery.go (3)
80-80
: Fix Typo in Log MessageThe log message on line 80 contains a duplicated word "state":
log.Ctx(ctx).Debug().Msg("Reset dispatcher state state after publish failure")Please remove the duplicate word for clarity.
Apply this diff to correct the typo:
-log.Ctx(ctx).Debug().Msg("Reset dispatcher state state after publish failure") +log.Ctx(ctx).Debug().Msg("Reset dispatcher state after publish failure")
86-116
: Enhance Context Cancellation Handling inrecoveryLoop
In
recoveryLoop
, whenctx.Err() != nil
, the loop exits silently. Consider adding a log message to indicate that the recovery loop is exiting due to context cancellation. This will improve observability and aid in debugging.Apply this diff to add a log statement:
if ctx.Err() != nil { + log.Info().Msg("Context canceled. Exiting recovery loop.") return }
127-130
: Review the Necessity of the UnusedgetState
MethodThe
getState
method is annotated with//nolint:unused
, indicating it is not currently used:// getState returns current recovery state // //nolint:unused func (r *recovery) getState() (bool, time.Time, int) {If this method is not needed, consider removing it to reduce unused code. If it's intended for future use or for testing purposes, it can remain as is.
pkg/transport/dispatcher/state.go (2)
109-109
: Initializemsgs
slice tonil
instead of an empty sliceWhen initializing the
msgs
slice innewPendingMessageStore
, consider setting it tonil
rather than an empty slice to optimize memory usage.Suggestion:
func newPendingMessageStore() *pendingMessageStore { return &pendingMessageStore{ - msgs: make([]*pendingMessage, 0), + msgs: nil, } }
146-146
: Setmsgs
slice tonil
inClear
method for better memory managementIn the
Clear
method, settings.msgs
tonil
instead of an empty slice can help with memory optimization.Suggestion:
func (s *pendingMessageStore) Clear() { s.mu.Lock() defer s.mu.Unlock() - s.msgs = make([]*pendingMessage, 0) + s.msgs = nil }pkg/orchestrator/watchers/ncl_message_creator.go (1)
78-125
: Refactor message creation methods to reduce code duplicationThe methods
createAskForBidMessage
,createBidAcceptedMessage
,createBidRejectedMessage
, andcreateCancelMessage
have similar structures and repetitive code patterns. Refactoring them can improve maintainability and reduce the likelihood of errors.Consider creating a generic helper function that handles common logic. For example:
func (d *NCLMessageCreator) createMessage( upsert models.ExecutionUpsert, request interface{}, messageType string, ) *envelope.Message { return envelope.NewMessage(request). WithMetadataValue(envelope.KeyMessageType, messageType) }Then, update the specific methods:
func (d *NCLMessageCreator) createAskForBidMessage(upsert models.ExecutionUpsert) *envelope.Message { log.Debug(). Str("nodeID", upsert.Current.NodeID). Str("executionID", upsert.Current.ID). Msg("Asking for bid") return d.createMessage(upsert, messages.AskForBidRequest{ BaseRequest: messages.BaseRequest{Events: upsert.Events}, Execution: upsert.Current, }, messages.AskForBidMessageType) }Repeat this pattern for the other message creation methods.
pkg/transport/dispatcher/config.go (1)
28-72
: Align default value comments with constants to prevent discrepanciesThe comments for each field in the
Config
struct specify default values, such as// Default: 5 seconds
. To ensure consistency and prevent discrepancies if the default constants change, consider referencing the constants directly in the comments or generating documentation that incorporates the constant values. This approach enhances maintainability and ensures the documentation remains accurate over time.pkg/orchestrator/watchers/bprotocol_dispatcher.go (1)
59-61
: Consider adding a log message when skipping events due to protocol mismatchCurrently, when the preferred protocol is not
models.ProtocolBProtocolV2
, the function returns without any logging. Consider adding a log message to indicate that the event has been skipped due to protocol mismatch. This can aid in debugging and monitoring the dispatcher's behavior.Apply this diff to add a log message:
if preferredProtocol != models.ProtocolBProtocolV2 { + log.Ctx(ctx).Debug(). + Str("executionID", execution.ID). + Str("preferredProtocol", string(preferredProtocol)). + Msg("Skipping event due to protocol mismatch") return nil }pkg/transport/dispatcher/dispatcher.go (4)
82-88
: Use 'defer' for mutex unlocking in 'Start' methodCurrently, the mutex is manually unlocked, which can be error-prone. Using
defer
ensures the mutex is always unlocked, even if an error occurs.Apply this diff:
func (d *Dispatcher) Start(ctx context.Context) error { d.mu.Lock() + defer d.mu.Unlock() if d.running { - d.mu.Unlock() return fmt.Errorf("dispatcher already running") } d.running = true - d.mu.Unlock() d.routinesWg.Add(3) // For the three goroutines // ...Similarly, consider applying the same pattern in the
Stop
method to ensure consistent and safe mutex handling.
165-167
: Avoid logging entire message objects to prevent sensitive data exposureLogging the entire
msg
object usingEmbedObject(msg)
may inadvertently expose sensitive information in the logs.Apply this diff to log only necessary details:
func (d *Dispatcher) handlePublishSuccess(ctx context.Context, msg *pendingMessage) { - log.Ctx(ctx).Debug().EmbedObject(msg).Msg("Message published successfully") + log.Ctx(ctx).Debug(). + Uint64("eventSeqNum", msg.eventSeqNum). + Msg("Message published successfully") // Remove all messages up to and including this one // ...This approach safeguards sensitive data while still providing useful logging information.
131-160
: Optimize pending messages processing to improve performanceProcessing all pending messages on each tick could impact performance, especially with many pending messages.
Consider batching or limiting the number of messages processed at a time, or adjusting the
ProcessInterval
based on the number of pending messages.
73-74
: Use consistent error wrapping for better stack tracesIn the
New
function, errors are wrapped usingfmt.Errorf
with%w
, and elsewhereerrors.Wrap
is used. For consistency and richer error context, consider usingerrors.Wrap
throughout.Apply this diff:
- return nil, fmt.Errorf("failed to set handler: %w", err) + return nil, errors.Wrap(err, "failed to set handler")This provides a consistent error-handling approach and includes stack traces if using
pkg/errors
.pkg/orchestrator/watchers/bprotocol_dispatcher_test.go (5)
60-67
: Enhance flexibility by parameterizingexpectProtocolSupport
The
expectProtocolSupport
helper function currently hard-codes the supported protocols to[models.ProtocolBProtocolV2]
. To make the tests more flexible and reusable, consider adding a parameter to specify different supported protocols for each test case.You can modify the function as follows:
- func (s *BProtocolDispatcherSuite) expectProtocolSupport(execution *models.Execution) { + func (s *BProtocolDispatcherSuite) expectProtocolSupport(execution *models.Execution, protocols []models.Protocol) { s.nodeStore.EXPECT().Get(s.ctx, execution.NodeID).Return( models.NodeState{ Info: models.NodeInfo{ - SupportedProtocols: []models.Protocol{models.ProtocolBProtocolV2}, + SupportedProtocols: protocols, }, }, nil) ) }This allows you to specify the supported protocols when calling
expectProtocolSupport
in different tests, improving clarity and maintainability.
158-159
: Apply parameterized protocol support inTestHandleEvent_AskForBid
After refactoring
expectProtocolSupport
, update its usage inTestHandleEvent_AskForBid
to specify the supported protocols explicitly.Modify the call to:
s.expectProtocolSupport(upsert.Current, []models.Protocol{models.ProtocolBProtocolV2})This makes the test's assumptions clear and aligns with the refactored helper function.
183-184
: Apply parameterized protocol support inTestHandleEvent_BidAccepted
Similarly, update the
expectProtocolSupport
call inTestHandleEvent_BidAccepted
to specify the supported protocols:s.expectProtocolSupport(upsert.Current, []models.Protocol{models.ProtocolBProtocolV2})This enhances test clarity and consistency.
205-206
: Apply parameterized protocol support inTestHandleEvent_BidRejected
Again, update the
expectProtocolSupport
call inTestHandleEvent_BidRejected
:s.expectProtocolSupport(upsert.Current, []models.Protocol{models.ProtocolBProtocolV2})This follows the refactored approach and improves test readability.
227-228
: Apply parameterized protocol support inTestHandleEvent_Cancel
In
TestHandleEvent_Cancel
, adjust theexpectProtocolSupport
call:s.expectProtocolSupport(upsert.Current, []models.Protocol{models.ProtocolBProtocolV2})This maintains consistency across your tests.
pkg/node/compute.go (1)
433-454
: Review the use ofRetryStrategyBlock
innclDispatcherWatcher
The
nclDispatcherWatcher
is configured withRetryStrategyBlock
, which will block processing on failure. Ensure that this strategy is acceptable and won't cause unintended delays or bottlenecks in event processing.Consider implementing a retry strategy with backoff or monitoring the watcher for potential blocking issues to improve resilience.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
⛔ Files ignored due to path filters (2)
go.sum
is excluded by!**/*.sum
go.work.sum
is excluded by!**/*.sum
📒 Files selected for processing (55)
.cspell/custom-dictionary.txt
(1 hunks)go.mod
(1 hunks)pkg/compute/message_handler.go
(1 hunks)pkg/compute/watchers/bprotocol_dispatcher.go
(1 hunks)pkg/compute/watchers/constants.go
(1 hunks)pkg/compute/watchers/dispatcher.go
(0 hunks)pkg/compute/watchers/dispatcher_test.go
(0 hunks)pkg/compute/watchers/ncl_dispatcher_test.go
(0 hunks)pkg/compute/watchers/ncl_message_creator.go
(2 hunks)pkg/compute/watchers/ncl_message_creator_test.go
(1 hunks)pkg/lib/ncl/mocks.go
(1 hunks)pkg/lib/ncl/publisher_config.go
(3 hunks)pkg/lib/ncl/publisher_ordered.go
(4 hunks)pkg/lib/ncl/types.go
(1 hunks)pkg/models/execution.go
(1 hunks)pkg/models/job.go
(1 hunks)pkg/node/compute.go
(8 hunks)pkg/node/constants.go
(1 hunks)pkg/node/ncl.go
(1 hunks)pkg/node/requester.go
(7 hunks)pkg/orchestrator/message_handler.go
(1 hunks)pkg/orchestrator/watchers/bprotocol_dispatcher.go
(2 hunks)pkg/orchestrator/watchers/bprotocol_dispatcher_test.go
(7 hunks)pkg/orchestrator/watchers/constants.go
(1 hunks)pkg/orchestrator/watchers/dispatcher.go
(0 hunks)pkg/orchestrator/watchers/dispatcher_test.go
(0 hunks)pkg/orchestrator/watchers/execution_canceller.go
(1 hunks)pkg/orchestrator/watchers/execution_canceller_test.go
(1 hunks)pkg/orchestrator/watchers/ncl_dispatcher.go
(0 hunks)pkg/orchestrator/watchers/ncl_dispatcher_test.go
(0 hunks)pkg/orchestrator/watchers/ncl_message_creator.go
(1 hunks)pkg/orchestrator/watchers/ncl_message_creator_test.go
(1 hunks)pkg/orchestrator/watchers/protocol_router.go
(1 hunks)pkg/orchestrator/watchers/protocol_router_test.go
(1 hunks)pkg/orchestrator/watchers/utils_test.go
(1 hunks)pkg/test/utils/utils.go
(2 hunks)pkg/transport/dispatcher/config.go
(1 hunks)pkg/transport/dispatcher/config_test.go
(1 hunks)pkg/transport/dispatcher/constants.go
(1 hunks)pkg/transport/dispatcher/dispatcher.go
(1 hunks)pkg/transport/dispatcher/dispatcher_e2e_test.go
(1 hunks)pkg/transport/dispatcher/dispatcher_test.go
(1 hunks)pkg/transport/dispatcher/errors.go
(1 hunks)pkg/transport/dispatcher/handler.go
(1 hunks)pkg/transport/dispatcher/handler_test.go
(1 hunks)pkg/transport/dispatcher/recovery.go
(1 hunks)pkg/transport/dispatcher/recovery_test.go
(1 hunks)pkg/transport/dispatcher/state.go
(1 hunks)pkg/transport/dispatcher/state_test.go
(1 hunks)pkg/transport/dispatcher/utils.go
(1 hunks)pkg/transport/forwarder/forwarder.go
(1 hunks)pkg/transport/forwarder/forwarder_e2e_test.go
(1 hunks)pkg/transport/forwarder/forwarder_test.go
(1 hunks)pkg/transport/mocks.go
(1 hunks)pkg/transport/types.go
(1 hunks)
💤 Files with no reviewable changes (7)
- pkg/compute/watchers/dispatcher.go
- pkg/compute/watchers/dispatcher_test.go
- pkg/compute/watchers/ncl_dispatcher_test.go
- pkg/orchestrator/watchers/dispatcher.go
- pkg/orchestrator/watchers/dispatcher_test.go
- pkg/orchestrator/watchers/ncl_dispatcher.go
- pkg/orchestrator/watchers/ncl_dispatcher_test.go
✅ Files skipped from review due to trivial changes (6)
- .cspell/custom-dictionary.txt
- pkg/compute/watchers/constants.go
- pkg/lib/ncl/mocks.go
- pkg/lib/ncl/types.go
- pkg/transport/dispatcher/constants.go
- pkg/transport/mocks.go
🔇 Additional comments (73)
pkg/orchestrator/watchers/constants.go (1)
5-8
: Verify consistent usage of error components across the codebase
The renaming from dispatcherErrComponent
to protocolRouterErrComponent
suggests an architectural shift. Let's ensure all error components are being used consistently.
✅ Verification successful
Error components are used consistently across the codebase
Each error component is appropriately used in its corresponding file:
protocolRouterErrComponent
in protocol_router.gonclDispatcherErrComponent
in ncl_message_creator.gobprotocolErrComponent
in bprotocol_dispatcher.go and ncl_message_creator.goexecutionCancellerErrComponent
in execution_canceller.go
The components follow a clear pattern and are used correctly for error wrapping with their respective components.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check usage of error components across the codebase
# Expected: All error components should be used in their respective files
echo "Checking usage of error components..."
for component in "protocolRouterErrComponent" "nclDispatcherErrComponent" "bprotocolErrComponent" "executionCancellerErrComponent"; do
echo "=== Checking $component ==="
rg -A 2 "$component" pkg/orchestrator/
done
Length of output: 4538
pkg/transport/dispatcher/utils.go (2)
9-13
: LGTM! Clean and focused implementation.
The function provides a simple and effective way to generate unique message IDs based on event sequence numbers, which aligns well with the deduplication requirements.
11-13
: Consider adding nil event validation.
The function should validate the input event to prevent potential panics if a nil event is passed. This is especially important as this function will be used in message handling paths.
Consider adding validation:
func generateMsgID(event watcher.Event) string {
+ // Defensive check for zero-value event
+ if event.SeqNum <= 0 {
+ return "seq-invalid"
+ }
return fmt.Sprintf("seq-%d", event.SeqNum)
}
Also, consider documenting the behavior for edge cases:
- How should sequence number wrapping be handled?
- What's the expected behavior for zero or negative sequence numbers?
Let's check how this function is used in the codebase:
pkg/transport/dispatcher/errors.go (2)
1-4
: LGTM!
The package declaration and imports are clean and appropriate.
1-20
: Verify consistent error handling patterns across the transport package.
Given the PR's focus on reliability and recovery mechanisms, it's important to ensure consistent error handling patterns across both the Dispatcher and Forwarder components.
✅ Verification successful
Error handling patterns are consistent across the transport package
The error handling implementation in the transport package follows a consistent pattern:
- The dispatcher component properly wraps errors using the
ErrDispatcher
type with operation context - The forwarder component uses standard error handling with no custom error types, which is appropriate for its simpler role
- Error propagation is consistent, using standard Go error handling patterns
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check error handling patterns across the transport package
# Look for error type usage
echo "Checking error type usage patterns:"
rg "ErrDispatcher" -A 3
# Check for direct error returns vs wrapped errors
echo "Checking error handling patterns:"
ast-grep --pattern 'return err'
ast-grep --pattern 'return &ErrDispatcher'
# Look for other error types in the package
echo "Checking for other error types:"
rg "type.*Error.*struct" pkg/transport/
Length of output: 23444
Script:
#!/bin/bash
# Let's check for error handling patterns in the transport package more specifically
# Look for error types in the transport package
echo "Error types in transport package:"
rg "type.*Error" pkg/transport/
# Check error wrapping patterns in dispatcher and forwarder
echo -e "\nError handling in dispatcher:"
rg "func.*Error|return.*error" pkg/transport/dispatcher/
echo -e "\nError handling in forwarder:"
rg "func.*Error|return.*error" pkg/transport/forwarder/
# Look for error constructors
echo -e "\nError constructors:"
ast-grep --pattern 'func new$_Error'
Length of output: 1880
pkg/transport/types.go (3)
1-9
: LGTM! Well-structured file with appropriate imports and mock generation.
The file structure follows best practices with proper mock generation setup and necessary imports for the transport package functionality.
15-23
: Excellent interface design with clear documentation!
The MessageCreator
interface is well-designed with:
- Clear documentation explaining its role as an extension point
- Single responsibility (converting events to messages)
- Explicit handling of nil cases and errors
- Flexibility for different implementations
This aligns well with the PR objectives for connecting BoltDB events with NATS messaging.
1-28
: Verify integration with Dispatcher and Forwarder components.
The types defined here form the foundation for the transport package. Let's verify their usage in the Dispatcher and Forwarder components mentioned in the PR objectives.
✅ Verification successful
Integration with Dispatcher and Forwarder components is properly implemented
The verification confirms that the types defined in types.go
are correctly integrated:
- Both Forwarder and Dispatcher properly initialize with
MessageCreator
in their constructors KeySeqNum
is consistently used for sequence tracking in both components- Message handling follows the intended pattern with proper metadata assignment
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify how MessageCreator is used in Dispatcher and Forwarder
# Check for MessageCreator usage
echo "Checking MessageCreator usage in Dispatcher and Forwarder:"
rg -l "MessageCreator" pkg/transport/
# Check for KeySeqNum usage
echo "Checking KeySeqNum usage for sequence tracking:"
rg -l "KeySeqNum" pkg/transport/
Length of output: 950
Script:
#!/bin/bash
# Let's examine how MessageCreator is used in both Forwarder and Dispatcher
echo "=== Forwarder Implementation ==="
rg "MessageCreator" pkg/transport/forwarder/forwarder.go -B 2 -A 2
echo -e "\n=== Dispatcher Implementation ==="
rg "MessageCreator" pkg/transport/dispatcher/dispatcher.go -B 2 -A 2
echo -e "\n=== Handler Implementation ==="
rg "MessageCreator" pkg/transport/dispatcher/handler.go -B 2 -A 2
echo -e "\n=== KeySeqNum Usage ==="
rg "KeySeqNum" pkg/transport/forwarder/forwarder.go -B 2 -A 2
rg "KeySeqNum" pkg/transport/dispatcher/handler.go -B 2 -A 2
Length of output: 1833
pkg/orchestrator/watchers/execution_canceller.go (2)
1-12
: LGTM: Clean package structure and imports
The package name and imports are well-organized and appropriate for the functionality.
38-39
: Add missing newExecutionTransitions function
The newExecutionTransitions
function and its shouldCancel
method are not visible in the provided code. Please ensure these are properly defined and tested.
✅ Verification successful
Both newExecutionTransitions
and shouldCancel
are properly defined
The newExecutionTransitions
function and its shouldCancel
method are correctly implemented in pkg/orchestrator/watchers/execution_transitions.go
. The function creates a new instance of executionTransitions
with the provided upsert parameter, and the shouldCancel
method is properly defined with clear cancellation logic for execution state transitions.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for the newExecutionTransitions function definition
ast-grep --pattern 'func newExecutionTransitions($$$) $$$'
Length of output: 354
Script:
#!/bin/bash
# Let's check the executionTransitions type and its shouldCancel method
ast-grep --pattern 'type executionTransitions struct {
$$$
}
func (e *executionTransitions) shouldCancel() $$$'
Length of output: 116
Script:
#!/bin/bash
# Let's check the content of the execution_transitions.go file
cat pkg/orchestrator/watchers/execution_transitions.go
Length of output: 2536
pkg/node/constants.go (2)
8-14
: Verify protocol-specific dispatcher usage and documentation
While the naming and structure are consistent, the comments could be more specific about the distinct purposes of BProtocol vs NCL dispatchers and how they relate to the new transport package's reliability guarantees.
Let's check how these constants are used in the codebase:
✅ Verification successful
Based on the shell script results, I can now generate a final response. The results show clear implementation details of both dispatchers, their integration with the transport package, and their usage patterns.
Comments are accurate and implementation aligns with design
The dispatcher implementations and their usage in the codebase confirm that:
- BProtocol dispatcher handles reliable delivery through a callback mechanism
- Both dispatchers are properly integrated with the transport package
- The naming and documentation accurately reflect their distinct purposes:
- BProtocol dispatcher for reliable delivery (uses callback and retry strategy)
- NCL dispatcher for event forwarding (uses latest iterator)
The comments in the constants file are sufficiently clear and accurately represent the implementation details found in the codebase.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify the usage context of the new dispatcher watchers
# and their integration with the transport package.
# Check for BProtocol dispatcher usage
echo "=== BProtocol Dispatcher Usage ==="
rg -A 5 "computeBProtocolDispatcherWatcherID"
# Check for NCL dispatcher usage
echo "=== NCL Dispatcher Usage ==="
rg -A 5 "computeNCLDispatcherWatcherID"
# Check for transport package integration
echo "=== Transport Package Integration ==="
fd -e go -e md . | xargs rg "pkg/transport.*Dispatcher"
Length of output: 2357
Script:
#!/bin/bash
# Let's check for the implementation details of these dispatchers
# Check BProtocol dispatcher implementation
echo "=== BProtocol Dispatcher Implementation ==="
ast-grep --pattern 'type BProtocolDispatcher struct {
$$$
}'
echo -e "\n=== BProtocol Dispatcher Methods ==="
ast-grep --pattern 'func ($_ *BProtocolDispatcher) Handle($$$) {
$$$
}'
# Check NCL dispatcher implementation
echo -e "\n=== NCL Dispatcher Implementation ==="
ast-grep --pattern 'type NCLDispatcher struct {
$$$
}'
echo -e "\n=== NCL Dispatcher Methods ==="
ast-grep --pattern 'func ($_ *NCLDispatcher) Handle($$$) {
$$$
}'
# Check for any transport-related imports
echo -e "\n=== Transport Package Imports ==="
rg "\".*transport\"" --type go
# Check for dispatcher factory or creation methods
echo -e "\n=== Dispatcher Creation ==="
rg "NewBProtocolDispatcher|NewNCLDispatcher" -A 5
Length of output: 5465
20-30
: Verify integration with the Forwarder component
According to the PR objectives, the Forwarder component handles best-effort publishing from orchestrator to compute nodes. Let's verify how these new watchers integrate with it.
✅ Verification successful
Let me gather more information about how these watchers are used and their relationship with the Forwarder component.
Let me check the implementation of the Forwarder component and its integration with the watchers.
Based on the gathered information, I can now provide a final response about the integration between the watchers and the Forwarder component.
Integration with Forwarder component is properly implemented
The watchers are correctly integrated with the Forwarder component for best-effort publishing:
- The NCL dispatcher watcher uses the Forwarder for reliable message delivery to compute nodes
- The Forwarder provides ordered message delivery without complex recovery mechanisms, which aligns with the "best-effort publishing" requirement
- The implementation includes proper message creation, sequencing, and publishing through NCL (Network Communication Layer)
- The BProtocol dispatcher and execution canceller watchers operate independently for backward compatibility
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify the integration between orchestrator watchers
# and the Forwarder component
# Check for Forwarder usage in relation to these watchers
echo "=== Forwarder Integration ==="
rg -A 10 "orchestratorBProtocolDispatcherWatcherID|orchestratorNCLDispatcherWatcherID|orchestratorExecutionCancellerWatcherID" | rg -A 5 "Forwarder"
# Check for transport package integration
echo "=== Transport Package Integration ==="
fd -e go . | xargs rg "pkg/transport/forwarder"
Length of output: 546
Script:
#!/bin/bash
# Let's check the implementation of these watchers and their integration
echo "=== Watcher Implementations ==="
rg -A 20 "func.*NewBProtocolDispatcherWatcher|func.*NewNCLDispatcherWatcher|func.*NewExecutionCancellerWatcher"
echo "=== Requester Integration ==="
# Since we found a reference to forwarder in requester.go, let's check its usage
cat pkg/node/requester.go
echo "=== Transport Package Structure ==="
# Let's understand the forwarder package structure
fd . pkg/transport/forwarder -t f
Length of output: 20007
Script:
#!/bin/bash
# Check the forwarder implementation
echo "=== Forwarder Implementation ==="
cat pkg/transport/forwarder/forwarder.go
echo "=== NCL Message Creator ==="
# Let's check how messages are created and forwarded
ast-grep --pattern 'type NCLMessageCreator struct {
$$$
}'
echo "=== Watcher Handler Implementation ==="
rg -A 10 "NewBProtocolDispatcher|NewNCLMessageCreator" pkg/orchestrator/watchers/
Length of output: 7020
pkg/transport/forwarder/forwarder.go (1)
1-11
: LGTM!
The package structure and imports are well-organized and follow Go best practices.
pkg/compute/watchers/ncl_message_creator.go (3)
11-11
: LGTM: Import aligns with transport package integration
The addition of the transport package import supports the architectural changes described in the PR objectives.
69-70
: LGTM: Good practice with compile-time interface check
The compile-time interface check ensures that NCLMessageCreator
properly implements the transport.MessageCreator
interface.
21-31
: Robust input validation and protocol handling
The early validation checks and protocol filtering ensure clean separation of concerns. The error handling is well-structured with appropriate context.
Let's verify the protocol handling is consistent across the codebase:
✅ Verification successful
Based on the search results, I can now provide a definitive response about the protocol handling consistency:
Protocol handling is correctly implemented and consistent
The protocol check in NCLMessageCreator is consistent with the codebase's protocol handling pattern. Evidence:
- The protocol check is symmetrically implemented in both message creators:
- NCLMessageCreator checks for
ProtocolNCLV1
- BProtocolDispatcher checks for
ProtocolBProtocolV2
- NCLMessageCreator checks for
- The protocol selection logic is centralized in the
ProtocolRouter
which handles protocol negotiation - The codebase has proper fallback mechanisms when no protocols are matched
- Protocol constants are properly defined in models package and used consistently
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for other protocol checks to ensure consistency
ast-grep --pattern 'if $_.OrchestrationProtocol() $_ models.ProtocolNCLV1'
Length of output: 76
Script:
#!/bin/bash
# Let's try a broader search for protocol checks using ripgrep
rg -A 2 "OrchestrationProtocol\(\)"
# Also search for ProtocolNCLV1 usage
rg "ProtocolNCLV1"
# And let's check the models package for protocol-related code
fd "protocol.*go" --exec cat {}
Length of output: 37909
pkg/node/ncl.go (2)
49-51
: LGTM! Function rename improves clarity
The rename from orchestratorOutSubjectPrefix
to orchestratorOutSubject
better reflects its purpose as it returns a complete subject string rather than just a prefix. This change aligns well with the function's actual behavior.
46-46
: Verify the impact of wildcard removal from subscription patterns
The removal of the '>' wildcard from the subscription pattern could affect message routing behavior. While this change makes the routing more precise, we should ensure it doesn't break existing message delivery patterns.
✅ Verification successful
Wildcard removal is safe and follows a consistent pattern
The removal of the '>' wildcard from the subscription pattern is safe and follows a well-structured pattern. The code in pkg/node/ncl.go
shows a clear and consistent subject hierarchy:
- All subjects follow the pattern
bacalhau.global.compute.<node_id>.(in|out).(msgs|heartbeat)
- The
*
wildcard inorchestratorInSubscription()
correctly matches the node ID segment, allowing the orchestrator to receive messages from all compute nodes - The subject structure maintains symmetry between publishers and subscribers:
- Compute nodes publish to
*.out.msgs
which orchestrator subscribes to - Orchestrator publishes to
<node_id>.in.msgs
which specific compute nodes subscribe to
- Compute nodes publish to
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for any NATS message handlers that might expect messages with additional subject segments
rg -A 5 "Subscribe.*bacalhau\.global\.compute.*\.out\.msgs"
# Look for any direct NATS publish calls that might include additional segments
rg "Publish.*bacalhau\.global\.compute.*\.out\.msgs"
Length of output: 116
Script:
#!/bin/bash
# Let's broaden the search to find NATS-related message patterns and subscriptions
# Search for any NATS subscription patterns in the codebase
rg -A 5 "Subscribe.*bacalhau\.global"
# Search for NATS publish patterns
rg -A 5 "Publish.*bacalhau\.global"
# Look for any other usage of the subject pattern
rg "bacalhau\.global\.compute"
# Check the entire file content to understand the context better
cat pkg/node/ncl.go
Length of output: 3288
pkg/transport/dispatcher/config_test.go (3)
1-10
: LGTM! Build tags and imports are well-structured.
The build tags are correctly placed and the imports are minimal and appropriate for the test suite.
98-100
: LGTM! Test suite setup is correct.
The test suite runner follows testify's best practices.
16-26
: Verify the existence of default constants.
The test references several default constants (e.g., defaultCheckpointInterval
, defaultCheckpointTimeout
, etc.). Let's ensure these are properly defined.
✅ Verification successful
All default constants are properly defined and documented
The verification shows that all default constants referenced in the test are properly defined in config.go
:
- All eight default constants are defined in a const block with clear grouping and documentation
- Each constant has a corresponding field in the Config struct with detailed documentation
- The values are used correctly in the DefaultConfig() function
- The configuration values have appropriate validation rules
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify that all default constants used in the test are defined
# Expected: Find definitions for all default constants in the config.go file
rg -t go "^const\s+(default(Checkpoint|Stall|Process|Seek|BaseRetry|MaxRetry).*?)\s+=\s+" ./pkg/transport/dispatcher/
Length of output: 119
Script:
#!/bin/bash
# Let's try a broader search for these constants without the strict pattern matching
rg -t go "default(Checkpoint|Stall|Process|Seek|BaseRetry|MaxRetry)" ./pkg/transport/dispatcher/
Length of output: 2254
Script:
#!/bin/bash
# Let's also check the entire content of config.go to understand the structure
rg -t go "^(const|var)\s+" ./pkg/transport/dispatcher/config.go
Length of output: 73
Script:
#!/bin/bash
# Let's see the full content of config.go to understand how these defaults are defined
cat ./pkg/transport/dispatcher/config.go
Length of output: 4590
pkg/compute/watchers/bprotocol_dispatcher.go (1)
Line range hint 31-110
: Verify context handling in callback methods
The HandleEvent
method properly accepts a context, but we should verify that the callback methods (OnBidComplete
, OnRunComplete
, OnComputeFailure
) respect context cancellation.
pkg/orchestrator/watchers/execution_canceller_test.go (2)
1-18
: LGTM! Well-structured test file setup.
The file structure follows Go best practices with proper build tags and necessary imports.
1-109
: Consider adding missing test scenarios and documenting helpers.
-
Missing test scenarios:
- Concurrent execution handling
- Edge cases like multiple rapid state transitions
- Recovery scenarios after cancellation
-
Helper functions:
createExecutionEvent
andsetupStateTransition
are used but not defined in the provided code- Consider documenting these helpers or moving them to a shared test utilities package
pkg/test/utils/utils.go (3)
58-72
: LGTM! Well-configured for testing scenarios.
The implementation includes appropriate test-specific NATS client configurations:
- Disabled reconnect buffer for fast failure detection
- Reduced reconnect wait time
- Adjusted flusher timeout for faster tests
74-81
: LGTM! Clean abstraction for random port allocation.
The function provides a good public API while properly handling port allocation and error checking.
83-95
: Consider handling port binding failures during server restart.
While the implementation is generally good, there's a potential race condition where another process could bind to the port between server shutdown and restart, leading to flaky tests.
Consider adding:
- Retry logic with backoff
- Port availability check before restart
Here's a suggested improvement:
func RestartNatsServer(t *testing.T, natsServer *natsserver.Server) (*natsserver.Server, *nats.Conn) {
t.Helper()
+ maxRetries := 3
+ var lastErr error
+
natsServer.Shutdown()
u, err := url.Parse(natsServer.ClientURL())
require.NoError(t, err, "Failed to parse NATS server URL %s", natsServer.ClientURL())
port, err := strconv.Atoi(u.Port())
require.NoError(t, err, "Failed to convert port %s to int", u.Port())
- return startNatsOnPort(t, port)
+ // Retry logic with exponential backoff
+ for i := 0; i < maxRetries; i++ {
+ if server, conn := startNatsOnPort(t, port); server != nil {
+ return server, conn
+ }
+ time.Sleep(time.Duration(1<<uint(i)) * 100 * time.Millisecond)
+ }
+ require.NoError(t, lastErr, "Failed to restart NATS server after %d retries", maxRetries)
+ return nil, nil
}
pkg/transport/forwarder/forwarder_test.go (3)
1-14
: LGTM! Well-organized imports and package structure.
The test file follows Go best practices for package organization and import management.
153-155
: LGTM! Standard suite runner implementation.
1-155
: Verify integration test coverage with other transport components.
Since this is part of a larger transport package that connects BoltDB event logs with NATS messaging, we should ensure proper integration testing exists.
✅ Verification successful
Let me check the content of the end-to-end tests to verify the integration coverage.
Integration testing is comprehensive and well-implemented
The codebase has thorough end-to-end tests that verify the integration between components:
-
The forwarder's E2E tests verify:
- Event flow from BoltDB to NATS
- Message ordering and delivery
- Reconnection handling with NATS
- Recovery scenarios
- Restart behavior
-
The dispatcher's E2E tests cover:
- Event flow between components
- Recovery mechanisms
- Checkpointing functionality
- Restart scenarios with BoltDB and NATS
Both components have robust tests for failure scenarios, reconnection handling, and data consistency across the transport pipeline.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for integration tests between transport components
# Look for integration tests
echo "Checking for integration tests between Forwarder and other components..."
rg -l "Integration|integration" --type go pkg/transport/
# Look for BoltDB and NATS interaction tests
echo "Checking for BoltDB and NATS interaction tests..."
rg -l "bolt|Bolt|NATS|nats" --type go pkg/transport/
# Look for test files that use multiple transport components
echo "Checking for tests using multiple transport components..."
ast-grep --pattern 'import {
$$$
"github.com/bacalhau-project/bacalhau/pkg/transport/forwarder"
$$$
"github.com/bacalhau-project/bacalhau/pkg/transport/dispatcher"
$$$
}' --lang go
Length of output: 1554
Script:
#!/bin/bash
# Check the content of e2e tests
echo "=== Forwarder E2E Test Content ==="
cat pkg/transport/forwarder/forwarder_e2e_test.go
echo -e "\n=== Dispatcher E2E Test Content ==="
cat pkg/transport/dispatcher/dispatcher_e2e_test.go
Length of output: 15651
pkg/transport/dispatcher/handler_test.go (3)
1-17
: LGTM! Build tags and imports are well-structured.
The build tags and imports are properly organized, following Go conventions.
126-128
: LGTM! Suite runner is correctly implemented.
The test suite runner follows testify's best practices.
1-128
: Verify test coverage and concurrent behavior.
Since this test file is for a component handling concurrent operations, let's verify:
- Test coverage of the handler package
- Potential race conditions in tests
✅ Verification successful
Tests adequately cover concurrent operations with proper synchronization
The static analysis reveals:
-
Comprehensive test coverage with multiple test files covering different aspects:
handler_test.go
: Core handler functionalitystate_test.go
: Includes concurrent access testsdispatcher_test.go
,dispatcher_e2e_test.go
: Integration scenariosrecovery_test.go
: Recovery mechanisms
-
Proper synchronization mechanisms are in place:
- All shared state is protected with
sync.RWMutex
- Background goroutines are managed with
sync.WaitGroup
- Concurrent access tests are implemented in
state_test.go
- All shared state is protected with
-
The handler tests cover the main scenarios:
- Error cases (creator error, publish error)
- Success cases
- Edge cases (nil message)
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check test coverage and run race detector
# Check test coverage
go test -coverprofile=coverage.out ./pkg/transport/dispatcher/...
go tool cover -func=coverage.out
# Run tests with race detector
go test -race ./pkg/transport/dispatcher/...
Length of output: 428
Script:
#!/bin/bash
# Let's analyze the test coverage and concurrent behavior through static analysis
# since we can't run the tests directly
# Find all test files in the dispatcher package
fd ".*_test.go" pkg/transport/dispatcher
# Look for test cases in handler_test.go
rg "^func.*Test.*\(" pkg/transport/dispatcher/handler_test.go
# Check for concurrent operations and potential race conditions
rg -A 3 "go |sync\.|atomic\." pkg/transport/dispatcher/
# Look for mock implementations to understand test coverage
ast-grep --pattern 'type Mock$_ struct {
$$$
}'
# Check if there are other test files that might cover concurrent scenarios
rg "race|concurrent|parallel" pkg/transport/dispatcher/
Length of output: 3525
pkg/compute/message_handler.go (2)
36-50
: Consider selective error propagation
While centralizing error handling is good, completely swallowing all errors might make it harder to detect and debug critical failures. Consider categorizing errors and selectively propagating critical ones while logging and suppressing others.
Let's verify if there are any critical errors that should be propagated:
36-50
: Consider adding message ordering guarantees
Given the PR objectives mention reliability and ordering guarantees, the HandleMessage
implementation should consider:
- Sequence number validation
- Out-of-order message handling
- Integration with BoltDB sequence tracking
This would align better with the transport package's reliability goals.
Let's check if sequence handling exists elsewhere:
pkg/transport/dispatcher/dispatcher_test.go (1)
175-177
: LGTM!
The suite runner implementation follows the standard testify/suite pattern correctly.
pkg/transport/dispatcher/recovery_test.go (3)
1-17
: LGTM! Build tags and imports are well-structured.
The file is properly organized with appropriate build tags and necessary imports.
177-179
: LGTM! Suite runner is properly implemented.
The test suite runner follows the standard testify pattern.
1-179
: Verify test coverage for the recovery package.
While the test suite provides good coverage of the main scenarios, let's verify the actual coverage metrics.
✅ Verification successful
Test coverage appears comprehensive and well-structured
The test suite provides good coverage of all the main methods in the recovery implementation:
handleError()
is covered byTestHandleFirstError
andTestHandleErrorWhileRecovering
recoveryLoop()
is covered byTestRecoveryLoopWithFailures
,TestRecoveryLoopWithRunningWatcher
, andTestRecoveryLoopWithContextCancellation
reset()
is covered byTestReset
getState()
is used throughout the test cases to verify state transitions
The tests cover the main scenarios including:
- Initial error handling
- Error during recovery
- Recovery with multiple failures
- Recovery with already running watcher
- Context cancellation
- State reset functionality
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check test coverage for the recovery package
# Run tests with coverage
cd pkg/transport/dispatcher
go test -coverprofile=coverage.out
go tool cover -func=coverage.out
Length of output: 287
Script:
#!/bin/bash
# Let's analyze the test coverage by examining the test cases and their corresponding implementations
# First, let's find all the methods in the recovery implementation
ast-grep --pattern 'func (r *recovery) $_($$$) $$$'
# Then, let's examine the test cases
ast-grep --pattern 'func (suite *RecoveryTestSuite) Test$_($$$) {'
Length of output: 5822
Script:
#!/bin/bash
# Let's search for the test implementations to match with the methods
rg "func.*RecoveryTestSuite.*Test" -A 2 pkg/transport/dispatcher/recovery_test.go
Length of output: 1071
pkg/orchestrator/watchers/protocol_router_test.go (3)
1-17
: LGTM! Well-structured test file setup.
The build tags and imports are properly organized, following Go best practices.
1-199
: Overall well-structured test suite with room for enhancement.
The test implementation follows good practices and provides solid coverage. Consider the suggested improvements:
- Add TearDown for proper cleanup
- Enhance validation test cases
- Add edge cases for protocol version compatibility
- Include race condition tests
These improvements will make the test suite more robust and comprehensive.
92-199
: 🛠️ Refactor suggestion
Add tests for version compatibility and concurrent access.
Consider adding these test scenarios:
- Node advertising an unsupported protocol version
- Node advertising multiple versions of the same protocol
- Concurrent protocol selection requests
Example test for version compatibility:
func (s *ProtocolRouterTestSuite) TestPreferredProtocol_UnsupportedVersion() {
execution := mock.Execution()
nodeState := models.NodeState{
Info: models.NodeInfo{
SupportedProtocols: []models.Protocol{"NCL/v2"}, // Unsupported version
},
}
s.nodeStore.EXPECT().Get(s.ctx, execution.NodeID).Return(nodeState, nil)
protocol, err := s.router.PreferredProtocol(s.ctx, execution)
s.Error(err)
s.Empty(protocol)
}
Additionally, consider adding a test with the race
flag to verify thread safety:
pkg/transport/dispatcher/state_test.go (2)
1-14
: LGTM: Build tags and imports are well structured.
The build tags correctly separate unit and integration tests, and all necessary testing dependencies are properly imported.
218-220
: LGTM: Test suite runner is properly implemented.
The test suite runner follows testify's conventions correctly.
pkg/compute/watchers/ncl_message_creator_test.go (2)
1-15
: LGTM! Well-structured test file setup.
The file is properly organized with appropriate build tags and necessary imports.
1-187
: Verify test coverage for transport package objectives.
While these tests thoroughly cover the NCLMessageCreator, please ensure there are corresponding test files for the transport package components mentioned in the PR objectives, specifically:
- Dispatcher's reliability and ordering guarantees
- Forwarder's best-effort publishing
- Recovery and retry mechanisms
✅ Verification successful
Let me check the content of these test files to verify the test coverage for transport package objectives.
Let me check the actual test implementations to verify the coverage of specific requirements.
The transport package has comprehensive test coverage - no additional tests needed.
The test files demonstrate thorough coverage of the transport package objectives:
-
Dispatcher reliability and recovery:
TestRecovery
in dispatcher_e2e_test.go tests recovery with retry intervals- End-to-end tests verify message ordering and delivery guarantees
-
Forwarder's best-effort publishing:
TestEventFlow
andTestReconnection
in forwarder_e2e_test.go verify publishing behaviorTestNoResponders
covers best-effort scenarios- Unit tests cover edge cases like startup/shutdown
-
Recovery mechanisms:
- Both components have reconnection and recovery tests
- Tests verify behavior during network failures and restarts
The test coverage aligns well with the PR objectives, including both unit and end-to-end tests for all key functionality.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check for transport package test files
echo "Searching for transport package test files..."
fd -e go -e test.go "transport"
# Check for test coverage of key components
echo "Checking for Dispatcher and Forwarder test coverage..."
rg -l "type (Dispatcher|Forwarder)" --type go
rg -l "Test.*(Dispatcher|Forwarder)" --type go
Length of output: 889
Script:
#!/bin/bash
# Check Dispatcher test coverage
echo "=== Dispatcher Tests ==="
rg "func Test.*Dispatcher" -B 1 -A 5 pkg/transport/dispatcher/dispatcher_test.go pkg/transport/dispatcher/dispatcher_e2e_test.go
echo -e "\n=== Forwarder Tests ==="
# Check Forwarder test coverage
rg "func Test.*Forwarder" -B 1 -A 5 pkg/transport/forwarder/forwarder_test.go pkg/transport/forwarder/forwarder_e2e_test.go
# Check for reliability and retry tests
echo -e "\n=== Reliability and Retry Tests ==="
rg "(reliability|retry|ordering|recover)" -i pkg/transport/dispatcher/dispatcher_test.go pkg/transport/dispatcher/dispatcher_e2e_test.go pkg/transport/forwarder/forwarder_test.go pkg/transport/forwarder/forwarder_e2e_test.go
Length of output: 2263
Script:
#!/bin/bash
# Check Dispatcher recovery and retry implementation
echo "=== Dispatcher Recovery Test ==="
rg "TestRecovery" -A 30 pkg/transport/dispatcher/dispatcher_e2e_test.go
echo -e "\n=== Dispatcher Test Methods ==="
rg "func \(s \*DispatcherTestSuite\) Test" -A 5 pkg/transport/dispatcher/dispatcher_test.go
echo -e "\n=== Forwarder Test Methods ==="
rg "func \(s \*ForwarderUnitTestSuite\) Test" -A 5 pkg/transport/forwarder/forwarder_test.go
rg "func \(s \*ForwarderE2ETestSuite\) Test" -A 5 pkg/transport/forwarder/forwarder_e2e_test.go
Length of output: 3077
pkg/orchestrator/message_handler.go (1)
39-50
: Verify sequence tracking implementation.
The PR objectives mention maintaining sequence tracking between BoltDB and NATS, but this isn't evident in the current implementation. Please verify how sequence numbers are tracked and handled.
pkg/orchestrator/watchers/ncl_message_creator_test.go (1)
1-18
: LGTM! Well-structured imports and build tags
The file organization follows Go best practices with proper separation of unit and integration tests.
pkg/transport/forwarder/forwarder_e2e_test.go (2)
1-25
: LGTM! Build tags and imports are well organized.
The build tags appropriately categorize these tests as unit tests, and the imports are well-structured and necessary for the test suite.
1-289
: Verify test coverage for critical infrastructure.
The test suite provides good coverage of the forwarder's functionality, particularly around message delivery guarantees and reconnection handling. However, given this is critical infrastructure, consider:
- Adding metrics collection to verify performance characteristics
- Adding chaos testing scenarios (e.g., network partitions, message corruption)
- Testing with different NATS quality of service settings
pkg/transport/dispatcher/dispatcher_e2e_test.go (1)
1-24
: LGTM! Build tags and imports are well-organized.
The build tags and imports are properly structured, with clear separation between standard library, third-party, and internal packages.
pkg/models/execution.go (1)
255-259
: Verify usage in transport package.
Let's ensure this method is being used correctly to determine the event transport mechanism.
✅ Verification successful
Based on the search results, I can now generate the final response:
OrchestrationProtocol() is correctly used for protocol-specific message handling
The method is properly used in the transport-related code within the pkg/compute/watchers
package:
- In
ncl_message_creator.go
: Guards NCL-specific message creation - In
bprotocol_dispatcher.go
: Guards BProtocol-specific dispatch logic
Each implementation correctly checks the protocol before proceeding with protocol-specific logic, ensuring proper message handling based on the orchestration protocol.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify the OrchestrationProtocol method is used in the transport package
# to determine whether to use Dispatcher or Forwarder.
# Search for OrchestrationProtocol usage
rg -A 5 "OrchestrationProtocol\(\)"
# Search for specific transport package implementations
ast-grep --pattern 'if $x.OrchestrationProtocol() {
$$$
}'
Length of output: 1730
pkg/models/job.go (1)
384-385
: Verify the protocol constant naming
The constant ProtocolBProtocolV2
seems to have an unusual name. Should it be ProtocolV2
instead? The "BProtocol" part appears redundant.
✅ Verification successful
Let me search for the protocol constant definitions to understand the naming convention better.
The protocol constant naming is correct
The constant ProtocolBProtocolV2
follows the established naming pattern in the codebase, representing the protocol identifier "bprotocol/v2". This is consistent with other protocol definitions like ProtocolNCLV1
which maps to "ncl/v1". The naming, while seemingly redundant, appears intentional to distinguish between different protocol types.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for protocol-related constants to verify naming convention
rg -A 1 "type Protocol|const.*Protocol"
Length of output: 597
Script:
#!/bin/bash
# Search for protocol constant definitions
rg "Protocol.*=.*\"" pkg/models/
Length of output: 250
go.mod (1)
Line range hint 1-300
: Dependencies are well-aligned with PR objectives
The module dependencies align well with the PR objectives:
- NATS server/client libraries for messaging
- BoltDB for event logs
- OpenTelemetry for observability
pkg/orchestrator/watchers/protocol_router.go (1)
31-33
: Compatibility of errors.Join
The errors.Join
function is used to aggregate errors. Please verify that your project is using Go 1.20 or later, as errors.Join
was introduced in Go 1.20. Using an earlier version of Go will result in a compilation error.
Confirm the Go version used in the project. If using an earlier version, consider alternatives like concatenating error messages or using a custom error aggregator.
pkg/orchestrator/watchers/ncl_message_creator.go (1)
Line range hint 1-137
: Overall implementation aligns with design objectives
The NCLMessageCreator
is well-structured, implementing the transport.MessageCreator
interface effectively. The integration with ProtocolRouter
and the handling of different execution transitions are appropriately designed.
pkg/transport/dispatcher/config.go (1)
90-118
: Ensure errors.Join()
compatibility with Go version 1.20 or higher
The use of errors.Join()
in the Validate
method requires Go 1.20 or newer. Please verify that the project's Go version is set to 1.20 or higher to avoid build issues in environments using older Go versions.
Run the following script to check the Go version specified in the go.mod
file:
✅ Verification successful
Go version compatibility check passed for errors.Join()
The project's Go version is set to 1.23 in go.mod
, which fully supports the errors.Join()
function introduced in Go 1.20. No changes are required.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify that the Go version in 'go.mod' is 1.20 or higher.
# Extract the Go version from go.mod
go_version=$(grep '^go [0-9]\+\.[0-9]\+' go.mod | awk '{print $2}')
# Compare versions
required_version="1.20"
if [[ "$(printf '%s\n%s' "$required_version" "$go_version" | sort -V | head -n1)" == "$required_version" ]]; then
echo "Go version is $go_version, which is compatible with errors.Join()"
else
echo "Go version is $go_version, which is not compatible with errors.Join(). Please update to Go 1.20 or newer."
fi
Length of output: 303
pkg/orchestrator/watchers/bprotocol_dispatcher.go (4)
22-22
: Addition of protocolRouter
field to BProtocolDispatcher
The inclusion of protocolRouter *ProtocolRouter
in BProtocolDispatcher
is appropriate and aligns with the need to route protocols using the ProtocolRouter
.
28-28
: Addition of ProtocolRouter
field to BProtocolDispatcherParams
Adding ProtocolRouter *ProtocolRouter
to BProtocolDispatcherParams
ensures proper initialization of the dispatcher with the necessary protocol routing capabilities.
35-35
: Proper initialization of protocolRouter
in constructor
The protocolRouter
field is correctly initialized in NewBProtocolDispatcher
, ensuring that the dispatcher has access to protocol routing functionality.
53-65
: Implementation of protocol checking in HandleEvent
The added logic in HandleEvent
to determine the preferred protocol before processing transitions is well-implemented. It ensures that only events relevant to models.ProtocolBProtocolV2
are processed, which is essential for maintaining compatibility with legacy nodes.
pkg/transport/dispatcher/dispatcher.go (1)
82-99
: Review context usage in goroutines started by 'Start' method
Passing the main context ctx
to long-running goroutines can lead to premature cancellation if ctx
is canceled elsewhere. This could stop essential background processes unexpectedly.
Consider creating a dedicated context for the goroutines:
func (d *Dispatcher) Start(ctx context.Context) error {
// ...
+ goroutineCtx, cancel := context.WithCancel(context.Background())
+ d.goroutineCancel = cancel
// Start background processing
- go d.processPublishResults(ctx)
- go d.checkStalledMessages(ctx)
- go d.checkpointLoop(ctx)
+ go d.processPublishResults(goroutineCtx)
+ go d.checkStalledMessages(goroutineCtx)
+ go d.checkpointLoop(goroutineCtx)
// ...
Ensure that d.goroutineCancel()
is called during Stop()
to terminate the goroutines appropriately.
pkg/orchestrator/watchers/bprotocol_dispatcher_test.go (3)
76-88
: Verify no unintended interactions occur in TestHandleEvent_NoStateChange
In this test, you expect no actions to occur when there is no state change. While the absence of mock expectations implicitly checks this, it's safer to explicitly verify that no unexpected interactions happen with your mocks.
You can use s.ctrl.Finish()
after the HandleEvent
call to ensure that any unexpected interactions with the mocks will cause the test to fail:
err := s.dispatcher.HandleEvent(s.ctx, createExecutionEvent(upsert))
s.NoError(err)
s.ctrl.Finish() // Verifies no unexpected calls were made
This provides a stricter guarantee that the dispatcher behaves correctly when no state changes occur.
90-109
: Ensure proper handling when protocols are unsupported
In TestHandleEvent_UnsupportedProtocol
, the test verifies that no action is taken when the node supports only an unsupported protocol. Confirm that the dispatcher logs appropriate messages or handles this scenario as intended, aiding in debugging and system monitoring.
Consider adding assertions or log checks to validate that the dispatcher gracefully handles unsupported protocols, possibly by logging a warning or metric for unsupported nodes.
310-310
: Confirm the order of mock setups in TestHandleEvent_ComputeErrors
In the TestHandleEvent_ComputeErrors
test, you call s.expectProtocolSupport
before tc.setupMock()
. Ensure that the order of these calls does not introduce any unintended side effects, especially if setupMock
alters the state or expectations of the mocks.
Consider restructuring the test to make the sequence of mock setups explicit and to prevent potential conflicts:
tc.setupMock()
s.expectProtocolSupport(tc.upsert.Current, []models.Protocol{models.ProtocolBProtocolV2})
Verify that this change aligns with the intended behavior of your tests.
pkg/node/compute.go (7)
34-34
: Import of dispatcher
package is appropriate
The addition of the dispatcher
package import is necessary for the new functionality introduced.
247-248
: Updated OrderedPublisherConfig
parameters correctly
The change from DestinationPrefix
to Destination
in ncl.NewOrderedPublisher
is appropriate and aligns with the updated API. Ensure that computeOutSubject(cfg.NodeID)
returns the correct destination string.
268-268
: Updated call to setupComputeWatchers
reflects new return value
The function now returns nclDispatcher
along with watcherRegistry
, and the updated call captures this new return value correctly.
393-393
: Function signature updated to return *dispatcher.Dispatcher
The setupComputeWatchers
function now returns a *dispatcher.Dispatcher
, which is appropriate for managing the dispatcher externally.
402-402
: Adjusted error return to match updated function signature
The return statement now includes an additional nil
to account for the new return value in case of an error, which aligns with the updated function signature.
Line range hint 405-416
: Execution handler watcher set up correctly
The execution handler watcher is correctly configured with the appropriate handler, filters, retry strategy, and initial event iterator.
Line range hint 419-432
: bprotocol
dispatcher watcher configured appropriately
The bprotocol
dispatcher watcher is properly set up with the necessary handler and configurations to process execution upsert events.
pkg/node/requester.go (1)
Line range hint 433-442
: Function signature and return values updated appropriately
The setupOrchestratorWatchers
function signature has been updated to return an additional *forwarder.Forwarder
, and the return statements have been adjusted accordingly. This change correctly integrates the Forwarder
component as intended.
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Outside diff range and nitpick comments (1)
pkg/node/requester.go (1)
485-490
: Enhance error handling for protocol routerThe error handling for the protocol router creation could be more specific to help with debugging.
Improve error wrapping:
protocolRouter, err := watchers.NewProtocolRouter(watchers.ProtocolRouterParams{ NodeStore: nodeManager, SupportedProtocols: []models.Protocol{models.ProtocolBProtocolV2, models.ProtocolNCLV1}, }) if err != nil { - return nil, nil, fmt.Errorf("failed to create protocol router: %w", err) + return nil, nil, fmt.Errorf("failed to create protocol router with protocols %v: %w", + []models.Protocol{models.ProtocolBProtocolV2, models.ProtocolNCLV1}, err) }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (4)
pkg/node/compute.go
(8 hunks)pkg/node/requester.go
(7 hunks)pkg/orchestrator/watchers/execution_canceller.go
(1 hunks)pkg/transport/forwarder/forwarder.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- pkg/orchestrator/watchers/execution_canceller.go
- pkg/transport/forwarder/forwarder.go
🔇 Additional comments (5)
pkg/node/compute.go (2)
435-453
: LGTM! Well-structured dispatcher setup
The NCL dispatcher setup is well-implemented with:
- Appropriate filter configuration for execution events
- Blocking retry strategy for reliability
- Proper error handling and startup sequence
247-249
: Verify NCL publisher configuration changes across the codebase
The parameter name change from DestinationPrefix
to Destination
might need similar updates in other files.
✅ Verification successful
All NCL publisher configurations are using the correct parameter
The verification shows that all NCL publisher instances in the codebase are consistently using Destination
instead of DestinationPrefix
. The remaining occurrences of DestinationPrefix
are in test files and the NCL package itself, where it's maintained as a separate configuration option with different semantics (used to construct subjects by appending message type).
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Find all NCL publisher configurations to ensure consistent parameter usage
# Search for NCL publisher configurations
rg -A 5 "ncl\.NewOrderedPublisher|ncl\.NewPublisher" --type go
# Search for any remaining usage of DestinationPrefix
rg "DestinationPrefix" --type go
Length of output: 4890
pkg/node/requester.go (3)
43-43
: LGTM: Import of new transport package
The addition of the forwarder package import aligns with the PR objectives.
Line range hint 246-294
: LGTM: Proper integration and cleanup of the forwarder component
The changes correctly integrate the new forwarder component and include proper cleanup handling, following the established patterns in the codebase.
527-533
: Consider adding configuration options for the forwarder
The forwarder is created with default settings. Consider adding configuration options for better flexibility.
Let's verify if the forwarder package supports configuration options:
If configuration options are available, consider passing them from the NodeConfig:
- nclForwarder, err := forwarder.New(nclPublisher, nclDispatcherWatcher, nclMessageCreator)
+ nclForwarder, err := forwarder.New(nclPublisher, nclDispatcherWatcher, nclMessageCreator, forwarder.WithConfig(cfg.ForwarderConfig))
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (6)
pkg/orchestrator/watchers/ncl_message_creator.go (4)
17-25
: Add godoc comments for exported types.Consider adding documentation comments for the exported types
NCLMessageCreator
andNCLMessageCreatorParams
to improve code maintainability and help users understand their purpose and usage.+// NCLMessageCreator creates messages for the NCL protocol based on execution events. type NCLMessageCreator struct { protocolRouter *ProtocolRouter subjectFn func(nodeID string) string } +// NCLMessageCreatorParams contains the parameters needed to create an NCLMessageCreator. type NCLMessageCreatorParams struct { ProtocolRouter *ProtocolRouter SubjectFn func(nodeID string) string }
35-80
: Improve context handling in CreateMessage method.The method uses a background context for the
PreferredProtocol
call, which could lead to resource leaks or hanging operations. Consider accepting a context parameter to allow proper timeout and cancellation handling.-func (d *NCLMessageCreator) CreateMessage(event watcher.Event) (*envelope.Message, error) { +func (d *NCLMessageCreator) CreateMessage(ctx context.Context, event watcher.Event) (*envelope.Message, error) { upsert, ok := event.Object.(models.ExecutionUpsert) if !ok { return nil, bacerrors.New("failed to process event: expected models.ExecutionUpsert, got %T", event.Object). WithComponent(nclDispatcherErrComponent) } // Skip if there's no state change if !upsert.HasStateChange() { return nil, nil } if upsert.Current == nil { return nil, bacerrors.New("upsert.Current is nil"). WithComponent(nclDispatcherErrComponent) } execution := upsert.Current - preferredProtocol, err := d.protocolRouter.PreferredProtocol(context.Background(), execution) + preferredProtocol, err := d.protocolRouter.PreferredProtocol(ctx, execution)
82-129
: Enhance logging and adjust method visibility.The message creation methods could benefit from more detailed logging and should be made private since they're only used internally.
-func (d *NCLMessageCreator) createAskForBidMessage(upsert models.ExecutionUpsert) *envelope.Message { +func (d *NCLMessageCreator) createAskForBidMessage(upsert models.ExecutionUpsert) *envelope.Message { log.Debug(). Str("nodeID", upsert.Current.NodeID). Str("executionID", upsert.Current.ID). + Str("state", string(upsert.Current.State)). + Int("event_count", len(upsert.Events)). Msg("Asking for bid")Consider applying similar logging enhancements to other message creation methods.
1-132
: Implementation aligns well with PR objectives.The NCLMessageCreator successfully implements the message creation logic needed for the dispatcher component mentioned in the PR objectives. It handles the various message types needed for reliable delivery between compute nodes and the orchestrator.
Consider documenting the relationship between this component and the broader transport package architecture in the package documentation.
pkg/executor/docker/executor_test.go (2)
409-409
: LGTM! Consider adding a comment about timing constraints.The change to use
Require().Eventually
improves test reliability by failing fast when conditions aren't met. This is particularly important for container startup verification.Consider adding a comment explaining the 10-second timeout and 100ms polling interval choices:
+ // Wait up to 10 seconds for the container to start, polling every 100ms. + // These timing values are chosen based on typical Docker container startup times. s.Require().Eventually(func() bool {
Line range hint
409-417
: Document the race condition handling in the test.The test handles a subtle race condition between marking a container as active and its actual startup. While the implementation correctly uses
activeCh
to handle this, the sequence isn't immediately clear to future maintainers.Consider restructuring the test to make the timing dependencies more explicit:
+ // There's a potential race condition between marking a container as active + // and the container actually starting. The executor sets active=true before + // calling ContainerStart, so we need to: + // 1. Wait for the handler to be registered and marked as active + // 2. Wait for the activeCh signal to confirm the container has actually started s.Require().Eventually(func() bool { handler, ok := s.executor.handlers.Get(executionID) return ok && handler.active() }, time.Second*10, time.Millisecond*100, "Could not find a running container") - // This is important to do. In our docker executor, we set active to true, before calling the docker client with ContainerStart - // Hence there is a bit of time before the container actually gets started. The correct way of identifying that whether - // a container has started or not is via activeCh. We want to make sure that container is started before canceling the execution. + // Wait for confirmation that the container has actually started handler, _ := s.executor.handlers.Get(executionID) <-handler.activeCh
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (4)
pkg/executor/docker/executor_test.go
(1 hunks)pkg/orchestrator/watchers/ncl_message_creator.go
(1 hunks)pkg/transport/dispatcher/dispatcher.go
(1 hunks)pkg/transport/forwarder/forwarder.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- pkg/transport/dispatcher/dispatcher.go
- pkg/transport/forwarder/forwarder.go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (5)
pkg/orchestrator/watchers/ncl_message_creator.go (3)
17-25
: Add documentation for struct fields and consider params validation.Consider adding documentation for the struct fields to improve code maintainability. Also, the constructor should validate that neither field in
NCLMessageCreatorParams
is nil.Apply this diff:
type NCLMessageCreator struct { + // protocolRouter determines the preferred protocol for message creation protocolRouter *ProtocolRouter + // subjectFn generates the subject string for a given node ID subjectFn func(nodeID string) string } type NCLMessageCreatorParams struct { + // ProtocolRouter determines the preferred protocol for message creation ProtocolRouter *ProtocolRouter + // SubjectFn generates the subject string for a given node ID SubjectFn func(nodeID string) string }
63-74
: Consider using early returns to simplify the switch logic.The switch statement could be more readable with early returns instead of setting an intermediate message variable.
Apply this diff:
- var message *envelope.Message - - switch { - case transitions.shouldAskForPendingBid(): - message = d.createAskForBidMessage(upsert) - case transitions.shouldAskForDirectBid(): - message = d.createAskForBidMessage(upsert) - case transitions.shouldAcceptBid(): - message = d.createBidAcceptedMessage(upsert) - case transitions.shouldRejectBid(): - message = d.createBidRejectedMessage(upsert) - case transitions.shouldCancel(): - message = d.createCancelMessage(upsert) - } - - if message != nil { - message.WithMetadataValue(ncl.KeySubject, d.subjectFn(upsert.Current.NodeID)) - } - return message, nil + var message *envelope.Message + switch { + case transitions.shouldAskForPendingBid(), transitions.shouldAskForDirectBid(): + message = d.createAskForBidMessage(upsert) + case transitions.shouldAcceptBid(): + message = d.createBidAcceptedMessage(upsert) + case transitions.shouldRejectBid(): + message = d.createBidRejectedMessage(upsert) + case transitions.shouldCancel(): + message = d.createCancelMessage(upsert) + default: + return nil, nil + } + + return message.WithMetadataValue(ncl.KeySubject, d.subjectFn(upsert.Current.NodeID)), nil
82-129
: Consider adding error handling for edge cases in message creation methods.While the message creation methods are well-implemented, they might benefit from additional error handling for edge cases, such as validating the Events field in BaseRequest.
Example validation for one method:
func (d *NCLMessageCreator) createAskForBidMessage(upsert models.ExecutionUpsert) *envelope.Message { + if len(upsert.Events) == 0 { + log.Warn(). + Str("nodeID", upsert.Current.NodeID). + Str("executionID", upsert.Current.ID). + Msg("Creating ask for bid message with no events") + } log.Debug(). Str("nodeID", upsert.Current.NodeID). Str("executionID", upsert.Current.ID). Msg("Asking for bid")pkg/executor/docker/executor_test.go (2)
409-409
: LGTM! Consider adding a clarifying comment.The change to use
s.Require().Eventually
improves test reliability by failing fast with clear error messages. Consider adding a comment explaining why we need to wait for the handler to become active, as it's a crucial timing condition for the test.+ // Wait for the container handler to become active, which indicates that the + // container is ready to be managed by the executor. This is necessary before + // attempting to cancel the execution. s.Require().Eventually(func() bool {
Line range hint
1-24
: Consider architectural improvements to the test suite.While the test suite is well-structured, consider these improvements for better maintainability:
- Group related network tests using
t.Run()
to create logical sub-test groups- Document magic numbers (e.g., timeouts, sleep durations) with named constants and comments explaining their significance
Example refactor for network tests:
func (s *ExecutorTestSuite) TestDockerNetworking(t *testing.T) { tests := []struct { name string network *models.NetworkConfig wantErr bool wantCode int }{ {"FullNetwork", &models.NetworkConfig{Type: models.NetworkFull}, false, 0}, {"NoNetwork", &models.NetworkConfig{Type: models.NetworkNone}, true, 1}, // ... other network test cases } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { // ... test implementation }) } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (4)
pkg/executor/docker/executor_test.go
(1 hunks)pkg/orchestrator/watchers/ncl_message_creator.go
(1 hunks)pkg/transport/dispatcher/dispatcher.go
(1 hunks)pkg/transport/forwarder/forwarder.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- pkg/transport/dispatcher/dispatcher.go
- pkg/transport/forwarder/forwarder.go
🔇 Additional comments (1)
pkg/executor/docker/executor_test.go (1)
Line range hint 409-424
: Enhance cancellation test verification.
While the test verifies basic cancellation, consider adding these verifications:
- Verify that the container is actually stopped after cancellation
- Add timeout protection for channel operations
- Verify resource cleanup
Example enhancement:
err = s.executor.Cancel(jobCtx, executionID)
s.Require().NoError(err)
+// Add timeout protection for channel operations
+select {
case err := <-errC:
s.Require().Failf("Executor run should have returned a result, but instead returned err: %w", err.Error())
case result := <-resultC:
s.Require().NotNil(result)
+case <-time.After(10 * time.Second):
+ s.Require().Fail("Timed out waiting for execution result")
}
+
+// Verify container cleanup
+s.Require().Eventually(func() bool {
+ handler, exists := s.executor.handlers.Get(executionID)
+ return !exists || !handler.active()
+}, 5*time.Second, 100*time.Millisecond, "Container wasn't properly cleaned up")
return nil, bacerrors.New("failed to process event: expected models.ExecutionUpsert, got %T", event.Object). | ||
WithComponent(nclDispatcherErrComponent) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Inconsistent error component constant values across packages
The nclDispatcherErrComponent
constant is defined in two locations with different values:
pkg/orchestrator/watchers/constants.go
: "NCLDispatcher"pkg/compute/watchers/constants.go
: "NCLMessageCreator"
This inconsistency could lead to incorrect error attribution and debugging difficulties.
- Consider consolidating these constants in a shared location or ensuring the values are intentionally different for their respective use cases.
- Update the constant value in
pkg/compute/watchers/constants.go
to match "NCLDispatcher" if these should be consistent.
🔗 Analysis chain
Verify error component constant definitions.
The nclDispatcherErrComponent
constant is used but not defined in this file.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for the error component constant definition
rg "nclDispatcherErrComponent.*=.*" --type go
Length of output: 219
Script:
#!/bin/bash
# Get more context around the constant definitions to understand the potential conflict
rg "nclDispatcherErrComponent" --type go -B 2 -A 2
Length of output: 1983
Overview
This PR introduces a new
transport
package that bridges BoltDB event logs with NATS messaging:Dispatcher
: Ensures reliable delivery of events from compute nodes to orchestratorForwarder
: Best-effort publishing from orchestrator to compute nodesKey Aspects
Next Steps
Summary by CodeRabbit
Release Notes
New Features
Lenf
in the spell checker dictionary for enhanced recognition.ProtocolRouter
for better protocol management and routing of commands.Forwarder
for forwarding events without delivery guarantees.Bug Fixes
Tests
NCLMessageCreator
,ProtocolRouter
, andForwarder
to validate functionality and error handling.Documentation