Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remote txlog gbbafna #26

Open
wants to merge 23 commits into
base: before-txlog
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1239,7 +1239,7 @@ private void createConfiguration() {
if (nodeName != null) {
baseConfig.put("node.name", nodeName);
}
baseConfig.put("path.repo", confPathRepo.toAbsolutePath().toString());
baseConfig.put("path.repo", "/Users/gbbafna/git/OpenSearch/build/testclusters/repo");
baseConfig.put("path.data", confPathData.toAbsolutePath().toString());
baseConfig.put("path.logs", confPathLogs.toAbsolutePath().toString());
baseConfig.put("path.shared_data", workingDir.resolve("sharedData").toString());
Expand Down
10 changes: 10 additions & 0 deletions server/src/main/java/org/opensearch/ExceptionsHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,16 @@ public static <T extends Throwable> T useOrSuppress(T first, T second) {
return first;
}

public static <T extends Throwable> RuntimeException multiple(List<T> exceptions) {
RuntimeException multiple = new RuntimeException();
if (exceptions != null) {
for (Throwable t : exceptions) {
multiple.addSuppressed(t);
}
}
return multiple;
}

private static final List<Class<? extends IOException>> CORRUPTION_EXCEPTIONS = Arrays.asList(
CorruptIndexException.class,
IndexFormatTooOldException.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,9 @@ public Iterator<Setting<?>> settings() {
public static final String SETTING_REMOTE_STORE_REPOSITORY = "index.remote_store.repository";

public static final String SETTING_REMOTE_TRANSLOG_STORE_ENABLED = "index.remote_store.translog.enabled";

public static final String SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY = "index.remote_store.translog.repository";

/**
* Used to specify if the index data should be persisted in the remote store.
*/
Expand Down Expand Up @@ -396,6 +399,39 @@ public Iterator<Setting<?>> settings() {
Property.Final
);

public static final Setting<String> INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING = Setting.simpleString(
SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY,
new Setting.Validator<>() {

@Override
public void validate(final String value) {}

@Override
public void validate(final String value, final Map<Setting<?>, Object> settings) {
if (value != null && !value.isEmpty()) {
final Boolean isRemoteTranslogStoreEnabled = (Boolean) settings.get(INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING);
if (isRemoteTranslogStoreEnabled == null || isRemoteTranslogStoreEnabled == false) {
throw new IllegalArgumentException(
"Settings "
+ INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING.getKey()
+ " can only be set/enabled when "
+ INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING.getKey()
+ " is set to true"
);
}
}
}

@Override
public Iterator<Setting<?>> settings() {
final List<Setting<?>> settings = Collections.singletonList(INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING);
return settings.iterator();
}
},
Property.IndexScope,
Property.Final
);

public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas";
public static final Setting<AutoExpandReplicas> INDEX_AUTO_EXPAND_REPLICAS_SETTING = AutoExpandReplicas.SETTING;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1184,6 +1184,11 @@ List<String> getIndexSettingsValidationErrors(
IndexMetadata.SETTING_NUMBER_OF_REPLICAS,
INDEX_NUMBER_OF_REPLICAS_SETTING.getDefault(Settings.EMPTY)
);

AutoExpandReplicas autoExpandReplicas = AutoExpandReplicas.SETTING.get(settings);



Optional<String> error = awarenessReplicaBalance.validate(replicaCount);
if (error.isPresent()) {
validationErrors.add(error.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
Arrays.asList(
IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING,
IndexMetadata.INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING,
IndexMetadata.INDEX_REMOTE_STORE_REPOSITORY_SETTING
IndexMetadata.INDEX_REMOTE_STORE_REPOSITORY_SETTING,
IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class FeatureFlags {
* and false otherwise.
*/
public static boolean isEnabled(String featureFlagName) {
return "true".equalsIgnoreCase(System.getProperty(featureFlagName));
return "true".equalsIgnoreCase(System.getProperty(featureFlagName)) || featureFlagName.equals(REPLICATION_TYPE)
|| featureFlagName.equals(REMOTE_STORE);
}
}
8 changes: 6 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -94,6 +95,7 @@
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* IndexModule represents the central extension point for index level custom implementations like:
Expand Down Expand Up @@ -486,7 +488,8 @@ public IndexService newIndexService(
NamedWriteableRegistry namedWriteableRegistry,
BooleanSupplier idFieldDataEnabled,
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory,
Supplier<RepositoriesService> repositoriesServiceSupplier
) throws IOException {
final IndexEventListener eventListener = freeze();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
Expand Down Expand Up @@ -541,7 +544,8 @@ public IndexService newIndexService(
allowExpensiveQueries,
expressionResolver,
valuesSourceRegistry,
recoveryStateFactory
recoveryStateFactory,
repositoriesServiceSupplier
);
success = true;
return indexService;
Expand Down
16 changes: 13 additions & 3 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,17 @@
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.InternalTranslogFactory;
import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogFactory;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.cluster.IndicesClusterStateService;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -173,6 +176,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final IndexNameExpressionResolver expressionResolver;
private final Supplier<Sort> indexSortSupplier;
private final ValuesSourceRegistry valuesSourceRegistry;
private final Supplier<RepositoriesService> repositoriesServiceSupplier;

public IndexService(
IndexSettings indexSettings,
Expand Down Expand Up @@ -204,7 +208,8 @@ public IndexService(
BooleanSupplier allowExpensiveQueries,
IndexNameExpressionResolver expressionResolver,
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
super(indexSettings);
this.allowExpensiveQueries = allowExpensiveQueries;
Expand Down Expand Up @@ -276,6 +281,7 @@ public IndexService(
this.trimTranslogTask = new AsyncTrimTranslogTask(this);
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this);
this.repositoriesServiceSupplier = repositoriesServiceSupplier;
updateFsyncTaskIfNecessary();
}

Expand Down Expand Up @@ -518,6 +524,11 @@ public synchronized IndexShard createShard(
remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY);
}

TranslogFactory translogFactory = this.indexSettings.isRemoteTranslogStoreEnabled()
? new RemoteBlobStoreInternalTranslogFactory(repositoriesServiceSupplier, threadPool,
this.indexSettings.getRemoteStoreTranslogRepository())
: new InternalTranslogFactory();

Directory directory = directoryFactory.newDirectory(this.indexSettings, path);
store = new Store(
shardId,
Expand Down Expand Up @@ -548,8 +559,7 @@ public synchronized IndexShard createShard(
() -> globalCheckpointSyncer.accept(shardId),
retentionLeaseSyncer,
circuitBreakerService,
// TODO Replace with remote translog factory in the follow up PR
this.indexSettings.isRemoteTranslogStoreEnabled() ? null : new InternalTranslogFactory(),
translogFactory,
this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null,
remoteStore
);
Expand Down
6 changes: 6 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,7 @@ public final class IndexSettings {
private final ReplicationType replicationType;
private final boolean isRemoteStoreEnabled;
private final boolean isRemoteTranslogStoreEnabled;
private final String remoteStoreTranslogRepository;
private final String remoteStoreRepository;
// volatile fields are updated via #updateIndexMetadata(IndexMetadata) under lock
private volatile Settings settings;
Expand Down Expand Up @@ -722,6 +723,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
replicationType = ReplicationType.parseString(settings.get(IndexMetadata.SETTING_REPLICATION_TYPE));
isRemoteStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false);
isRemoteTranslogStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, false);
remoteStoreTranslogRepository = settings.get(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY);
remoteStoreRepository = settings.get(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY);
this.searchThrottled = INDEX_SEARCH_THROTTLED.get(settings);
this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings);
Expand Down Expand Up @@ -988,6 +990,10 @@ public String getRemoteStoreRepository() {
return remoteStoreRepository;
}

public String getRemoteStoreTranslogRepository() {
return remoteStoreTranslogRepository;
}

/**
* Returns the node settings. The settings returned from {@link #getSettings()} are a merged version of the
* index settings and the node settings where node settings are overwritten by index settings.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1433,6 +1433,7 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() {
* @return true if checkpoint should be processed
*/
public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) {
assert replicationTracker.isPrimaryMode();
if (state().equals(IndexShardState.STARTED) == false) {
logger.trace(() -> new ParameterizedMessage("Ignoring new replication checkpoint - shard is not started {}", state()));
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
*
* @opensearch.internal
*/
final class Checkpoint {
final public class Checkpoint {

final long offset;
final int numOps;
Expand Down Expand Up @@ -262,6 +262,10 @@ public synchronized byte[] toByteArray() {
return byteOutputStream.toByteArray();
}

public long getMinTranslogGeneration() {
return minTranslogGeneration;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public Translog newTranslog(
LongConsumer persistedSequenceNumberConsumer
) throws IOException {

return new Translog(
return new LocalTranslog(
translogConfig,
translogUUID,
translogDeletionPolicy,
Expand Down
Loading