diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index e2befe1311af0..1deecdd208a14 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -1593,13 +1593,16 @@ public long newRequestId() { * Called once the channel is closed for instance due to a disconnect or a closed socket etc. */ protected final void onChannelClosed(Channel channel) { - Optional> first = pendingHandshakes.entrySet().stream() - .filter((entry) -> entry.getValue().channel == channel).findFirst(); + final Optional first = pendingHandshakes.entrySet().stream() + .filter((entry) -> entry.getValue().channel == channel).map((e) -> e.getKey()).findFirst(); if(first.isPresent()) { - final Long requestId = first.get().getKey(); - HandshakeResponseHandler handler = first.get().getValue(); - pendingHandshakes.remove(requestId); - handler.handleException(new TransportException("connection reset")); + final Long requestId = first.get(); + final HandshakeResponseHandler handler = pendingHandshakes.remove(requestId); + if (handler != null) { + // there might be a race removing this or this method might be called twice concurrently depending on how + // the channel is closed ie. due to connection reset or broken pipes + handler.handleException(new TransportException("connection reset")); + } } } }