Skip to content

Commit

Permalink
[Remote Cluster State] Parallel and Multipart IndexMetadata uploads
Browse files Browse the repository at this point in the history
Signed-off-by: bansvaru <[email protected]>
  • Loading branch information
linuxpi committed Aug 31, 2023
1 parent 79e5aee commit 7a6cd4e
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.Version;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.Nullable;
Expand All @@ -21,10 +23,12 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat;

Expand All @@ -35,7 +39,11 @@
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
Expand Down Expand Up @@ -127,23 +135,12 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState) thro
}
ensureRepositorySet();

final List<ClusterMetadataManifest.UploadedIndexMetadata> allUploadedIndexMetadata = new ArrayList<>();
// todo parallel upload
// any validations before/after upload ?
for (IndexMetadata indexMetadata : clusterState.metadata().indices().values()) {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX/metadata_4_1690947200
final String indexMetadataKey = writeIndexMetadata(
clusterState.getClusterName().value(),
clusterState.getMetadata().clusterUUID(),
indexMetadata,
indexMetadataFileName(indexMetadata)
);
final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata(
indexMetadata.getIndex().getName(),
indexMetadata.getIndexUUID(),
indexMetadataKey
);
allUploadedIndexMetadata.add(uploadedIndexMetadata);
final List<UploadedIndexMetadata> allUploadedIndexMetadata;
try {
allUploadedIndexMetadata = writeIndexMetadata(clusterState, new ArrayList<>(clusterState.metadata().indices().values()));
} catch (InterruptedException | TimeoutException e) {
throw new RuntimeException(e);
}
final ClusterMetadataManifest manifest = uploadManifest(clusterState, allUploadedIndexMetadata, false);
final long durationMillis = relativeTimeMillisSupplier.getAsLong() - startTimeMillis;
Expand Down Expand Up @@ -194,6 +191,9 @@ public ClusterMetadataManifest writeIncrementalMetadata(
final Map<String, ClusterMetadataManifest.UploadedIndexMetadata> allUploadedIndexMetadata = previousManifest.getIndices()
.stream()
.collect(Collectors.toMap(UploadedIndexMetadata::getIndexName, Function.identity()));

List<IndexMetadata> toUpload = new ArrayList<>();

for (final IndexMetadata indexMetadata : clusterState.metadata().indices().values()) {
final Long previousVersion = previousStateIndexMetadataVersionByName.get(indexMetadata.getIndex().getName());
if (previousVersion == null || indexMetadata.getVersion() != previousVersion) {
Expand All @@ -204,24 +204,19 @@ public ClusterMetadataManifest writeIncrementalMetadata(
indexMetadata.getVersion()
);
numIndicesUpdated++;
final String indexMetadataKey = writeIndexMetadata(
clusterState.getClusterName().value(),
clusterState.getMetadata().clusterUUID(),
indexMetadata,
indexMetadataFileName(indexMetadata)
);
final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata(
indexMetadata.getIndex().getName(),
indexMetadata.getIndexUUID(),
indexMetadataKey
);
allUploadedIndexMetadata.put(indexMetadata.getIndex().getName(), uploadedIndexMetadata);
toUpload.add(indexMetadata);
} else {
numIndicesUnchanged++;
}
previousStateIndexMetadataVersionByName.remove(indexMetadata.getIndex().getName());
}

try {
writeIndexMetadata(clusterState, toUpload);
} catch (InterruptedException | TimeoutException e) {
throw new RuntimeException(e);
}

for (String removedIndexName : previousStateIndexMetadataVersionByName.keySet()) {
allUploadedIndexMetadata.remove(removedIndexName);
}
Expand Down Expand Up @@ -251,6 +246,72 @@ public ClusterMetadataManifest writeIncrementalMetadata(
return manifest;
}

private List<UploadedIndexMetadata> writeIndexMetadata(ClusterState clusterState, List<IndexMetadata> toUpload) throws IOException,
InterruptedException, TimeoutException {
List<Exception> exceptionList = new ArrayList<>(toUpload.size());
final CountDownLatch latch = new CountDownLatch(toUpload.size());
List<UploadedIndexMetadata> result = new ArrayList<>(toUpload.size());

LatchedActionListener<IndexMetadata> latchedActionListener = new LatchedActionListener<>(ActionListener.wrap((IndexMetadata t) -> {
logger.trace(String.format(Locale.ROOT, "IndexMetadata uploaded successfully for %s", t.getIndex().toString()));
}, ex -> {
assert ex instanceof IndexMetadataTransferException;
logger.error(
() -> new ParameterizedMessage(
"Exception during transfer of IndexMetadata to Remote {}",
((IndexMetadataTransferException) ex).getIndexMetadata().getIndex().toString()
),
ex
);
exceptionList.add(ex);
}), latch);

for (IndexMetadata indexMetadata : toUpload) {
final UploadedIndexMetadata uploadedIndexMetadata = writeIndexMetadata(clusterState, latchedActionListener, indexMetadata);
result.add(uploadedIndexMetadata);
}

try {
if (latch.await(20000, TimeUnit.MILLISECONDS) == false) {
TimeoutException ex = new TimeoutException("Timed out waiting for transfer of index metadata to complete");
exceptionList.forEach(ex::addSuppressed);
throw ex;
}
} catch (InterruptedException ex) {
exceptionList.forEach(ex::addSuppressed);
Thread.currentThread().interrupt();
throw ex;
}

return result;
}

private UploadedIndexMetadata writeIndexMetadata(
ClusterState clusterState,
LatchedActionListener<IndexMetadata> latchedActionListener,
IndexMetadata indexMetadata
) throws IOException {
final BlobContainer indexMetadataContainer = indexMetadataContainer(
clusterState.getClusterName().value(),
clusterState.metadata().clusterUUID(),
indexMetadata.getIndexUUID()
);

ActionListener<Void> completionListener = ActionListener.wrap(
resp -> latchedActionListener.onResponse(indexMetadata),
ex -> latchedActionListener.onFailure(new IndexMetadataTransferException(indexMetadata, ex))
);
INDEX_METADATA_FORMAT.writeAsync(
indexMetadata,
indexMetadataContainer,
indexMetadataFileName(indexMetadata),
blobStoreRepository.getCompressor(),
completionListener
);
final String indexMetadataKey = indexMetadataContainer.path().buildAsString() + indexMetadataFileName(indexMetadata);
return new UploadedIndexMetadata(indexMetadata.getIndex().getName(), indexMetadata.getIndexUUID(), indexMetadataKey);
}

@Nullable
public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterState, ClusterMetadataManifest previousManifest)
throws IOException {
Expand Down Expand Up @@ -282,7 +343,12 @@ void ensureRepositorySet() {
}
final String remoteStoreRepo = REMOTE_CLUSTER_STATE_REPOSITORY_SETTING.get(settings);
assert remoteStoreRepo != null : "Remote Cluster State repository is not configured";
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
final Repository repository;
try {
repository = repositoriesService.get().repository(remoteStoreRepo);
} catch (RepositoryMissingException e) {
return;
}
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
blobStoreRepository = (BlobStoreRepository) repository;
}
Expand Down Expand Up @@ -367,4 +433,17 @@ private static String indexMetadataFileName(IndexMetadata indexMetadata) {
return String.join(DELIMITER, "metadata", String.valueOf(indexMetadata.getVersion()), String.valueOf(System.currentTimeMillis()));
}

class IndexMetadataTransferException extends RuntimeException {

private final IndexMetadata indexMetadata;

public IndexMetadataTransferException(IndexMetadata indexMetadata, Throwable cause) {
super(cause);
this.indexMetadata = indexMetadata;
}

public IndexMetadata getIndexMetadata() {
return indexMetadata;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,18 @@
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.common.CheckedFunction;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.blobstore.transfer.RemoteTransferContainer;
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream;
import org.opensearch.common.io.Streams;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.common.lucene.store.IndexOutputOutputStream;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.xcontent.MediaTypeRegistry;
Expand Down Expand Up @@ -167,6 +172,46 @@ public void write(final T obj, final BlobContainer blobContainer, final String n
blobContainer.writeBlob(blobName, bytes.streamInput(), bytes.length(), false);
}

/**
* Writes blob with resolving the blob name using {@link #blobName} method.
* Leverages the multipart upload if supported by the blobContainer.
*
* @param obj object to be serialized
* @param blobContainer blob container
* @param name blob name
* @param compressor whether to use compression
* @param listener listener to listen to write result
*/
public void writeAsync(
final T obj,
final BlobContainer blobContainer,
final String name,
final Compressor compressor,
ActionListener<Void> listener
) throws IOException {
if (blobContainer instanceof VerifyingMultiStreamBlobContainer == false) {
write(obj, blobContainer, name, compressor);
return;
}
final String blobName = blobName(name);
final BytesReference bytes = serialize(obj, blobName, compressor);

IndexInput input = new ByteArrayIndexInput("", BytesReference.toBytes(bytes));

RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
blobName,
blobName,
bytes.length(),
true,
WritePriority.HIGH,
(size, position) -> new OffsetRangeIndexInputStream(input, size, position),
CodecUtil.checksumEntireFile(input),
true
);

((VerifyingMultiStreamBlobContainer) blobContainer).asyncBlobUpload(remoteTransferContainer.createWriteContext(), listener);
}

public BytesReference serialize(final T obj, final String blobName, final Compressor compressor) throws IOException {
try (BytesStreamOutput outputStream = new BytesStreamOutput()) {
try (
Expand Down

0 comments on commit 7a6cd4e

Please sign in to comment.