Skip to content

Commit

Permalink
update insert pending segments logic to synchronous (apache#6336)
Browse files Browse the repository at this point in the history
* 1. Mysql default transaction isolation is REPEATABLE_READ, treat it as READ_COMMITTED will reduce insert id conflict.
2. Add an index to 'dataSource used end' is work well for the most of scenarios(get recently segments), and it will speed up sync add pending segments in DB.
3. 'select and insert' is not need within transaction.

* Use TaskLockbox.doInCriticalSection instead of synchronized syntax to speed up insert pending segments.

* fix typo for NullPointerException
  • Loading branch information
FaxianZhao authored and gianm committed Nov 16, 2018
1 parent 0c29a6a commit 9f55c55
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.druid.indexing.common.TaskLockType;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.CriticalAction;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.LockResult;
import io.druid.java.util.common.IAE;
Expand Down Expand Up @@ -269,14 +271,31 @@ private SegmentIdentifier tryAllocate(
}

if (lockResult.isOk()) {
final SegmentIdentifier identifier = toolbox.getIndexerMetadataStorageCoordinator().allocatePendingSegment(
dataSource,
sequenceName,
previousSegmentId,
tryInterval,
lockResult.getTaskLock().getVersion(),
skipSegmentLineageCheck
);
final SegmentIdentifier identifier;
try {
identifier = toolbox.getTaskLockbox().doInCriticalSection(
task,
ImmutableList.of(tryInterval),
CriticalAction.<SegmentIdentifier>builder()
.onValidLocks(
() -> toolbox.getIndexerMetadataStorageCoordinator().allocatePendingSegment(
dataSource,
sequenceName,
previousSegmentId,
tryInterval,
lockResult.getTaskLock().getVersion(),
skipSegmentLineageCheck
)
).onInvalidLocks(
() -> null
)
.build()
);
}
catch (Exception e) {
throw new RuntimeException(e);
}

if (identifier != null) {
return identifier;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,11 +394,11 @@ public SegmentIdentifier allocatePendingSegment(
Preconditions.checkNotNull(interval, "interval");
Preconditions.checkNotNull(maxVersion, "maxVersion");

return connector.retryTransaction(
new TransactionCallback<SegmentIdentifier>()
return connector.retryWithHandle(
new HandleCallback<SegmentIdentifier>()
{
@Override
public SegmentIdentifier inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception
public SegmentIdentifier withHandle(Handle handle) throws Exception
{
return skipSegmentLineageCheck ?
allocatePendingSegment(handle, dataSource, sequenceName, interval, maxVersion) :
Expand All @@ -411,9 +411,7 @@ public SegmentIdentifier inTransaction(Handle handle, TransactionStatus transact
maxVersion
);
}
},
ALLOCATE_SEGMENT_QUIET_TRIES,
SQLMetadataConnector.DEFAULT_MAX_TRIES
}
);
}

Expand Down
16 changes: 7 additions & 9 deletions server/src/main/java/io/druid/metadata/SQLMetadataConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionIsolationLevel;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.exceptions.DBIException;
import org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException;
Expand Down Expand Up @@ -150,16 +151,13 @@ public <T> T retryWithHandle(final HandleCallback<T> callback)

public <T> T retryTransaction(final TransactionCallback<T> callback, final int quietTries, final int maxTries)
{
final Callable<T> call = new Callable<T>()
{
@Override
public T call() throws Exception
{
return getDBI().inTransaction(callback);
}
};
try {
return RetryUtils.retry(call, shouldRetry, quietTries, maxTries);
return RetryUtils.retry(
() -> getDBI().inTransaction(TransactionIsolationLevel.READ_COMMITTED, callback),
shouldRetry,
quietTries,
maxTries
);
}
catch (Exception e) {
throw Throwables.propagate(e);
Expand Down

0 comments on commit 9f55c55

Please sign in to comment.