Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update AWS SDK to V2 #43

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ pom.xml.next
release.properties
dependency-reduced-pom.xml
buildNumber.properties
.idea
.idea
.vscode
36 changes: 22 additions & 14 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.github.alexmojaki</groupId>
<artifactId>s3-stream-upload</artifactId>
<version>2.2.4</version>
<version>2.2.5-SNAPSHOT</version>

<name>S3 Stream Upload</name>
<description>Manages streaming of data to S3 without knowing the size beforehand and without keeping it all in
Expand Down Expand Up @@ -38,34 +38,40 @@

<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>org.gaul</groupId>
<artifactId>s3proxy</artifactId>
<version>1.7.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>3.11.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>3.6.28</version>
<scope>test</scope>
</dependency>
</dependencies>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bom</artifactId>
<version>1.11.782</version>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>2.17.173</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand All @@ -74,6 +80,8 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>

<build>
Expand All @@ -83,8 +91,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public void abort() {
/**
* Convenience method to wait for the callables to finish for when you don't care about the results.
*/
@SuppressWarnings({"unused"})
public void awaitCompletion() {
//noinspection StatementWithEmptyBody
for (V ignored : this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import java.io.OutputStream;
import java.util.concurrent.BlockingQueue;

import static com.amazonaws.services.s3.internal.Constants.MB;
import static alex.mojaki.s3upload.StreamTransferManager.MB;


/**
* An {@code OutputStream} which packages data written to it into discrete {@link StreamPart}s which can be obtained
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/alex/mojaki/s3upload/StreamPart.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package alex.mojaki.s3upload;

import com.amazonaws.util.Base64;

import java.io.InputStream;

import software.amazon.awssdk.utils.BinaryUtils;

/**
* A simple class which holds some data which can be uploaded to S3 as part of a multipart upload and a part number
* identifying it.
Expand Down Expand Up @@ -40,7 +40,7 @@ public long size() {
}

public String getMD5Digest() {
return Base64.encodeAsString(stream.getMD5Digest());
return BinaryUtils.toBase64(stream.getMD5Digest());
}

@Override
Expand Down
125 changes: 66 additions & 59 deletions src/main/java/alex/mojaki/s3upload/StreamTransferManager.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package alex.mojaki.s3upload;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.*;
import com.amazonaws.util.BinaryUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.*;
import software.amazon.awssdk.utils.BinaryUtils;

import java.io.ByteArrayInputStream;
import java.math.BigInteger;
import java.security.MessageDigest;
import java.util.ArrayList;
Expand All @@ -15,7 +15,6 @@
import java.util.List;
import java.util.concurrent.*;

import static com.amazonaws.services.s3.internal.Constants.MB;

// @formatter:off
/**
Expand Down Expand Up @@ -85,7 +84,7 @@ public void run() {
* on what this class can accomplish. If order of data is important to you, then either use only one stream or ensure
* that you write at least 5 MB to every stream.
* <p>
* While performing the multipart upload this class will create instances of {@link InitiateMultipartUploadRequest},
* While performing the multipart upload this class will create instances of {@link CreateMultipartUploadRequest},
* {@link UploadPartRequest}, and {@link CompleteMultipartUploadRequest}, fill in the essential details, and send them
* off. If you need to add additional details then override the appropriate {@code customise*Request} methods and
* set the required properties within. Note that if no data is written (i.e. the object body is empty) then a normal (not multipart) upload will be performed and {@code customisePutEmptyObjectRequest} will be called instead.
Expand All @@ -105,16 +104,18 @@ public class StreamTransferManager {

private static final Logger log = LoggerFactory.getLogger(StreamTransferManager.class);

public static final int MB = 1024 * 1024;

protected final String bucketName;
protected final String putKey;
protected final AmazonS3 s3Client;
protected final S3Client s3Client;
protected String uploadId;
protected int numStreams = 1;
protected int numUploadThreads = 1;
protected int queueCapacity = 1;
protected int partSize = 5 * MB;
protected boolean checkIntegrity = false;
private final List<PartETag> partETags = Collections.synchronizedList(new ArrayList<PartETag>());
private final List<CompletedPart> partETags = Collections.synchronizedList(new ArrayList<>());
private List<MultiPartOutputStream> multiPartOutputStreams;
private ExecutorServiceResultsHandler<Void> executorServiceResultsHandler;
private ClosableQueue<StreamPart> queue;
Expand All @@ -126,7 +127,7 @@ public class StreamTransferManager {

public StreamTransferManager(String bucketName,
String putKey,
AmazonS3 s3Client) {
S3Client s3Client) {
this.bucketName = bucketName;
this.putKey = putKey;
this.s3Client = s3Client;
Expand Down Expand Up @@ -292,12 +293,12 @@ private void ensureCanSet() {
}

/**
* Deprecated constructor kept for backward compatibility. Use {@link StreamTransferManager#StreamTransferManager(String, String, AmazonS3)} and then chain the desired setters.
* Deprecated constructor kept for backward compatibility. Use {@link StreamTransferManager#StreamTransferManager(String, String, S3Client)} and then chain the desired setters.
*/
@Deprecated
public StreamTransferManager(String bucketName,
String putKey,
AmazonS3 s3Client,
S3Client s3Client,
int numStreams,
int numUploadThreads,
int queueCapacity,
Expand All @@ -320,15 +321,15 @@ public List<MultiPartOutputStream> getMultiPartOutputStreams() {
return multiPartOutputStreams;
}

queue = new ClosableQueue<StreamPart>(queueCapacity);
queue = new ClosableQueue<>(queueCapacity);
log.debug("Initiating multipart upload to {}/{}", bucketName, putKey);
InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucketName, putKey);
customiseInitiateRequest(initRequest);
InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(initRequest);
uploadId = initResponse.getUploadId();
CreateMultipartUploadRequest initRequest = CreateMultipartUploadRequest.builder()
.bucket(bucketName).key(putKey).applyMutation(this::customiseInitiateRequest).build();
CreateMultipartUploadResponse initResponse = s3Client.createMultipartUpload(initRequest);
uploadId = initResponse.uploadId();
log.info("Initiated multipart upload to {}/{} with full ID {}", bucketName, putKey, uploadId);
try {
multiPartOutputStreams = new ArrayList<MultiPartOutputStream>();
multiPartOutputStreams = new ArrayList<>();
ExecutorService threadPool = Executors.newFixedThreadPool(numUploadThreads);

int partNumberStart = 1;
Expand All @@ -340,7 +341,7 @@ public List<MultiPartOutputStream> getMultiPartOutputStreams() {
multiPartOutputStreams.add(multiPartOutputStream);
}

executorServiceResultsHandler = new ExecutorServiceResultsHandler<Void>(threadPool);
executorServiceResultsHandler = new ExecutorServiceResultsHandler<>(threadPool);
for (int i = 0; i < numUploadThreads; i++) {
executorServiceResultsHandler.submit(new UploadTask());
}
Expand Down Expand Up @@ -373,25 +374,26 @@ public void complete() {
log.debug("{}: Aborting upload of empty stream", this);
abort();
log.info("{}: Putting empty object", this);
ByteArrayInputStream emptyStream = new ByteArrayInputStream(new byte[]{});
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(0);
PutObjectRequest request = new PutObjectRequest(bucketName, putKey, emptyStream, metadata);
customisePutEmptyObjectRequest(request);
s3Client.putObject(request);
PutObjectRequest request = PutObjectRequest.builder()
.bucket(bucketName)
.key(putKey)
.contentLength(0L)
.applyMutation(this::customisePutEmptyObjectRequest)
.build();
s3Client.putObject(request, RequestBody.empty());
} else {
List<PartETag> sortedParts = new ArrayList<PartETag>(partETags);
List<CompletedPart> sortedParts = new ArrayList<CompletedPart>(partETags);
Collections.sort(sortedParts, new PartNumberComparator());
CompleteMultipartUploadRequest completeRequest = new
CompleteMultipartUploadRequest(
bucketName,
putKey,
uploadId,
sortedParts);
customiseCompleteRequest(completeRequest);
CompleteMultipartUploadResult completeMultipartUploadResult = s3Client.completeMultipartUpload(completeRequest);
CompleteMultipartUploadRequest completeRequest = CompleteMultipartUploadRequest.builder()
.bucket(bucketName)
.key(putKey)
.uploadId(uploadId)
.multipartUpload(b -> b.parts(partETags))
alexwhiteoval marked this conversation as resolved.
Show resolved Hide resolved
.applyMutation(this::customiseCompleteRequest)
.build();
CompleteMultipartUploadResponse completeMultipartUploadResult = s3Client.completeMultipartUpload(completeRequest);
if (checkIntegrity) {
checkCompleteFileIntegrity(completeMultipartUploadResult.getETag(), sortedParts);
checkCompleteFileIntegrity(completeMultipartUploadResult.eTag(), sortedParts);
}
}
log.info("{}: Completed", this);
Expand All @@ -403,7 +405,7 @@ public void complete() {
}
}

private void checkCompleteFileIntegrity(String s3ObjectETag, List<PartETag> sortedParts) {
private void checkCompleteFileIntegrity(String s3ObjectETag, List<CompletedPart> sortedParts) {
String expectedETag = computeCompleteFileETag(sortedParts);
if (!expectedETag.equals(s3ObjectETag)) {
throw new IntegrityCheckException(String.format(
Expand All @@ -412,13 +414,13 @@ private void checkCompleteFileIntegrity(String s3ObjectETag, List<PartETag> sort
}
}

private String computeCompleteFileETag(List<PartETag> parts) {
private String computeCompleteFileETag(List<CompletedPart> parts) {
// When S3 combines the parts of a multipart upload into the final object, the ETag value is set to the
// hex-encoded MD5 hash of the concatenated binary-encoded (raw bytes) MD5 hashes of each part followed by
// "-" and the number of parts.
MessageDigest md = Utils.md5();
for (PartETag partETag : parts) {
md.update(BinaryUtils.fromHex(partETag.getETag()));
for (CompletedPart partETag : parts) {
md.update(BinaryUtils.fromHex(partETag.eTag()));
}
// Represent byte array as a 32-digit number hexadecimal format followed by "-<partCount>".
return String.format("%032x-%d", new BigInteger(1, md.digest()), parts.size());
Expand Down Expand Up @@ -466,8 +468,8 @@ public void abort() {
}
if (uploadId != null) {
log.debug("{}: Aborting", this);
AbortMultipartUploadRequest abortMultipartUploadRequest = new AbortMultipartUploadRequest(
bucketName, putKey, uploadId);
AbortMultipartUploadRequest abortMultipartUploadRequest = AbortMultipartUploadRequest.builder()
.bucket(bucketName).key(putKey).uploadId(uploadId).build();
s3Client.abortMultipartUpload(abortMultipartUploadRequest);
log.info("{}: Aborted", this);
}
Expand Down Expand Up @@ -544,18 +546,23 @@ part remaining, which S3 can accept. It is uploaded in the complete() method.
private void uploadStreamPart(StreamPart part) {
log.debug("{}: Uploading {}", this, part);

UploadPartRequest uploadRequest = new UploadPartRequest()
.withBucketName(bucketName).withKey(putKey)
.withUploadId(uploadId).withPartNumber(part.getPartNumber())
.withInputStream(part.getInputStream())
.withPartSize(part.size());
if (checkIntegrity) {
uploadRequest.setMd5Digest(part.getMD5Digest());
UploadPartRequest.Builder builder = UploadPartRequest.builder()
.bucket(bucketName)
.key(putKey)
.uploadId(uploadId)
.partNumber(part.getPartNumber());
if(checkIntegrity) {
builder.contentMD5(part.getMD5Digest());
}
customiseUploadPartRequest(uploadRequest);

UploadPartResult uploadPartResult = s3Client.uploadPart(uploadRequest);
PartETag partETag = uploadPartResult.getPartETag();
UploadPartRequest uploadRequest = builder
.applyMutation(this::customiseUploadPartRequest)
.build();

UploadPartResponse uploadPartResult = s3Client.uploadPart(
uploadRequest,
RequestBody.fromInputStream(part.getInputStream(), part.size()));
CompletedPart partETag = CompletedPart.builder()
.partNumber(part.getPartNumber()).eTag(uploadPartResult.eTag()).build();
partETags.add(partETag);
log.info("{}: Finished uploading {}", this, part);
}
Expand All @@ -569,26 +576,26 @@ public String toString() {
// These methods are intended to be overridden for more specific interactions with the AWS API.

@SuppressWarnings("unused")
public void customiseInitiateRequest(InitiateMultipartUploadRequest request) {
public void customiseInitiateRequest(CreateMultipartUploadRequest.Builder requestBuilder) {
}

@SuppressWarnings("unused")
public void customiseUploadPartRequest(UploadPartRequest request) {
public void customiseUploadPartRequest(UploadPartRequest.Builder requestBuilder) {
}

@SuppressWarnings("unused")
public void customiseCompleteRequest(CompleteMultipartUploadRequest request) {
public void customiseCompleteRequest(CompleteMultipartUploadRequest.Builder requestBuilder) {
}

@SuppressWarnings("unused")
public void customisePutEmptyObjectRequest(PutObjectRequest request) {
public void customisePutEmptyObjectRequest(PutObjectRequest.Builder requestBuilder) {
}

private static class PartNumberComparator implements Comparator<PartETag> {
private static class PartNumberComparator implements Comparator<CompletedPart> {
@Override
public int compare(PartETag o1, PartETag o2) {
int partNumber1 = o1.getPartNumber();
int partNumber2 = o2.getPartNumber();
public int compare(CompletedPart o1, CompletedPart o2) {
int partNumber1 = o1.partNumber();
int partNumber2 = o2.partNumber();

if (partNumber1 == partNumber2) {
return 0;
Expand Down
Loading