From b87f244760ce1024eeda6c89a11d6279622b2ad6 Mon Sep 17 00:00:00 2001 From: Armin Date: Tue, 18 Jun 2019 12:20:58 +0200 Subject: [PATCH] Some Cleanup in BlobStoreRepository * Extracted from #42833: * Dry up index and shard path handling * Shorten XContent handling --- .../repositories/RepositoryData.java | 4 +- .../blobstore/BlobStoreRepository.java | 65 ++++++++----------- 2 files changed, 28 insertions(+), 41 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java index 589e0432c032d..3d0daff98936e 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java @@ -442,8 +442,7 @@ public static RepositoryData snapshotsFromXContent(final XContentParser parser, /** * Writes the incompatible snapshot ids to x-content. */ - public void incompatibleSnapshotsToXContent(final XContentBuilder builder) throws IOException { - + public XContentBuilder incompatibleSnapshotsToXContent(XContentBuilder builder) throws IOException { builder.startObject(); // write the incompatible snapshots list builder.startArray(INCOMPATIBLE_SNAPSHOTS); @@ -452,6 +451,7 @@ public void incompatibleSnapshotsToXContent(final XContentBuilder builder) throw } builder.endArray(); builder.endObject(); + return builder; } /** diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index d71192fb07b1c..037fef4a317da 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -51,8 +51,6 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.compress.NotXContentException; import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.metrics.CounterMetric; @@ -63,7 +61,6 @@ import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; -import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; @@ -374,10 +371,7 @@ public void initializeSnapshot(SnapshotId snapshotId, List indices, Met // write the index metadata for each index in the snapshot for (IndexId index : indices) { - final IndexMetaData indexMetaData = clusterMetaData.index(index.getName()); - final BlobPath indexPath = basePath().add("indices").add(index.getId()); - final BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath); - indexMetaDataFormat.write(indexMetaData, indexMetaDataBlobContainer, snapshotId.getUUID()); + indexMetaDataFormat.write(clusterMetaData.index(index.getName()), indexContainer(index), snapshotId.getUUID()); } } catch (IOException ex) { throw new SnapshotCreationException(metadata.name(), snapshotId, ex); @@ -425,7 +419,7 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Action snapshotId, ActionListener.map(listener, v -> { try { - blobStore().blobContainer(basePath().add("indices")).deleteBlobsIgnoringIfNotExists( + blobStore().blobContainer(indicesPath()).deleteBlobsIgnoringIfNotExists( unreferencedIndices.stream().map(IndexId::getId).collect(Collectors.toList())); } catch (IOException e) { logger.warn(() -> @@ -477,9 +471,8 @@ protected void doRun() { } private void deleteIndexMetaDataBlobIgnoringErrors(SnapshotId snapshotId, IndexId indexId) { - BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(basePath().add("indices").add(indexId.getId())); try { - indexMetaDataFormat.delete(indexMetaDataBlobContainer, snapshotId.getUUID()); + indexMetaDataFormat.delete(indexContainer(indexId), snapshotId.getUUID()); } catch (IOException ex) { logger.warn(() -> new ParameterizedMessage("[{}] failed to delete metadata for index [{}]", snapshotId, indexId.getName()), ex); @@ -540,8 +533,19 @@ public MetaData getSnapshotGlobalMetaData(final SnapshotId snapshotId) { @Override public IndexMetaData getSnapshotIndexMetaData(final SnapshotId snapshotId, final IndexId index) throws IOException { - final BlobPath indexPath = basePath().add("indices").add(index.getId()); - return indexMetaDataFormat.read(blobStore().blobContainer(indexPath), snapshotId.getUUID()); + return indexMetaDataFormat.read(indexContainer(index), snapshotId.getUUID()); + } + + private BlobPath indicesPath() { + return basePath().add("indices"); + } + + private BlobContainer indexContainer(IndexId indexId) { + return blobStore().blobContainer(indicesPath().add(indexId.getId())); + } + + private BlobContainer shardContainer(IndexId indexId, ShardId snapshotShardId) { + return blobStore().blobContainer(indicesPath().add(indexId.getId()).add(Integer.toString(snapshotShardId.getId()))); } /** @@ -589,10 +593,9 @@ public String startVerification() { String seed = UUIDs.randomBase64UUID(); byte[] testBytes = Strings.toUTF8Bytes(seed); BlobContainer testContainer = blobStore().blobContainer(basePath().add(testBlobPrefix(seed))); - String blobName = "master.dat"; BytesArray bytes = new BytesArray(testBytes); try (InputStream stream = bytes.streamInput()) { - testContainer.writeBlobAtomic(blobName, stream, bytes.length(), true); + testContainer.writeBlobAtomic("master.dat", stream, bytes.length(), true); } return seed; } @@ -665,7 +668,7 @@ public RepositoryData getRepositoryData() { } } - public static String testBlobPrefix(String seed) { + private static String testBlobPrefix(String seed) { return TESTS_FILE + seed; } @@ -685,19 +688,11 @@ protected void writeIndexGen(final RepositoryData repositoryData, final long rep "] - possibly due to simultaneous snapshot deletion requests"); } final long newGen = currentGen + 1; - final BytesReference snapshotsBytes; - try (BytesStreamOutput bStream = new BytesStreamOutput()) { - try (StreamOutput stream = new OutputStreamStreamOutput(bStream)) { - XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream); - repositoryData.snapshotsToXContent(builder); - builder.close(); - } - snapshotsBytes = bStream.bytes(); - } // write the index file final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen); logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob); - writeAtomic(indexBlob, snapshotsBytes, true); + writeAtomic( + indexBlob, BytesReference.bytes(repositoryData.snapshotsToXContent(XContentFactory.contentBuilder(XContentType.JSON))), true); // write the current generation to the index-latest file final BytesReference genBytes; try (BytesStreamOutput bStream = new BytesStreamOutput()) { @@ -724,16 +719,10 @@ protected void writeIndexGen(final RepositoryData repositoryData, final long rep */ void writeIncompatibleSnapshots(RepositoryData repositoryData) throws IOException { assert isReadOnly() == false; // can not write to a read only repository - final BytesReference bytes; - try (BytesStreamOutput bStream = new BytesStreamOutput()) { - try (StreamOutput stream = new OutputStreamStreamOutput(bStream); - XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream)) { - repositoryData.incompatibleSnapshotsToXContent(builder); - } - bytes = bStream.bytes(); - } // write the incompatible snapshots blob - writeAtomic(INCOMPATIBLE_SNAPSHOTS_BLOB, bytes, false); + writeAtomic(INCOMPATIBLE_SNAPSHOTS_BLOB, + BytesReference.bytes(repositoryData.incompatibleSnapshotsToXContent(XContentFactory.contentBuilder(XContentType.JSON))), + false); } /** @@ -826,9 +815,8 @@ public void restoreShard(Store store, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) { ShardId shardId = store.shardId(); final Context context = new Context(snapshotId, indexId, shardId, snapshotShardId); - BlobPath path = basePath().add("indices").add(indexId.getId()).add(Integer.toString(snapshotShardId.getId())); - BlobContainer blobContainer = blobStore().blobContainer(path); - final RestoreContext snapshotContext = new RestoreContext(shardId, snapshotId, recoveryState, blobContainer); + final RestoreContext snapshotContext = + new RestoreContext(shardId, snapshotId, recoveryState, shardContainer(indexId, snapshotShardId)); try { BlobStoreIndexShardSnapshot snapshot = context.loadSnapshot(); SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()); @@ -904,8 +892,7 @@ private class Context { Context(SnapshotId snapshotId, IndexId indexId, ShardId shardId, ShardId snapshotShardId) { this.snapshotId = snapshotId; this.shardId = shardId; - blobContainer = blobStore().blobContainer(basePath().add("indices").add(indexId.getId()) - .add(Integer.toString(snapshotShardId.getId()))); + blobContainer = shardContainer(indexId, snapshotShardId); } /**