Skip to content

Commit

Permalink
Blob store compression fix (#39073)
Browse files Browse the repository at this point in the history
Blob store compression was not enabled for some of the files in
snapshots due to constructor accessing sub-class fields. Fixed to
instead accept compress field as constructor param. Also fixed chunk
size validation to work.

Deprecated repositories.fs.compress setting as well to be able to unify
in a future commit.
  • Loading branch information
henningandersen committed Feb 20, 2019
1 parent ab1f6c9 commit 3f09d61
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public class URLRepository extends BlobStoreRepository {
*/
public URLRepository(RepositoryMetaData metadata, Environment environment,
NamedXContentRegistry namedXContentRegistry) {
super(metadata, environment.settings(), namedXContentRegistry);
super(metadata, environment.settings(), false, namedXContentRegistry);

if (URL_SETTING.exists(metadata.settings()) == false && REPOSITORIES_URL_SETTING.exists(environment.settings()) == false) {
throw new RepositoryException(metadata.name(), "missing url");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.microsoft.azure.storage.LocationMode;
import com.microsoft.azure.storage.StorageException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
Expand Down Expand Up @@ -82,16 +81,14 @@ public static final class Repository {

private final BlobPath basePath;
private final ByteSizeValue chunkSize;
private final boolean compress;
private final Environment environment;
private final AzureStorageService storageService;
private final boolean readonly;

public AzureRepository(RepositoryMetaData metadata, Environment environment, NamedXContentRegistry namedXContentRegistry,
AzureStorageService storageService) {
super(metadata, environment.settings(), namedXContentRegistry);
super(metadata, environment.settings(), Repository.COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry);
this.chunkSize = Repository.CHUNK_SIZE_SETTING.get(metadata.settings());
this.compress = Repository.COMPRESS_SETTING.get(metadata.settings());
this.environment = environment;
this.storageService = storageService;

Expand Down Expand Up @@ -132,7 +129,7 @@ protected AzureBlobStore createBlobStore() throws URISyntaxException, StorageExc

logger.debug((org.apache.logging.log4j.util.Supplier<?>) () -> new ParameterizedMessage(
"using container [{}], chunk_size [{}], compress [{}], base_path [{}]",
blobStore, chunkSize, compress, basePath));
blobStore, chunkSize, isCompress(), basePath));
return blobStore;
}

Expand All @@ -141,14 +138,6 @@ protected BlobPath basePath() {
return basePath;
}

/**
* {@inheritDoc}
*/
@Override
protected boolean isCompress() {
return compress;
}

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,14 @@ class GoogleCloudStorageRepository extends BlobStoreRepository {
private final Settings settings;
private final GoogleCloudStorageService storageService;
private final BlobPath basePath;
private final boolean compress;
private final ByteSizeValue chunkSize;
private final String bucket;
private final String clientName;

GoogleCloudStorageRepository(RepositoryMetaData metadata, Environment environment,
NamedXContentRegistry namedXContentRegistry,
GoogleCloudStorageService storageService) {
super(metadata, environment.settings(), namedXContentRegistry);
super(metadata, environment.settings(), getSetting(COMPRESS, metadata), namedXContentRegistry);
this.settings = environment.settings();
this.storageService = storageService;

Expand All @@ -85,11 +84,10 @@ class GoogleCloudStorageRepository extends BlobStoreRepository {
this.basePath = BlobPath.cleanPath();
}

this.compress = getSetting(COMPRESS, metadata);
this.chunkSize = getSetting(CHUNK_SIZE, metadata);
this.bucket = getSetting(BUCKET, metadata);
this.clientName = CLIENT_NAME.get(metadata.settings());
logger.debug("using bucket [{}], base_path [{}], chunk_size [{}], compress [{}]", bucket, basePath, chunkSize, compress);
logger.debug("using bucket [{}], base_path [{}], chunk_size [{}], compress [{}]", bucket, basePath, chunkSize, isCompress());
}

@Override
Expand All @@ -102,11 +100,6 @@ protected BlobPath basePath() {
return basePath;
}

@Override
protected boolean isCompress() {
return compress;
}

@Override
protected ByteSizeValue chunkSize() {
return chunkSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ public final class HdfsRepository extends BlobStoreRepository {

private final Environment environment;
private final ByteSizeValue chunkSize;
private final boolean compress;
private final BlobPath basePath = BlobPath.cleanPath();
private final URI uri;
private final String pathSetting;
Expand All @@ -69,11 +68,10 @@ public final class HdfsRepository extends BlobStoreRepository {

public HdfsRepository(RepositoryMetaData metadata, Environment environment,
NamedXContentRegistry namedXContentRegistry) {
super(metadata, environment.settings(), namedXContentRegistry);
super(metadata, environment.settings(), metadata.settings().getAsBoolean("compress", false), namedXContentRegistry);

this.environment = environment;
this.chunkSize = metadata.settings().getAsBytesSize("chunk_size", null);
this.compress = metadata.settings().getAsBoolean("compress", false);

String uriSetting = getMetadata().settings().get("uri");
if (Strings.hasText(uriSetting) == false) {
Expand Down Expand Up @@ -239,11 +237,6 @@ protected BlobPath basePath() {
return basePath;
}

@Override
protected boolean isCompress() {
return compress;
}

@Override
protected ByteSizeValue chunkSize() {
return chunkSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

package org.elasticsearch.repositories.s3;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobPath;
Expand Down Expand Up @@ -155,8 +155,6 @@ class S3Repository extends BlobStoreRepository {

private final ByteSizeValue chunkSize;

private final boolean compress;

private final BlobPath basePath;

private final boolean serverSideEncryption;
Expand All @@ -174,7 +172,7 @@ class S3Repository extends BlobStoreRepository {
final Settings settings,
final NamedXContentRegistry namedXContentRegistry,
final S3Service service) {
super(metadata, settings, namedXContentRegistry);
super(metadata, settings, COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry);
this.service = service;

this.repositoryMetaData = metadata;
Expand All @@ -187,7 +185,6 @@ class S3Repository extends BlobStoreRepository {

this.bufferSize = BUFFER_SIZE_SETTING.get(metadata.settings());
this.chunkSize = CHUNK_SIZE_SETTING.get(metadata.settings());
this.compress = COMPRESS_SETTING.get(metadata.settings());

// We make sure that chunkSize is bigger or equal than/to bufferSize
if (this.chunkSize.getBytes() < bufferSize.getBytes()) {
Expand Down Expand Up @@ -245,11 +242,6 @@ protected BlobPath basePath() {
return basePath;
}

@Override
protected boolean isCompress() {
return compress;
}

@Override
protected ByteSizeValue chunkSize() {
return chunkSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

private final Settings settings;

private final boolean compress;

private final RateLimiter snapshotRateLimiter;

private final RateLimiter restoreRateLimiter;
Expand Down Expand Up @@ -226,33 +228,37 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
*
* @param metadata The metadata for this repository including name and settings
* @param settings Settings for the node this repository object is created on
* @param compress true if metadata and snapshot files should be compressed
*/
protected BlobStoreRepository(RepositoryMetaData metadata, Settings settings, NamedXContentRegistry namedXContentRegistry) {
protected BlobStoreRepository(RepositoryMetaData metadata, Settings settings, boolean compress,
NamedXContentRegistry namedXContentRegistry) {
this.settings = settings;
this.compress = compress;
this.metadata = metadata;
this.namedXContentRegistry = namedXContentRegistry;
snapshotRateLimiter = getRateLimiter(metadata.settings(), "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
readOnly = metadata.settings().getAsBoolean("readonly", false);


indexShardSnapshotFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT,
BlobStoreIndexShardSnapshot::fromXContent, namedXContentRegistry, isCompress());
BlobStoreIndexShardSnapshot::fromXContent, namedXContentRegistry, compress);
indexShardSnapshotsFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_INDEX_CODEC, SNAPSHOT_INDEX_NAME_FORMAT,
BlobStoreIndexShardSnapshots::fromXContent, namedXContentRegistry, isCompress());
ByteSizeValue chunkSize = chunkSize();
if (chunkSize != null && chunkSize.getBytes() <= 0) {
throw new IllegalArgumentException("the chunk size cannot be negative: [" + chunkSize + "]");
}
BlobStoreIndexShardSnapshots::fromXContent, namedXContentRegistry, compress);
}

@Override
protected void doStart() {
ByteSizeValue chunkSize = chunkSize();
if (chunkSize != null && chunkSize.getBytes() <= 0) {
throw new IllegalArgumentException("the chunk size cannot be negative: [" + chunkSize + "]");
}
globalMetaDataFormat = new ChecksumBlobStoreFormat<>(METADATA_CODEC, METADATA_NAME_FORMAT,
MetaData::fromXContent, namedXContentRegistry, isCompress());
MetaData::fromXContent, namedXContentRegistry, compress);
indexMetaDataFormat = new ChecksumBlobStoreFormat<>(INDEX_METADATA_CODEC, METADATA_NAME_FORMAT,
IndexMetaData::fromXContent, namedXContentRegistry, isCompress());
IndexMetaData::fromXContent, namedXContentRegistry, compress);
snapshotFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT,
SnapshotInfo::fromXContentInternal, namedXContentRegistry, isCompress());
SnapshotInfo::fromXContentInternal, namedXContentRegistry, compress);
}

@Override
Expand Down Expand Up @@ -347,8 +353,8 @@ protected BlobStore blobStore() {
*
* @return true if compression is needed
*/
protected boolean isCompress() {
return false;
protected final boolean isCompress() {
return compress;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,19 @@ public class FsRepository extends BlobStoreRepository {
new ByteSizeValue(Long.MAX_VALUE), new ByteSizeValue(5), new ByteSizeValue(Long.MAX_VALUE), Property.NodeScope);
public static final Setting<Boolean> COMPRESS_SETTING = Setting.boolSetting("compress", false, Property.NodeScope);
public static final Setting<Boolean> REPOSITORIES_COMPRESS_SETTING =
Setting.boolSetting("repositories.fs.compress", false, Property.NodeScope);
Setting.boolSetting("repositories.fs.compress", false, Property.NodeScope, Property.Deprecated);
private final Environment environment;

private ByteSizeValue chunkSize;

private final BlobPath basePath;

private boolean compress;

/**
* Constructs a shared file system repository.
*/
public FsRepository(RepositoryMetaData metadata, Environment environment,
NamedXContentRegistry namedXContentRegistry) {
super(metadata, environment.settings(), namedXContentRegistry);
super(metadata, environment.settings(), calculateCompress(metadata, environment), namedXContentRegistry);
this.environment = environment;
String location = REPOSITORIES_LOCATION_SETTING.get(metadata.settings());
if (location.isEmpty()) {
Expand Down Expand Up @@ -105,23 +103,21 @@ public FsRepository(RepositoryMetaData metadata, Environment environment,
} else {
this.chunkSize = REPOSITORIES_CHUNK_SIZE_SETTING.get(environment.settings());
}
this.compress = COMPRESS_SETTING.exists(metadata.settings())
? COMPRESS_SETTING.get(metadata.settings()) : REPOSITORIES_COMPRESS_SETTING.get(environment.settings());
this.basePath = BlobPath.cleanPath();
}

private static boolean calculateCompress(RepositoryMetaData metadata, Environment environment) {
return COMPRESS_SETTING.exists(metadata.settings())
? COMPRESS_SETTING.get(metadata.settings()) : REPOSITORIES_COMPRESS_SETTING.get(environment.settings());
}

@Override
protected BlobStore createBlobStore() throws Exception {
final String location = REPOSITORIES_LOCATION_SETTING.get(metadata.settings());
final Path locationFile = environment.resolveRepoFile(location);
return new FsBlobStore(environment.settings(), locationFile);
}

@Override
protected boolean isCompress() {
return compress;
}

@Override
protected ByteSizeValue chunkSize() {
return chunkSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.Plugin;
Expand Down Expand Up @@ -232,6 +234,38 @@ public void testIncompatibleSnapshotsBlobExists() throws Exception {
assertEquals(0, repository.getRepositoryData().getIncompatibleSnapshotIds().size());
}

public void testBadChunksize() throws Exception {
final Client client = client();
final Path location = ESIntegTestCase.randomRepoPath(node().settings());
final String repositoryName = "test-repo";

expectThrows(RepositoryException.class, () ->
client.admin().cluster().preparePutRepository(repositoryName)
.setType(REPO_TYPE)
.setSettings(Settings.builder().put(node().settings())
.put("location", location)
.put("chunk_size", randomLongBetween(-10, 0), ByteSizeUnit.BYTES))
.get());
}

public void testFsRepositoryCompressDeprecated() {
final Path location = ESIntegTestCase.randomRepoPath(node().settings());
final Settings settings = Settings.builder().put(node().settings()).put("location", location).build();
final RepositoryMetaData metaData = new RepositoryMetaData("test-repo", REPO_TYPE, settings);

Settings useCompressSettings = Settings.builder()
.put(node().getEnvironment().settings())
.put(FsRepository.REPOSITORIES_COMPRESS_SETTING.getKey(), true)
.build();
Environment useCompressEnvironment =
new Environment(useCompressSettings, node().getEnvironment().configFile());

new FsRepository(metaData, useCompressEnvironment, null);

assertWarnings("[repositories.fs.compress] setting was deprecated in Elasticsearch and will be removed in a future release!" +
" See the breaking changes documentation for the next major version.");
}

private BlobStoreRepository setupRepo() {
final Client client = client();
final Path location = ESIntegTestCase.randomRepoPath(node().settings());
Expand Down

0 comments on commit 3f09d61

Please sign in to comment.