Skip to content

Commit

Permalink
Concurrent replace should work with supervisors using concurrent locks (
Browse files Browse the repository at this point in the history
#15995)

* Concurrent replace should work with supervisors using concurrent locks

* Ignore supervisors with useConcurrentLocks set to false

* Apply feedback
  • Loading branch information
AmatyaAvadhanula authored Feb 29, 2024
1 parent d6f59d1 commit 7c42e87
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,23 +87,38 @@ public Optional<String> getActiveSupervisorIdForDatasourceWithAppendLock(String
final Supervisor supervisor = entry.getValue().lhs;
final SupervisorSpec supervisorSpec = entry.getValue().rhs;

TaskLockType taskLockType = null;
boolean hasAppendLock = Tasks.DEFAULT_USE_CONCURRENT_LOCKS;
if (supervisorSpec instanceof SeekableStreamSupervisorSpec) {
SeekableStreamSupervisorSpec seekableStreamSupervisorSpec = (SeekableStreamSupervisorSpec) supervisorSpec;
Map<String, Object> context = seekableStreamSupervisorSpec.getContext();
if (context != null) {
taskLockType = QueryContexts.getAsEnum(
Tasks.TASK_LOCK_TYPE,
context.get(Tasks.TASK_LOCK_TYPE),
TaskLockType.class
Boolean useConcurrentLocks = QueryContexts.getAsBoolean(
Tasks.USE_CONCURRENT_LOCKS,
context.get(Tasks.USE_CONCURRENT_LOCKS)
);
if (useConcurrentLocks == null) {
TaskLockType taskLockType = QueryContexts.getAsEnum(
Tasks.TASK_LOCK_TYPE,
context.get(Tasks.TASK_LOCK_TYPE),
TaskLockType.class
);
if (taskLockType == null) {
hasAppendLock = Tasks.DEFAULT_USE_CONCURRENT_LOCKS;
} else if (taskLockType == TaskLockType.APPEND) {
hasAppendLock = true;
} else {
hasAppendLock = false;
}
} else {
hasAppendLock = useConcurrentLocks;
}
}
}

if (supervisor instanceof SeekableStreamSupervisor
&& !supervisorSpec.isSuspended()
&& supervisorSpec.getDataSources().contains(datasource)
&& TaskLockType.APPEND.equals(taskLockType)) {
&& (hasAppendLock)) {
return Optional.of(supervisorId);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,21 @@ public void testGetActiveSupervisorIdForDatasourceWithAppendLock()
EasyMock.replay(activeSpec);
metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject());

SeekableStreamSupervisorSpec activeSpecWithConcurrentLocks = EasyMock.mock(SeekableStreamSupervisorSpec.class);
Supervisor activeSupervisorWithConcurrentLocks = EasyMock.mock(SeekableStreamSupervisor.class);
EasyMock.expect(activeSpecWithConcurrentLocks.getId()).andReturn("activeSpecWithConcurrentLocks").anyTimes();
EasyMock.expect(activeSpecWithConcurrentLocks.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(activeSpecWithConcurrentLocks.getDataSources())
.andReturn(ImmutableList.of("activeConcurrentLocksDS")).anyTimes();
EasyMock.expect(activeSpecWithConcurrentLocks.createSupervisor())
.andReturn(activeSupervisorWithConcurrentLocks).anyTimes();
EasyMock.expect(activeSpecWithConcurrentLocks.createAutoscaler(activeSupervisorWithConcurrentLocks))
.andReturn(null).anyTimes();
EasyMock.expect(activeSpecWithConcurrentLocks.getContext())
.andReturn(ImmutableMap.of(Tasks.USE_CONCURRENT_LOCKS, true)).anyTimes();
EasyMock.replay(activeSpecWithConcurrentLocks);
metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject());

SeekableStreamSupervisorSpec activeAppendSpec = EasyMock.mock(SeekableStreamSupervisorSpec.class);
Supervisor activeAppendSupervisor = EasyMock.mock(SeekableStreamSupervisor.class);
EasyMock.expect(activeAppendSpec.getId()).andReturn("activeAppendSpec").anyTimes();
Expand All @@ -482,6 +497,25 @@ public void testGetActiveSupervisorIdForDatasourceWithAppendLock()
EasyMock.replay(activeAppendSpec);
metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject());

// A supervisor with useConcurrentLocks set to false explicitly must not use an append lock
SeekableStreamSupervisorSpec specWithUseConcurrentLocksFalse = EasyMock.mock(SeekableStreamSupervisorSpec.class);
Supervisor supervisorWithUseConcurrentLocksFalse = EasyMock.mock(SeekableStreamSupervisor.class);
EasyMock.expect(specWithUseConcurrentLocksFalse.getId()).andReturn("useConcurrentLocksFalse").anyTimes();
EasyMock.expect(specWithUseConcurrentLocksFalse.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(specWithUseConcurrentLocksFalse.getDataSources())
.andReturn(ImmutableList.of("dsWithuseConcurrentLocksFalse")).anyTimes();
EasyMock.expect(specWithUseConcurrentLocksFalse.createSupervisor()).andReturn(supervisorWithUseConcurrentLocksFalse).anyTimes();
EasyMock.expect(specWithUseConcurrentLocksFalse.createAutoscaler(supervisorWithUseConcurrentLocksFalse))
.andReturn(null).anyTimes();
EasyMock.expect(specWithUseConcurrentLocksFalse.getContext()).andReturn(ImmutableMap.of(
Tasks.USE_CONCURRENT_LOCKS,
false,
Tasks.TASK_LOCK_TYPE,
TaskLockType.APPEND.name()
)).anyTimes();
EasyMock.replay(specWithUseConcurrentLocksFalse);
metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject());

replayAll();
manager.start();

Expand All @@ -499,6 +533,14 @@ public void testGetActiveSupervisorIdForDatasourceWithAppendLock()
manager.createOrUpdateAndStartSupervisor(activeAppendSpec);
Assert.assertTrue(manager.getActiveSupervisorIdForDatasourceWithAppendLock("activeAppendDS").isPresent());

manager.createOrUpdateAndStartSupervisor(activeSpecWithConcurrentLocks);
Assert.assertTrue(manager.getActiveSupervisorIdForDatasourceWithAppendLock("activeConcurrentLocksDS").isPresent());

manager.createOrUpdateAndStartSupervisor(specWithUseConcurrentLocksFalse);
Assert.assertFalse(
manager.getActiveSupervisorIdForDatasourceWithAppendLock("dsWithUseConcurrentLocksFalse").isPresent()
);

verifyAll();
}

Expand Down

0 comments on commit 7c42e87

Please sign in to comment.