Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some Cleanup in BlobStoreRepository (#43323) #44043

Merged
merged 1 commit into from
Jul 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ public List<IndexId> resolveNewIndices(final List<String> indicesToResolve) {
* Writes the snapshots metadata and the related indices metadata to x-content, omitting the
* incompatible snapshots.
*/
public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final ToXContent.Params params) throws IOException {
public XContentBuilder snapshotsToXContent(final XContentBuilder builder) throws IOException {
builder.startObject();
// write the snapshots list
builder.startArray(SNAPSHOTS);
Expand Down Expand Up @@ -453,14 +453,12 @@ public static RepositoryData snapshotsFromXContent(final XContentParser parser,
/**
* Writes the incompatible snapshot ids to x-content.
*/
public XContentBuilder incompatibleSnapshotsToXContent(final XContentBuilder builder, final ToXContent.Params params)
throws IOException {

public XContentBuilder incompatibleSnapshotsToXContent(XContentBuilder builder) throws IOException {
builder.startObject();
// write the incompatible snapshots list
builder.startArray(INCOMPATIBLE_SNAPSHOTS);
for (final SnapshotId snapshot : getIncompatibleSnapshotIds()) {
snapshot.toXContent(builder, params);
snapshot.toXContent(builder, ToXContent.EMPTY_PARAMS);
}
builder.endArray();
builder.endObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.compress.NotXContentException;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.metrics.CounterMetric;
Expand All @@ -62,8 +60,6 @@
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
Expand Down Expand Up @@ -401,10 +397,7 @@ public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, Met

// write the index metadata for each index in the snapshot
for (IndexId index : indices) {
final IndexMetaData indexMetaData = clusterMetaData.index(index.getName());
final BlobPath indexPath = basePath().add("indices").add(index.getId());
final BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath);
indexMetaDataFormat.write(indexMetaData, indexMetaDataBlobContainer, snapshotId.getUUID());
indexMetaDataFormat.write(clusterMetaData.index(index.getName()), indexContainer(index), snapshotId.getUUID());
}
} catch (IOException ex) {
throw new SnapshotCreationException(metadata.name(), snapshotId, ex);
Expand Down Expand Up @@ -452,7 +445,7 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Action
snapshotId,
ActionListener.map(listener, v -> {
try {
blobStore().blobContainer(basePath().add("indices")).deleteBlobsIgnoringIfNotExists(
blobStore().blobContainer(indicesPath()).deleteBlobsIgnoringIfNotExists(
unreferencedIndices.stream().map(IndexId::getId).collect(Collectors.toList()));
} catch (IOException e) {
logger.warn(() ->
Expand Down Expand Up @@ -504,9 +497,8 @@ protected void doRun() {
}

private void deleteIndexMetaDataBlobIgnoringErrors(SnapshotId snapshotId, IndexId indexId) {
BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(basePath().add("indices").add(indexId.getId()));
try {
indexMetaDataFormat.delete(indexMetaDataBlobContainer, snapshotId.getUUID());
indexMetaDataFormat.delete(indexContainer(indexId), snapshotId.getUUID());
} catch (IOException ex) {
logger.warn(() -> new ParameterizedMessage("[{}] failed to delete metadata for index [{}]",
snapshotId, indexId.getName()), ex);
Expand Down Expand Up @@ -570,8 +562,19 @@ public MetaData getSnapshotGlobalMetaData(final SnapshotId snapshotId) {

@Override
public IndexMetaData getSnapshotIndexMetaData(final SnapshotId snapshotId, final IndexId index) throws IOException {
final BlobPath indexPath = basePath().add("indices").add(index.getId());
return indexMetaDataFormat.read(blobStore().blobContainer(indexPath), snapshotId.getUUID());
return indexMetaDataFormat.read(indexContainer(index), snapshotId.getUUID());
}

private BlobPath indicesPath() {
return basePath().add("indices");
}

private BlobContainer indexContainer(IndexId indexId) {
return blobStore().blobContainer(indicesPath().add(indexId.getId()));
}

private BlobContainer shardContainer(IndexId indexId, ShardId shardId) {
return blobStore().blobContainer(indicesPath().add(indexId.getId()).add(Integer.toString(shardId.getId())));
}

/**
Expand Down Expand Up @@ -619,10 +622,9 @@ public String startVerification() {
String seed = UUIDs.randomBase64UUID();
byte[] testBytes = Strings.toUTF8Bytes(seed);
BlobContainer testContainer = blobStore().blobContainer(basePath().add(testBlobPrefix(seed)));
String blobName = "master.dat";
BytesArray bytes = new BytesArray(testBytes);
try (InputStream stream = bytes.streamInput()) {
testContainer.writeBlobAtomic(blobName, stream, bytes.length(), true);
testContainer.writeBlobAtomic("master.dat", stream, bytes.length(), true);
}
return seed;
}
Expand Down Expand Up @@ -695,7 +697,7 @@ public RepositoryData getRepositoryData() {
}
}

public static String testBlobPrefix(String seed) {
private static String testBlobPrefix(String seed) {
return TESTS_FILE + seed;
}

Expand All @@ -715,19 +717,10 @@ protected void writeIndexGen(final RepositoryData repositoryData, final long rep
"] - possibly due to simultaneous snapshot deletion requests");
}
final long newGen = currentGen + 1;
final BytesReference snapshotsBytes;
try (BytesStreamOutput bStream = new BytesStreamOutput()) {
try (StreamOutput stream = new OutputStreamStreamOutput(bStream)) {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream);
repositoryData.snapshotsToXContent(builder, ToXContent.EMPTY_PARAMS);
builder.close();
}
snapshotsBytes = bStream.bytes();
}
// write the index file
final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen);
logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob);
writeAtomic(indexBlob, snapshotsBytes, true);
writeAtomic(indexBlob, BytesReference.bytes(repositoryData.snapshotsToXContent(XContentFactory.jsonBuilder())), true);
// write the current generation to the index-latest file
final BytesReference genBytes;
try (BytesStreamOutput bStream = new BytesStreamOutput()) {
Expand All @@ -754,17 +747,9 @@ protected void writeIndexGen(final RepositoryData repositoryData, final long rep
*/
void writeIncompatibleSnapshots(RepositoryData repositoryData) throws IOException {
assert isReadOnly() == false; // can not write to a read only repository
final BytesReference bytes;
try (BytesStreamOutput bStream = new BytesStreamOutput()) {
try (StreamOutput stream = new OutputStreamStreamOutput(bStream)) {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream);
repositoryData.incompatibleSnapshotsToXContent(builder, ToXContent.EMPTY_PARAMS);
builder.close();
}
bytes = bStream.bytes();
}
// write the incompatible snapshots blob
writeAtomic(INCOMPATIBLE_SNAPSHOTS_BLOB, bytes, false);
writeAtomic(INCOMPATIBLE_SNAPSHOTS_BLOB,
BytesReference.bytes(repositoryData.incompatibleSnapshotsToXContent(XContentFactory.jsonBuilder())), false);
}

/**
Expand Down Expand Up @@ -857,9 +842,8 @@ public void restoreShard(Store store, SnapshotId snapshotId,
Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
ShardId shardId = store.shardId();
final Context context = new Context(snapshotId, indexId, shardId, snapshotShardId);
BlobPath path = basePath().add("indices").add(indexId.getId()).add(Integer.toString(snapshotShardId.getId()));
BlobContainer blobContainer = blobStore().blobContainer(path);
final RestoreContext snapshotContext = new RestoreContext(shardId, snapshotId, recoveryState, blobContainer);
final RestoreContext snapshotContext =
new RestoreContext(shardId, snapshotId, recoveryState, shardContainer(indexId, snapshotShardId));
try {
BlobStoreIndexShardSnapshot snapshot = context.loadSnapshot();
SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles());
Expand Down Expand Up @@ -935,8 +919,7 @@ private class Context {
Context(SnapshotId snapshotId, IndexId indexId, ShardId shardId, ShardId snapshotShardId) {
this.snapshotId = snapshotId;
this.shardId = shardId;
blobContainer = blobStore().blobContainer(basePath().add("indices").add(indexId.getId())
.add(Integer.toString(snapshotShardId.getId())));
blobContainer = shardContainer(indexId, snapshotShardId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
Expand Down Expand Up @@ -61,7 +60,7 @@ public void testEqualsAndHashCode() {
public void testXContent() throws IOException {
RepositoryData repositoryData = generateRandomRepoData();
XContentBuilder builder = JsonXContent.contentBuilder();
repositoryData.snapshotsToXContent(builder, ToXContent.EMPTY_PARAMS);
repositoryData.snapshotsToXContent(builder);
try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) {
long gen = (long) randomIntBetween(0, 500);
RepositoryData fromXContent = RepositoryData.snapshotsFromXContent(parser, gen);
Expand Down Expand Up @@ -166,7 +165,7 @@ public void testIndexThatReferencesAnUnknownSnapshot() throws IOException {
final RepositoryData repositoryData = generateRandomRepoData();

XContentBuilder builder = XContentBuilder.builder(xContent);
repositoryData.snapshotsToXContent(builder, ToXContent.EMPTY_PARAMS);
repositoryData.snapshotsToXContent(builder);
RepositoryData parsedRepositoryData;
try (XContentParser xParser = createParser(builder)) {
parsedRepositoryData = RepositoryData.snapshotsFromXContent(xParser, repositoryData.getGenId());
Expand Down Expand Up @@ -197,7 +196,7 @@ public void testIndexThatReferencesAnUnknownSnapshot() throws IOException {
indexSnapshots, new ArrayList<>(parsedRepositoryData.getIncompatibleSnapshotIds()));

final XContentBuilder corruptedBuilder = XContentBuilder.builder(xContent);
corruptedRepositoryData.snapshotsToXContent(corruptedBuilder, ToXContent.EMPTY_PARAMS);
corruptedRepositoryData.snapshotsToXContent(corruptedBuilder);

try (XContentParser xParser = createParser(corruptedBuilder)) {
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () ->
Expand Down