Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1530] support MPU for S3 #2830

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build/make-distribution.sh
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,8 @@ function sbt_build_service {
echo "Celeborn $VERSION$GITREVSTRING" > "$DIST_DIR/RELEASE"
echo "Build flags: $@" >> "$DIST_DIR/RELEASE"

if [[ $@ == *"hadoop-aws"* ]]; then
SBT_MAVEN_PROFILES="hadoop-aws"
if [[ $@ == *"aws-mpu"* ]]; then
export SBT_MAVEN_PROFILES="aws-mpu"
fi
BUILD_COMMAND=("$SBT" clean package)

Expand Down
20 changes: 0 additions & 20 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -209,25 +209,5 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>hadoop-aws</id>
<activation>
<property>
<name>hadoop-aws-deps</name>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bundle</artifactId>
<version>${aws.version}</version>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -1136,7 +1136,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se

def s3SecretKey: String = get(S3_SECRET_KEY).getOrElse("")

def s3Endpoint: String = get(S3_ENDPOINT).getOrElse("")
def s3EndpointRegion: String = get(S3_ENDPOINT_REGION).getOrElse("")

def s3Dir: String = {
get(S3_DIR).map {
Expand Down Expand Up @@ -3019,8 +3019,8 @@ object CelebornConf extends Logging {
.stringConf
.createOptional

val S3_ENDPOINT: OptionalConfigEntry[String] =
buildConf("celeborn.storage.s3.endpoint")
val S3_ENDPOINT_REGION: OptionalConfigEntry[String] =
buildConf("celeborn.storage.s3.endpoint.region")
.categories("worker", "master", "client")
.version("0.6.0")
.doc("S3 endpoint for Celeborn to store shuffle data.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,17 @@ object CelebornHadoopUtils extends Logging {
}

if (conf.s3Dir.nonEmpty) {
if (conf.s3AccessKey.isEmpty || conf.s3SecretKey.isEmpty || conf.s3Endpoint.isEmpty) {
if (conf.s3AccessKey.isEmpty || conf.s3SecretKey.isEmpty || conf.s3EndpointRegion.isEmpty) {
throw new CelebornException(
"S3 storage is enabled but s3AccessKey, s3SecretKey, or s3Endpoint is not set")
"S3 storage is enabled but s3AccessKey, s3SecretKey, or s3EndpointRegion is not set")
}
hadoopConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoopConf.set(
"fs.s3a.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
hadoopConf.set("fs.s3a.access.key", conf.s3AccessKey)
hadoopConf.set("fs.s3a.secret.key", conf.s3SecretKey)
hadoopConf.set("fs.s3a.endpoint", conf.s3Endpoint)
hadoopConf.set("fs.s3a.endpoint.region", conf.s3EndpointRegion)
}
appendSparkHadoopConfigs(conf, hadoopConf)
hadoopConf
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,6 @@ license: |
| celeborn.storage.hdfs.dir | &lt;undefined&gt; | false | HDFS base directory for Celeborn to store shuffle data. | 0.2.0 | |
| celeborn.storage.s3.access.key | &lt;undefined&gt; | false | S3 access key for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.dir | &lt;undefined&gt; | false | S3 base directory for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.endpoint | &lt;undefined&gt; | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.endpoint.region | &lt;undefined&gt; | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.secret.key | &lt;undefined&gt; | false | S3 secret key for Celeborn to store shuffle data. | 0.6.0 | |
<!--end-include-->
2 changes: 1 addition & 1 deletion docs/configuration/master.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,6 @@ license: |
| celeborn.storage.hdfs.kerberos.principal | &lt;undefined&gt; | false | Kerberos principal for HDFS storage connection. | 0.3.2 | |
| celeborn.storage.s3.access.key | &lt;undefined&gt; | false | S3 access key for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.dir | &lt;undefined&gt; | false | S3 base directory for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.endpoint | &lt;undefined&gt; | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.endpoint.region | &lt;undefined&gt; | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.secret.key | &lt;undefined&gt; | false | S3 secret key for Celeborn to store shuffle data. | 0.6.0 | |
<!--end-include-->
2 changes: 1 addition & 1 deletion docs/configuration/worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ license: |
| celeborn.storage.hdfs.kerberos.principal | &lt;undefined&gt; | false | Kerberos principal for HDFS storage connection. | 0.3.2 | |
| celeborn.storage.s3.access.key | &lt;undefined&gt; | false | S3 access key for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.dir | &lt;undefined&gt; | false | S3 base directory for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.endpoint | &lt;undefined&gt; | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.endpoint.region | &lt;undefined&gt; | false | S3 endpoint for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.secret.key | &lt;undefined&gt; | false | S3 secret key for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.worker.activeConnection.max | &lt;undefined&gt; | false | If the number of active connections on a worker exceeds this configuration value, the worker will be marked as high-load in the heartbeat report, and the master will not include that node in the response of RequestSlots. | 0.3.1 | |
| celeborn.worker.applicationRegistry.cache.size | 10000 | false | Cache size of the application registry on Workers. | 0.5.0 | |
Expand Down
82 changes: 82 additions & 0 deletions multipart-uploader/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-parent_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>celeborn-multipart-uploader_${scala.binary.version}</artifactId>
<packaging>jar</packaging>
<name>Celeborn Multipart Uploader</name>

<dependencies>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This dependency is duplicated.

</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-service_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
</plugin>
</plugins>
</build>

<profiles>
<profile>
<id>aws-mpu</id>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The profile name can be changed to aws.

<activation>
<property>
<name>aws-mpu-deps</name>
</property>
</activation>
<dependencies>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these dependencies can be moved to dependencies section because this module is loaded when aws-mpu profile is activated only.

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bundle</artifactId>
<version>${aws.version}</version>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.retry.PredefinedRetryPolicies;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.ListPartsRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PartListing;
import com.amazonaws.services.s3.model.PartSummary;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.UploadPartRequest;

import org.apache.celeborn.server.common.service.mpu.MultipartUploadHandler;
import org.apache.celeborn.server.common.service.mpu.bean.AWSCredentials;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class S3MultipartUploadHandler implements MultipartUploadHandler {

private static final Logger logger = LoggerFactory.getLogger(S3MultipartUploadHandler.class);

private final AWSCredentials awsCredentials;
private String uploadId;
private AmazonS3 s3Client;
private String key;

public S3MultipartUploadHandler(AWSCredentials awsCredentials, String key) {
this.awsCredentials = awsCredentials;
BasicAWSCredentials basicAWSCredentials =
new BasicAWSCredentials(awsCredentials.getS3AccessKey(), awsCredentials.getS3SecretKey());
ClientConfiguration clientConfig = new ClientConfiguration()
.withRetryPolicy(PredefinedRetryPolicies.getDefaultRetryPolicyWithCustomMaxRetries(5))
.withMaxErrorRetry(5)
.withConnectionTimeout(10000)
.withRequestTimeout(50000);
this.s3Client =
AmazonS3ClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(basicAWSCredentials))
.withRegion(awsCredentials.getS3EndpointRegion())
.withClientConfiguration(clientConfig)
.build();
this.key = key;
}

@Override
public void startUpload() {
InitiateMultipartUploadRequest initRequest =
new InitiateMultipartUploadRequest(awsCredentials.getBucketName(), key);
InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(initRequest);
uploadId = initResponse.getUploadId();
}

@Override
public void putPart(InputStream inputStream, Long lengthInBytes, Integer partNumber) {
try {
UploadPartRequest uploadRequest =
new UploadPartRequest()
.withBucketName(awsCredentials.getBucketName())
.withKey(key)
.withUploadId(uploadId)
.withPartNumber(partNumber)
.withInputStream(inputStream)
.withPartSize(lengthInBytes);
s3Client.uploadPart(uploadRequest);
} catch (RuntimeException e) {
logger.error("Failed to upload part", e);
}
}

@Override
public void complete() {
List<PartETag> partETags = new ArrayList<>();
ListPartsRequest listPartsRequest = new ListPartsRequest(awsCredentials.getBucketName(), key, uploadId);
PartListing partListing;
do {
partListing = s3Client.listParts(listPartsRequest);
for (PartSummary part : partListing.getParts()) {
partETags.add(new PartETag(part.getPartNumber(), part.getETag()));
}
listPartsRequest.setPartNumberMarker(partListing.getNextPartNumberMarker());
} while (partListing.isTruncated());
if (partETags.size() == 0) {
logger.debug("UploadId {} has no parts uploaded, aborting upload", uploadId);
abort();
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(0);
PutObjectRequest putRequest = new PutObjectRequest(awsCredentials.getBucketName(), key, new ByteArrayInputStream(new byte[0]), metadata);
s3Client.putObject(putRequest);
return;
}
CompleteMultipartUploadRequest compRequest =
new CompleteMultipartUploadRequest(
awsCredentials.getBucketName(), key, uploadId, partETags);
logger.debug("UploadId {} upload completing and partSize is {}", uploadId, partETags.size());
s3Client.completeMultipartUpload(compRequest);
}

@Override
public void abort() {
AbortMultipartUploadRequest abortMultipartUploadRequest =
new AbortMultipartUploadRequest(awsCredentials.getBucketName(), key, uploadId);
s3Client.abortMultipartUpload(abortMultipartUploadRequest);
}
}
20 changes: 6 additions & 14 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
<module>service</module>
<module>master</module>
<module>worker</module>
<module>web</module>
<module>cli</module>
</modules>

Expand Down Expand Up @@ -1340,23 +1341,14 @@
</dependencies>
</profile>
<profile>
<id>hadoop-aws</id>
<id>aws-mpu</id>
<modules>
<module>multipart-uploader</module>
</modules>
<properties>
<hadoop-aws-deps>true</hadoop-aws-deps>
<aws-mpu-deps>true</aws-mpu-deps>
<aws.version>1.12.367</aws.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bundle</artifactId>
<version>${aws.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>spark-2.4</id>
Expand Down
Loading
Loading