Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge AwsS3Service and InternalAwsS3Service in a S3Service class #31580

Merged
merged 1 commit into from
Jun 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.StorageClass;

import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
Expand All @@ -40,7 +39,7 @@

class S3BlobStore extends AbstractComponent implements BlobStore {

private final AwsS3Service service;
private final S3Service service;

private final String clientName;

Expand All @@ -54,7 +53,7 @@ class S3BlobStore extends AbstractComponent implements BlobStore {

private final StorageClass storageClass;

S3BlobStore(Settings settings, AwsS3Service service, String clientName, String bucket, boolean serverSideEncryption,
S3BlobStore(Settings settings, S3Service service, String clientName, String bucket, boolean serverSideEncryption,
ByteSizeValue bufferSize, String cannedACL, String storageClass) {
super(settings);
this.service = service;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.elasticsearch.repositories.s3;

import com.amazonaws.auth.BasicAWSCredentials;

import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobPath;
Expand Down Expand Up @@ -156,8 +155,10 @@ class S3Repository extends BlobStoreRepository {
/**
* Constructs an s3 backed repository
*/
S3Repository(RepositoryMetaData metadata, Settings settings, NamedXContentRegistry namedXContentRegistry,
AwsS3Service awsService) throws IOException {
S3Repository(final RepositoryMetaData metadata,
final Settings settings,
final NamedXContentRegistry namedXContentRegistry,
final S3Service service) throws IOException {
super(metadata, settings, namedXContentRegistry);

final String bucket = BUCKET_SETTING.get(metadata.settings());
Expand Down Expand Up @@ -188,9 +189,9 @@ class S3Repository extends BlobStoreRepository {
// deprecated behavior: override client credentials from the cluster state
// (repository settings)
if (S3ClientSettings.checkDeprecatedCredentials(metadata.settings())) {
overrideCredentialsFromClusterState(awsService);
overrideCredentialsFromClusterState(service);
}
blobStore = new S3BlobStore(settings, awsService, clientName, bucket, serverSideEncryption, bufferSize, cannedACL, storageClass);
blobStore = new S3BlobStore(settings, service, clientName, bucket, serverSideEncryption, bufferSize, cannedACL, storageClass);

final String basePath = BASE_PATH_SETTING.get(metadata.settings());
if (Strings.hasLength(basePath)) {
Expand Down Expand Up @@ -220,13 +221,13 @@ protected ByteSizeValue chunkSize() {
return chunkSize;
}

void overrideCredentialsFromClusterState(AwsS3Service awsService) {
void overrideCredentialsFromClusterState(final S3Service s3Service) {
deprecationLogger.deprecated("Using s3 access/secret key from repository settings. Instead "
+ "store these in named clients and the elasticsearch keystore for secure settings.");
final BasicAWSCredentials insecureCredentials = S3ClientSettings.loadDeprecatedCredentials(metadata.settings());
// hack, but that's ok because the whole if branch should be axed
final Map<String, S3ClientSettings> prevSettings = awsService.refreshAndClearCache(S3ClientSettings.load(Settings.EMPTY));
final Map<String, S3ClientSettings> prevSettings = s3Service.refreshAndClearCache(S3ClientSettings.load(Settings.EMPTY));
final Map<String, S3ClientSettings> newSettings = S3ClientSettings.overrideCredentials(prevSettings, insecureCredentials);
awsService.refreshAndClearCache(newSettings);
s3Service.refreshAndClearCache(newSettings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,6 @@

package org.elasticsearch.repositories.s3;

import java.io.IOException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import com.amazonaws.util.json.Jackson;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
Expand All @@ -39,6 +31,15 @@
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;

import java.io.IOException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* A plugin to add a repository type that writes to and from the AWS S3.
*/
Expand All @@ -60,33 +61,29 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo
});
}

private final AwsS3Service awsS3Service;
private final S3Service service;

public S3RepositoryPlugin(Settings settings) {
this.awsS3Service = getAwsS3Service(settings);
// eagerly load client settings so that secure settings are read
final Map<String, S3ClientSettings> clientsSettings = S3ClientSettings.load(settings);
this.awsS3Service.refreshAndClearCache(clientsSettings);
public S3RepositoryPlugin(final Settings settings) {
this(settings, new S3Service(settings));
}

protected S3RepositoryPlugin(AwsS3Service awsS3Service) {
this.awsS3Service = awsS3Service;
}

// proxy method for testing
protected S3Repository getS3Repository(RepositoryMetaData metadata, Settings settings, NamedXContentRegistry namedXContentRegistry)
throws IOException {
return new S3Repository(metadata, settings, namedXContentRegistry, awsS3Service);
S3RepositoryPlugin(final Settings settings, final S3Service service) {
this.service = Objects.requireNonNull(service, "S3 service must not be null");
// eagerly load client settings so that secure settings are read
final Map<String, S3ClientSettings> clientsSettings = S3ClientSettings.load(settings);
this.service.refreshAndClearCache(clientsSettings);
}

// proxy method for testing
protected AwsS3Service getAwsS3Service(Settings settings) {
return new InternalAwsS3Service(settings);
protected S3Repository createRepository(final RepositoryMetaData metadata,
final Settings settings,
final NamedXContentRegistry registry) throws IOException {
return new S3Repository(metadata, settings, registry, service);
}

@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
return Collections.singletonMap(S3Repository.TYPE, (metadata) -> getS3Repository(metadata, env.settings(), namedXContentRegistry));
public Map<String, Repository.Factory> getRepositories(final Environment env, final NamedXContentRegistry registry) {
return Collections.singletonMap(S3Repository.TYPE, (metadata) -> createRepository(metadata, env.settings(), registry));
}

@Override
Expand All @@ -112,11 +109,11 @@ public List<Setting<?>> getSettings() {
public void reload(Settings settings) {
// secure settings should be readable
final Map<String, S3ClientSettings> clientsSettings = S3ClientSettings.load(settings);
awsS3Service.refreshAndClearCache(clientsSettings);
service.refreshAndClearCache(clientsSettings);
}

@Override
public void close() throws IOException {
awsS3Service.close();
service.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,25 @@
import com.amazonaws.internal.StaticCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;

import java.io.Closeable;
import java.io.IOException;
import java.util.Map;

import static java.util.Collections.emptyMap;


class InternalAwsS3Service extends AbstractComponent implements AwsS3Service {
class S3Service extends AbstractComponent implements Closeable {

private volatile Map<String, AmazonS3Reference> clientsCache = emptyMap();
private volatile Map<String, S3ClientSettings> clientsSettings = emptyMap();

InternalAwsS3Service(Settings settings) {
S3Service(Settings settings) {
super(settings);
}

Expand All @@ -55,7 +56,6 @@ class InternalAwsS3Service extends AbstractComponent implements AwsS3Service {
* clients are usable until released. On release they will be destroyed instead
* to being returned to the cache.
*/
@Override
public synchronized Map<String, S3ClientSettings> refreshAndClearCache(Map<String, S3ClientSettings> clientsSettings) {
// shutdown all unused clients
// others will shutdown on their respective release
Expand All @@ -71,7 +71,6 @@ public synchronized Map<String, S3ClientSettings> refreshAndClearCache(Map<Strin
* Attempts to retrieve a client by name from the cache. If the client does not
* exist it will be created.
*/
@Override
public AmazonS3Reference client(String clientName) {
AmazonS3Reference clientReference = clientsCache.get(clientName);
if ((clientReference != null) && clientReference.tryIncRef()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,82 +65,6 @@ public final void wipeAfter() {
cleanRepositoryFiles(basePath);
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch-cloud-aws/issues/211")
public void testSimpleWorkflow() {
Client client = client();
Settings.Builder settings = Settings.builder()
.put(S3Repository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(1000, 10000));

// We sometime test getting the base_path from node settings using repositories.s3.base_path
settings.put(S3Repository.BASE_PATH_SETTING.getKey(), basePath);

logger.info("--> creating s3 repository with bucket[{}] and path [{}]", internalCluster().getInstance(Settings.class).get("repositories.s3.bucket"), basePath);
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("s3").setSettings(settings
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));

createIndex("test-idx-1", "test-idx-2", "test-idx-3");
ensureGreen();

logger.info("--> indexing some data");
for (int i = 0; i < 100; i++) {
index("test-idx-1", "doc", Integer.toString(i), "foo", "bar" + i);
index("test-idx-2", "doc", Integer.toString(i), "foo", "baz" + i);
index("test-idx-3", "doc", Integer.toString(i), "foo", "baz" + i);
}
refresh();
assertThat(client.prepareSearch("test-idx-1").setSize(0).get().getHits().getTotalHits(), equalTo(100L));
assertThat(client.prepareSearch("test-idx-2").setSize(0).get().getHits().getTotalHits(), equalTo(100L));
assertThat(client.prepareSearch("test-idx-3").setSize(0).get().getHits().getTotalHits(), equalTo(100L));

logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-*", "-test-idx-3").get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));

assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));

logger.info("--> delete some data");
for (int i = 0; i < 50; i++) {
client.prepareDelete("test-idx-1", "doc", Integer.toString(i)).get();
}
for (int i = 50; i < 100; i++) {
client.prepareDelete("test-idx-2", "doc", Integer.toString(i)).get();
}
for (int i = 0; i < 100; i += 2) {
client.prepareDelete("test-idx-3", "doc", Integer.toString(i)).get();
}
refresh();
assertThat(client.prepareSearch("test-idx-1").setSize(0).get().getHits().getTotalHits(), equalTo(50L));
assertThat(client.prepareSearch("test-idx-2").setSize(0).get().getHits().getTotalHits(), equalTo(50L));
assertThat(client.prepareSearch("test-idx-3").setSize(0).get().getHits().getTotalHits(), equalTo(50L));

logger.info("--> close indices");
client.admin().indices().prepareClose("test-idx-1", "test-idx-2").get();

logger.info("--> restore all indices from the snapshot");
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));

ensureGreen();
assertThat(client.prepareSearch("test-idx-1").setSize(0).get().getHits().getTotalHits(), equalTo(100L));
assertThat(client.prepareSearch("test-idx-2").setSize(0).get().getHits().getTotalHits(), equalTo(100L));
assertThat(client.prepareSearch("test-idx-3").setSize(0).get().getHits().getTotalHits(), equalTo(50L));

// Test restore after index deletion
logger.info("--> delete indices");
cluster().wipeIndices("test-idx-1", "test-idx-2");
logger.info("--> restore one index after deletion");
restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-*", "-test-idx-2").execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
assertThat(client.prepareSearch("test-idx-1").setSize(0).get().getHits().getTotalHits(), equalTo(100L));
ClusterState clusterState = client.admin().cluster().prepareState().get().getState();
assertThat(clusterState.getMetaData().hasIndex("test-idx-1"), equalTo(true));
assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(false));
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch-cloud-aws/issues/211")
public void testEncryption() {
Client client = client();
Expand Down Expand Up @@ -179,7 +103,7 @@ public void testEncryption() {

Settings settings = internalCluster().getInstance(Settings.class);
Settings bucket = settings.getByPrefix("repositories.s3.");
try (AmazonS3Reference s3Client = internalCluster().getInstance(AwsS3Service.class).client("default")) {
try (AmazonS3Reference s3Client = internalCluster().getInstance(S3Service.class).client("default")) {
String bucketName = bucket.get("bucket");
logger.info("--> verify encryption for bucket [{}], prefix [{}]", bucketName, basePath);
List<S3ObjectSummary> summaries = s3Client.client().listObjects(bucketName, basePath).getObjectSummaries();
Expand Down Expand Up @@ -442,7 +366,7 @@ public void cleanRepositoryFiles(String basePath) {
// We check that settings has been set in elasticsearch.yml integration test file
// as described in README
assertThat("Your settings in elasticsearch.yml are incorrect. Check README file.", bucketName, notNullValue());
try (AmazonS3Reference s3Client = internalCluster().getInstance(AwsS3Service.class).client("default")) {
try (AmazonS3Reference s3Client = internalCluster().getInstance(S3Service.class).client("default")) {
ObjectListing prevListing = null;
//From http://docs.amazonwebservices.com/AmazonS3/latest/dev/DeletingMultipleObjectsUsingJava.html
//we can do at most 1K objects per delete
Expand Down
Loading