diff --git a/pom.xml b/pom.xml index 71c374c..6455b4d 100644 --- a/pom.xml +++ b/pom.xml @@ -120,6 +120,13 @@ 1.0.2-RELEASE + + + com.jcraft + jsch + 0.1.55 + + diff --git a/src/main/java/net/locusworks/s3sync/client/S3Client.java b/src/main/java/net/locusworks/s3sync/client/S3Client.java index e2c01f1..ef7f91a 100644 --- a/src/main/java/net/locusworks/s3sync/client/S3Client.java +++ b/src/main/java/net/locusworks/s3sync/client/S3Client.java @@ -17,17 +17,29 @@ import com.amazonaws.services.s3.transfer.Download; import com.amazonaws.services.s3.transfer.TransferManager; import com.amazonaws.services.s3.transfer.TransferManagerBuilder; import com.amazonaws.services.s3.transfer.Upload; +import com.jcraft.jsch.JSch; +import com.jcraft.jsch.JSchException; +import com.jcraft.jsch.Session; import net.locusworks.logger.ApplicationLogger; import net.locusworks.logger.ApplicationLoggerFactory; import net.locusworks.s3sync.conf.ConfigurationManager; +import net.locusworks.s3sync.scp.ScpFromMessage; public class S3Client implements AutoCloseable { private ApplicationLogger logger = ApplicationLoggerFactory.getLogger(S3Client.class); private AmazonS3 s3Client; + private String bucket; - private Path syncFolder; + private String remoteFolder; + private String host; + private String identity; + private String user; + + private Integer port; + + private Path localFolder; public S3Client(ConfigurationManager conf) { String region = conf.getRegion(); @@ -41,7 +53,12 @@ public class S3Client implements AutoCloseable { } logger.info("Found Bucket: %s", bucket); this.bucket = conf.getBucketName(); - this.syncFolder = conf.getSyncFolder(); + this.remoteFolder = conf.getRemoteFolder(); + this.localFolder = conf.getLocalFolder(); + this.host = conf.getHost(); + this.identity = conf.getIdentity(); + this.user = conf.getUser(); + this.port = conf.getPort(); } public String getBucket() { @@ -108,16 +125,32 @@ public class S3Client implements AutoCloseable { } } - public void syncFolder() throws IOException { + public void syncFolder() throws IOException, JSchException { TransferManager xferMgr = TransferManagerBuilder.standard().withS3Client(s3Client).build(); - try (FileManager manager = FileManager.getInstance()) { - Files.walk(syncFolder) - .filter(f -> Files.isRegularFile(f)) - .forEach(f -> uploadFile(xferMgr, syncFolder.resolve(f))); + + if (Files.notExists(localFolder)) { + logger.info("Creating local folder: " + localFolder); + Files.createDirectory(localFolder); + } + + JSch.setConfig("StrictHostKeyChecking", "no"); + JSch jsch = new JSch(); + jsch.addIdentity(identity); + Session session = jsch.getSession(user, host, port); + session.connect(); + try(ScpFromMessage msg = new ScpFromMessage(true, session, remoteFolder, localFolder, true); + FileManager manager = FileManager.getInstance()) { + msg.fileCallback(p -> { + logger.info(p+""); + if (Files.isRegularFile(p)) { + uploadFile(xferMgr, p); + } + }); + msg.execute(); } catch (Exception e) { logger.error("Unable to load file Manager: " + e.getMessage()); } - + xferMgr.shutdownNow(false); } @@ -149,7 +182,7 @@ public class S3Client implements AutoCloseable { private String getPath(Path file) { if (file.getParent() == null) return file.getFileName().toString(); - Path relative = syncFolder.relativize(file); + Path relative = localFolder.relativize(file); return relative.toString().replace("\\", "/"); } diff --git a/src/main/java/net/locusworks/s3sync/conf/ConfigurationManager.java b/src/main/java/net/locusworks/s3sync/conf/ConfigurationManager.java index f3b6f35..44646f8 100644 --- a/src/main/java/net/locusworks/s3sync/conf/ConfigurationManager.java +++ b/src/main/java/net/locusworks/s3sync/conf/ConfigurationManager.java @@ -46,8 +46,28 @@ public class ConfigurationManager { return conf.getProperty("bucketName"); } - public Path getSyncFolder() { - return Paths.get(conf.getProperty("syncFolder")); + public String getRemoteFolder() { + return conf.getProperty("remoteFolder"); + } + + public Path getLocalFolder() { + return Paths.get(conf.getProperty("localFolder")); + } + + public String getUser() { + return conf.getProperty("user"); + } + + public String getIdentity() { + return conf.getProperty("identity"); + } + + public String getHost() { + return conf.getProperty("host"); + } + + public Integer getPort() { + return Integer.valueOf(conf.getProperty("port")); } public LogLevel getLogLevel() { diff --git a/src/main/java/net/locusworks/s3sync/scp/AbstractSshMessage.java b/src/main/java/net/locusworks/s3sync/scp/AbstractSshMessage.java new file mode 100644 index 0000000..b0685d4 --- /dev/null +++ b/src/main/java/net/locusworks/s3sync/scp/AbstractSshMessage.java @@ -0,0 +1,259 @@ +package net.locusworks.s3sync.scp; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.text.NumberFormat; + + +import com.jcraft.jsch.Channel; +import com.jcraft.jsch.ChannelExec; +import com.jcraft.jsch.ChannelSftp; +import com.jcraft.jsch.JSchException; +import com.jcraft.jsch.Session; +import com.jcraft.jsch.SftpProgressMonitor; +import net.locusworks.logger.ApplicationLogger; + +public abstract class AbstractSshMessage { + + private ApplicationLogger logger; + + private static final double ONE_SECOND = 1000.0; + + protected Session session; + private final boolean verbose; + private final boolean compressed; + + /** + * Constructor for AbstractSshMessage + * @param session the ssh session to use + */ + public AbstractSshMessage(final Session session, ApplicationLogger logger) { + this(false, session, logger); + } + + /** + * Constructor for AbstractSshMessage + * @param verbose if true do verbose logging + * @param session the ssh session to use + * @since Ant 1.6.2 + */ + public AbstractSshMessage(final boolean verbose, final Session session, ApplicationLogger logger) { + this(verbose, false, session, logger); + } + + /** + * Constructor for AbstractSshMessage + * @param verbose if true do verbose logging + * @param compressed if true use compression + * @param session the ssh session to use + * @since Ant 1.9.8 + */ + public AbstractSshMessage(boolean verbose, boolean compressed, Session session, ApplicationLogger logger) { + this.verbose = verbose; + this.compressed = compressed; + this.session = session; + this.logger = logger; + } + + /** + * Open an ssh channel. + * @param command the command to use + * @return the channel + * @throws JSchException on error + */ + protected Channel openExecChannel(final String command) throws JSchException { + final ChannelExec channel = (ChannelExec) session.openChannel("exec"); + channel.setCommand(command); + + return channel; + } + + /** + * Open an ssh sftp channel. + * @return the channel + * @throws JSchException on error + */ + protected ChannelSftp openSftpChannel() throws JSchException { + return (ChannelSftp) session.openChannel("sftp"); + } + + /** + * Send an ack. + * @param out the output stream to use + * @throws IOException on error + */ + protected void sendAck(final OutputStream out) throws IOException { + final byte[] buf = new byte[1]; + buf[0] = 0; + out.write(buf); + out.flush(); + } + + /** + * Reads the response, throws a BuildException if the response + * indicates an error. + * @param in the input stream to use + * @throws IOException on I/O error + * @throws BuildException on other errors + */ + protected void waitForAck(final InputStream in) throws IOException, JSchException { + final int b = in.read(); + + // b may be 0 for success, + // 1 for error, + // 2 for fatal error, + + if (b == -1) { + // didn't receive any response + throw new JSchException("No response from server"); + } + if (b != 0) { + final StringBuilder sb = new StringBuilder(); + + int c = in.read(); + while (c > 0 && c != '\n') { + sb.append((char) c); + c = in.read(); + } + + if (b == 1) { + throw new JSchException("server indicated an error: " + sb.toString()); + } else if (b == 2) { + throw new JSchException("server indicated a fatal error: " + sb.toString()); + } else { + throw new JSchException("unknown response, code " + b + " message: " + sb.toString()); + } + } + } + + /** + * Carry out the transfer. + * @throws IOException on I/O errors + * @throws JSchException on ssh errors + */ + public abstract void execute() throws IOException, JSchException; + + /** + * Log a message to the log listener. + * @param message the message to log + */ + protected void log(final String message) { + logger.info(message); + } + + /** + * Log transfer stats to the log listener. + * @param timeStarted the time started + * @param timeEnded the finishing time + * @param totalLength the total length + */ + protected void logStats(final long timeStarted, final long timeEnded, final long totalLength) { + final double duration = (timeEnded - timeStarted) / ONE_SECOND; + final NumberFormat format = NumberFormat.getNumberInstance(); + format.setMaximumFractionDigits(2); + format.setMinimumFractionDigits(1); + log("File transfer time: " + format.format(duration) + " Average Rate: " + format.format(totalLength / duration) + " B/s"); + } + + /** + * Is the verbose attribute set. + * @return true if the verbose attribute is set + * @since Ant 1.6.2 + */ + protected final boolean getVerbose() { + return verbose; + } + + /** + * Is the compressed attribute set. + * @return true if the compressed attribute is set + * @since Ant 1.9.8 + */ + protected final boolean getCompressed() { + return compressed; + } + + protected final Session getSession() { + return session; + } + + /** + * Track progress every 10% if 100kb < filesize < 1Mb. For larger + * files track progress for every percent transmitted. + * @param filesize the size of the file been transmitted + * @param totalLength the total transmission size + * @param percentTransmitted the current percent transmitted + * @return the percent that the file is of the total + */ + protected final int trackProgress(final long filesize, final long totalLength, + final int percentTransmitted) { + + // CheckStyle:MagicNumber OFF + final int percent = (int) Math.round(Math.floor((totalLength / (double) filesize) * 100)); + + if (percent > percentTransmitted) { + if (filesize < 1048576) { + if (percent % 10 == 0) { + if (percent == 100) { + System.out.println(" 100%"); + } else { + System.out.print("*"); + } + } + } else { + if (percent == 50) { + System.out.println(" 50%"); + } else if (percent == 100) { + System.out.println(" 100%"); + } else { + System.out.print("."); + } + } + } + // CheckStyle:MagicNumber ON + + return percent; + } + + private ProgressMonitor monitor = null; + + /** + * Get the progress monitor. + * @return the progress monitor. + */ + protected SftpProgressMonitor getProgressMonitor() { + if (monitor == null) { + monitor = new ProgressMonitor(); + } + return monitor; + } + + private class ProgressMonitor implements SftpProgressMonitor { + private long initFileSize = 0; + private long totalLength = 0; + private int percentTransmitted = 0; + + @Override + public void init(final int op, final String src, final String dest, final long max) { + initFileSize = max; + } + + @Override + public boolean count(final long len) { + totalLength += len; + percentTransmitted = trackProgress(initFileSize, totalLength, percentTransmitted); + return true; + } + + @Override + public void end() { + } + + @SuppressWarnings("unused") + public long getTotalLength() { + return totalLength; + } + } + +} diff --git a/src/main/java/net/locusworks/s3sync/scp/ScpFromMessage.java b/src/main/java/net/locusworks/s3sync/scp/ScpFromMessage.java new file mode 100644 index 0000000..333a105 --- /dev/null +++ b/src/main/java/net/locusworks/s3sync/scp/ScpFromMessage.java @@ -0,0 +1,341 @@ +package net.locusworks.s3sync.scp; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import java.io.ByteArrayOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.attribute.FileTime; +import java.util.function.Consumer; +import com.jcraft.jsch.Channel; +import com.jcraft.jsch.ChannelSftp; +import com.jcraft.jsch.JSch; +import com.jcraft.jsch.JSchException; +import com.jcraft.jsch.Session; +import com.jcraft.jsch.SftpATTRS; +import com.jcraft.jsch.SftpException; +import net.locusworks.logger.ApplicationLoggerFactory; +import net.locusworks.logger.ApplicationLogger; + +public class ScpFromMessage extends AbstractSshMessage implements AutoCloseable { + + private static ApplicationLogger logger = ApplicationLoggerFactory.getLogger(ScpFromMessage.class); + + private static final int HUNDRED_KILOBYTES = 102400; + private static final byte LINE_FEED = 0x0a; + private static final int BUFFER_SIZE = 100 * 1024; + + private String remoteFile; + private Path localFile; + private boolean isRecursive = false; + private boolean preserveLastModified = false; + + private Consumer fileCallback; + + /** + * Constructor for ScpFromMessage + * @param session the ssh session to use + */ + public ScpFromMessage(final Session session) { + super(session, logger); + } + + /** + * Constructor for ScpFromMessage + * @param verbose if true do verbose logging + * @param session the ssh session to use + * @since Ant 1.7 + */ + public ScpFromMessage(final boolean verbose, final Session session) { + super(verbose, session, logger); + } + + /** + * Constructor for ScpFromMessage. + * @param verbose if true log extra information + * @param session the Scp session to use + * @param aRemoteFile the remote file name + * @param aLocalFile the local file + * @param recursive if true use recursion (-r option to scp) + * @since Ant 1.6.2 + */ + public ScpFromMessage(final boolean verbose, final Session session, final String aRemoteFile, + final Path aLocalFile, final boolean recursive) { + this(false, session, aRemoteFile, aLocalFile, recursive, false); + } + + /** + * Constructor for ScpFromMessage. + * @param session the Scp session to use + * @param aRemoteFile the remote file name + * @param aLocalFile the local file + * @param recursive if true use recursion (-r option to scp) + */ + public ScpFromMessage(final Session session, final String aRemoteFile, final Path aLocalFile, final boolean recursive) { + this(false, session, aRemoteFile, aLocalFile, recursive); + } + + /** + * Constructor for ScpFromMessage. + * @param verbose if true log extra information + * @param session the Scp session to use + * @param aRemoteFile the remote file name + * @param aLocalFile the local file + * @param recursive if true use recursion (-r option to scp) + * @param preserveLastModified whether to preserve file + * modification times + * @since Ant 1.8.0 + */ + public ScpFromMessage(final boolean verbose, final Session session, final String aRemoteFile, final Path aLocalFile, + final boolean recursive, final boolean preserveLastModified) { + this(verbose, session, aRemoteFile, aLocalFile, recursive, preserveLastModified, false); + } + + /** + * Constructor for ScpFromMessage. + * @param verbose if true log extra information + * @param session the Scp session to use + * @param aRemoteFile the remote file name + * @param aLocalFile the local file + * @param recursive if true use recursion (-r option to scp) + * @param preserveLastModified whether to preserve file + * @param compressed if true use compression (-C option to scp) + * modification times + * @since Ant 1.9.8 + */ + public ScpFromMessage(boolean verbose, Session session, String aRemoteFile, Path aLocalFile, boolean recursive, + boolean preserveLastModified, boolean compressed) { + super(verbose, compressed, session, logger); + this.remoteFile = aRemoteFile; + this.localFile = aLocalFile; + this.isRecursive = recursive; + this.preserveLastModified = preserveLastModified; + } + + public Consumer fileCallback(Consumer callback) { + this.fileCallback = callback; + return this.fileCallback; + } + + /** + * Carry out the transfer. + * @throws IOException on i/o errors + * @throws JSchException on errors detected by scp + */ + @Override + public void execute() throws IOException, JSchException { + String command = "scp -f "; + if (isRecursive) { + command += "-r "; + } + if (getCompressed()) { + command += "-C "; + } + command += remoteFile; + log(command); + final Channel channel = openExecChannel(command); + log(localFile + ""); + try { + // get I/O streams for remote scp + final OutputStream out = channel.getOutputStream(); + final InputStream in = channel.getInputStream(); + + channel.connect(); + + sendAck(out); + startRemoteCpProtocol(in, out, localFile); + } finally { + if (channel != null) { + channel.disconnect(); + } + } + log("done\n"); + } + + protected boolean getPreserveLastModified() { + return preserveLastModified; + } + + private void startRemoteCpProtocol(final InputStream in, final OutputStream out, final Path localFile) throws IOException, JSchException { + Path startFile = localFile; + while (true) { + // C0644 filesize filename - header for a regular file + // T time 0 time 0\n - present if perserve time. + // D directory - this is the header for a directory. + final ByteArrayOutputStream stream = new ByteArrayOutputStream(); + while (true) { + final int read = in.read(); + if (read < 0) { + return; + } + if ((byte) read == LINE_FEED) { + break; + } + stream.write(read); + } + final String serverResponse = stream.toString("UTF-8"); + if (serverResponse.charAt(0) == 'C') { + parseAndFetchFile(serverResponse, startFile, out, in); + } else if (serverResponse.charAt(0) == 'D') { + startFile = parseAndCreateDirectory(serverResponse, startFile); + sendAck(out); + } else if (serverResponse.charAt(0) == 'E') { + startFile = startFile.getParent(); + sendAck(out); + } else if (serverResponse.charAt(0) == '\01' || serverResponse.charAt(0) == '\02') { + // this indicates an error. + throw new IOException(serverResponse.substring(1)); + } + } + } + + private Path parseAndCreateDirectory(final String serverResponse, final Path localFile) throws IOException { + int start = serverResponse.indexOf(' '); + // appears that the next token is not used and it's zero. + start = serverResponse.indexOf(' ', start + 1); + final String directoryName = serverResponse.substring(start + 1); + if (Files.isDirectory(localFile)) { + final Path dir = localFile.resolve(directoryName); + Files.createDirectories(dir); + log("Creating: " + dir); + return dir; + } + return null; + } + + private void parseAndFetchFile(final String serverResponse, final Path localFile, final OutputStream out, final InputStream in) throws IOException, JSchException { + int start = 0; + int end = serverResponse.indexOf(' ', start + 1); + start = end + 1; + end = serverResponse.indexOf(' ', start + 1); + final long filesize = Long.parseLong(serverResponse.substring(start, end)); + final String filename = serverResponse.substring(end + 1); + log("Receiving: " + filename + " : " + filesize); + final Path transferFile = Files.isDirectory(localFile) ? localFile.resolve(filename) : localFile; + fetchFile(transferFile, filesize, out, in); + waitForAck(in); + sendAck(out); + } + + private void fetchFile(final Path localFile, long filesize, final OutputStream out, final InputStream in) throws IOException, JSchException { + final byte[] buf = new byte[BUFFER_SIZE]; + sendAck(out); + + // read a content of lfile + final OutputStream fos = Files.newOutputStream(localFile); + int length; + long totalLength = 0; + final long startTime = System.currentTimeMillis(); + + // only track progress for files larger than 100kb in verbose mode + final boolean trackProgress = getVerbose() && filesize > HUNDRED_KILOBYTES; + // since filesize keeps on decreasing we have to store the + // initial filesize + final long initFilesize = filesize; + int percentTransmitted = 0; + boolean goodXfer = false; + try { + + while (true) { + length = in.read(buf, 0, BUFFER_SIZE < filesize ? BUFFER_SIZE : (int) filesize); + if (length < 0) { + throw new EOFException("Unexpected end of stream."); + } + fos.write(buf, 0, length); + filesize -= length; + totalLength += length; + if (trackProgress) { + percentTransmitted = trackProgress(initFilesize, totalLength, percentTransmitted); + } + if (filesize == 0) { + goodXfer = true; + break; + } + } + } finally { + final long endTime = System.currentTimeMillis(); + logStats(startTime, endTime, totalLength); + fos.flush(); + fos.close(); + } + + if (getPreserveLastModified()) { + setLastModified(localFile); + } + + if (goodXfer && fileCallback != null) { + fileCallback.accept(localFile); + } + } + + private void setLastModified(final Path localFile) throws JSchException { + SftpATTRS fileAttributes = null; + final ChannelSftp channel = openSftpChannel(); + channel.connect(); + try { + fileAttributes = channel.lstat(remoteDir(remoteFile) + localFile.getFileName().toString()); + } catch (final SftpException e) { + throw new JSchException("failed to stat remote file", e); + } + + try { + Files.setLastModifiedTime(localFile, FileTime.fromMillis(fileAttributes.getMTime() * 1000)); + } catch (IOException e) { + logger.warn(e.getMessage()); + } + } + + /** + * returns the directory part of the remote file, if any. + */ + private static String remoteDir(final String remoteFile) { + int index = remoteFile.lastIndexOf('/'); + if (index < 0) { + index = remoteFile.lastIndexOf('\\'); + } + return index < 0 ? "" : remoteFile.substring(0, index + 1); + } + + @Override + public void close() throws Exception { + if (session != null && session.isConnected()) { + session.disconnect(); + session = null; + } + } + + public static void main(String[] args) throws Exception { + JSch.setConfig("StrictHostKeyChecking", "no"); + JSch jsch = new JSch(); + jsch.addIdentity(String.format("%s/.ssh/id_rsa",System.getProperty("user.home"))); + Session session = jsch.getSession("iparenteau", "s3sync.locusworks.net", 22); + session.connect(); + try(ScpFromMessage msg = new ScpFromMessage(true, session, "logs", Paths.get("downloads"), true)) { + msg.fileCallback(p -> { + System.out.println(p); + }); + msg.execute(); + } + } +} diff --git a/src/main/resources/s3.properties b/src/main/resources/s3.properties index 61e8b71..3ad57bc 100644 --- a/src/main/resources/s3.properties +++ b/src/main/resources/s3.properties @@ -1,4 +1,9 @@ region=us-east-2 bucketName=locus2k-backup -syncFolder=D:/OneDrive -logLevel=INFO \ No newline at end of file +remoteFolder=logs +logLevel=INFO +identity=${user.home}/.ssh/id_rsa +host=s3sync.locusworks.net +user=iparenteau +port=22 +localFolder=Downloads \ No newline at end of file