Skip to content

Commit

Permalink
Add more logs when secondary processor has issues (#6323)
Browse files Browse the repository at this point in the history
  • Loading branch information
neil-xie authored Oct 2, 2024
1 parent 75bacb0 commit 3cacfbb
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 0 deletions.
6 changes: 6 additions & 0 deletions service/worker/indexer/esProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,12 @@ func (p *ESProcessorImpl) shadowBulkAfterAction(id int64, requests []bulk.Generi
// This happens after configured retry, which means something bad happens on cluster or index
// When cluster back to live, bulkProcessor will re-commit those failure requests
p.logger.Error("Error commit bulk request in secondary processor.", tag.Error(err.Details))

for _, request := range requests {
p.logger.Error("ES request failed in secondary processor",
tag.ESResponseStatus(err.Status),
tag.ESRequest(request.String()))
}
}
}

Expand Down
52 changes: 52 additions & 0 deletions service/worker/indexer/esProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,3 +487,55 @@ func (m *MockBulkableRequest) String() string {
func (m *MockBulkableRequest) Source() ([]string, error) {
return nil, fmt.Errorf("simulated source error")
}

func (s *esProcessorSuite) TestBulkAfterAction_Nack_Shadow_WithError() {
version := int64(3)
testKey := "testKey"
request := &mocks2.GenericBulkableRequest{}
request.On("String").Return("")
request.On("Source").Return([]string{string(`{"delete":{"_id":"testKey"}}`)}, nil)
requests := []bulk.GenericBulkableRequest{request}

mFailed := map[string]*bulk.GenericBulkResponseItem{
"index": {
Index: testIndex,
Type: testType,
ID: testID,
Version: version,
Status: 400,
},
}
response := &bulk.GenericBulkResponse{
Took: 3,
Errors: false,
Items: []map[string]*bulk.GenericBulkResponseItem{mFailed},
}

// Mock error to be passed to the after action functions
mockErr := &bulk.GenericError{
Status: 500,
Details: fmt.Errorf("Test error occurred"),
}

wid := "test-workflowID"
rid := "test-runID"
domainID := "test-domainID"
payload := s.getEncodedMsg(wid, rid, domainID)

mockKafkaMsg := &msgMocks.Message{}
mapVal := newKafkaMessageWithMetrics(mockKafkaMsg, &testStopWatch)
s.esProcessor.mapToKafkaMsg.Put(testKey, mapVal)

// Add mocked secondary processor
secondaryProcessor := &mocks2.GenericBulkProcessor{}
s.esProcessor.bulkProcessor = append(s.esProcessor.bulkProcessor, secondaryProcessor)

// Mock Kafka message Nack and Value
mockKafkaMsg.On("Nack").Return(nil).Once()
mockKafkaMsg.On("Value").Return(payload).Once()
s.mockScope.On("IncCounter", mock.AnythingOfType("int")).Return()
// Execute bulkAfterAction for primary processor with error
s.esProcessor.bulkAfterAction(0, requests, response, mockErr)
// Mocking secondary processor to test shadowBulkAfterAction with error
s.esProcessor.shadowBulkAfterAction(0, requests, response, mockErr)
}

0 comments on commit 3cacfbb

Please sign in to comment.