Skip to content

Commit

Permalink
Enable trace log in FollowerFailOverIT (elastic#38148)
Browse files Browse the repository at this point in the history
This suite still fails one per week sometimes with a worrying assertion.
Sadly we are still unable to find the actual source.

Expected: <SeqNoStats{maxSeqNo=229, localCheckpoint=86, globalCheckpoint=86}>
but: was   <SeqNoStats{maxSeqNo=229, localCheckpoint=-1, globalCheckpoint=86}>

This change enables trace log in the suite so we will have a better
picture if this fails again.

Relates elastic#3333
  • Loading branch information
dnhatn committed Feb 3, 2019
1 parent 4cf0391 commit ba1ec32
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ public TransportBulkShardOperationsAction(
@Override
protected WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> shardOperationOnPrimary(
final BulkShardOperationsRequest request, final IndexShard primary) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("index [{}] on the following primary shard {}", request.getOperations(), primary.routingEntry());
}
return shardOperationOnPrimary(request.shardId(), request.getHistoryUUID(), request.getOperations(),
request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger);
}
Expand Down Expand Up @@ -137,6 +140,10 @@ public static CcrWritePrimaryResult shardOperationOnPrimary(
// replicated to replicas but with the existing primary term (not the current primary term) in order
// to guarantee the consistency between the primary and replicas, and between translog and Lucene index.
final AlreadyProcessedFollowingEngineException failure = (AlreadyProcessedFollowingEngineException) result.getFailure();
if (logger.isTraceEnabled()) {
logger.trace("operation [{}] was processed before on following primary shard {} with existing term {}",
targetOp, primary.routingEntry(), failure.getExistingPrimaryTerm());
}
assert failure.getSeqNo() == targetOp.seqNo() : targetOp.seqNo() + " != " + failure.getSeqNo();
if (failure.getExistingPrimaryTerm().isPresent()) {
appliedOperations.add(rewriteOperationWithPrimaryTerm(sourceOp, failure.getExistingPrimaryTerm().getAsLong()));
Expand All @@ -159,6 +166,9 @@ public static CcrWritePrimaryResult shardOperationOnPrimary(
@Override
protected WriteReplicaResult<BulkShardOperationsRequest> shardOperationOnReplica(
final BulkShardOperationsRequest request, final IndexShard replica) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("index [{}] on the following replica shard {}", request.getOperations(), replica.routingEntry());
}
return shardOperationOnReplica(request, replica, logger);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Ind
final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes();
assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
if (hasBeenProcessedBefore(index)) {
if (logger.isTraceEnabled()) {
logger.trace("index operation [id={} seq_no={} origin={}] was processed before", index.id(), index.seqNo(), index.origin());
}
if (index.origin() == Operation.Origin.PRIMARY) {
/*
* The existing operation in this engine was probably assigned the term of the previous primary shard which is different
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,13 @@ public static ResumeFollowAction.Request resumeFollow(String followerIndex) {
* on the follower equal the leader's; then verifies the existing pairs of (docId, seqNo) on the follower also equal the leader.
*/
protected void assertIndexFullyReplicatedToFollower(String leaderIndex, String followerIndex) throws Exception {
logger.info("--> asserting <<docId,seqNo>> between {} and {}", leaderIndex, followerIndex);
assertBusy(() -> {
Map<Integer, List<DocIdSeqNoAndTerm>> docsOnFollower = getDocIdAndSeqNos(clusterGroup.followerCluster, followerIndex);
logger.info("--> docs on the follower {}", docsOnFollower);
assertThat(docsOnFollower, equalTo(getDocIdAndSeqNos(clusterGroup.leaderCluster, leaderIndex)));
}, 120, TimeUnit.SECONDS);

logger.info("--> asserting seq_no_stats between {} and {}", leaderIndex, followerIndex);
assertBusy(() -> {
Map<Integer, SeqNoStats> leaderStats = new HashMap<>();
Expand All @@ -463,13 +470,8 @@ protected void assertIndexFullyReplicatedToFollower(String leaderIndex, String f
}
followerStats.put(shardStat.getShardRouting().shardId().id(), shardStat.getSeqNoStats());
}
assertThat(leaderStats, equalTo(followerStats));
}, 60, TimeUnit.SECONDS);
logger.info("--> asserting <<docId,seqNo>> between {} and {}", leaderIndex, followerIndex);
assertBusy(() -> {
assertThat(getDocIdAndSeqNos(clusterGroup.leaderCluster, leaderIndex),
equalTo(getDocIdAndSeqNos(clusterGroup.followerCluster, followerIndex)));
}, 60, TimeUnit.SECONDS);
assertThat(followerStats, equalTo(leaderStats));
}, 120, TimeUnit.SECONDS);
}

private Map<Integer, List<DocIdSeqNoAndTerm>> getDocIdAndSeqNos(InternalTestCluster cluster, String index) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.xpack.CcrIntegTestCase;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
Expand All @@ -44,6 +45,7 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

@TestLogging("org.elasticsearch.xpack.ccr:TRACE,org.elasticsearch.index.shard:DEBUG")
public class FollowerFailOverIT extends CcrIntegTestCase {

@Override
Expand Down

0 comments on commit ba1ec32

Please sign in to comment.