Skip to content

Commit

Permalink
Merge pull request elastic#9 from henningandersen/spacetime_transacti…
Browse files Browse the repository at this point in the history
…ons_hook_commit

Hook commit and replay ops.
  • Loading branch information
henningandersen authored Nov 19, 2021
2 parents 1c2f707 + 4c165b6 commit 572d7d4
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ public void testPrepareConflict() throws Exception {
transportServiceIterable.forEach(ts -> ((MockTransportService) ts).addSendBehavior(new StubbableTransport.SendRequestBehavior() {
@Override
public void sendRequest(Transport.Connection connection, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException {
if (action.startsWith(ShardPrepareCommitAction.NAME)) {
if (action.equals(ShardPrepareCommitAction.NAME)) {
txes.add(((ShardPrepareCommitRequest) request).txid());
new Thread(() -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -81,7 +80,6 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ

private final UpdateHelper updateHelper;
private final MappingUpdatedAction mappingUpdatedAction;
private static final ShardTransactionRegistry transactionRegistry = new ShardTransactionRegistry();

@Inject
public TransportShardBulkAction(
Expand Down Expand Up @@ -177,8 +175,14 @@ public static void performOnPrimary(
ThreadPool threadPool,
String executorName
) {
primary.registerTransaction(request.txID(),
Arrays.stream(request.items()).map(BulkItemRequest::request).map(DocWriteRequest::id).collect(Collectors.toSet()));
Translog.Location[] transactionId = new Translog.Location[1];
try {
transactionId[0] = primary.startTransaction(request.txID(),
Arrays.stream(request.items()).map(BulkItemRequest::request).map(DocWriteRequest::id).collect(Collectors.toSet()));
} catch (IOException e) {
assert false;
throw new RuntimeException(e);
}
new ActionRunnable<>(listener) {

private final Executor executor = threadPool.executor(executorName);
Expand All @@ -189,11 +193,7 @@ public static void performOnPrimary(

@Override
protected void doRun() throws Exception {
TxID txID1 = TxID.create();
Translog.Location[] transactionId = new Translog.Location[1];
try {
transactionId[0] = primary.startTransaction(txID1.id());
transactionRegistry.registerTransaction(txID1, Set.of(transactionId[0].id()));

while (context.hasMoreOperationsToExecute()) {
if (executeBulkItemRequest(
Expand All @@ -212,11 +212,12 @@ protected void doRun() throws Exception {
assert context.isInitial(); // either completed and moved to next or reset
}

primary.commitTransaction(transactionId);
primary.loggingComplete(request.txID(), transactionId[0]);
transactionId[0] = primary.commitTransaction(request.txID());
primary.closeTransaction(transactionId);
} catch (Exception x) {
logger.warn("Encountered an error while executing bulk transaction", x);
primary.rollbackTransaction(transactionId);
} finally {
primary.closeTransaction(transactionId);
}
primary.getBulkOperationListener().afterBulk(request.totalSizeInBytes(), System.nanoTime() - startBulkTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Stream;

Expand Down Expand Up @@ -323,7 +324,8 @@ public Condition newCondition() {

public abstract Translog.Location startTransaction(String id) throws IOException;

public abstract Translog.Location commitTransaction(Translog.Location prevId) throws IOException;
public abstract Translog.Location commitTransaction(Translog.Location prevId,
Function<Translog.Operation, Engine.Result> applier) throws IOException;

public abstract Translog.Location rollbackTransaction(Translog.Location prevId) throws IOException;

Expand Down Expand Up @@ -1312,6 +1314,7 @@ public Operation(Term uid, long seqNo, long primaryTerm, long version, VersionTy
}

public enum Origin {
TRANSACTION,
PRIMARY,
REPLICA,
PEER_RECOVERY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2074,7 +2074,10 @@ public Translog.Location startTransaction(String id) throws IOException {
}

@Override
public Translog.Location commitTransaction(Translog.Location prevId) throws IOException {
public Translog.Location commitTransaction(Translog.Location prevId, Function<Translog.Operation, Engine.Result> applier) throws IOException {
Translog.Location commitLocation = translog.add(
new Translog.TxCommit(doGenerateSeqNoForOperation(new TxOp(System.nanoTime())), prevId));

Translog.Location loc = prevId;

while (loc != null) {
Expand All @@ -2087,6 +2090,8 @@ public Translog.Location commitTransaction(Translog.Location prevId) throws IOEx
logger.info("Committing op " + op);

if (op instanceof Translog.TransactionMember) {
// todo: lots of things here, but maybe this works for now...
applier.apply(op);
loc = ((Translog.TransactionMember)op).getTransactionId();
} else if (op instanceof Translog.TxStart) {
break;
Expand All @@ -2096,8 +2101,7 @@ public Translog.Location commitTransaction(Translog.Location prevId) throws IOEx
}
}

return translog.add(
new Translog.TxCommit(doGenerateSeqNoForOperation(new TxOp(System.nanoTime())), prevId));
return commitLocation;

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Stream;

Expand Down Expand Up @@ -533,7 +534,8 @@ public Translog.Location startTransaction(String id) throws IOException {
}

@Override
public Translog.Location commitTransaction(Translog.Location transactionId) throws IOException {
public Translog.Location commitTransaction(Translog.Location transactionId,
Function<Translog.Operation, Result> applier) throws IOException {
return null;
}

Expand Down
22 changes: 17 additions & 5 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@
import java.util.Collections;
import java.util.Deque;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -887,9 +886,15 @@ public Translog.Location startTransaction(String id) throws IOException {
return getEngine().startTransaction(id);
}

public boolean commitTransaction(Translog.Location[] transactionId) throws IOException {
transactionId[0] = getEngine().commitTransaction(transactionId[0]);
return true;
public Translog.Location commitTransaction(TxID txID) throws IOException {
return getEngine().commitTransaction(transactionRegistry.translogHead(txID), operation -> {
try {
return applyTranslogOperation(operation, Engine.Operation.Origin.TRANSACTION);
} catch (IOException e) {
assert false;
throw new RuntimeException(e);
}
});
}

public boolean rollbackTransaction(Translog.Location[] transactionId) throws IOException {
Expand Down Expand Up @@ -1613,6 +1618,10 @@ static Engine.Searcher wrapSearcher(
); // completes stats recording
}

public void loggingComplete(TxID txID, Translog.Location headOfTranslogList) {
transactionRegistry.loggingComplete(txID, headOfTranslogList);
}

private static final class NonClosingReaderWrapper extends FilterDirectoryReader {

private NonClosingReaderWrapper(DirectoryReader in) throws IOException {
Expand Down Expand Up @@ -2150,6 +2159,8 @@ private void ensureWriteAllowed(Engine.Operation.Origin origin) throws IllegalIn
assert assertPrimaryMode();
} else if (origin == Engine.Operation.Origin.REPLICA) {
assert assertReplicationTarget();
} else if (origin == Engine.Operation.Origin.TRANSACTION) {
// empty
} else {
assert origin == Engine.Operation.Origin.LOCAL_RESET;
assert getActiveOperationsCount() == OPERATIONS_BLOCKED
Expand Down Expand Up @@ -4134,8 +4145,9 @@ RetentionLeaseSyncer getRetentionLeaseSyncer() {
return retentionLeaseSyncer;
}

public void registerTransaction(TxID id, Set<String> keys) {
public Translog.Location startTransaction(TxID id, Set<String> keys) throws IOException {
transactionRegistry.registerTransaction(id, keys);
return getEngine().startTransaction(id.id());
}

public Map<TxID, Boolean> prepareCommit(TxID txID) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.index.shard;

import org.elasticsearch.action.bulk.TxID;
import org.elasticsearch.index.translog.Translog;

import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -21,6 +22,8 @@ public class ShardTransactionRegistry {
private final Map<String, Set<TxID>> byKey = new HashMap<>();
private final Map<TxID, Set<String>> byTxID = new HashMap<>();
private final Map<TxID, Set<String>> conflictingKeysByTxID = new HashMap<>();
private final Map<TxID, Translog.Location> translogHeads = new HashMap<>();

private final Set<TxID> prepared = new HashSet<>();
// todo: less locking and perhaps totally different content...
public synchronized void registerTransaction(TxID txID, Set<String> ids) {
Expand All @@ -37,13 +40,20 @@ public synchronized void registerTransaction(TxID txID, Set<String> ids) {
assert invariant();
}


public synchronized void releaseTransaction(TxID txID) {
byTxID.remove(txID).forEach(id -> cleanByKey(id, txID));
prepared.remove(txID);
conflictingKeysByTxID.remove(txID);
assert invariant();
}

public synchronized void loggingComplete(TxID txID, Translog.Location headOfTranslogList) {
assert translogHeads.containsKey(txID) == false;

translogHeads.put(txID, headOfTranslogList);
}

public synchronized Map<TxID, Boolean> prepare(TxID txID) {
assert byTxID.containsKey(txID);
assert prepared.contains(txID) == false;
Expand Down Expand Up @@ -99,4 +109,8 @@ Set<String> keys(TxID txID) {
public int size() {
return byTxID.size();
}

public Translog.Location translogHead(TxID txID) {
return translogHeads.get(txID);
}
}

0 comments on commit 572d7d4

Please sign in to comment.