Skip to content

Commit

Permalink
apacheGH-461: Fix heartbeats with wantReply=true
Browse files Browse the repository at this point in the history
Switch from a timeout model to the OpenSSH model: fail if there
are more than a certain number of heartbeats for which no reply
was received yet.

Bug: apache#461
  • Loading branch information
tomaswolf committed May 29, 2024
1 parent 5a78e6d commit 624b1b2
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 32 deletions.
37 changes: 36 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

* [GH-427](https://github.com/apache/mina-sshd/issues/427) SCP client: fix `DefaultScpClient.upload(InputStream, ...)`
* [GH-455](https://github.com/apache/mina-sshd/issues/455) Fix `BaseCipher`: make sure all bytes are processed
* [GH-461](https://github.com/apache/mina-sshd/issues/461) Fix heartbeats with `wantReply=true`
* [GH-470](https://github.com/apache/mina-sshd/issues/470) MontgomeryCurve: synchronize access to KeyPairGenerator
* [GH-489](https://github.com/apache/mina-sshd/issues/489) SFTP v3 client: better file type determination
* [GH-493](https://github.com/apache/mina-sshd/issues/493) Fix arcfour128 and arcfour256 ciphers
Expand All @@ -58,7 +59,41 @@ NTRU Prime sntrup761 and X25519 with SHA-512: sntrup761x25519-sha512](https://ww

## Behavioral changes and enhancements

* [GH-468](https://github.com/apache/mina-sshd/issues/468) SFTP: validate length of data received: must not be more than requested
### [GH-461](https://github.com/apache/mina-sshd/issues/461) Fix heartbeats with `wantReply=true`

The client-side heartbeat mechanism has been updated. Such heartbeats are configured via the
`CoreModuleProperties.HEARTBEAT_INTERVAL` property. If this interval is > 0, heartbeats are sent to
the server.

Previously these heartbeats could also be configured with a `CoreModuleProperties.HEARTBEAT_REPLY_WAIT`
timeout. If the timeout was <= 0, the client would just send heartbeat requests without expecting any
answers. If the timeout was > 0, the client would send requests with a flag indicating that the server
should reply. The client would then wait for the specified duration for the reply and would terminate
the connection if none was received.

This mechanism could cause trouble if the timeout was fairly long and the server was slow to respond.
A timeout longer than the interval could also delay subsequent heartbeats.

The `CoreModuleProperties.HEARTBEAT_REPLY_WAIT` property is now _deprecated_.

There is a new configuration property `CoreModuleProperties.HEARTBEAT_NO_REPLY_MAX` instead. It defines a
limit for the number of heartbeats sent without receiving a reply before a session is terminated. If
the value is <= 0, the client still sends heartbeats without expecting any reply. If the value is > 0,
the client will request a reply from the server for each heartbeat message, and it will
terminate the connection if the number of unanswered heartbeats reaches
`CoreModuleProperties.HEARTBEAT_NO_REPLY_MAX`.

This new way to configure heartbeats aligns with the OpenSSH configuration options
`ServerAliveInterval` and `ServerAliveCountMax`.

For compatibility with older configurations that explicitly define `CoreModuleProperties.HEARTBEAT_REPLY_WAIT`,
the new code maps this to the new configuration (but only if `CoreModuleProperties.HEARTBEAT_INTERVAL` > 0
and the new property `CoreModuleProperties.HEARTBEAT_NO_REPLY_MAX` has _not_ been set) by setting
`CoreModuleProperties.HEARTBEAT_NO_REPLY_MAX` to
* `CoreModuleProperties.HEARTBEAT_REPLY_WAIT` <= 0: `CoreModuleProperties.HEARTBEAT_NO_REPLY_MAX = 0`
* otherwise: `(CoreModuleProperties.HEARTBEAT_REPLY_WAIT / CoreModuleProperties.HEARTBEAT_INTERVAL) + 1`.

### [GH-468](https://github.com/apache/mina-sshd/issues/468) SFTP: validate length of data received: must not be more than requested

SFTP read operations now check the amount of data they get back. If it's more than
requested an exception is thrown. SFTP servers must never return more data than the
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sshd.common.future;

/**
* Something that may have a failure exception.
*
* @author <a href="mailto:[email protected]">Apache MINA SSHD Project</a>
*/
public interface HasException {

/**
* Returns the cause of the failure.
*
* @return the {@link Throwable} of the failure, or {@code null} if not failed (yet).
*/
Throwable getException();

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,7 @@
*
* @author <a href="mailto:[email protected]">Apache MINA SSHD Project</a>
*/
public interface WithException {

/**
* Returns the cause of the failure.
*
* @return the {@link Throwable} of the failure, or {@code null} if not failed (yet).
*/
Throwable getException();
public interface WithException extends HasException {

/**
* Sets the exception that caused the operation to fail.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,14 @@
*/
package org.apache.sshd.common.io;

import org.apache.sshd.common.future.HasException;
import org.apache.sshd.common.future.SshFuture;
import org.apache.sshd.common.future.VerifiableFuture;

public interface IoWriteFuture extends SshFuture<IoWriteFuture>, VerifiableFuture<IoWriteFuture> {
public interface IoWriteFuture extends HasException, SshFuture<IoWriteFuture>, VerifiableFuture<IoWriteFuture> {
/**
* @return <tt>true</tt> if the write operation is finished successfully.
*/
boolean isWritten();

/**
* @return the cause of the write failure if and only if the write operation has failed due to an {@link Exception}.
* Otherwise, {@code null} is returned (use {@link #isDone()} to distinguish between the two.
*/
Throwable getException();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.sshd.agent.common.AgentForwardSupport;
import org.apache.sshd.common.FactoryManager;
import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.SshException;
import org.apache.sshd.common.future.GlobalRequestFuture;
import org.apache.sshd.common.io.IoWriteFuture;
import org.apache.sshd.common.kex.KexState;
import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.session.helpers.AbstractConnectionService;
import org.apache.sshd.common.util.GenericUtils;
Expand All @@ -46,7 +49,10 @@ public class ClientConnectionService
implements ClientSessionHolder {
protected final String heartbeatRequest;
protected final Duration heartbeatInterval;
protected final Duration heartbeatReplyMaxWait;
protected final int heartbeatMaxNoReply;

protected final AtomicInteger outstandingHeartbeats = new AtomicInteger();

/** Non-null only if using the &quot;keep-alive&quot; request mechanism */
protected ScheduledFuture<?> clientHeartbeat;

Expand All @@ -55,7 +61,44 @@ public ClientConnectionService(AbstractClientSession s) throws SshException {

heartbeatRequest = CoreModuleProperties.HEARTBEAT_REQUEST.getRequired(this);
heartbeatInterval = CoreModuleProperties.HEARTBEAT_INTERVAL.getRequired(this);
heartbeatReplyMaxWait = CoreModuleProperties.HEARTBEAT_REPLY_WAIT.getRequired(this);
heartbeatMaxNoReply = configureMaxNoReply();
}

protected int configureMaxNoReply() {
@SuppressWarnings("deprecation")
Duration timeout = CoreModuleProperties.HEARTBEAT_REPLY_WAIT.getOrNull(this);
if (timeout == null || GenericUtils.isNegativeOrNull(heartbeatInterval) || GenericUtils.isEmpty(heartbeatRequest)) {
return CoreModuleProperties.HEARTBEAT_NO_REPLY_MAX.getRequired(this).intValue();
}
// The deprecated timeout is configured explicitly. If the new no-reply-max is _not_ explicitly configured,
// set it from the timeout.
Integer noReplyValue = CoreModuleProperties.HEARTBEAT_NO_REPLY_MAX.getOrNull(this);
if (noReplyValue != null) {
return noReplyValue.intValue();
}
if (GenericUtils.isNegativeOrNull(timeout)) {
return 0;
}
if (timeout.compareTo(heartbeatInterval) >= 0) {
// Timeout is longer than the interval. With the previous system, that would have killed the session when
// the timeout was reached. A slow server that managed to return the reply just before the timeout expired
// would have delayed subsequent heartbeats. The new system will keep sending heartbeats with the given
// interval. Thus we can have timeout / interval heartbeats without reply if we want to approximate the
// timeout.
double timeoutSec = timeout.getSeconds() + (timeout.getNano() / 1_000_000_000.0);
double intervalSec = heartbeatInterval.getSeconds() + (heartbeatInterval.getNano() / 1_000_000_000.0);
double multiple = timeoutSec / intervalSec;
if (multiple >= Integer.MAX_VALUE - 1) {
return Integer.MAX_VALUE;
} else {
return (int) multiple + 1;
}
}
// Timeout is smaller than the interval. We want to have every heartbeat replied to.
return 1;
// This is an approximation. If no reply is forthcoming, the session will be killed after the interval. In the
// previous system, it would have been killed after the timeout. We _could_ code something to schedule a task
// that kills the session after the timeout and cancel that if we get a reply, but it seems a bit pointless.
}

@Override
Expand All @@ -82,6 +125,7 @@ protected synchronized ScheduledFuture<?> startHeartBeat() {
if (!GenericUtils.isNegativeOrNull(heartbeatInterval) && GenericUtils.isNotEmpty(heartbeatRequest)) {
stopHeartBeat();

outstandingHeartbeats.set(0);
ClientSession session = getClientSession();
FactoryManager manager = session.getFactoryManager();
ScheduledExecutorService service = manager.getScheduledExecutorService();
Expand Down Expand Up @@ -117,26 +161,35 @@ protected boolean sendHeartBeat() {
}

Session session = getSession();
if (session.getKexState() != KexState.DONE) {
// During KEX, global requests are delayed until after the key exchange is over. Don't count during KEX,
// otherwise a slow KEX might cause us to kill the session prematurely.
return false;
}
try {
boolean withReply = !GenericUtils.isNegativeOrNull(heartbeatReplyMaxWait);
heartbeatCount.incrementAndGet();
boolean withReply = heartbeatMaxNoReply > 0;
int outstanding = outstandingHeartbeats.incrementAndGet();
if (withReply && heartbeatMaxNoReply < outstanding) {
throw new SshException("Got " + (outstanding - 1) + " heartbeat requests without reply");
}
Buffer buf = session.createBuffer(
SshConstants.SSH_MSG_GLOBAL_REQUEST, heartbeatRequest.length() + Byte.SIZE);
buf.putString(heartbeatRequest);
buf.putBoolean(withReply);

// Even if we want a reply, we don't wait.
if (withReply) {
Buffer reply = session.request(heartbeatRequest, buf, heartbeatReplyMaxWait);
if (reply != null) {
if (log.isTraceEnabled()) {
log.trace("sendHeartBeat({}) received reply size={} for request={}",
session, reply.available(), heartbeatRequest);
}
}
GlobalRequestFuture future = session.request(buf, heartbeatRequest, (cmd, buffer) -> {
// We got something back. Don't care about success or failure. (In particular we may get here in
// case the server responds SSH_MSG_UNIMPLEMENTED.)
outstandingHeartbeats.set(0);
});
future.addListener(this::futureDone);
} else {
IoWriteFuture future = session.writePacket(buf);
future.addListener(this::futureDone);
}
heartbeatCount.incrementAndGet();
return true;
} catch (IOException | RuntimeException | Error e) {
session.exceptionCaught(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* @see org.apache.sshd.common.session.Session#request(Buffer, String, ReplyHandler)
*/
public class GlobalRequestFuture extends DefaultSshFuture<GlobalRequestFuture>
implements SshFutureListener<IoWriteFuture> {
implements HasException, SshFutureListener<IoWriteFuture> {

/**
* A {@code ReplyHandler} is invoked asynchronously when the reply for a request with {@code want-reply = true} is
Expand Down Expand Up @@ -137,6 +137,7 @@ public Buffer getBuffer() {
*
* @return a failure reason, or {@code null} if there isn't one or if the request did not fail
*/
@Override
public Throwable getException() {
Object value = getValue();
if (value instanceof Throwable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.sshd.common.forward.ForwarderFactory;
import org.apache.sshd.common.forward.PortForwardingEventListener;
import org.apache.sshd.common.forward.PortForwardingEventListenerManager;
import org.apache.sshd.common.future.HasException;
import org.apache.sshd.common.io.AbstractIoWriteFuture;
import org.apache.sshd.common.io.IoWriteFuture;
import org.apache.sshd.common.kex.KexState;
Expand Down Expand Up @@ -263,7 +264,7 @@ protected boolean sendHeartBeat() {
}
}

protected void futureDone(IoWriteFuture future) {
protected void futureDone(HasException future) {
Throwable t = future.getException();
if (t != null) {
Session session = getSession();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,23 @@ public final class CoreModuleProperties {
/**
* Key used to indicate that the heartbeat request is also expecting a reply - time in <U>milliseconds</U> to wait
* for the reply. If non-positive then no reply is expected (nor requested).
*
* @deprecated since 2.13.0, use {@link #HEARTBEAT_NO_REPLY_MAX} instead
*/
@Deprecated
public static final Property<Duration> HEARTBEAT_REPLY_WAIT
= Property.durationSec("heartbeat-reply-wait", Duration.ofMinutes(5));

/**
* Key to set the maximum number of heartbeat messages to send without having received a reply. If &gt; 0, heartbeat
* messages are sent with a flag that requires the peer to reply. The session will be killed if
* {@code HEARTBEAT_NO_REPLY_MAX} heartbeats have been sent without having received a reply. If &lt;= 0, heartbeat
* messages will be sent, but no reply is requested or expected, and the client will not kill the session.
*
* @since 2.13.0
*/
public static final Property<Integer> HEARTBEAT_NO_REPLY_MAX = Property.integer("heartbeat-no-reply-max", 0);

/**
* Whether to ignore invalid identities files when pre-initializing the client session
*
Expand Down
9 changes: 5 additions & 4 deletions sshd-core/src/test/java/org/apache/sshd/KeepAliveTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void tearDown() {
// Restore default value
CoreModuleProperties.IDLE_TIMEOUT.remove(sshd);
CoreModuleProperties.HEARTBEAT_INTERVAL.remove(client);
CoreModuleProperties.HEARTBEAT_REPLY_WAIT.remove(client);
CoreModuleProperties.HEARTBEAT_NO_REPLY_MAX.remove(client);
}

@Test
Expand Down Expand Up @@ -198,7 +198,7 @@ public Result process(
}
}));
CoreModuleProperties.HEARTBEAT_INTERVAL.set(client, HEARTBEAT);
CoreModuleProperties.HEARTBEAT_REPLY_WAIT.set(client, Duration.ofSeconds(5L));
CoreModuleProperties.HEARTBEAT_NO_REPLY_MAX.set(client, 1);
try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port)
.verify(7L, TimeUnit.SECONDS)
.getSession()) {
Expand Down Expand Up @@ -229,8 +229,9 @@ public Result process(ConnectionService connectionService, String request, boole
return Result.Replied;
}
}));
CoreModuleProperties.HEARTBEAT_INTERVAL.set(client, HEARTBEAT);
CoreModuleProperties.HEARTBEAT_REPLY_WAIT.set(client, Duration.ofSeconds(1));
CoreModuleProperties.HEARTBEAT_INTERVAL.set(client, Duration.ofSeconds(1));
// CoreModuleProperties.HEARTBEAT_REPLY_WAIT.set(client, Duration.ofSeconds(1));
CoreModuleProperties.HEARTBEAT_NO_REPLY_MAX.set(client, 1);
try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port).verify(CONNECT_TIMEOUT)
.getSession()) {
session.addPasswordIdentity(getCurrentTestName());
Expand Down

0 comments on commit 624b1b2

Please sign in to comment.