From 7bbc8c3c42302416c1fe5724716910441095d5b9 Mon Sep 17 00:00:00 2001 From: Amatya Date: Wed, 28 Feb 2024 16:11:57 +0530 Subject: [PATCH 1/3] Concurrent replace should work with supervisors using concurrent locks --- .../overlord/supervisor/SupervisorManager.java | 11 +++++++++-- .../supervisor/SupervisorManagerTest.java | 18 ++++++++++++++++++ 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 207ff56f28f8..a8002b1e26bd 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -87,11 +87,18 @@ public Optional getActiveSupervisorIdForDatasourceWithAppendLock(String final Supervisor supervisor = entry.getValue().lhs; final SupervisorSpec supervisorSpec = entry.getValue().rhs; - TaskLockType taskLockType = null; + boolean useConcurrentLocks = Tasks.DEFAULT_USE_CONCURRENT_LOCKS; + TaskLockType taskLockType = Tasks.DEFAULT_TASK_LOCK_TYPE; if (supervisorSpec instanceof SeekableStreamSupervisorSpec) { SeekableStreamSupervisorSpec seekableStreamSupervisorSpec = (SeekableStreamSupervisorSpec) supervisorSpec; Map context = seekableStreamSupervisorSpec.getContext(); if (context != null) { + useConcurrentLocks = QueryContexts.getAsBoolean( + Tasks.USE_CONCURRENT_LOCKS, + context.get(Tasks.USE_CONCURRENT_LOCKS), + Tasks.DEFAULT_USE_CONCURRENT_LOCKS + ); + taskLockType = QueryContexts.getAsEnum( Tasks.TASK_LOCK_TYPE, context.get(Tasks.TASK_LOCK_TYPE), @@ -103,7 +110,7 @@ public Optional getActiveSupervisorIdForDatasourceWithAppendLock(String if (supervisor instanceof SeekableStreamSupervisor && !supervisorSpec.isSuspended() && supervisorSpec.getDataSources().contains(datasource) - && TaskLockType.APPEND.equals(taskLockType)) { + && (useConcurrentLocks || TaskLockType.APPEND.equals(taskLockType))) { return Optional.of(supervisorId); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java index e8c5d839cf19..3dcff3e2d627 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java @@ -468,6 +468,21 @@ public void testGetActiveSupervisorIdForDatasourceWithAppendLock() EasyMock.replay(activeSpec); metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject()); + SeekableStreamSupervisorSpec activeSpecWithConcurrentLocks = EasyMock.mock(SeekableStreamSupervisorSpec.class); + Supervisor activeSupervisorWithConcurrentLocks = EasyMock.mock(SeekableStreamSupervisor.class); + EasyMock.expect(activeSpecWithConcurrentLocks.getId()).andReturn("activeSpecWithConcurrentLocks").anyTimes(); + EasyMock.expect(activeSpecWithConcurrentLocks.isSuspended()).andReturn(false).anyTimes(); + EasyMock.expect(activeSpecWithConcurrentLocks.getDataSources()) + .andReturn(ImmutableList.of("activeConcurrentLocksDS")).anyTimes(); + EasyMock.expect(activeSpecWithConcurrentLocks.createSupervisor()) + .andReturn(activeSupervisorWithConcurrentLocks).anyTimes(); + EasyMock.expect(activeSpecWithConcurrentLocks.createAutoscaler(activeSupervisorWithConcurrentLocks)) + .andReturn(null).anyTimes(); + EasyMock.expect(activeSpecWithConcurrentLocks.getContext()) + .andReturn(ImmutableMap.of(Tasks.USE_CONCURRENT_LOCKS, true)).anyTimes(); + EasyMock.replay(activeSpecWithConcurrentLocks); + metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject()); + SeekableStreamSupervisorSpec activeAppendSpec = EasyMock.mock(SeekableStreamSupervisorSpec.class); Supervisor activeAppendSupervisor = EasyMock.mock(SeekableStreamSupervisor.class); EasyMock.expect(activeAppendSpec.getId()).andReturn("activeAppendSpec").anyTimes(); @@ -499,6 +514,9 @@ public void testGetActiveSupervisorIdForDatasourceWithAppendLock() manager.createOrUpdateAndStartSupervisor(activeAppendSpec); Assert.assertTrue(manager.getActiveSupervisorIdForDatasourceWithAppendLock("activeAppendDS").isPresent()); + manager.createOrUpdateAndStartSupervisor(activeSpecWithConcurrentLocks); + Assert.assertTrue(manager.getActiveSupervisorIdForDatasourceWithAppendLock("activeConcurrentLocksDS").isPresent()); + verifyAll(); } From 261aa3c062c5cb6efe617d190442f29026630ba6 Mon Sep 17 00:00:00 2001 From: Amatya Date: Wed, 28 Feb 2024 16:25:49 +0530 Subject: [PATCH 2/3] Ignore supervisors with useConcurrentLocks set to false --- .../supervisor/SupervisorManager.java | 18 +++++++------- .../supervisor/SupervisorManagerTest.java | 24 +++++++++++++++++++ 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index a8002b1e26bd..270bc5967765 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -87,7 +87,7 @@ public Optional getActiveSupervisorIdForDatasourceWithAppendLock(String final Supervisor supervisor = entry.getValue().lhs; final SupervisorSpec supervisorSpec = entry.getValue().rhs; - boolean useConcurrentLocks = Tasks.DEFAULT_USE_CONCURRENT_LOCKS; + Boolean useConcurrentLocks = Tasks.DEFAULT_USE_CONCURRENT_LOCKS; TaskLockType taskLockType = Tasks.DEFAULT_TASK_LOCK_TYPE; if (supervisorSpec instanceof SeekableStreamSupervisorSpec) { SeekableStreamSupervisorSpec seekableStreamSupervisorSpec = (SeekableStreamSupervisorSpec) supervisorSpec; @@ -95,15 +95,17 @@ public Optional getActiveSupervisorIdForDatasourceWithAppendLock(String if (context != null) { useConcurrentLocks = QueryContexts.getAsBoolean( Tasks.USE_CONCURRENT_LOCKS, - context.get(Tasks.USE_CONCURRENT_LOCKS), - Tasks.DEFAULT_USE_CONCURRENT_LOCKS + context.get(Tasks.USE_CONCURRENT_LOCKS) ); - taskLockType = QueryContexts.getAsEnum( - Tasks.TASK_LOCK_TYPE, - context.get(Tasks.TASK_LOCK_TYPE), - TaskLockType.class - ); + if (useConcurrentLocks == null) { + useConcurrentLocks = false; + taskLockType = QueryContexts.getAsEnum( + Tasks.TASK_LOCK_TYPE, + context.get(Tasks.TASK_LOCK_TYPE), + TaskLockType.class + ); + } } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java index 3dcff3e2d627..5ffbd4b94608 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java @@ -497,6 +497,25 @@ public void testGetActiveSupervisorIdForDatasourceWithAppendLock() EasyMock.replay(activeAppendSpec); metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject()); + // A supervisor with useConcurrentLocks set to false explicitly must not use an append lock + SeekableStreamSupervisorSpec specWithUseConcurrentLocksFalse = EasyMock.mock(SeekableStreamSupervisorSpec.class); + Supervisor supervisorWithUseConcurrentLocksFalse = EasyMock.mock(SeekableStreamSupervisor.class); + EasyMock.expect(specWithUseConcurrentLocksFalse.getId()).andReturn("useConcurrentLocksFalse").anyTimes(); + EasyMock.expect(specWithUseConcurrentLocksFalse.isSuspended()).andReturn(false).anyTimes(); + EasyMock.expect(specWithUseConcurrentLocksFalse.getDataSources()) + .andReturn(ImmutableList.of("dsWithuseConcurrentLocksFalse")).anyTimes(); + EasyMock.expect(specWithUseConcurrentLocksFalse.createSupervisor()).andReturn(supervisorWithUseConcurrentLocksFalse).anyTimes(); + EasyMock.expect(specWithUseConcurrentLocksFalse.createAutoscaler(supervisorWithUseConcurrentLocksFalse)) + .andReturn(null).anyTimes(); + EasyMock.expect(specWithUseConcurrentLocksFalse.getContext()).andReturn(ImmutableMap.of( + Tasks.USE_CONCURRENT_LOCKS, + false, + Tasks.TASK_LOCK_TYPE, + TaskLockType.APPEND.name() + )).anyTimes(); + EasyMock.replay(specWithUseConcurrentLocksFalse); + metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject()); + replayAll(); manager.start(); @@ -517,6 +536,11 @@ public void testGetActiveSupervisorIdForDatasourceWithAppendLock() manager.createOrUpdateAndStartSupervisor(activeSpecWithConcurrentLocks); Assert.assertTrue(manager.getActiveSupervisorIdForDatasourceWithAppendLock("activeConcurrentLocksDS").isPresent()); + manager.createOrUpdateAndStartSupervisor(specWithUseConcurrentLocksFalse); + Assert.assertFalse( + manager.getActiveSupervisorIdForDatasourceWithAppendLock("dsWithUseConcurrentLocksFalse").isPresent() + ); + verifyAll(); } From 11ce1bf8127e66d360addadebc7f0efed8082184 Mon Sep 17 00:00:00 2001 From: Amatya Date: Thu, 29 Feb 2024 08:48:38 +0530 Subject: [PATCH 3/3] Apply feedback --- .../supervisor/SupervisorManager.java | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 270bc5967765..810a991c2f22 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -87,24 +87,30 @@ public Optional getActiveSupervisorIdForDatasourceWithAppendLock(String final Supervisor supervisor = entry.getValue().lhs; final SupervisorSpec supervisorSpec = entry.getValue().rhs; - Boolean useConcurrentLocks = Tasks.DEFAULT_USE_CONCURRENT_LOCKS; - TaskLockType taskLockType = Tasks.DEFAULT_TASK_LOCK_TYPE; + boolean hasAppendLock = Tasks.DEFAULT_USE_CONCURRENT_LOCKS; if (supervisorSpec instanceof SeekableStreamSupervisorSpec) { SeekableStreamSupervisorSpec seekableStreamSupervisorSpec = (SeekableStreamSupervisorSpec) supervisorSpec; Map context = seekableStreamSupervisorSpec.getContext(); if (context != null) { - useConcurrentLocks = QueryContexts.getAsBoolean( + Boolean useConcurrentLocks = QueryContexts.getAsBoolean( Tasks.USE_CONCURRENT_LOCKS, context.get(Tasks.USE_CONCURRENT_LOCKS) ); - if (useConcurrentLocks == null) { - useConcurrentLocks = false; - taskLockType = QueryContexts.getAsEnum( + TaskLockType taskLockType = QueryContexts.getAsEnum( Tasks.TASK_LOCK_TYPE, context.get(Tasks.TASK_LOCK_TYPE), TaskLockType.class ); + if (taskLockType == null) { + hasAppendLock = Tasks.DEFAULT_USE_CONCURRENT_LOCKS; + } else if (taskLockType == TaskLockType.APPEND) { + hasAppendLock = true; + } else { + hasAppendLock = false; + } + } else { + hasAppendLock = useConcurrentLocks; } } } @@ -112,7 +118,7 @@ public Optional getActiveSupervisorIdForDatasourceWithAppendLock(String if (supervisor instanceof SeekableStreamSupervisor && !supervisorSpec.isSuspended() && supervisorSpec.getDataSources().contains(datasource) - && (useConcurrentLocks || TaskLockType.APPEND.equals(taskLockType))) { + && (hasAppendLock)) { return Optional.of(supervisorId); } }