diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentAllocateAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentAllocateAction.java index bc61d23045b8..d8a5e735a279 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentAllocateAction.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentAllocateAction.java @@ -23,9 +23,11 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import io.druid.indexing.common.TaskLockType; import io.druid.indexing.common.task.Task; +import io.druid.indexing.overlord.CriticalAction; import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import io.druid.indexing.overlord.LockResult; import io.druid.java.util.common.IAE; @@ -269,14 +271,31 @@ private SegmentIdentifier tryAllocate( } if (lockResult.isOk()) { - final SegmentIdentifier identifier = toolbox.getIndexerMetadataStorageCoordinator().allocatePendingSegment( - dataSource, - sequenceName, - previousSegmentId, - tryInterval, - lockResult.getTaskLock().getVersion(), - skipSegmentLineageCheck - ); + final SegmentIdentifier identifier; + try { + identifier = toolbox.getTaskLockbox().doInCriticalSection( + task, + ImmutableList.of(tryInterval), + CriticalAction.builder() + .onValidLocks( + () -> toolbox.getIndexerMetadataStorageCoordinator().allocatePendingSegment( + dataSource, + sequenceName, + previousSegmentId, + tryInterval, + lockResult.getTaskLock().getVersion(), + skipSegmentLineageCheck + ) + ).onInvalidLocks( + () -> null + ) + .build() + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + if (identifier != null) { return identifier; } else { diff --git a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 6ecafd03a181..e4a1d7957b0c 100644 --- a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -394,11 +394,11 @@ public SegmentIdentifier allocatePendingSegment( Preconditions.checkNotNull(interval, "interval"); Preconditions.checkNotNull(maxVersion, "maxVersion"); - return connector.retryTransaction( - new TransactionCallback() + return connector.retryWithHandle( + new HandleCallback() { @Override - public SegmentIdentifier inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception + public SegmentIdentifier withHandle(Handle handle) throws Exception { return skipSegmentLineageCheck ? allocatePendingSegment(handle, dataSource, sequenceName, interval, maxVersion) : @@ -411,9 +411,7 @@ public SegmentIdentifier inTransaction(Handle handle, TransactionStatus transact maxVersion ); } - }, - ALLOCATE_SEGMENT_QUIET_TRIES, - SQLMetadataConnector.DEFAULT_MAX_TRIES + } ); } diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataConnector.java b/server/src/main/java/io/druid/metadata/SQLMetadataConnector.java index e648e928f6e5..4491e9ac18f2 100644 --- a/server/src/main/java/io/druid/metadata/SQLMetadataConnector.java +++ b/server/src/main/java/io/druid/metadata/SQLMetadataConnector.java @@ -32,6 +32,7 @@ import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.TransactionCallback; +import org.skife.jdbi.v2.TransactionIsolationLevel; import org.skife.jdbi.v2.TransactionStatus; import org.skife.jdbi.v2.exceptions.DBIException; import org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException; @@ -150,16 +151,13 @@ public T retryWithHandle(final HandleCallback callback) public T retryTransaction(final TransactionCallback callback, final int quietTries, final int maxTries) { - final Callable call = new Callable() - { - @Override - public T call() throws Exception - { - return getDBI().inTransaction(callback); - } - }; try { - return RetryUtils.retry(call, shouldRetry, quietTries, maxTries); + return RetryUtils.retry( + () -> getDBI().inTransaction(TransactionIsolationLevel.READ_COMMITTED, callback), + shouldRetry, + quietTries, + maxTries + ); } catch (Exception e) { throw Throwables.propagate(e);