Updated program to use scp to get files
	
		
			
	
		
	
	
		
	
		
			All checks were successful
		
		
	
	
		
			
				
	
				Locusworks Team/aws-s3-sync/pipeline/head This commit looks good
				
			
		
		
	
	
				
					
				
			
		
			All checks were successful
		
		
	
	Locusworks Team/aws-s3-sync/pipeline/head This commit looks good
				
			This commit is contained in:
		| @@ -17,17 +17,29 @@ import com.amazonaws.services.s3.transfer.Download; | ||||
| import com.amazonaws.services.s3.transfer.TransferManager; | ||||
| import com.amazonaws.services.s3.transfer.TransferManagerBuilder; | ||||
| import com.amazonaws.services.s3.transfer.Upload; | ||||
| import com.jcraft.jsch.JSch; | ||||
| import com.jcraft.jsch.JSchException; | ||||
| import com.jcraft.jsch.Session; | ||||
| import net.locusworks.logger.ApplicationLogger; | ||||
| import net.locusworks.logger.ApplicationLoggerFactory; | ||||
| import net.locusworks.s3sync.conf.ConfigurationManager; | ||||
| import net.locusworks.s3sync.scp.ScpFromMessage; | ||||
|  | ||||
| public class S3Client implements AutoCloseable { | ||||
|  | ||||
|   private ApplicationLogger logger = ApplicationLoggerFactory.getLogger(S3Client.class); | ||||
|  | ||||
|   private AmazonS3 s3Client; | ||||
|    | ||||
|   private String bucket; | ||||
|   private Path syncFolder; | ||||
|   private String remoteFolder; | ||||
|   private String host; | ||||
|   private String identity; | ||||
|   private String user; | ||||
|    | ||||
|   private Integer port; | ||||
|    | ||||
|   private Path localFolder; | ||||
|  | ||||
|   public S3Client(ConfigurationManager conf) { | ||||
|     String region = conf.getRegion(); | ||||
| @@ -41,7 +53,12 @@ public class S3Client implements AutoCloseable { | ||||
|     } | ||||
|     logger.info("Found Bucket: %s", bucket); | ||||
|     this.bucket = conf.getBucketName(); | ||||
|     this.syncFolder = conf.getSyncFolder(); | ||||
|     this.remoteFolder = conf.getRemoteFolder(); | ||||
|     this.localFolder = conf.getLocalFolder(); | ||||
|     this.host = conf.getHost(); | ||||
|     this.identity = conf.getIdentity(); | ||||
|     this.user = conf.getUser(); | ||||
|     this.port = conf.getPort(); | ||||
|   } | ||||
|    | ||||
|   public String getBucket() { | ||||
| @@ -108,16 +125,32 @@ public class S3Client implements AutoCloseable { | ||||
|     } | ||||
|   } | ||||
|  | ||||
|   public void syncFolder() throws IOException { | ||||
|   public void syncFolder() throws IOException, JSchException { | ||||
|     TransferManager xferMgr = TransferManagerBuilder.standard().withS3Client(s3Client).build(); | ||||
|     try (FileManager manager = FileManager.getInstance()) { | ||||
|       Files.walk(syncFolder) | ||||
|       .filter(f -> Files.isRegularFile(f)) | ||||
|       .forEach(f -> uploadFile(xferMgr, syncFolder.resolve(f))); | ||||
|      | ||||
|     if (Files.notExists(localFolder)) { | ||||
|       logger.info("Creating local folder: " + localFolder); | ||||
|       Files.createDirectory(localFolder); | ||||
|     } | ||||
|      | ||||
|     JSch.setConfig("StrictHostKeyChecking", "no"); | ||||
|     JSch jsch = new JSch(); | ||||
|     jsch.addIdentity(identity); | ||||
|     Session session = jsch.getSession(user, host, port); | ||||
|     session.connect(); | ||||
|     try(ScpFromMessage msg = new ScpFromMessage(true, session, remoteFolder, localFolder, true); | ||||
|         FileManager manager = FileManager.getInstance()) { | ||||
|       msg.fileCallback(p -> { | ||||
|         logger.info(p+""); | ||||
|         if (Files.isRegularFile(p)) { | ||||
|           uploadFile(xferMgr, p); | ||||
|         } | ||||
|       }); | ||||
|       msg.execute(); | ||||
|     } catch (Exception e) { | ||||
|       logger.error("Unable to load file Manager: " + e.getMessage()); | ||||
|     }  | ||||
|  | ||||
|      | ||||
|     xferMgr.shutdownNow(false); | ||||
|   } | ||||
|    | ||||
| @@ -149,7 +182,7 @@ public class S3Client implements AutoCloseable { | ||||
|   | ||||
|   private String getPath(Path file) { | ||||
|     if (file.getParent() == null) return file.getFileName().toString(); | ||||
|     Path relative = syncFolder.relativize(file); | ||||
|     Path relative = localFolder.relativize(file); | ||||
|     return relative.toString().replace("\\", "/"); | ||||
|   } | ||||
|    | ||||
|   | ||||
| @@ -46,8 +46,28 @@ public class ConfigurationManager { | ||||
|     return conf.getProperty("bucketName"); | ||||
|   } | ||||
|    | ||||
|   public Path getSyncFolder() { | ||||
|     return Paths.get(conf.getProperty("syncFolder")); | ||||
|   public String getRemoteFolder() { | ||||
|     return conf.getProperty("remoteFolder"); | ||||
|   } | ||||
|    | ||||
|   public Path getLocalFolder() { | ||||
|     return Paths.get(conf.getProperty("localFolder")); | ||||
|   } | ||||
|    | ||||
|   public String getUser() { | ||||
|     return conf.getProperty("user"); | ||||
|   } | ||||
|    | ||||
|   public String getIdentity() { | ||||
|     return conf.getProperty("identity"); | ||||
|   } | ||||
|    | ||||
|   public String getHost() { | ||||
|     return conf.getProperty("host"); | ||||
|   } | ||||
|    | ||||
|   public Integer getPort() { | ||||
|     return Integer.valueOf(conf.getProperty("port")); | ||||
|   } | ||||
|    | ||||
|   public LogLevel getLogLevel() { | ||||
|   | ||||
							
								
								
									
										259
									
								
								src/main/java/net/locusworks/s3sync/scp/AbstractSshMessage.java
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										259
									
								
								src/main/java/net/locusworks/s3sync/scp/AbstractSshMessage.java
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,259 @@ | ||||
| package net.locusworks.s3sync.scp; | ||||
|  | ||||
| import java.io.IOException; | ||||
| import java.io.InputStream; | ||||
| import java.io.OutputStream; | ||||
| import java.text.NumberFormat; | ||||
|  | ||||
|  | ||||
| import com.jcraft.jsch.Channel; | ||||
| import com.jcraft.jsch.ChannelExec; | ||||
| import com.jcraft.jsch.ChannelSftp; | ||||
| import com.jcraft.jsch.JSchException; | ||||
| import com.jcraft.jsch.Session; | ||||
| import com.jcraft.jsch.SftpProgressMonitor; | ||||
| import net.locusworks.logger.ApplicationLogger; | ||||
|  | ||||
| public abstract class AbstractSshMessage { | ||||
|  | ||||
|   private ApplicationLogger logger; | ||||
|  | ||||
|   private static final double ONE_SECOND = 1000.0; | ||||
|  | ||||
|   protected Session session; | ||||
|   private final boolean verbose; | ||||
|   private final boolean compressed; | ||||
|  | ||||
|   /** | ||||
|    * Constructor for AbstractSshMessage | ||||
|    * @param session the ssh session to use | ||||
|    */ | ||||
|   public AbstractSshMessage(final Session session, ApplicationLogger logger) { | ||||
|     this(false, session, logger); | ||||
|   } | ||||
|  | ||||
|   /** | ||||
|    * Constructor for AbstractSshMessage | ||||
|    * @param verbose if true do verbose logging | ||||
|    * @param session the ssh session to use | ||||
|    * @since Ant 1.6.2 | ||||
|    */ | ||||
|   public AbstractSshMessage(final boolean verbose, final Session session, ApplicationLogger logger) { | ||||
|     this(verbose, false, session, logger); | ||||
|   } | ||||
|  | ||||
|   /** | ||||
|    * Constructor for AbstractSshMessage | ||||
|    * @param verbose if true do verbose logging | ||||
|    * @param compressed if true use compression | ||||
|    * @param session the ssh session to use | ||||
|    * @since Ant 1.9.8 | ||||
|    */ | ||||
|   public AbstractSshMessage(boolean verbose, boolean compressed, Session session, ApplicationLogger logger) { | ||||
|     this.verbose = verbose; | ||||
|     this.compressed = compressed; | ||||
|     this.session = session; | ||||
|     this.logger = logger; | ||||
|   } | ||||
|  | ||||
|   /** | ||||
|    * Open an ssh channel. | ||||
|    * @param command the command to use | ||||
|    * @return the channel | ||||
|    * @throws JSchException on error | ||||
|    */ | ||||
|   protected Channel openExecChannel(final String command) throws JSchException { | ||||
|     final ChannelExec channel = (ChannelExec) session.openChannel("exec"); | ||||
|     channel.setCommand(command); | ||||
|  | ||||
|     return channel; | ||||
|   } | ||||
|  | ||||
|   /** | ||||
|    * Open an ssh sftp channel. | ||||
|    * @return the channel | ||||
|    * @throws JSchException on error | ||||
|    */ | ||||
|   protected ChannelSftp openSftpChannel() throws JSchException { | ||||
|     return (ChannelSftp) session.openChannel("sftp"); | ||||
|   } | ||||
|  | ||||
|   /** | ||||
|    * Send an ack. | ||||
|    * @param out the output stream to use | ||||
|    * @throws IOException on error | ||||
|    */ | ||||
|   protected void sendAck(final OutputStream out) throws IOException { | ||||
|     final byte[] buf = new byte[1]; | ||||
|     buf[0] = 0; | ||||
|     out.write(buf); | ||||
|     out.flush(); | ||||
|   } | ||||
|  | ||||
|   /** | ||||
|    * Reads the response, throws a BuildException if the response | ||||
|    * indicates an error. | ||||
|    * @param in the input stream to use | ||||
|    * @throws IOException on I/O error | ||||
|    * @throws BuildException on other errors | ||||
|    */ | ||||
|   protected void waitForAck(final InputStream in) throws IOException, JSchException { | ||||
|     final int b = in.read(); | ||||
|  | ||||
|     // b may be 0 for success, | ||||
|     //          1 for error, | ||||
|     //          2 for fatal error, | ||||
|  | ||||
|     if (b == -1) { | ||||
|       // didn't receive any response | ||||
|       throw new JSchException("No response from server"); | ||||
|     } | ||||
|     if (b != 0) { | ||||
|       final StringBuilder sb = new StringBuilder(); | ||||
|  | ||||
|       int c = in.read(); | ||||
|       while (c > 0 && c != '\n') { | ||||
|         sb.append((char) c); | ||||
|         c = in.read(); | ||||
|       } | ||||
|  | ||||
|       if (b == 1) { | ||||
|         throw new JSchException("server indicated an error: " + sb.toString()); | ||||
|       } else if (b == 2) { | ||||
|         throw new JSchException("server indicated a fatal error: " + sb.toString()); | ||||
|       } else { | ||||
|         throw new JSchException("unknown response, code " + b + " message: " + sb.toString()); | ||||
|       } | ||||
|     } | ||||
|   } | ||||
|  | ||||
|   /** | ||||
|    * Carry out the transfer. | ||||
|    * @throws IOException on I/O errors | ||||
|    * @throws JSchException on ssh errors | ||||
|    */ | ||||
|   public abstract void execute() throws IOException, JSchException; | ||||
|  | ||||
|   /** | ||||
|    * Log a message to the log listener. | ||||
|    * @param message the message to log | ||||
|    */ | ||||
|   protected void log(final String message) { | ||||
|     logger.info(message); | ||||
|   } | ||||
|  | ||||
|   /** | ||||
|    * Log transfer stats to the log listener. | ||||
|    * @param timeStarted the time started | ||||
|    * @param timeEnded   the finishing time | ||||
|    * @param totalLength the total length | ||||
|    */ | ||||
|   protected void logStats(final long timeStarted, final long timeEnded, final long totalLength) { | ||||
|     final double duration = (timeEnded - timeStarted) / ONE_SECOND; | ||||
|     final NumberFormat format = NumberFormat.getNumberInstance(); | ||||
|     format.setMaximumFractionDigits(2); | ||||
|     format.setMinimumFractionDigits(1); | ||||
|     log("File transfer time: " + format.format(duration) + " Average Rate: " + format.format(totalLength / duration) + " B/s"); | ||||
|   } | ||||
|  | ||||
|   /** | ||||
|    * Is the verbose attribute set. | ||||
|    * @return true if the verbose attribute is set | ||||
|    * @since Ant 1.6.2 | ||||
|    */ | ||||
|   protected final boolean getVerbose() { | ||||
|     return verbose; | ||||
|   } | ||||
|  | ||||
|   /** | ||||
|    * Is the compressed attribute set. | ||||
|    * @return true if the compressed attribute is set | ||||
|    * @since Ant 1.9.8 | ||||
|    */ | ||||
|   protected final boolean getCompressed() { | ||||
|     return compressed; | ||||
|   } | ||||
|    | ||||
|   protected final Session getSession() { | ||||
|     return session; | ||||
|   } | ||||
|  | ||||
|   /** | ||||
|    * Track progress every 10% if 100kb < filesize < 1Mb. For larger | ||||
|    * files track progress for every percent transmitted. | ||||
|    * @param filesize the size of the file been transmitted | ||||
|    * @param totalLength the total transmission size | ||||
|    * @param percentTransmitted the current percent transmitted | ||||
|    * @return the percent that the file is of the total | ||||
|    */ | ||||
|   protected final int trackProgress(final long filesize, final long totalLength, | ||||
|       final int percentTransmitted) { | ||||
|  | ||||
|     // CheckStyle:MagicNumber OFF | ||||
|     final int percent = (int) Math.round(Math.floor((totalLength / (double) filesize) * 100)); | ||||
|  | ||||
|     if (percent > percentTransmitted) { | ||||
|       if (filesize < 1048576) { | ||||
|         if (percent % 10 == 0) { | ||||
|           if (percent == 100) { | ||||
|             System.out.println(" 100%"); | ||||
|           } else { | ||||
|             System.out.print("*"); | ||||
|           } | ||||
|         } | ||||
|       } else { | ||||
|         if (percent == 50) { | ||||
|           System.out.println(" 50%"); | ||||
|         } else if (percent == 100) { | ||||
|           System.out.println(" 100%"); | ||||
|         } else { | ||||
|           System.out.print("."); | ||||
|         } | ||||
|       } | ||||
|     } | ||||
|     // CheckStyle:MagicNumber ON | ||||
|  | ||||
|     return percent; | ||||
|   } | ||||
|  | ||||
|   private ProgressMonitor monitor = null; | ||||
|  | ||||
|   /** | ||||
|    * Get the progress monitor. | ||||
|    * @return the progress monitor. | ||||
|    */ | ||||
|   protected SftpProgressMonitor getProgressMonitor() { | ||||
|     if (monitor == null) { | ||||
|       monitor = new ProgressMonitor(); | ||||
|     } | ||||
|     return monitor; | ||||
|   } | ||||
|  | ||||
|   private class ProgressMonitor implements SftpProgressMonitor { | ||||
|     private long initFileSize = 0; | ||||
|     private long totalLength = 0; | ||||
|     private int percentTransmitted = 0; | ||||
|  | ||||
|     @Override | ||||
|     public void init(final int op, final String src, final String dest, final long max) { | ||||
|       initFileSize = max; | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|     public boolean count(final long len) { | ||||
|       totalLength += len; | ||||
|       percentTransmitted = trackProgress(initFileSize, totalLength, percentTransmitted); | ||||
|       return true; | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|     public void end() { | ||||
|     } | ||||
|  | ||||
|     @SuppressWarnings("unused") | ||||
|     public long getTotalLength() { | ||||
|       return totalLength; | ||||
|     } | ||||
|   } | ||||
|  | ||||
| } | ||||
							
								
								
									
										341
									
								
								src/main/java/net/locusworks/s3sync/scp/ScpFromMessage.java
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										341
									
								
								src/main/java/net/locusworks/s3sync/scp/ScpFromMessage.java
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,341 @@ | ||||
| package net.locusworks.s3sync.scp; | ||||
|  | ||||
| /* | ||||
|  *  Licensed to the Apache Software Foundation (ASF) under one or more | ||||
|  *  contributor license agreements.  See the NOTICE file distributed with | ||||
|  *  this work for additional information regarding copyright ownership. | ||||
|  *  The ASF licenses this file to You under the Apache License, Version 2.0 | ||||
|  *  (the "License"); you may not use this file except in compliance with | ||||
|  *  the License.  You may obtain a copy of the License at | ||||
|  * | ||||
|  *      https://www.apache.org/licenses/LICENSE-2.0 | ||||
|  * | ||||
|  *  Unless required by applicable law or agreed to in writing, software | ||||
|  *  distributed under the License is distributed on an "AS IS" BASIS, | ||||
|  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|  *  See the License for the specific language governing permissions and | ||||
|  *  limitations under the License. | ||||
|  * | ||||
|  */ | ||||
|  | ||||
| import java.io.ByteArrayOutputStream; | ||||
| import java.io.EOFException; | ||||
| import java.io.IOException; | ||||
| import java.io.InputStream; | ||||
| import java.io.OutputStream; | ||||
| import java.nio.file.Files; | ||||
| import java.nio.file.Path; | ||||
| import java.nio.file.Paths; | ||||
| import java.nio.file.attribute.FileTime; | ||||
| import java.util.function.Consumer; | ||||
| import com.jcraft.jsch.Channel; | ||||
| import com.jcraft.jsch.ChannelSftp; | ||||
| import com.jcraft.jsch.JSch; | ||||
| import com.jcraft.jsch.JSchException; | ||||
| import com.jcraft.jsch.Session; | ||||
| import com.jcraft.jsch.SftpATTRS; | ||||
| import com.jcraft.jsch.SftpException; | ||||
| import net.locusworks.logger.ApplicationLoggerFactory; | ||||
| import net.locusworks.logger.ApplicationLogger; | ||||
|  | ||||
| public class ScpFromMessage extends AbstractSshMessage implements AutoCloseable { | ||||
|  | ||||
|   private static ApplicationLogger logger = ApplicationLoggerFactory.getLogger(ScpFromMessage.class); | ||||
|  | ||||
|   private static final int HUNDRED_KILOBYTES = 102400; | ||||
|   private static final byte LINE_FEED = 0x0a; | ||||
|   private static final int BUFFER_SIZE = 100 * 1024; | ||||
|  | ||||
|   private String remoteFile; | ||||
|   private Path localFile; | ||||
|   private boolean isRecursive = false; | ||||
|   private boolean preserveLastModified = false; | ||||
|  | ||||
|   private Consumer<Path> fileCallback; | ||||
|  | ||||
|   /** | ||||
|    * Constructor for ScpFromMessage | ||||
|    * @param session the ssh session to use | ||||
|    */ | ||||
|   public ScpFromMessage(final Session session) { | ||||
|     super(session, logger); | ||||
|   } | ||||
|  | ||||
|   /** | ||||
|    * Constructor for ScpFromMessage | ||||
|    * @param verbose if true do verbose logging | ||||
|    * @param session the ssh session to use | ||||
|    * @since Ant 1.7 | ||||
|    */ | ||||
|   public ScpFromMessage(final boolean verbose, final Session session) { | ||||
|     super(verbose, session, logger); | ||||
|   } | ||||
|  | ||||
|   /** | ||||
|    * Constructor for ScpFromMessage. | ||||
|    * @param verbose if true log extra information | ||||
|    * @param session the Scp session to use | ||||
|    * @param aRemoteFile the remote file name | ||||
|    * @param aLocalFile  the local file | ||||
|    * @param recursive   if true use recursion (-r option to scp) | ||||
|    * @since Ant 1.6.2 | ||||
|    */ | ||||
|   public ScpFromMessage(final boolean verbose, final Session session, final String aRemoteFile, | ||||
|       final Path aLocalFile, final boolean recursive) { | ||||
|     this(false, session, aRemoteFile, aLocalFile, recursive, false); | ||||
|   } | ||||
|  | ||||
|   /** | ||||
|    * Constructor for ScpFromMessage. | ||||
|    * @param session the Scp session to use | ||||
|    * @param aRemoteFile the remote file name | ||||
|    * @param aLocalFile  the local file | ||||
|    * @param recursive   if true use recursion (-r option to scp) | ||||
|    */ | ||||
|   public ScpFromMessage(final Session session, final String aRemoteFile, final Path aLocalFile, final boolean recursive) { | ||||
|     this(false, session, aRemoteFile, aLocalFile, recursive); | ||||
|   } | ||||
|  | ||||
|   /** | ||||
|    * Constructor for ScpFromMessage. | ||||
|    * @param verbose if true log extra information | ||||
|    * @param session the Scp session to use | ||||
|    * @param aRemoteFile the remote file name | ||||
|    * @param aLocalFile  the local file | ||||
|    * @param recursive   if true use recursion (-r option to scp) | ||||
|    * @param preserveLastModified whether to preserve file | ||||
|    * modification times | ||||
|    * @since Ant 1.8.0 | ||||
|    */ | ||||
|   public ScpFromMessage(final boolean verbose, final Session session, final String aRemoteFile, final Path aLocalFile, | ||||
|       final boolean recursive, final boolean preserveLastModified) { | ||||
|     this(verbose, session, aRemoteFile, aLocalFile, recursive, preserveLastModified, false); | ||||
|   } | ||||
|  | ||||
|   /** | ||||
|    * Constructor for ScpFromMessage. | ||||
|    * @param verbose if true log extra information | ||||
|    * @param session the Scp session to use | ||||
|    * @param aRemoteFile the remote file name | ||||
|    * @param aLocalFile  the local file | ||||
|    * @param recursive   if true use recursion (-r option to scp) | ||||
|    * @param preserveLastModified whether to preserve file | ||||
|    * @param compressed  if true use compression (-C option to scp) | ||||
|    * modification times | ||||
|    * @since Ant 1.9.8 | ||||
|    */ | ||||
|   public ScpFromMessage(boolean verbose, Session session, String aRemoteFile, Path aLocalFile, boolean recursive, | ||||
|       boolean preserveLastModified, boolean compressed) { | ||||
|     super(verbose, compressed, session, logger); | ||||
|     this.remoteFile = aRemoteFile; | ||||
|     this.localFile = aLocalFile; | ||||
|     this.isRecursive = recursive; | ||||
|     this.preserveLastModified = preserveLastModified; | ||||
|   } | ||||
|    | ||||
|   public Consumer<Path> fileCallback(Consumer<Path> callback) { | ||||
|     this.fileCallback = callback; | ||||
|     return this.fileCallback; | ||||
|   } | ||||
|  | ||||
|   /** | ||||
|    * Carry out the transfer. | ||||
|    * @throws IOException on i/o errors | ||||
|    * @throws JSchException on errors detected by scp | ||||
|    */ | ||||
|   @Override | ||||
|   public void execute() throws IOException, JSchException { | ||||
|     String command = "scp -f "; | ||||
|     if (isRecursive) { | ||||
|       command += "-r "; | ||||
|     } | ||||
|     if (getCompressed()) { | ||||
|       command += "-C "; | ||||
|     } | ||||
|     command += remoteFile; | ||||
|     log(command); | ||||
|     final Channel channel = openExecChannel(command); | ||||
|     log(localFile + ""); | ||||
|     try { | ||||
|       // get I/O streams for remote scp | ||||
|       final OutputStream out = channel.getOutputStream(); | ||||
|       final InputStream in = channel.getInputStream(); | ||||
|  | ||||
|       channel.connect(); | ||||
|  | ||||
|       sendAck(out); | ||||
|       startRemoteCpProtocol(in, out, localFile); | ||||
|     } finally { | ||||
|       if (channel != null) { | ||||
|         channel.disconnect(); | ||||
|       } | ||||
|     } | ||||
|     log("done\n"); | ||||
|   } | ||||
|  | ||||
|   protected boolean getPreserveLastModified() { | ||||
|     return preserveLastModified; | ||||
|   } | ||||
|  | ||||
|   private void startRemoteCpProtocol(final InputStream in, final OutputStream out, final Path localFile) throws IOException, JSchException { | ||||
|     Path startFile = localFile; | ||||
|     while (true) { | ||||
|       // C0644 filesize filename - header for a regular file | ||||
|       // T time 0 time 0\n - present if perserve time. | ||||
|       // D directory - this is the header for a directory. | ||||
|       final ByteArrayOutputStream stream = new ByteArrayOutputStream(); | ||||
|       while (true) { | ||||
|         final int read = in.read(); | ||||
|         if (read < 0) { | ||||
|           return; | ||||
|         } | ||||
|         if ((byte) read == LINE_FEED) { | ||||
|           break; | ||||
|         } | ||||
|         stream.write(read); | ||||
|       } | ||||
|       final String serverResponse = stream.toString("UTF-8"); | ||||
|       if (serverResponse.charAt(0) == 'C') { | ||||
|         parseAndFetchFile(serverResponse, startFile, out, in); | ||||
|       } else if (serverResponse.charAt(0) == 'D') { | ||||
|         startFile = parseAndCreateDirectory(serverResponse, startFile); | ||||
|         sendAck(out); | ||||
|       } else if (serverResponse.charAt(0) == 'E') { | ||||
|         startFile = startFile.getParent(); | ||||
|         sendAck(out); | ||||
|       } else if (serverResponse.charAt(0) == '\01' || serverResponse.charAt(0) == '\02') { | ||||
|         // this indicates an error. | ||||
|         throw new IOException(serverResponse.substring(1)); | ||||
|       } | ||||
|     } | ||||
|   } | ||||
|  | ||||
|   private Path parseAndCreateDirectory(final String serverResponse, final Path localFile) throws IOException { | ||||
|     int start = serverResponse.indexOf(' '); | ||||
|     // appears that the next token is not used and it's zero. | ||||
|     start = serverResponse.indexOf(' ', start + 1); | ||||
|     final String directoryName = serverResponse.substring(start + 1); | ||||
|     if (Files.isDirectory(localFile)) { | ||||
|       final Path dir = localFile.resolve(directoryName); | ||||
|       Files.createDirectories(dir); | ||||
|       log("Creating: " + dir); | ||||
|       return dir; | ||||
|     } | ||||
|     return null; | ||||
|   } | ||||
|  | ||||
|   private void parseAndFetchFile(final String serverResponse, final Path localFile, final OutputStream out, final InputStream in) throws IOException, JSchException  { | ||||
|     int start = 0; | ||||
|     int end = serverResponse.indexOf(' ', start + 1); | ||||
|     start = end + 1; | ||||
|     end = serverResponse.indexOf(' ', start + 1); | ||||
|     final long filesize = Long.parseLong(serverResponse.substring(start, end)); | ||||
|     final String filename = serverResponse.substring(end + 1); | ||||
|     log("Receiving: " + filename + " : " + filesize); | ||||
|     final Path transferFile = Files.isDirectory(localFile) ? localFile.resolve(filename) : localFile; | ||||
|     fetchFile(transferFile, filesize, out, in); | ||||
|     waitForAck(in); | ||||
|     sendAck(out); | ||||
|   } | ||||
|  | ||||
|   private void fetchFile(final Path localFile, long filesize, final OutputStream out, final InputStream in) throws IOException, JSchException { | ||||
|     final byte[] buf = new byte[BUFFER_SIZE]; | ||||
|     sendAck(out); | ||||
|  | ||||
|     // read a content of lfile | ||||
|     final OutputStream fos = Files.newOutputStream(localFile); | ||||
|     int length; | ||||
|     long totalLength = 0; | ||||
|     final long startTime = System.currentTimeMillis(); | ||||
|  | ||||
|     // only track progress for files larger than 100kb in verbose mode | ||||
|     final boolean trackProgress = getVerbose() && filesize > HUNDRED_KILOBYTES; | ||||
|     // since filesize keeps on decreasing we have to store the | ||||
|     // initial filesize | ||||
|     final long initFilesize = filesize; | ||||
|     int percentTransmitted = 0; | ||||
|     boolean goodXfer = false; | ||||
|     try { | ||||
|        | ||||
|       while (true) { | ||||
|         length = in.read(buf, 0, BUFFER_SIZE < filesize ? BUFFER_SIZE : (int) filesize); | ||||
|         if (length < 0) { | ||||
|           throw new EOFException("Unexpected end of stream."); | ||||
|         } | ||||
|         fos.write(buf, 0, length); | ||||
|         filesize -= length; | ||||
|         totalLength += length; | ||||
|         if (trackProgress) { | ||||
|           percentTransmitted = trackProgress(initFilesize, totalLength, percentTransmitted); | ||||
|         } | ||||
|         if (filesize == 0) { | ||||
|           goodXfer = true; | ||||
|           break; | ||||
|         } | ||||
|       } | ||||
|     } finally { | ||||
|       final long endTime = System.currentTimeMillis(); | ||||
|       logStats(startTime, endTime, totalLength); | ||||
|       fos.flush(); | ||||
|       fos.close(); | ||||
|     } | ||||
|  | ||||
|     if (getPreserveLastModified()) { | ||||
|       setLastModified(localFile); | ||||
|     } | ||||
|      | ||||
|     if (goodXfer && fileCallback != null) { | ||||
|       fileCallback.accept(localFile); | ||||
|     } | ||||
|   } | ||||
|  | ||||
|   private void setLastModified(final Path localFile) throws JSchException { | ||||
|     SftpATTRS fileAttributes = null; | ||||
|     final ChannelSftp channel = openSftpChannel(); | ||||
|     channel.connect(); | ||||
|     try { | ||||
|       fileAttributes = channel.lstat(remoteDir(remoteFile) + localFile.getFileName().toString()); | ||||
|     } catch (final SftpException e) { | ||||
|       throw new JSchException("failed to stat remote file", e); | ||||
|     } | ||||
|  | ||||
|     try { | ||||
|       Files.setLastModifiedTime(localFile, FileTime.fromMillis(fileAttributes.getMTime() * 1000)); | ||||
|     } catch (IOException e) { | ||||
|       logger.warn(e.getMessage()); | ||||
|     } | ||||
|   } | ||||
|  | ||||
|   /** | ||||
|    * returns the directory part of the remote file, if any. | ||||
|    */ | ||||
|   private static String remoteDir(final String remoteFile) { | ||||
|     int index = remoteFile.lastIndexOf('/'); | ||||
|     if (index < 0) { | ||||
|       index = remoteFile.lastIndexOf('\\'); | ||||
|     } | ||||
|     return index < 0 ? "" : remoteFile.substring(0, index + 1); | ||||
|   } | ||||
|  | ||||
|   @Override | ||||
|   public void close() throws Exception { | ||||
|     if (session != null && session.isConnected()) { | ||||
|       session.disconnect(); | ||||
|       session = null; | ||||
|     } | ||||
|   } | ||||
|    | ||||
|   public static void main(String[] args) throws Exception { | ||||
|     JSch.setConfig("StrictHostKeyChecking", "no"); | ||||
|     JSch jsch = new JSch(); | ||||
|     jsch.addIdentity(String.format("%s/.ssh/id_rsa",System.getProperty("user.home"))); | ||||
|     Session session = jsch.getSession("iparenteau", "s3sync.locusworks.net", 22); | ||||
|     session.connect(); | ||||
|     try(ScpFromMessage msg = new ScpFromMessage(true, session, "logs", Paths.get("downloads"), true)) { | ||||
|       msg.fileCallback(p -> { | ||||
|         System.out.println(p); | ||||
|       }); | ||||
|       msg.execute(); | ||||
|     } | ||||
|   } | ||||
| } | ||||
		Reference in New Issue
	
	Block a user