2 Commits

Author SHA1 Message Date
6d908b43d7 Initial switch to h2 database instead of csv
All checks were successful
Locusworks Team/aws-s3-sync/pipeline/head This commit looks good
2020-11-27 23:27:48 -06:00
6d6f0ed891 Added logic to remove output dir and changed logging 2020-11-25 00:49:36 -06:00
20 changed files with 923 additions and 317 deletions

1
.gitignore vendored
View File

@ -5,3 +5,4 @@
/.settings/
**/*.csv
*.properties
*.db

195
pom.xml
View File

@ -13,6 +13,14 @@
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<dep.check.version>6.0.2</dep.check.version>
<jackson.version>2.11.3</jackson.version>
<flyway.version>7.3.0</flyway.version>
<h2.version>1.4.200</h2.version>
<hibernate.version>5.4.24.Final</hibernate.version>
<spring.version>5.3.1</spring.version>
<spring.boot.version>2.4.0</spring.boot.version>
<spring.data.version>2.4.1</spring.data.version>
<maven.enforcer.version>3.0.0-M2</maven.enforcer.version>
</properties>
<build>
@ -20,7 +28,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-enforcer-plugin</artifactId>
<version>3.0.0-M2</version>
<version>${maven.enforcer.version}</version>
<configuration>
<rules>
<dependencyConvergence />
@ -53,6 +61,14 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>versions-maven-plugin</artifactId>
<version>2.7</version>
<configuration>
<generateBackupPoms>false</generateBackupPoms>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
@ -68,7 +84,7 @@
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>net.locusworks.s3sync.Entry</mainClass>
<mainClass>net.locusworks.s3sync.S3SyncLauncher</mainClass>
</transformer>
</transformers>
<filters>
@ -85,6 +101,39 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-maven-plugin</artifactId>
<version>${flyway.version}</version>
<configuration>
<serverId>flyway-h2</serverId>
<user>sa</user>
<url>jdbc:h2:file:./s3sync;MODE=MYSQL;DATABASE_TO_UPPER=false</url>
<outOfOrder>false</outOfOrder>
<schemas>
<schema>s3sync</schema>
</schemas>
<table>_flyway_migration</table>
<locations>
<location>filesystem:${basedir}/src/main/resources/database/migration</location>
</locations>
</configuration>
<executions>
<execution>
<phase>process-sources</phase>
<goals>
<goal>migrate</goal>
</goals>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>${h2.version}</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
@ -95,6 +144,27 @@
<version>1.11.892</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-annotations -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
@ -127,6 +197,127 @@
<version>0.1.55</version>
</dependency>
<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-core</artifactId>
<version>${hibernate.version}</version>
<exclusions>
<exclusion>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-entitymanager</artifactId>
<version>${hibernate.version}</version>
</dependency>
<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-hikaricp</artifactId>
<version>${hibernate.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot</artifactId>
<version>${spring.boot.version}</version>
<exclusions>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-autoconfigure -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
<version>${spring.boot.version}</version>
</dependency>
<!-- Spring Framework jars to be shared across projects -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-jpa</artifactId>
<version>${spring.data.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/javax.annotation/javax.annotation-api -->
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
<version>1.3.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.flywaydb/flyway-core -->
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-core</artifactId>
<version>${flyway.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.h2database/h2 -->
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>${h2.version}</version>
</dependency>
</dependencies>
<distributionManagement>

View File

@ -1,48 +0,0 @@
package net.locusworks.s3sync;
import net.locusworks.logger.ApplicationLogger;
import net.locusworks.logger.ApplicationLoggerFactory;
import net.locusworks.logger.ApplicationLoggerInitializer;
import net.locusworks.logger.LogLevel;
import net.locusworks.s3sync.client.FileManager;
import net.locusworks.s3sync.client.S3Client;
import net.locusworks.s3sync.conf.ConfigurationManager;
public class Entry {
public static void main(String[] args) {
ConfigurationManager conf = ConfigurationManager.getInstance();
ApplicationLoggerFactory.init(new ApplicationLoggerInitializer() {
@Override
public LogLevel initialize() {
return conf.getLogLevel();
}
});
ApplicationLogger logger = ApplicationLoggerFactory.getLogger(Entry.class);
if (System.getenv("AWS_ACCESS_KEY_ID") == null) {
logger.error("AWS_ACCESS_KEY_ID is not set in environment variables");
System.exit(-1);
}
if (System.getenv("AWS_SECRET_ACCESS_KEY") == null) {
logger.error("AWS_SECRET_ACCESS_KEY is not set in environment variables");
System.exit(-1);
}
logger.info("Starting S3 Sync");
try (S3Client client = new S3Client(ConfigurationManager.getInstance())) {
FileManager manager = FileManager.newInstance(client);
client.syncFolder();
manager.removeOrphanedFiles();
} catch (Exception | Error e) {
logger.error(e);
System.exit(-1);
}
}
}

View File

@ -0,0 +1,55 @@
package net.locusworks.s3sync;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.stereotype.Component;
import net.locusworks.logger.ApplicationLogger;
import net.locusworks.logger.ApplicationLoggerFactory;
import net.locusworks.s3sync.client.S3Client;
@SpringBootApplication(scanBasePackages= {"net.locusworks.s3sync"})
@ComponentScan("net.locusworks.s3sync")
@Component
@EnableScheduling
public class S3SyncLauncher implements ApplicationRunner {
@Autowired
private S3Client client;
public static void main(String[] args) {
new SpringApplicationBuilder(S3SyncLauncher.class).headless(false).run(args);
}
@Override
public void run(ApplicationArguments args) throws Exception {
ApplicationLogger logger = ApplicationLoggerFactory.getLogger(S3SyncLauncher.class);
if (System.getenv("AWS_ACCESS_KEY_ID") == null) {
logger.error("AWS_ACCESS_KEY_ID is not set in environment variables");
System.exit(-1);
}
if (System.getenv("AWS_SECRET_ACCESS_KEY") == null) {
logger.error("AWS_SECRET_ACCESS_KEY is not set in environment variables");
System.exit(-1);
}
logger.info("Starting S3 Sync");
try {
client.syncFolder();
} catch (Exception | Error e) {
logger.error(e);
System.exit(-1);
} finally {
client.close();
}
}
}

View File

@ -1,57 +0,0 @@
package net.locusworks.s3sync.client;
public class FileDetail {
private String file;
private String hash;
private boolean uploaded;
public FileDetail() {}
/**
* @param file
* @param hash
* @param uploaded
*/
public FileDetail(String file, String hash, boolean uploaded) {
this.file = file;
this.hash = hash;
this.uploaded = uploaded;
}
/**
* @return the file
*/
public synchronized final String getFile() {
return file;
}
/**
* @param file the file to set
*/
public synchronized final void setFile(String file) {
this.file = file;
}
/**
* @return the hash
*/
public synchronized final String getHash() {
return hash;
}
/**
* @param hash the hash to set
*/
public synchronized final void setHash(String hash) {
this.hash = hash;
}
/**
* @return the uploaded
*/
public synchronized final boolean isUploaded() {
return uploaded;
}
/**
* @param uploaded the uploaded to set
*/
public synchronized final void setUploaded(boolean uploaded) {
this.uploaded = uploaded;
}
}

View File

@ -1,130 +1,52 @@
package net.locusworks.s3sync.client;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.Date;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import com.amazonaws.services.s3.model.S3Object;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import net.locusworks.s3sync.database.entities.FileInfo;
import net.locusworks.s3sync.database.repos.FileInfoRepository;
import net.locusworks.logger.ApplicationLogger;
import net.locusworks.logger.ApplicationLoggerFactory;
public class FileManager implements AutoCloseable {
@Service
public class FileManager {
private ApplicationLogger logger = ApplicationLoggerFactory.getLogger(FileManager.class);
public static final String FILE_CSV = "upload.csv";
private Map<String, FileDetail> detailMap;
private Set<String> s3Files;
@Autowired
private FileInfoRepository fileInfoRepo;
private S3Client client;
public static final String S3SYNC_DB_FILE = "s3sync.mv.db";
private String bucket;
private static FileManager instance;
private FileManager(S3Client client) throws IOException {
detailMap = new LinkedHashMap<String, FileDetail>();
s3Files = new HashSet<String>();
this.client = client;
this.bucket = client.getBucket();
readFile();
public FileInfo addEntry(Path file, String hash, boolean uploaded) {
return addEntry(new FileInfo(null, file.getFileName().toString(), file.getParent().toString(), hash, new Date(), uploaded));
}
private void readFile() throws IOException {
public FileInfo addEntry(FileInfo fd) {
logger.debug("Saving file: " + fd);
if (fd.getUploadDate() == null) fd.setUploadDate(new Date());
return fileInfoRepo.save(fd);
}
public FileInfo getFileDetail(Path file, String relativePath) throws IOException {
String sha1 = DigestUtils.sha1Hex(Files.newInputStream(file));
S3Object hashFile = client.getObject(bucket, FILE_CSV);
if (hashFile == null) return;
Path file = Paths.get(FILE_CSV);
FileInfo fi = fileInfoRepo.findByFileHash(sha1);
client.downloadFile(FILE_CSV, file);
s3Files = client.getFileList();
if (Files.notExists(file)) return;
try(BufferedReader reader = Files.newBufferedReader(file)) {
String line = null;
while((line = reader.readLine()) != null) {
String[] values = line.split(",");
if (values.length != 3) {
logger.warn("Invalid entry detected: " + line);
continue;
}
FileDetail fd = new FileDetail(values[0], values[1], Boolean.valueOf(values[2]));
detailMap.put(values[0], fd);
}
if (fi == null) {
fi = new FileInfo();
fi.setFileHash(sha1);
fi.setFilePath(relativePath);
fi.setFileName(file.getFileName().toString());
fi.setUploaded(false);
}
}
private void saveFile() throws IOException {
try(BufferedWriter writer = Files.newBufferedWriter(Paths.get(FILE_CSV))) {
writer.write("FILE_NAME,HASH,STATUS\n");
for(FileDetail v : detailMap.values()) {
writer.write(String.format("%s,%s,%s%n", v.getFile(), v.getHash(), v.isUploaded()));
};
}
}
public boolean addEntry(String file, String hash, boolean uploaded) {
return addEntry(new FileDetail(file, hash, uploaded));
}
public boolean addEntry(FileDetail fd) {
return detailMap.put(fd.getFile(), fd) != null;
}
public FileDetail getFileDetail(Path path, String key) throws IOException {
boolean newFile = false;
FileDetail fd = null;
if (detailMap.containsKey(key)) {
fd = detailMap.get(key);
} else {
newFile = true;
fd = new FileDetail(key, DigestUtils.sha1Hex(Files.newInputStream(path)), false);
}
String sha1 = newFile ? fd.getHash() : DigestUtils.sha1Hex(Files.newInputStream(path));
if (sha1.equals(fd.getHash()) && s3Files.contains(key)) {
fd.setUploaded(true);
s3Files.remove(key);
} else {
fd.setUploaded(false);
}
return fd;
}
public void removeOrphanedFiles() {
client.removeFiles(s3Files);
}
@Override
public void close() throws Exception {
saveFile();
client.uploadFile(Paths.get(FILE_CSV));
Files.deleteIfExists(Paths.get(FILE_CSV));
}
public static FileManager newInstance(S3Client client) throws IOException {
instance = new FileManager(client);
return instance;
}
public static FileManager getInstance() throws IOException {
if (instance == null || instance.client == null || StringUtils.isBlank(instance.bucket)) {
throw new IOException("a call to newInstance() was not made. Please call newInstance first");
}
return instance;
return fi;
}
}

View File

@ -1,10 +1,15 @@
package net.locusworks.s3sync.client;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Set;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.regions.Regions;
@ -17,31 +22,33 @@ import com.amazonaws.services.s3.transfer.Download;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import com.amazonaws.services.s3.transfer.Upload;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import net.locusworks.logger.ApplicationLogger;
import net.locusworks.logger.ApplicationLoggerFactory;
import net.locusworks.s3sync.conf.ConfigurationManager;
import net.locusworks.s3sync.conf.TransferType;
import net.locusworks.s3sync.database.entities.FileInfo;
import net.locusworks.s3sync.scp.AbstractSshMessage;
import net.locusworks.s3sync.scp.LocalFileMessage;
import net.locusworks.s3sync.scp.ScpFromMessage;
public class S3Client implements AutoCloseable {
@Component
public class S3Client {
private ApplicationLogger logger = ApplicationLoggerFactory.getLogger(S3Client.class);
private AmazonS3 s3Client;
private String bucket;
private String remoteFolder;
private String host;
private String identity;
private String user;
private Integer port;
private Path localFolder;
public S3Client(ConfigurationManager conf) {
@Autowired
private ConfigurationManager conf;
@Autowired
private FileManager fileManager;
public S3Client() {}
@PostConstruct
private void init() {
String region = conf.getRegion();
this.s3Client = AmazonS3ClientBuilder.standard().withRegion(Regions.fromName(region)).build();
Bucket bucket = s3Client.listBuckets().stream()
@ -52,50 +59,40 @@ public class S3Client implements AutoCloseable {
System.exit(-1);
}
logger.info("Found Bucket: %s", bucket);
this.bucket = conf.getBucketName();
this.remoteFolder = conf.getRemoteFolder();
this.localFolder = conf.getLocalFolder();
this.host = conf.getHost();
this.identity = conf.getIdentity();
this.user = conf.getUser();
this.port = conf.getPort();
}
public String getBucket() {
return this.bucket;
return this.conf.getBucketName();
}
public void uploadFile(Path file) {
public void uploadFile(Path file, Path localFolder) {
TransferManager xferMgr = TransferManagerBuilder.standard().withS3Client(s3Client).build();
uploadFile(xferMgr, file);
uploadFile(xferMgr, file, localFolder);
xferMgr.shutdownNow(false);
}
public void uploadFile(TransferManager xferMgr, Path file) {
public void uploadFile(TransferManager xferMgr, Path file, Path localFolder) {
boolean xferMgrNull = xferMgr == null;
xferMgr = !xferMgrNull ? xferMgr : TransferManagerBuilder.standard().withS3Client(s3Client).build();
FileDetail fd = null;
FileInfo fd = null;
try {
String key = getPath(file);
fd = FileManager.getInstance().getFileDetail(file, key);
if (fd.isUploaded()) return;
String key = getPath(file, localFolder);
fd = fileManager.getFileDetail(file, key);
if (fd.getUploaded()) return;
logger.info("Uploading file: %s", file);
Upload xfer = xferMgr.upload(bucket, key, file.toFile());
Upload xfer = xferMgr.upload(getBucket(), key, file.toFile());
xfer.waitForCompletion();
fd.setUploaded(true);
FileManager.getInstance().addEntry(fd);
logger.info("Done uploading %s", file);
} catch (AmazonClientException | InterruptedException | IOException e) {
if (fd != null) {
fd.setUploaded(false);
try {
FileManager.getInstance().addEntry(fd);
} catch (IOException e1) {
logger.error("Unable to save file to file manager: " + e1.getMessage());
}
}
logger.error(e.getMessage());
} finally {
if (fd != null) {
fileManager.addEntry(fd);
}
if (xferMgrNull) {
xferMgr.shutdownNow(false);
}
@ -113,7 +110,7 @@ public class S3Client implements AutoCloseable {
xferMgr = !xferMgrNull ? xferMgr : TransferManagerBuilder.standard().withS3Client(s3Client).build();
try {
logger.info("Downloading file: %s", file);
Download xfer = xferMgr.download(bucket, key, file.toFile());
Download xfer = xferMgr.download(getBucket(), key, file.toFile());
xfer.waitForCompletion();
logger.info("Done downloading %s", file);
} catch (AmazonClientException | InterruptedException e) {
@ -127,23 +124,26 @@ public class S3Client implements AutoCloseable {
public void syncFolder() throws IOException, JSchException {
TransferManager xferMgr = TransferManagerBuilder.standard().withS3Client(s3Client).build();
Path localFolder = conf.getLocalFolder();
if (Files.notExists(localFolder)) {
logger.info("Creating local folder: " + localFolder);
Files.createDirectory(localFolder);
}
AbstractSshMessage aMsg = conf.getTransferType() == TransferType.REMOTE ?
new ScpFromMessage(true, ScpFromMessage.newSession(conf.getIdentity(), conf.getUser(), conf.getHost(), conf.getPort()), conf.getRemoteFolder(), localFolder, true) :
new LocalFileMessage(localFolder);
JSch.setConfig("StrictHostKeyChecking", "no");
JSch jsch = new JSch();
jsch.addIdentity(identity);
Session session = jsch.getSession(user, host, port);
session.connect();
try(ScpFromMessage msg = new ScpFromMessage(true, session, remoteFolder, localFolder, true);
FileManager manager = FileManager.getInstance()) {
try(AbstractSshMessage msg = aMsg) {
msg.fileCallback(p -> {
logger.info(p+"");
if (Files.isRegularFile(p)) {
uploadFile(xferMgr, p);
uploadFile(xferMgr, p, localFolder);
try {
if (conf.deleteLocal())
Files.delete(p);
} catch (Exception ex) {
logger.warn("Unable to delete local file %s: %s", p, ex.getMessage());
}
}
});
msg.execute();
@ -152,6 +152,11 @@ public class S3Client implements AutoCloseable {
}
xferMgr.shutdownNow(false);
if (conf.deleteLocal())
Files.walk(localFolder)
.sorted(Comparator.reverseOrder())
.map(Path::toFile)
.forEach(File::delete);
}
public S3Object getObject(String bucketName, String key) {
@ -180,13 +185,12 @@ public class S3Client implements AutoCloseable {
}
}
private String getPath(Path file) {
private String getPath(Path file, Path localFolder) {
if (file.getParent() == null) return file.getFileName().toString();
Path relative = localFolder.relativize(file);
return relative.toString().replace("\\", "/");
}
@Override
public void close() throws Exception {
if (s3Client != null) {
s3Client.shutdown();

View File

@ -5,19 +5,17 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Properties;
import javax.annotation.PostConstruct;
import org.springframework.stereotype.Component;
import net.locusworks.logger.LogLevel;
@Component
public class ConfigurationManager {
private static final String CONF_FILE= "s3.properties";
private Properties conf;
private static ConfigurationManager confManager;
private ConfigurationManager() {
init();
}
@PostConstruct
private void init() {
try {
Properties defaultProps = PropertiesManager.loadConfiguration(ConfigurationManager.class, CONF_FILE);
@ -38,6 +36,14 @@ public class ConfigurationManager {
}
}
public TransferType getTransferType() {
return TransferType.getEnum(conf.getProperty("transferType"));
}
public boolean deleteLocal() {
return Boolean.getBoolean(conf.getProperty("deleteLocalFiles"));
}
public String getRegion() {
return conf.getProperty("region");
}
@ -74,11 +80,4 @@ public class ConfigurationManager {
return LogLevel.getEnum(conf.getProperty("logLevel"));
}
public static ConfigurationManager getInstance() {
if (confManager == null) {
confManager = new ConfigurationManager();
}
return confManager;
}
}

View File

@ -0,0 +1,15 @@
package net.locusworks.s3sync.conf;
public enum TransferType {
REMOTE,
LOCAL;
public static TransferType getEnum(String value) {
for (TransferType s : values()) {
if (s.name().toLowerCase().equals(value.toLowerCase().trim())) return s;
}
throw new RuntimeException("No transfer type of " + value);
}
}

View File

@ -0,0 +1,195 @@
package net.locusworks.s3sync.database;
import java.util.HashSet;
import java.util.Properties;
import javax.sql.DataSource;
import org.flywaydb.core.Flyway;
import org.flywaydb.core.api.configuration.FluentConfiguration;
import org.flywaydb.core.api.logging.Log;
import org.flywaydb.core.api.logging.LogCreator;
import org.flywaydb.core.api.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.annotation.Primary;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.hibernate5.LocalSessionFactoryBean;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.JpaVendorAdapter;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.orm.jpa.vendor.Database;
import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import net.locusworks.logger.ApplicationLogger;
import net.locusworks.logger.ApplicationLoggerFactory;
import net.locusworks.logger.ApplicationLoggerInitializer;
import net.locusworks.logger.LogLevel;
import net.locusworks.s3sync.conf.ConfigurationManager;
@Configuration(value="net.locusworks.s3sync.database.S3SyncBeanConfiguration")
@EnableTransactionManagement
@EnableJpaRepositories(
basePackages = {"net.locusworks.s3sync.database.repos"},
entityManagerFactoryRef = "s3syncEntityManagerFactory",
transactionManagerRef = "s3syncTransactionManager"
)
public class S3SyncBeanConfiguration {
public static final String PERSISTENCE_UNIT = "s3sync_pu";
public static final HashSet<String> cacheNames = new HashSet<>();
private static final String[] PACKAGES_TO_SCAN = {"net.locusworks.s3sync.database.entities", "net.locusworks.s3sync.database.repos"};
private static final Properties hibernateProperties;
static {
hibernateProperties = new Properties();
hibernateProperties.setProperty("hibernate.connection.zeroDateTimeBehavior", "convertToNull");
hibernateProperties.setProperty("hibernate.dbcp.maxActive", "50");
hibernateProperties.setProperty("hibernate.dbcp.maxIdle", "10");
hibernateProperties.setProperty("hibernate.dbcp.maxWait", "5000");
hibernateProperties.setProperty("hibernate.jdbc.batch_size property", "50");
hibernateProperties.setProperty("hibernate.connection.charSet", "UTF-8");
hibernateProperties.setProperty("hibernate.connection.characterEncoding", "UTF-8");
hibernateProperties.setProperty("hibernate.connection.useUnicode", "true");
}
@Autowired
private DataSource dataSource;
/**
* Create the entity manager factory bean used in the database connection
* @return entityManagerFactoryBean
*/
@Primary
@Bean
@DependsOn("flyway")
public LocalContainerEntityManagerFactoryBean s3syncEntityManagerFactory() {
LocalContainerEntityManagerFactoryBean lef = new LocalContainerEntityManagerFactoryBean();
lef.setDataSource(dataSource);
lef.setJpaVendorAdapter(s3syncJpaVendorAdapter());
lef.setPackagesToScan(PACKAGES_TO_SCAN);
lef.setJpaProperties(hibernateProperties);
lef.setPersistenceUnitName(PERSISTENCE_UNIT);
return lef;
}
/**
* Create the session factory bean
* @return sessionFactoryBean
*/
@Bean
public LocalSessionFactoryBean s3syncSessionFactory() {
LocalSessionFactoryBean sessionBean = new LocalSessionFactoryBean();
sessionBean.setDataSource(dataSource);
sessionBean.setPackagesToScan(PACKAGES_TO_SCAN);
sessionBean.setHibernateProperties(hibernateProperties);
return sessionBean;
}
/**
* Create the JPA Vendor adaptor bean that is used in the entity manager factory
* @return jpaVendorAdaptor
*/
@Primary
@Bean
public JpaVendorAdapter s3syncJpaVendorAdapter() {
HibernateJpaVendorAdapter hibernateJpaVendorAdapter = new HibernateJpaVendorAdapter();
hibernateJpaVendorAdapter.setShowSql(false);
hibernateJpaVendorAdapter.setGenerateDdl(false);
hibernateJpaVendorAdapter.setDatabase(Database.H2);
return hibernateJpaVendorAdapter;
}
/**
* Create the transaction manager bean
* @return transactionManager
*/
@Primary
@Bean
public PlatformTransactionManager s3syncTransactionManager() {
JpaTransactionManager manager = new JpaTransactionManager();
manager.setEntityManagerFactory(s3syncEntityManagerFactory().getObject());
return new JpaTransactionManager();
}
@Bean(name="flyway", initMethod="migrate")
@DependsOn({"initializer"})
public Flyway flyway() {
LogFactory.setLogCreator(new FlywayLogCreator());
FluentConfiguration fc = Flyway.configure();
fc.schemas("s3sync");
fc.table("_flyway_migration");
fc.locations("database/migration");
fc.outOfOrder(false);
fc.dataSource(dataSource);
return fc.load();
}
@Bean(name="initializer")
public Boolean initializer(ConfigurationManager confManager) {
ApplicationLoggerFactory.init(new ApplicationLoggerInitializer() {
@Override
public LogLevel initialize() {
return confManager.getLogLevel();
}
});
return true;
}
private class FlywayLogCreator implements LogCreator {
@Override
public Log createLogger(Class<?> clazz) {
return new FlywayLog(clazz);
}
}
private class FlywayLog implements Log {
private ApplicationLogger logger;
public FlywayLog(Class<?> clazz) {
logger = ApplicationLoggerFactory.getLogger(clazz);
}
@Override
public boolean isDebugEnabled() {
return logger.isDebugEnabled();
}
@Override
public void debug(String message) {
logger.debug(message);
}
@Override
public void info(String message) {
logger.info(message);
}
@Override
public void warn(String message) {
logger.warn(message);
}
@Override
public void error(String message) {
logger.error(message);
}
@Override
public void error(String message, Exception e) {
logger.error(message, e);
}
}
}

View File

@ -0,0 +1,46 @@
package net.locusworks.s3sync.database;
import javax.sql.DataSource;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;
import net.locusworks.logger.ApplicationLogger;
import net.locusworks.logger.ApplicationLoggerFactory;
@Primary
@Component
public class S3SyncDataSource {
private static final String DRIVER = "org.h2.Driver";
private static final String JNDI_STRING = "jdbc:h2:file:./s3sync;MODE=MYSQL;DATABASE_TO_UPPER=false";
/**
* Create the data source bean
* @return the data source bean
* @throws Exception
*/
@Bean
public DataSource dataSource() throws Exception {
return getDataSource();
}
private DataSource getDataSource() throws Exception {
ApplicationLogger logger = ApplicationLoggerFactory.getLogger(this.getClass());
String url = JNDI_STRING;
logger.debug(String.format("Database connection string: %s", url));
return DataSourceBuilder
.create()
.username("sa")
.password("")
.url(url)
.driverClassName(DRIVER)
.build();
}
}

View File

@ -0,0 +1,183 @@
package net.locusworks.s3sync.database.entities;
import java.io.Serializable;
import java.util.Date;
import javax.persistence.Basic;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.NamedQueries;
import javax.persistence.NamedQuery;
import javax.persistence.Table;
import javax.persistence.Temporal;
import javax.persistence.TemporalType;
@Entity
@Table(name ="file_info", catalog="S3SYNC", schema="s3sync")
@NamedQueries({
@NamedQuery(name = "FileInfo.findAll", query="SELECT f FROM FileInfo f")
})
public class FileInfo implements Serializable {
private static final long serialVersionUID = 1L;
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Basic(optional = false)
@Column(name = "ID")
private Long id;
@Basic(optional = false)
@Column(name = "FILE_NAME")
private String fileName;
@Basic(optional = false)
@Column(name = "FILE_HASH")
private String fileHash;
@Basic(optional = false)
@Column(name = "FILE_PATH")
private String filePath;
@Basic(optional = true)
@Column(name = "UPLOAD_DATE")
@Temporal(TemporalType.TIMESTAMP)
private Date uploadDate;
@Basic(optional = false)
@Column(name = "UPLOADED")
private Boolean uploaded;
public FileInfo() {
}
public FileInfo(Long id) {
this.id = id;
}
/**
* @param id
* @param fileName
* @param fileHash
* @param fileType
* @param uploadDate
* @param uploaded
*/
public FileInfo(Long id, String fileName, String fileHash, String filePath, Date uploadDate, Boolean uploaded) {
this.id = id;
this.fileName = fileName;
this.fileHash = fileHash;
this.filePath = filePath;
this.uploadDate = uploadDate;
this.uploaded = uploaded;
}
/**
* @return the id
*/
public synchronized final Long getId() {
return id;
}
/**
* @param id the id to set
*/
public synchronized final void setId(Long id) {
this.id = id;
}
/**
* @return the fileName
*/
public synchronized final String getFileName() {
return fileName;
}
/**
* @param fileName the fileName to set
*/
public synchronized final void setFileName(String fileName) {
this.fileName = fileName;
}
/**
* @return the fileHash
*/
public synchronized final String getFileHash() {
return fileHash;
}
/**
* @param fileHash the fileHash to set
*/
public synchronized final void setFileHash(String fileHash) {
this.fileHash = fileHash;
}
/**
* @return the fileType
*/
public synchronized final String getFilePath() {
return filePath;
}
/**
* @param fileType the fileType to set
*/
public synchronized final void setFilePath(String filePath) {
this.filePath = filePath;
}
/**
* @return the uploadDate
*/
public synchronized final Date getUploadDate() {
return uploadDate;
}
/**
* @param uploadDate the uploadDate to set
*/
public synchronized final void setUploadDate(Date uploadDate) {
this.uploadDate = uploadDate;
}
/**
* @return the uploaded
*/
public synchronized final Boolean getUploaded() {
return uploaded;
}
/**
* @param uploaded the uploaded to set
*/
public synchronized final void setUploaded(Boolean uploaded) {
this.uploaded = uploaded;
}
@Override
public int hashCode() {
int hash = 0;
hash += (id != null ? id.hashCode() : 0);
return hash;
}
@Override
public boolean equals(Object object) {
// TODO: Warning - this method won't work in the case the id fields are not set
if (!(object instanceof FileInfo)) {
return false;
}
FileInfo other = (FileInfo) object;
if ((this.id == null && other.id != null) || (this.id != null && !this.id.equals(other.id))) {
return false;
}
return true;
}
@Override
public String toString() {
return "net.locusworks.battletech.s3sync.entities.FileInfo[ id=" + id + " ]";
}
}

View File

@ -0,0 +1,26 @@
package net.locusworks.s3sync.database.repos;
import javax.transaction.Transactional;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.CrudRepository;
import net.locusworks.s3sync.database.entities.FileInfo;
/**
*
* @author isaac
*/
public interface FileInfoRepository extends CrudRepository<FileInfo, Long> {
FileInfo findByFileHash(String fileHash);
boolean existsByFileHash(String fileHash);
@Modifying
@Transactional
@Query("DELETE FROM FileInfo fh WHERE fh.fileHash = ?1")
void deleteByFileHash(String fileHash);
}

View File

@ -3,9 +3,9 @@ package net.locusworks.s3sync.scp;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Path;
import java.text.NumberFormat;
import java.util.function.Consumer;
import com.jcraft.jsch.Channel;
import com.jcraft.jsch.ChannelExec;
import com.jcraft.jsch.ChannelSftp;
@ -13,16 +13,19 @@ import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import com.jcraft.jsch.SftpProgressMonitor;
import net.locusworks.logger.ApplicationLogger;
import net.locusworks.logger.LogLevel;
public abstract class AbstractSshMessage {
public abstract class AbstractSshMessage implements AutoCloseable {
private ApplicationLogger logger;
private static final double ONE_SECOND = 1000.0;
protected Session session;
private Session session;
private final boolean verbose;
private final boolean compressed;
protected Consumer<Path> fileCallback;
/**
* Constructor for AbstractSshMessage
@ -138,8 +141,29 @@ public abstract class AbstractSshMessage {
* Log a message to the log listener.
* @param message the message to log
*/
protected void log(final String message) {
logger.info(message);
protected void log(final String message, LogLevel logLevel) {
switch (logLevel) {
case DEBUG:
logger.debug(message);
break;
case ERROR:
case FATAL:
logger.error(message);
break;
case INFO:
logger.info(message);
break;
case WARN:
logger.warn(message);
break;
case TRACE:
logger.trace(message);
break;
case OFF:
default:
logger.trace(message);
break;
}
}
/**
@ -153,7 +177,7 @@ public abstract class AbstractSshMessage {
final NumberFormat format = NumberFormat.getNumberInstance();
format.setMaximumFractionDigits(2);
format.setMinimumFractionDigits(1);
log("File transfer time: " + format.format(duration) + " Average Rate: " + format.format(totalLength / duration) + " B/s");
log("File transfer time: " + format.format(duration) + " Average Rate: " + format.format(totalLength / duration) + " B/s", LogLevel.DEBUG);
}
/**
@ -177,6 +201,27 @@ public abstract class AbstractSshMessage {
protected final Session getSession() {
return session;
}
protected void connect() throws JSchException {
if (session != null && !session.isConnected()) session.connect();
}
protected void disconnect() {
if (session != null && session.isConnected()) session.disconnect();
}
public Consumer<Path> fileCallback(Consumer<Path> callback) {
this.fileCallback = callback;
return this.fileCallback;
}
@Override
public void close() throws Exception {
if (session != null && session.isConnected()) {
session.disconnect();
session = null;
}
}
/**
* Track progress every 10% if 100kb &lt; filesize &lt; 1Mb. For larger

View File

@ -0,0 +1,29 @@
package net.locusworks.s3sync.scp;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import com.jcraft.jsch.JSchException;
import net.locusworks.logger.ApplicationLoggerFactory;
import net.locusworks.logger.LogLevel;
public class LocalFileMessage extends AbstractSshMessage {
private Path syncFolder;
public LocalFileMessage(Path localFolder) {
super(true, false, null, ApplicationLoggerFactory.getLogger(LocalFileMessage.class));
this.syncFolder = localFolder;
}
@Override
public void execute() throws IOException, JSchException {
Files.walk(syncFolder)
.filter(f -> Files.isRegularFile(f))
.forEach(f -> {
log("Found File: " + f, LogLevel.DEBUG);
if (fileCallback != null) fileCallback.accept(syncFolder.resolve(f));
});
}
}

View File

@ -25,9 +25,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileTime;
import java.util.function.Consumer;
import com.jcraft.jsch.Channel;
import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.JSch;
@ -36,9 +34,10 @@ import com.jcraft.jsch.Session;
import com.jcraft.jsch.SftpATTRS;
import com.jcraft.jsch.SftpException;
import net.locusworks.logger.ApplicationLoggerFactory;
import net.locusworks.logger.LogLevel;
import net.locusworks.logger.ApplicationLogger;
public class ScpFromMessage extends AbstractSshMessage implements AutoCloseable {
public class ScpFromMessage extends AbstractSshMessage {
private static ApplicationLogger logger = ApplicationLoggerFactory.getLogger(ScpFromMessage.class);
@ -51,8 +50,6 @@ public class ScpFromMessage extends AbstractSshMessage implements AutoCloseable
private boolean isRecursive = false;
private boolean preserveLastModified = false;
private Consumer<Path> fileCallback;
/**
* Constructor for ScpFromMessage
* @param session the ssh session to use
@ -133,11 +130,6 @@ public class ScpFromMessage extends AbstractSshMessage implements AutoCloseable
this.preserveLastModified = preserveLastModified;
}
public Consumer<Path> fileCallback(Consumer<Path> callback) {
this.fileCallback = callback;
return this.fileCallback;
}
/**
* Carry out the transfer.
* @throws IOException on i/o errors
@ -145,6 +137,8 @@ public class ScpFromMessage extends AbstractSshMessage implements AutoCloseable
*/
@Override
public void execute() throws IOException, JSchException {
connect();
String command = "scp -f ";
if (isRecursive) {
command += "-r ";
@ -153,9 +147,8 @@ public class ScpFromMessage extends AbstractSshMessage implements AutoCloseable
command += "-C ";
}
command += remoteFile;
log(command);
log("Executing command: " + command, LogLevel.DEBUG);
final Channel channel = openExecChannel(command);
log(localFile + "");
try {
// get I/O streams for remote scp
final OutputStream out = channel.getOutputStream();
@ -170,7 +163,7 @@ public class ScpFromMessage extends AbstractSshMessage implements AutoCloseable
channel.disconnect();
}
}
log("done\n");
log("Done\n", LogLevel.INFO);
}
protected boolean getPreserveLastModified() {
@ -218,7 +211,7 @@ public class ScpFromMessage extends AbstractSshMessage implements AutoCloseable
if (Files.isDirectory(localFile)) {
final Path dir = localFile.resolve(directoryName);
Files.createDirectories(dir);
log("Creating: " + dir);
log("Creating: " + dir, LogLevel.DEBUG);
return dir;
}
return null;
@ -231,8 +224,8 @@ public class ScpFromMessage extends AbstractSshMessage implements AutoCloseable
end = serverResponse.indexOf(' ', start + 1);
final long filesize = Long.parseLong(serverResponse.substring(start, end));
final String filename = serverResponse.substring(end + 1);
log("Receiving: " + filename + " : " + filesize);
final Path transferFile = Files.isDirectory(localFile) ? localFile.resolve(filename) : localFile;
log("Receiving: " + transferFile + " : " + filesize, LogLevel.INFO);
fetchFile(transferFile, filesize, out, in);
waitForAck(in);
sendAck(out);
@ -316,26 +309,12 @@ public class ScpFromMessage extends AbstractSshMessage implements AutoCloseable
}
return index < 0 ? "" : remoteFile.substring(0, index + 1);
}
@Override
public void close() throws Exception {
if (session != null && session.isConnected()) {
session.disconnect();
session = null;
}
}
public static void main(String[] args) throws Exception {
public static Session newSession(String identity, String user, String host, Integer port) throws JSchException {
JSch.setConfig("StrictHostKeyChecking", "no");
JSch jsch = new JSch();
jsch.addIdentity(String.format("%s/.ssh/id_rsa",System.getProperty("user.home")));
Session session = jsch.getSession("iparenteau", "s3sync.locusworks.net", 22);
session.connect();
try(ScpFromMessage msg = new ScpFromMessage(true, session, "logs", Paths.get("downloads"), true)) {
msg.fileCallback(p -> {
System.out.println(p);
});
msg.execute();
}
jsch.addIdentity(identity);
Session session = jsch.getSession(user, host, port);
return session;
}
}

View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<persistence version="2.1" xmlns="http://xmlns.jcp.org/xml/ns/persistence" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/persistence http://xmlns.jcp.org/xml/ns/persistence/persistence_2_1.xsd">
<persistence-unit name="s3sync_pu" transaction-type="RESOURCE_LOCAL">
<provider>org.hibernate.ejb.HibernatePersistence</provider>
<class>net.locusworks.s3sync.database.entities.FileInfo</class>
<properties/>
</persistence-unit>
</persistence>

View File

@ -0,0 +1,10 @@
CREATE TABLE s3sync.file_info (
ID IDENTITY NOT NULL AUTO_INCREMENT,
FILE_NAME VARCHAR(1000) NOT NULL,
FILE_PATH VARCHAR(1000) NOT NULL,
FILE_HASH VARCHAR(40) NOT NULL,
UPLOAD_DATE TIMESTAMP,
UPLOADED BOOLEAN NOT NULL,
CONSTRAINT FILE_INFO_PK PRIMARY KEY (ID),
CONSTRAINT FILE_INFO_UN UNIQUE (FILE_HASH)
);

View File

@ -0,0 +1 @@
CREATE SCHEMA IF NOT EXISTS s3sync AUTHORIZATION sa;

View File

@ -1,9 +1,11 @@
region=us-east-2
bucketName=locus2k-backup
transferType=remote
remoteFolder=logs
logLevel=INFO
identity=${user.home}/.ssh/id_rsa
host=s3sync.locusworks.net
user=iparenteau
port=22
localFolder=Downloads
localFolder=Downloads
deleteLocalFiles=true