Skip to content

Commit

Permalink
Merge branch 'master' into increase_cov
Browse files Browse the repository at this point in the history
  • Loading branch information
agautam478 authored Apr 4, 2024
2 parents 102d4c3 + e040206 commit eb468e7
Show file tree
Hide file tree
Showing 15 changed files with 1,011 additions and 484 deletions.
4 changes: 2 additions & 2 deletions common/domain/failover_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (p *failoverWatcherImpl) Stop() {

func (p *failoverWatcherImpl) refreshDomainLoop() {

timer := time.NewTimer(backoff.JitDuration(
timer := p.timeSource.NewTimer(backoff.JitDuration(
p.refreshInterval(),
p.refreshJitter(),
))
Expand All @@ -129,7 +129,7 @@ func (p *failoverWatcherImpl) refreshDomainLoop() {
select {
case <-p.shutdownChan:
return
case <-timer.C:
case <-timer.Chan():
domains := p.domainCache.GetAllDomain()
for _, domain := range domains {
p.handleFailoverTimeout(domain)
Expand Down
77 changes: 76 additions & 1 deletion common/domain/failover_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package domain

import (
"errors"
"log"
"os"
"testing"
Expand Down Expand Up @@ -76,7 +77,7 @@ func (s *failoverWatcherSuite) SetupTest() {
s.controller = gomock.NewController(s.T())

s.mockDomainCache = cache.NewMockDomainCache(s.controller)
s.timeSource = clock.NewRealTimeSource()
s.timeSource = clock.NewMockedTimeSource()
s.mockMetadataMgr = &mocks.MetadataManager{}

s.mockMetadataMgr.On("GetMetadata", mock.Anything).Return(&persistence.GetMetadataResponse{
Expand Down Expand Up @@ -243,3 +244,77 @@ func (s *failoverWatcherSuite) TestHandleFailoverTimeout() {
)
s.watcher.handleFailoverTimeout(domainEntry)
}

func (s *failoverWatcherSuite) TestStart() {
s.Assertions.Equal(common.DaemonStatusInitialized, s.watcher.status)
s.watcher.Start()
s.Assertions.Equal(common.DaemonStatusStarted, s.watcher.status)

// Verify that calling Start again does not change the status
s.watcher.Start()
s.Assertions.Equal(common.DaemonStatusStarted, s.watcher.status)
s.watcher.Stop()
}

func (s *failoverWatcherSuite) TestIsUpdateDomainRetryable() {
testCases := []struct {
name string
inputErr error
wantRetry bool
}{
{"nil error", nil, true},
{"non-nil error", errors.New("some error"), true},
}

for _, tc := range testCases {
s.Run(tc.name, func() {
retry := isUpdateDomainRetryable(tc.inputErr)
s.Equal(tc.wantRetry, retry)
})
}
}

func (s *failoverWatcherSuite) TestRefreshDomainLoop() {

domainName := "testDomain"
domainID := uuid.New()
failoverEndTime := common.Int64Ptr(time.Now().Add(-time.Hour).UnixNano()) // 1 hour in the past
mockTimeSource, _ := s.timeSource.(clock.MockedTimeSource)

domainInfo := &persistence.DomainInfo{ID: domainID, Name: domainName}
domainConfig := &persistence.DomainConfig{Retention: 1, EmitMetric: true}
replicationConfig := &persistence.DomainReplicationConfig{ActiveClusterName: "active", Clusters: []*persistence.ClusterReplicationConfig{{ClusterName: "active"}}}
domainEntry := cache.NewDomainCacheEntryForTest(domainInfo, domainConfig, true, replicationConfig, 1, failoverEndTime)

domainsMap := map[string]*cache.DomainCacheEntry{domainID: domainEntry}
s.mockDomainCache.EXPECT().GetAllDomain().Return(domainsMap).AnyTimes()

s.mockMetadataMgr.On("GetMetadata", mock.Anything).Return(&persistence.GetMetadataResponse{NotificationVersion: 1}, nil).Maybe()

s.mockMetadataMgr.On("GetDomain", mock.Anything, mock.AnythingOfType("*persistence.GetDomainRequest")).Return(&persistence.GetDomainResponse{
Info: domainInfo,
Config: domainConfig,
ReplicationConfig: replicationConfig,
IsGlobalDomain: true,
ConfigVersion: 1,
FailoverVersion: 1,
FailoverNotificationVersion: 1,
FailoverEndTime: failoverEndTime,
NotificationVersion: 1,
}, nil).Once()

s.mockMetadataMgr.On("UpdateDomain", mock.Anything, mock.Anything).Return(nil).Once()

s.watcher.Start()

// Delay to allow loop to start
time.Sleep(1 * time.Second)
mockTimeSource.Advance(12 * time.Second)
// Now stop the watcher, which should trigger the shutdown case in refreshDomainLoop
s.watcher.Stop()

// Enough time for shutdown process to complete
time.Sleep(1 * time.Second)

s.mockMetadataMgr.AssertExpectations(s.T())
}
File renamed without changes.
174 changes: 173 additions & 1 deletion common/persistence/nosql/nosql_execution_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1052,7 +1052,6 @@ func TestNosqlExecutionStore(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

store := tc.setupMock(ctrl)
err := tc.testFunc(store)
Expand All @@ -1067,6 +1066,179 @@ func TestNosqlExecutionStore(t *testing.T) {
}
}

func TestDeleteReplicationTaskFromDLQ(t *testing.T) {
ctx := context.Background()
shardID := 1

tests := []struct {
name string
sourceCluster string
taskID int64
setupMock func(*nosqlplugin.MockDB)
expectedError error
}{
{
name: "success",
sourceCluster: "sourceCluster",
taskID: 1,
setupMock: func(mockDB *nosqlplugin.MockDB) {
mockDB.EXPECT().
DeleteReplicationDLQTask(ctx, shardID, "sourceCluster", int64(1)).
Return(nil)
},
expectedError: nil,
},
{
name: "database error",
sourceCluster: "sourceCluster",
taskID: 1,
setupMock: func(mockDB *nosqlplugin.MockDB) {
mockDB.EXPECT().IsNotFoundError(gomock.Any()).Return(true).AnyTimes()
mockDB.EXPECT().
DeleteReplicationDLQTask(ctx, shardID, "sourceCluster", int64(1)).
Return(errors.New("database error"))
},
expectedError: &types.InternalServiceError{Message: "database error"},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
controller := gomock.NewController(t)

mockDB := nosqlplugin.NewMockDB(controller)
store := newTestNosqlExecutionStore(mockDB, log.NewNoop())

tc.setupMock(mockDB)

err := store.DeleteReplicationTaskFromDLQ(ctx, &persistence.DeleteReplicationTaskFromDLQRequest{
SourceClusterName: tc.sourceCluster,
TaskID: tc.taskID,
})

if tc.expectedError != nil {
require.ErrorAs(t, err, &tc.expectedError)
} else {
require.NoError(t, err)
}
})
}
}

func TestRangeDeleteReplicationTaskFromDLQ(t *testing.T) {
ctx := context.Background()
shardID := 1

tests := []struct {
name string
sourceCluster string
exclusiveBeginID int64
inclusiveEndID int64
setupMock func(*nosqlplugin.MockDB)
expectedError error
}{
{
name: "success",
sourceCluster: "sourceCluster",
exclusiveBeginID: 1,
inclusiveEndID: 100,
setupMock: func(mockDB *nosqlplugin.MockDB) {
mockDB.EXPECT().
RangeDeleteReplicationDLQTasks(ctx, shardID, "sourceCluster", int64(1), int64(100)).
Return(nil)
},
expectedError: nil,
},
{
name: "database error",
sourceCluster: "sourceCluster",
exclusiveBeginID: 1,
inclusiveEndID: 100,
setupMock: func(mockDB *nosqlplugin.MockDB) {
mockDB.EXPECT().IsNotFoundError(gomock.Any()).Return(true).AnyTimes()
mockDB.EXPECT().
RangeDeleteReplicationDLQTasks(ctx, shardID, "sourceCluster", int64(1), int64(100)).
Return(errors.New("database error"))
},
expectedError: &types.InternalServiceError{Message: "database error"},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
controller := gomock.NewController(t)

mockDB := nosqlplugin.NewMockDB(controller)
store := newTestNosqlExecutionStore(mockDB, log.NewNoop())

tc.setupMock(mockDB)

_, err := store.RangeDeleteReplicationTaskFromDLQ(ctx, &persistence.RangeDeleteReplicationTaskFromDLQRequest{
SourceClusterName: tc.sourceCluster,
ExclusiveBeginTaskID: tc.exclusiveBeginID,
InclusiveEndTaskID: tc.inclusiveEndID,
})

if tc.expectedError != nil {
require.ErrorAs(t, err, &tc.expectedError)
} else {
require.NoError(t, err)
}
})
}
}

func TestCreateFailoverMarkerTasks(t *testing.T) {
ctx := context.Background()
shardID := 1

tests := []struct {
name string
rangeID int64
markers []*persistence.FailoverMarkerTask
setupMock func(*nosqlplugin.MockDB)
expectedError error
}{
{
name: "success",
rangeID: 123,
markers: []*persistence.FailoverMarkerTask{
{
TaskData: persistence.TaskData{},
DomainID: "testDomainID",
},
},
setupMock: func(mockDB *nosqlplugin.MockDB) {
mockDB.EXPECT().
InsertReplicationTask(ctx, gomock.Any(), nosqlplugin.ShardCondition{ShardID: shardID, RangeID: 123}).
Return(nil)
},
expectedError: nil,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
controller := gomock.NewController(t)

mockDB := nosqlplugin.NewMockDB(controller)
store := newTestNosqlExecutionStore(mockDB, log.NewNoop())

tc.setupMock(mockDB)

err := store.CreateFailoverMarkerTasks(ctx, &persistence.CreateFailoverMarkersRequest{
RangeID: tc.rangeID,
Markers: tc.markers,
})

if tc.expectedError != nil {
require.ErrorAs(t, err, &tc.expectedError)
} else {
require.NoError(t, err)
}
})
}
}

func newCreateWorkflowExecutionRequest() *persistence.InternalCreateWorkflowExecutionRequest {
return &persistence.InternalCreateWorkflowExecutionRequest{
RangeID: 123,
Expand Down
62 changes: 0 additions & 62 deletions common/persistence/pinotVisibilityTripleManager_test.go

This file was deleted.

Loading

0 comments on commit eb468e7

Please sign in to comment.