Skip to content

Commit

Permalink
apacheGH-524: SFTP: optimize SftpOutputStreamAsync
Browse files Browse the repository at this point in the history
Give SftpOutPutStreamAsync a transferFrom operation to be able to read
directly into a prepared SSH packet uffer. Previously, we always had
to read into a general byte buffer, and then copy into the SSH packet.
This extra data copy can be avoided. If the copy buffer size is such
that the data fits into a single SSH packet, then this packet buffer
can be encoded and sent directly without any further data copying.

Simplify the implementation: since the stream always uses a PacketBuffer
the code paths for other general byte buffers can simply be removed.

Also enable reading the next buffer while the previous one is being
sent. Use two buffers alternatingly; one being sent, the other being
filled.

Use this implementation also in SftpRemotePathChannel in its
transferFrom() implementation. _Do_ close the stream there, otherwise
the final ACKs may not be checked.

Bug: apache#524
  • Loading branch information
tomaswolf committed Jul 25, 2024
1 parent 2e85df7 commit 4160b6e
Show file tree
Hide file tree
Showing 9 changed files with 368 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
* @author <a href="mailto:[email protected]">Apache MINA SSHD Project</a>
*/
public interface RawSftpClient {

/**
* @param cmd Command to send - <B>Note:</B> only lower 8-bits are used
* @param buffer The {@link Buffer} containing the command data
Expand All @@ -36,6 +37,8 @@ public interface RawSftpClient {
*/
int send(int cmd, Buffer buffer) throws IOException;

SftpMessage write(int cmd, Buffer buffer) throws IOException;

/**
* @param id The expected request id
* @return The received response {@link Buffer} containing the request id
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sshd.sftp.client;

import java.io.IOException;
import java.time.Duration;
import java.util.Objects;

import org.apache.sshd.common.io.IoWriteFuture;

/**
* A representation of a written SFTP message.
*
* @author <a href="mailto:[email protected]">Apache MINA SSHD Project</a>
*/
public class SftpMessage {

private final int id;
private final IoWriteFuture future;
private final Duration timeout;

/**
* Creates a new instance.
*
* @param id SFTP message id
* @param future {@link IoWriteFuture} of the SFTP message; can be used to wait until the message has been actually
* sent
* @param timeout the configured SFTP write timeout
*/
public SftpMessage(int id, IoWriteFuture future, Duration timeout) {
this.id = id;
this.future = Objects.requireNonNull(future);
this.timeout = Objects.requireNonNull(timeout);
}

/**
* Retrieves the SFTP message id.
*
* @return the SFTP message id
*/
public int getId() {
return id;
}

/**
* Retrieves the {@link IoWriteFuture} of the message; can be used to wait until the message has been actually sent.
*
* @return the {@link IoWriteFuture}, never {@code null}
*/
public IoWriteFuture getFuture() {
return future;
}

/**
* Retrieves the write timeout configured when the message was sent.
*
* @return the timeout, never {@code null}
*/
public Duration getTimeout() {
return timeout;
}

/**
* Waits with the configured timeout until the message has been sent.
*
* @throws IOException if the message could not be sent, or waiting is interrupted.
*/
public void waitUntilSent() throws IOException {
getFuture().verify(getTimeout());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.sshd.sftp.client.RawSftpClient;
import org.apache.sshd.sftp.client.SftpClient;
import org.apache.sshd.sftp.client.SftpClient.Handle;
import org.apache.sshd.sftp.client.SftpMessage;
import org.apache.sshd.sftp.client.extensions.SftpClientExtension;
import org.apache.sshd.sftp.client.impl.SftpResponse;
import org.apache.sshd.sftp.client.impl.SftpStatus;
Expand Down Expand Up @@ -93,6 +94,11 @@ public int send(int cmd, Buffer buffer) throws IOException {
return raw.send(cmd, buffer);
}

@Override
public SftpMessage write(int cmd, Buffer buffer) throws IOException {
return raw.write(cmd, buffer);
}

@Override
public Buffer receive(int id) throws IOException {
return raw.receive(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.sshd.sftp.client.SftpClient;
import org.apache.sshd.sftp.client.SftpClientFactory;
import org.apache.sshd.sftp.client.SftpErrorDataHandler;
import org.apache.sshd.sftp.client.SftpMessage;
import org.apache.sshd.sftp.client.SftpVersionSelector;
import org.apache.sshd.sftp.client.impl.AbstractSftpClient;
import org.apache.sshd.sftp.client.impl.SftpPathImpl;
Expand Down Expand Up @@ -606,6 +607,21 @@ public int send(int cmd, Buffer buffer) throws IOException {
}
}

@Override
public SftpMessage write(int cmd, Buffer buffer) throws IOException {
if (!isOpen()) {
throw new IOException("write(cmd=" + SftpConstants.getCommandMessageName(cmd) + ") client is closed");
}

if (delegate instanceof RawSftpClient) {
return ((RawSftpClient) delegate).write(cmd, buffer);
} else {
throw new StreamCorruptedException(
"write(cmd=" + SftpConstants.getCommandMessageName(cmd) + ") delegate is not a "
+ RawSftpClient.class.getSimpleName());
}
}

@Override
public Buffer receive(int id) throws IOException {
if (!isOpen()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1240,10 +1240,7 @@ public InputStream read(String path, int bufferSize, Collection<OpenMode> mode)

@Override
public OutputStream write(String path, int bufferSize, Collection<OpenMode> mode) throws IOException {
if (bufferSize <= 0) {
bufferSize = getWriteBufferSize();
}
if (bufferSize < MIN_WRITE_BUFFER_SIZE) {
if (bufferSize != 0 && bufferSize < MIN_WRITE_BUFFER_SIZE) {
throw new IllegalArgumentException("Insufficient write buffer size: " + bufferSize + ", min.="
+ MIN_WRITE_BUFFER_SIZE);
}
Expand All @@ -1260,6 +1257,7 @@ protected int getReadBufferSize() {
}

protected int getWriteBufferSize() {
// Do not use. -13 is wrong anyway.
return (int) getClientChannel().getRemoteWindow().getPacketSize() - 13;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.sshd.core.CoreModuleProperties;
import org.apache.sshd.sftp.SftpModuleProperties;
import org.apache.sshd.sftp.client.SftpErrorDataHandler;
import org.apache.sshd.sftp.client.SftpMessage;
import org.apache.sshd.sftp.client.SftpVersionSelector;
import org.apache.sshd.sftp.common.SftpConstants;
import org.apache.sshd.sftp.common.extensions.ParserUtils;
Expand Down Expand Up @@ -268,6 +269,13 @@ protected void process(Buffer incoming) throws IOException {

@Override
public int send(int cmd, Buffer buffer) throws IOException {
SftpMessage msg = write(cmd, buffer);
msg.waitUntilSent();
return msg.getId();
}

@Override
public SftpMessage write(int cmd, Buffer buffer) throws IOException {
int id = cmdId.incrementAndGet();
int len = buffer.available();
if (log.isTraceEnabled()) {
Expand Down Expand Up @@ -298,9 +306,8 @@ public int send(int cmd, Buffer buffer) throws IOException {
ClientChannel clientChannel = getClientChannel();
IoOutputStream asyncIn = clientChannel.getAsyncIn();
IoWriteFuture writeFuture = asyncIn.writeBuffer(buf);
Duration cmdTimeout = SFTP_CLIENT_CMD_TIMEOUT.getRequired(clientChannel);
writeFuture.verify(cmdTimeout);
return id;
Duration sendTimeout = SFTP_CLIENT_CMD_TIMEOUT.getRequired(clientChannel);
return new SftpMessage(id, writeFuture, sendTimeout);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
import org.apache.sshd.common.util.io.input.InputStreamWithChannel;
import org.apache.sshd.sftp.client.SftpClient;
import org.apache.sshd.sftp.client.SftpClient.Attributes;
import org.apache.sshd.sftp.client.SftpClient.CloseableHandle;
import org.apache.sshd.sftp.client.SftpClient.OpenMode;
import org.apache.sshd.sftp.client.SftpClientHolder;
Expand Down Expand Up @@ -65,25 +64,26 @@ public class SftpInputStreamAsync extends InputStreamWithChannel implements Sftp

private final AbstractSftpClient clientInstance;
private final String path;
private final boolean ownsHandle;

public SftpInputStreamAsync(AbstractSftpClient client, int bufferSize,
String path, Collection<OpenMode> mode)
throws IOException {
this.log = LoggerFactory.getLogger(getClass());
this.clientInstance = Objects.requireNonNull(client, "No SFTP client instance");
this.path = path;
Attributes attrs = client.stat(path);
this.fileSize = attrs.getSize();
this.handle = client.open(path, mode);
this.bufferSize = bufferSize;
this(client, bufferSize, 0, client.stat(path).getSize(), path, client.open(path, mode));
}

public SftpInputStreamAsync(AbstractSftpClient client, int bufferSize, long clientOffset, long fileSize,
String path, CloseableHandle handle) {
this(client, bufferSize, clientOffset, fileSize, path, handle, true);
}

public SftpInputStreamAsync(AbstractSftpClient client, int bufferSize, long clientOffset, long fileSize,
String path, CloseableHandle handle, boolean closeHandle) {
this.log = LoggerFactory.getLogger(getClass());
this.clientInstance = Objects.requireNonNull(client, "No SFTP client instance");
this.path = path;
this.handle = handle;
this.ownsHandle = closeHandle;
this.bufferSize = bufferSize;
this.requestOffset = clientOffset;
this.clientOffset = clientOffset;
Expand Down Expand Up @@ -398,10 +398,12 @@ public void close() throws IOException {
pollBuffer(ack);
}
} finally {
if (debugEnabled) {
log.debug("close({}) closing file handle; {} short reads", this, shortReads);
if (ownsHandle) {
if (debugEnabled) {
log.debug("close({}) closing file handle; {} short reads", this, shortReads);
}
handle.close();
}
handle.close();
}
} finally {
handle = null;
Expand Down
Loading

0 comments on commit 4160b6e

Please sign in to comment.