Skip to content

Commit

Permalink
Check index setting for source mode in SourceOnlySnapshotRepository (#…
Browse files Browse the repository at this point in the history
…116002) (#116014)

* Check index setting for source mode in SourceOnlySnapshotRepository

* update

* Revert "update"

This reverts commit 9bbf049.

(cherry picked from commit 37a4ee3)
  • Loading branch information
kkrik-es authored Oct 31, 2024
1 parent 1151ef4 commit af02795
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ public SourceFieldMapper build() {
}

private Mode resolveSourceMode() {
// If the `index.mapper.source.mode` exists it takes precedence to determine the source mode for `_source`
// If the `index.mapping.source.mode` exists it takes precedence to determine the source mode for `_source`
// otherwise the mode is determined according to `_source.mode`.
if (INDEX_MAPPER_SOURCE_MODE_SETTING.exists(settings)) {
return INDEX_MAPPER_SOURCE_MODE_SETTING.get(settings);
Expand Down Expand Up @@ -439,6 +439,10 @@ public static boolean isSynthetic(IndexSettings indexSettings) {
return INDEX_MAPPER_SOURCE_MODE_SETTING.get(indexSettings.getSettings()) == SourceFieldMapper.Mode.SYNTHETIC;
}

public static boolean isStored(IndexSettings indexSettings) {
return INDEX_MAPPER_SOURCE_MODE_SETTING.get(indexSettings.getSettings()) == Mode.STORED;
}

public boolean isDisabled() {
return mode == Mode.DISABLED;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.ReadOnlyEngine;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.repositories.FilterRepository;
Expand Down Expand Up @@ -139,8 +140,9 @@ private static Metadata metadataToSnapshot(Collection<IndexId> indices, Metadata
@Override
public void snapshotShard(SnapshotShardContext context) {
final MapperService mapperService = context.mapperService();
if (mapperService.documentMapper() != null // if there is no mapping this is null
&& mapperService.documentMapper().sourceMapper().isComplete() == false) {
if ((mapperService.documentMapper() != null // if there is no mapping this is null
&& mapperService.documentMapper().sourceMapper().isComplete() == false)
|| (mapperService.documentMapper() == null && SourceFieldMapper.isStored(mapperService.getIndexSettings()) == false)) {
context.onFailure(
new IllegalStateException(
"Can't snapshot _source only on an index that has incomplete source ie. has _source disabled or filters the source"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.fieldvisitor.FieldsVisitor;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
import org.elasticsearch.index.seqno.SeqNoStats;
Expand Down Expand Up @@ -149,6 +150,55 @@ public void testSourceIncomplete() throws IOException {
closeShards(shard);
}

public void testSourceIncompleteSyntheticSourceNoDoc() throws IOException {
ShardRouting shardRouting = shardRoutingBuilder(
new ShardId("index", "_na_", 0),
randomAlphaOfLength(10),
true,
ShardRoutingState.INITIALIZING
).withRecoverySource(RecoverySource.EmptyStoreRecoverySource.INSTANCE).build();
Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current())
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(SourceFieldMapper.INDEX_MAPPER_SOURCE_MODE_SETTING.getKey(), "synthetic")
.build();
IndexMetadata metadata = IndexMetadata.builder(shardRouting.getIndexName()).settings(settings).primaryTerm(0, primaryTerm).build();
IndexShard shard = newShard(shardRouting, metadata, null, new InternalEngineFactory());
recoverShardFromStore(shard);
SnapshotId snapshotId = new SnapshotId("test", "test");
IndexId indexId = new IndexId(shard.shardId().getIndexName(), shard.shardId().getIndex().getUUID());
SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository());
repository.start();
try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) {
IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(new ShardGeneration(-1L));
final PlainActionFuture<ShardSnapshotResult> future = new PlainActionFuture<>();
runAsSnapshot(
shard.getThreadPool(),
() -> repository.snapshotShard(
new SnapshotShardContext(
shard.store(),
shard.mapperService(),
snapshotId,
indexId,
new SnapshotIndexCommit(snapshotRef),
null,
indexShardSnapshotStatus,
IndexVersion.current(),
randomMillisUpToYear9999(),
future
)
)
);
IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, future::actionGet);
assertEquals(
"Can't snapshot _source only on an index that has incomplete source ie. has _source disabled or filters the source",
illegalStateException.getMessage()
);
}
closeShards(shard);
}

public void testIncrementalSnapshot() throws IOException {
IndexShard shard = newStartedShard();
for (int i = 0; i < 10; i++) {
Expand Down

0 comments on commit af02795

Please sign in to comment.