added sync folder and file manager
All checks were successful
Locusworks Team/aws-s3-sync/pipeline/head This commit looks good

This commit is contained in:
2020-11-02 20:35:40 -06:00
parent b2a8a83988
commit 572c067b9d
6 changed files with 157 additions and 310 deletions

2
.gitignore vendored
View File

@ -3,3 +3,5 @@
/logs/
.project
/.settings/
**/*.csv
*.properties

View File

@ -1,6 +1,5 @@
package net.locusworks.s3sync;
import java.nio.file.Paths;
import net.locusworks.logger.ApplicationLogger;
import net.locusworks.logger.ApplicationLoggerFactory;
import net.locusworks.logger.ApplicationLoggerInitializer;
@ -38,7 +37,7 @@ public class Entry {
try {
S3Client client = new S3Client(ConfigurationManager.getInstance());
client.uploadFile(Paths.get("D:\\OneDrive\\Documents\\config.docx"));
client.syncFolder();
} catch (Exception | Error e) {
logger.error(e);
System.exit(-1);

View File

@ -1,10 +1,8 @@
package net.locusworks.s3sync.client;
import java.nio.file.Path;
public class FileDetail {
private Path file;
private String file;
private String hash;
private boolean uploaded;
@ -14,7 +12,7 @@ public class FileDetail {
* @param hash
* @param uploaded
*/
public FileDetail(Path file, String hash, boolean uploaded) {
public FileDetail(String file, String hash, boolean uploaded) {
this.file = file;
this.hash = hash;
this.uploaded = uploaded;
@ -22,13 +20,13 @@ public class FileDetail {
/**
* @return the file
*/
public synchronized final Path getFile() {
public synchronized final String getFile() {
return file;
}
/**
* @param file the file to set
*/
public synchronized final void setFile(Path file) {
public synchronized final void setFile(String file) {
this.file = file;
}
/**

View File

@ -0,0 +1,98 @@
package net.locusworks.s3sync.client;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.commons.codec.digest.DigestUtils;
import net.locusworks.logger.ApplicationLogger;
import net.locusworks.logger.ApplicationLoggerFactory;
public class FileManager implements AutoCloseable {
private ApplicationLogger logger = ApplicationLoggerFactory.getLogger(FileManager.class);
public static final String FILE_CSV = "upload.csv";
private Map<String, FileDetail> detailMap;
private static FileManager instance;
private FileManager() throws IOException {
detailMap = new LinkedHashMap<String, FileDetail>();
readFile();
}
private void readFile() throws IOException {
Path file = Paths.get(FILE_CSV);
if (Files.notExists(file)) return;
try(BufferedReader reader = Files.newBufferedReader(file)) {
String line = null;
while((line = reader.readLine()) != null) {
String[] values = line.split(",");
if (values.length != 3) {
logger.warn("Invalid entry detected: " + line);
continue;
}
FileDetail fd = new FileDetail(values[0], values[1], Boolean.valueOf(values[2]));
detailMap.put(values[0], fd);
}
}
}
private void saveFile() throws IOException {
try(BufferedWriter writer = Files.newBufferedWriter(Paths.get(FILE_CSV))) {
writer.write("FILE_NAME,HASH,STATUS\n");
for(FileDetail v : detailMap.values()) {
writer.write(String.format("%s,%s,%s%n", v.getFile(), v.getHash(), v.isUploaded()));
};
}
}
public boolean addEntry(String file, String hash, boolean uploaded) {
return addEntry(new FileDetail(file, hash, uploaded));
}
public boolean addEntry(FileDetail fd) {
return detailMap.put(fd.getFile(), fd) != null;
}
public FileDetail uploadFile(Path path) throws IOException {
boolean newFile = false;
String file = path.toString();
FileDetail fd = null;
if (detailMap.containsKey(file)) {
fd = detailMap.get(file);
} else {
newFile = true;
fd = new FileDetail(file, DigestUtils.sha1Hex(Files.newInputStream(path)), false);
}
String sha1 = newFile ? fd.getHash() : DigestUtils.sha1Hex(Files.newInputStream(path));
if (!sha1.equals(fd.getHash())) {
fd.setUploaded(false);
}
return fd;
}
@Override
public void close() throws Exception {
saveFile();
}
public static FileManager getInstance() throws IOException {
if (instance == null) {
instance = new FileManager();
}
return instance;
}
}

View File

@ -1,14 +1,13 @@
package net.locusworks.s3sync.client;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.ExecutorService;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.client.builder.ExecutorFactory;
import com.amazonaws.AmazonClientException;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.Bucket;
import com.amazonaws.services.s3.transfer.MultipleFileUpload;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import com.amazonaws.services.s3.transfer.Upload;
@ -22,7 +21,7 @@ public class S3Client {
private AmazonS3 s3Client;
private String bucket;
private Path synchFolder;
private Path syncFolder;
public S3Client(ConfigurationManager conf) {
String region = conf.getRegion();
@ -36,41 +35,62 @@ public class S3Client {
}
logger.info("Found Bucket: %s", bucket);
this.bucket = conf.getBucketName();
this.synchFolder = conf.getSyncFolder();
this.syncFolder = conf.getSyncFolder();
}
public void uploadFile(Path file) {
logger.info("Uploading file: %s", file);
TransferManager xferMgr = TransferManagerBuilder.standard().withS3Client(s3Client).build();
try {
Upload xfer = xferMgr.upload(bucket, file.getFileName().toString(), file.toFile());
// loop with Transfer.isDone()
XferMgrProgress.showTransferProgress(xfer);
// or block with Transfer.waitForCompletion()
XferMgrProgress.waitForCompletion(xfer);
logger.info("Done uploading %s", file);
} catch (AmazonServiceException e) {
logger.error(e.getErrorMessage());
System.exit(1);
}
uploadFile(xferMgr, file);
xferMgr.shutdownNow();
}
public void syncFolder() {
TransferManager xferMgr = TransferManagerBuilder.standard()
.withS3Client(s3Client)
.build();
MultipleFileUpload xfer = xferMgr.uploadDirectory(bucket, null, synchFolder.toFile(), true);
public void uploadFile(TransferManager xferMgr, Path file) {
boolean xferMgrNull = xferMgr == null;
xferMgr = !xferMgrNull ? xferMgr : TransferManagerBuilder.standard().withS3Client(s3Client).build();
FileDetail fd = null;
try {
fd = FileManager.getInstance().uploadFile(file);
if (fd.isUploaded()) return;
logger.info("Uploading file: %s", file);
Upload xfer = xferMgr.upload(bucket, getPath(file), file.toFile());
xfer.waitForCompletion();
fd.setUploaded(true);
FileManager.getInstance().addEntry(fd);
logger.info("Done uploading %s", file);
} catch (AmazonClientException | InterruptedException | IOException e) {
if (fd != null) {
fd.setUploaded(false);
try {
FileManager.getInstance().addEntry(fd);
} catch (IOException e1) {
logger.error("Unable to save file to file manager: " + e1.getMessage());
}
}
logger.error(e.getMessage());
} finally {
if (xferMgrNull) {
xferMgr.shutdownNow();
}
}
}
public void syncFolder() throws IOException {
TransferManager xferMgr = TransferManagerBuilder.standard().withS3Client(s3Client).build();
try (FileManager manager = FileManager.getInstance()) {
Files.walk(syncFolder)
.filter(f -> Files.isRegularFile(f))
.forEach(f -> uploadFile(xferMgr, syncFolder.resolve(f)));
} catch (Exception e) {
logger.error("Unable to load file Manager: " + e.getMessage());
}
xferMgr.shutdownNow();
}
private String getPath(Path file) {
if (file.getParent() == null) return file.getFileName().toString();
Path relative = syncFolder.relativize(file);
return relative.toString().replace("\\", "/");
}
}

View File

@ -1,270 +0,0 @@
//snippet-sourcedescription:[XferMgrProgress.java demonstrates how to use the S3 transfermanager to upload files to a bucket and show progress of the upload.]
//snippet-keyword:[Java]
//snippet-sourcesyntax:[java]
//snippet-keyword:[Code Sample]
//snippet-keyword:[Amazon S3]
//snippet-keyword:[TransferProgress]
//snippet-keyword:[TransferManager]
//snippet-service:[s3]
//snippet-sourcetype:[full-example]
//snippet-sourcedate:[]
//snippet-sourceauthor:[soo-aws]
/*
Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
This file is licensed under the Apache License, Version 2.0 (the "License").
You may not use this file except in compliance with the License. A copy of
the License is located at
http://aws.amazon.com/apache2.0/
This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
*/
package net.locusworks.s3sync.client;
// snippet-start:[s3.java1.s3_xfer_mgr_progress.import]
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.event.ProgressEvent;
import com.amazonaws.event.ProgressListener;
import com.amazonaws.services.s3.transfer.*;
import com.amazonaws.services.s3.transfer.Transfer.TransferState;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
// snippet-end:[s3.java1.s3_xfer_mgr_progress.import]
// snippet-start:[s3.java1.s3_xfer_mgr_progress.complete]
public class XferMgrProgress {
// waits for the transfer to complete, catching any exceptions that occur.
public static void waitForCompletion(Transfer xfer) {
// snippet-start:[s3.java1.s3_xfer_mgr_progress.wait_for_transfer]
try {
xfer.waitForCompletion();
} catch (AmazonServiceException e) {
System.err.println("Amazon service error: " + e.getMessage());
System.exit(1);
} catch (AmazonClientException e) {
System.err.println("Amazon client error: " + e.getMessage());
System.exit(1);
} catch (InterruptedException e) {
System.err.println("Transfer interrupted: " + e.getMessage());
System.exit(1);
}
// snippet-end:[s3.java1.s3_xfer_mgr_progress.wait_for_transfer]
}
// Prints progress while waiting for the transfer to finish.
public static void showTransferProgress(Transfer xfer) {
// snippet-start:[s3.java1.s3_xfer_mgr_progress.poll]
// print the transfer's human-readable description
System.out.println(xfer.getDescription());
// print an empty progress bar...
printProgressBar(0.0);
// update the progress bar while the xfer is ongoing.
do {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
return;
}
// Note: so_far and total aren't used, they're just for
// documentation purposes.
TransferProgress progress = xfer.getProgress();
long so_far = progress.getBytesTransferred();
long total = progress.getTotalBytesToTransfer();
double pct = progress.getPercentTransferred();
eraseProgressBar();
printProgressBar(pct);
} while (xfer.isDone() == false);
// print the final state of the transfer.
TransferState xfer_state = xfer.getState();
System.out.println(": " + xfer_state);
// snippet-end:[s3.java1.s3_xfer_mgr_progress.poll]
}
// Prints progress of a multiple file upload while waiting for it to finish.
public static void showMultiUploadProgress(MultipleFileUpload multi_upload) {
// print the upload's human-readable description
System.out.println(multi_upload.getDescription());
// snippet-start:[s3.java1.s3_xfer_mgr_progress.substranferes]
Collection<? extends Upload> sub_xfers = new ArrayList<Upload>();
sub_xfers = multi_upload.getSubTransfers();
do {
System.out.println("\nSubtransfer progress:\n");
for (Upload u : sub_xfers) {
System.out.println(" " + u.getDescription());
if (u.isDone()) {
TransferState xfer_state = u.getState();
System.out.println(" " + xfer_state);
} else {
TransferProgress progress = u.getProgress();
double pct = progress.getPercentTransferred();
printProgressBar(pct);
System.out.println();
}
}
// wait a bit before the next update.
try {
Thread.sleep(200);
} catch (InterruptedException e) {
return;
}
} while (multi_upload.isDone() == false);
// print the final state of the transfer.
TransferState xfer_state = multi_upload.getState();
System.out.println("\nMultipleFileUpload " + xfer_state);
// snippet-end:[s3.java1.s3_xfer_mgr_progress.substranferes]
}
// prints a simple text progressbar: [##### ]
public static void printProgressBar(double pct) {
// if bar_size changes, then change erase_bar (in eraseProgressBar) to
// match.
final int bar_size = 40;
final String empty_bar = " ";
final String filled_bar = "########################################";
int amt_full = (int) (bar_size * (pct / 100.0));
System.out.format(" [%s%s]", filled_bar.substring(0, amt_full),
empty_bar.substring(0, bar_size - amt_full));
}
// erases the progress bar.
public static void eraseProgressBar() {
// erase_bar is bar_size (from printProgressBar) + 4 chars.
final String erase_bar = "\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b";
System.out.format(erase_bar);
}
public static void uploadFileWithListener(String file_path,
String bucket_name, String key_prefix, boolean pause) {
System.out.println("file: " + file_path +
(pause ? " (pause)" : ""));
String key_name = null;
if (key_prefix != null) {
key_name = key_prefix + '/' + file_path;
} else {
key_name = file_path;
}
// snippet-start:[s3.java1.s3_xfer_mgr_progress.progress_listener]
File f = new File(file_path);
TransferManager xfer_mgr = TransferManagerBuilder.standard().build();
try {
Upload u = xfer_mgr.upload(bucket_name, key_name, f);
// print an empty progress bar...
printProgressBar(0.0);
u.addProgressListener(new ProgressListener() {
public void progressChanged(ProgressEvent e) {
double pct = e.getBytesTransferred() * 100.0 / e.getBytes();
eraseProgressBar();
printProgressBar(pct);
}
});
// block with Transfer.waitForCompletion()
XferMgrProgress.waitForCompletion(u);
// print the final state of the transfer.
TransferState xfer_state = u.getState();
System.out.println(": " + xfer_state);
} catch (AmazonServiceException e) {
System.err.println(e.getErrorMessage());
System.exit(1);
}
xfer_mgr.shutdownNow();
// snippet-end:[s3.java1.s3_xfer_mgr_progress.progress_listener]
}
public static void uploadDirWithSubprogress(String dir_path,
String bucket_name, String key_prefix, boolean recursive,
boolean pause) {
System.out.println("directory: " + dir_path + (recursive ?
" (recursive)" : "") + (pause ? " (pause)" : ""));
TransferManager xfer_mgr = new TransferManager();
try {
MultipleFileUpload multi_upload = xfer_mgr.uploadDirectory(
bucket_name, key_prefix, new File(dir_path), recursive);
// loop with Transfer.isDone()
XferMgrProgress.showMultiUploadProgress(multi_upload);
// or block with Transfer.waitForCompletion()
XferMgrProgress.waitForCompletion(multi_upload);
} catch (AmazonServiceException e) {
System.err.println(e.getErrorMessage());
System.exit(1);
}
xfer_mgr.shutdownNow();
}
public static void main(String[] args) {
final String USAGE = "\n" +
"Usage:\n" +
" XferMgrProgress [--recursive] [--pause] <s3_path> <local_path>\n\n" +
"Where:\n" +
" --recursive - Only applied if local_path is a directory.\n" +
" Copies the contents of the directory recursively.\n\n" +
" --pause - Attempt to pause+resume the upload. This may not work for\n" +
" small files.\n\n" +
" s3_path - The S3 destination (bucket/path) to upload the file(s) to.\n\n" +
" local_path - The path to a local file or directory path to upload to S3.\n\n" +
"Examples:\n" +
" XferMgrProgress public_photos/cat_happy.png my_photos/funny_cat.png\n" +
" XferMgrProgress public_photos my_photos/cat_sad.png\n" +
" XferMgrProgress public_photos my_photos\n\n";
if (args.length < 2) {
System.out.println(USAGE);
System.exit(1);
}
int cur_arg = 0;
boolean recursive = false;
boolean pause = false;
// first, parse any switches
while (args[cur_arg].startsWith("--")) {
if (args[cur_arg].equals("--recursive")) {
recursive = true;
} else if (args[cur_arg].equals("--pause")) {
pause = true;
} else {
System.out.println("Unknown argument: " + args[cur_arg]);
System.out.println(USAGE);
System.exit(1);
}
cur_arg += 1;
}
// only the first '/' character is of interest to get the bucket name.
// Subsequent ones are part of the key name.
String[] s3_path = args[cur_arg].split("/", 2);
cur_arg += 1;
String bucket_name = s3_path[0];
String key_prefix = null;
if (s3_path.length > 1) {
key_prefix = s3_path[1];
}
String local_path = args[cur_arg];
// check to see if local path is a directory or file...
File f = new File(args[cur_arg]);
if (f.exists() == false) {
System.out.println("Input path doesn't exist: " + args[cur_arg]);
System.exit(1);
} else if (f.isDirectory()) {
uploadDirWithSubprogress(local_path, bucket_name, key_prefix,
recursive, pause);
} else {
uploadFileWithListener(local_path, bucket_name, key_prefix, pause);
}
}
}
// snippet-end:[s3.java1.s3_xfer_mgr_progress.complete]