Skip to content

Commit

Permalink
Fix pending requests failed with ResultConnectError when disconnecting (
Browse files Browse the repository at this point in the history
#322)

### Motivation

When there are multiple pending requests in the same `ClientConnection`,
if one request failed with a retryable error, e.g. the
`ServiceUnitNotReady` error when finding the owner broker of a topic,
the socket will be closed in `checkServerError` and `close()` will be
called subsequently in `handleRead` (`err` is `eof` or
`operation_failed`). However, the default value of 1st parameter is
`ResultConnectError` for `close`, which is not retryable.

### Modifications

If the connection is disconnected by the client, pass
`ResultDisconnected` to `close` and treat it as retryable.

closeSocket is replaced with close(ResultDisconnected) to avoid the connection being the status that socket is closed but TLS stream is not closed.
  • Loading branch information
BewareMyPower authored Oct 6, 2023
1 parent af45a54 commit f2c580b
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 36 deletions.
57 changes: 27 additions & 30 deletions lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "OpSendMsg.h"
#include "ProducerImpl.h"
#include "PulsarApi.pb.h"
#include "ResultUtils.h"
#include "Url.h"
#include "auth/InitialAuthData.h"
#include "checksum/ChecksumProvider.h"
Expand Down Expand Up @@ -205,7 +206,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
ctx.load_verify_file(trustCertFilePath);
} else {
LOG_ERROR(trustCertFilePath << ": No such trustCertFile");
close();
close(ResultAuthenticationError, false);
return;
}
} else {
Expand All @@ -215,7 +216,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:

if (!authentication_) {
LOG_ERROR("Invalid authentication plugin");
close();
close(ResultAuthenticationError, false);
return;
}

Expand All @@ -229,12 +230,12 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
tlsPrivateKey = authData->getTlsPrivateKey();
if (!file_exists(tlsCertificates)) {
LOG_ERROR(tlsCertificates << ": No such tlsCertificates");
close();
close(ResultAuthenticationError, false);
return;
}
if (!file_exists(tlsCertificates)) {
LOG_ERROR(tlsCertificates << ": No such tlsCertificates");
close();
close(ResultAuthenticationError, false);
return;
}
ctx.use_private_key_file(tlsPrivateKey, boost::asio::ssl::context::pem);
Expand Down Expand Up @@ -660,7 +661,7 @@ void ClientConnection::handleRead(const boost::system::error_code& err, size_t b
} else {
LOG_ERROR(cnxString_ << "Read operation failed: " << err.message());
}
close();
close(ResultDisconnected);
} else if (bytesTransferred < minReadSize) {
// Read the remaining part, use a slice of buffer to write on the next
// region
Expand Down Expand Up @@ -718,7 +719,7 @@ void ClientConnection::processIncomingBuffer() {
proto::BaseCommand incomingCmd;
if (!incomingCmd.ParseFromArray(incomingBuffer_.data(), cmdSize)) {
LOG_ERROR(cnxString_ << "Error parsing protocol buffer command");
close();
close(ResultDisconnected);
return;
}

Expand All @@ -742,7 +743,7 @@ void ClientConnection::processIncomingBuffer() {
<< incomingCmd.message().message_id().ledgerid() << ", entry id "
<< incomingCmd.message().message_id().entryid()
<< "] Error parsing broker entry metadata");
close();
close(ResultDisconnected);
return;
}
incomingBuffer_.setReaderIndex(readerIndex + 2 + 4 + brokerEntryMetadataSize);
Expand All @@ -760,7 +761,7 @@ void ClientConnection::processIncomingBuffer() {
<< incomingCmd.message().message_id().ledgerid() //
<< ", entry id " << incomingCmd.message().message_id().entryid()
<< "] Error parsing message metadata");
close();
close(ResultDisconnected);
return;
}

Expand Down Expand Up @@ -991,7 +992,7 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) {

default:
LOG_WARN(cnxString_ << "Received invalid message from server");
close();
close(ResultDisconnected);
break;
}
}
Expand Down Expand Up @@ -1133,7 +1134,7 @@ void ClientConnection::sendMessage(const std::shared_ptr<SendArguments>& args) {
void ClientConnection::handleSend(const boost::system::error_code& err, const SharedBuffer&) {
if (err) {
LOG_WARN(cnxString_ << "Could not send message on connection: " << err << " " << err.message());
close();
close(ResultDisconnected);
} else {
sendPendingCommands();
}
Expand All @@ -1142,7 +1143,7 @@ void ClientConnection::handleSend(const boost::system::error_code& err, const Sh
void ClientConnection::handleSendPair(const boost::system::error_code& err) {
if (err) {
LOG_WARN(cnxString_ << "Could not send pair message on connection: " << err << " " << err.message());
close();
close(ResultDisconnected);
} else {
sendPendingCommands();
}
Expand Down Expand Up @@ -1247,7 +1248,7 @@ void ClientConnection::handleKeepAliveTimeout() {

if (havePendingPingRequest_) {
LOG_WARN(cnxString_ << "Forcing connection to close after keep-alive timeout");
close();
close(ResultDisconnected);
} else {
// Send keep alive probe to peer
LOG_DEBUG(cnxString_ << "Sending ping message");
Expand Down Expand Up @@ -1287,7 +1288,14 @@ void ClientConnection::close(Result result, bool detach) {
}
state_ = Disconnected;

closeSocket();
if (socket_) {
boost::system::error_code err;
socket_->shutdown(boost::asio::socket_base::shutdown_both, err);
socket_->close(err);
if (err) {
LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
}
}
if (tlsSocket_) {
boost::system::error_code err;
tlsSocket_->lowest_layer().close(err);
Expand Down Expand Up @@ -1326,7 +1334,7 @@ void ClientConnection::close(Result result, bool detach) {
}

lock.unlock();
if (result != ResultDisconnected && result != ResultRetryable) {
if (!isResultRetryable(result)) {
LOG_ERROR(cnxString_ << "Connection closed with " << result);
} else {
LOG_INFO(cnxString_ << "Connection disconnected");
Expand Down Expand Up @@ -1473,26 +1481,15 @@ Future<Result, SchemaInfo> ClientConnection::newGetSchema(const std::string& top
return promise.getFuture();
}

void ClientConnection::closeSocket() {
boost::system::error_code err;
if (socket_) {
socket_->shutdown(boost::asio::socket_base::shutdown_both, err);
socket_->close(err);
if (err) {
LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
}
}
}

void ClientConnection::checkServerError(ServerError error) {
switch (error) {
case proto::ServerError::ServiceNotReady:
closeSocket();
close(ResultDisconnected);
break;
case proto::ServerError::TooManyRequests:
// TODO: Implement maxNumberOfRejectedRequestPerConnection like
// https://github.com/apache/pulsar/pull/274
closeSocket();
close(ResultDisconnected);
break;
default:
break;
Expand All @@ -1518,7 +1515,7 @@ void ClientConnection::handleSendReceipt(const proto::CommandSendReceipt& sendRe
if (!producer->ackReceived(sequenceId, messageId)) {
// If the producer fails to process the ack, we need to close the connection
// to give it a chance to recover from there
close();
close(ResultDisconnected);
}
}
} else {
Expand All @@ -1542,12 +1539,12 @@ void ClientConnection::handleSendError(const proto::CommandSendError& error) {
if (!producer->removeCorruptMessage(sequenceId)) {
// If the producer fails to remove corrupt msg, we need to close the
// connection to give it a chance to recover from there
close();
close(ResultDisconnected);
}
}
}
} else {
close();
close(ResultDisconnected);
}
}

Expand Down
11 changes: 10 additions & 1 deletion lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,16 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
*/
void tcpConnectAsync();

/**
* Close the connection.
*
* @param result all pending futures will complete with this result
* @param detach remove it from the pool if it's true
*
* `detach` should only be false when:
* 1. Before the connection is put into the pool, i.e. during the construction.
* 2. When the connection pool is closed
*/
void close(Result result = ResultConnectError, bool detach = true);

bool isClosed() const;
Expand Down Expand Up @@ -392,7 +402,6 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
ConnectionPool& pool_;
friend class PulsarFriend;

void closeSocket();
void checkServerError(ServerError error);

void handleSendReceipt(const proto::CommandSendReceipt&);
Expand Down
3 changes: 2 additions & 1 deletion lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "MessagesImpl.h"
#include "ProducerConfigurationImpl.h"
#include "PulsarApi.pb.h"
#include "ResultUtils.h"
#include "TimeUtils.h"
#include "TopicName.h"
#include "UnAckedMessageTrackerDisabled.h"
Expand Down Expand Up @@ -319,7 +320,7 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r
} else {
// Consumer was not yet created, retry to connect to broker if it's possible
result = convertToTimeoutIfNecessary(result, creationTimestamp_);
if (result == ResultRetryable) {
if (isResultRetryable(result)) {
LOG_WARN(getName() << "Temporary error in creating consumer: " << strResult(result));
scheduleReconnection();
} else {
Expand Down
5 changes: 3 additions & 2 deletions lib/HandlerBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "ClientImpl.h"
#include "ExecutorService.h"
#include "LogUtils.h"
#include "ResultUtils.h"
#include "TimeUtils.h"

DECLARE_LOG_OBJECT()
Expand Down Expand Up @@ -110,7 +111,7 @@ void HandlerBase::handleDisconnection(Result result, const ClientConnectionPtr&

resetCnx();

if (result == ResultRetryable) {
if (isResultRetryable(result)) {
scheduleReconnection();
return;
}
Expand Down Expand Up @@ -165,7 +166,7 @@ void HandlerBase::handleTimeout(const boost::system::error_code& ec) {
}

Result HandlerBase::convertToTimeoutIfNecessary(Result result, ptime startTimestamp) const {
if (result == ResultRetryable && (TimeUtils::now() - startTimestamp >= operationTimeut_)) {
if (isResultRetryable(result) && (TimeUtils::now() - startTimestamp >= operationTimeut_)) {
return ResultTimeout;
} else {
return result;
Expand Down
3 changes: 2 additions & 1 deletion lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "OpSendMsg.h"
#include "ProducerConfigurationImpl.h"
#include "PulsarApi.pb.h"
#include "ResultUtils.h"
#include "Semaphore.h"
#include "TimeUtils.h"
#include "TopicName.h"
Expand Down Expand Up @@ -272,7 +273,7 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
} else {
// Producer was not yet created, retry to connect to broker if it's possible
result = convertToTimeoutIfNecessary(result, creationTimestamp_);
if (result == ResultRetryable) {
if (isResultRetryable(result)) {
LOG_WARN(getName() << "Temporary error in creating producer: " << strResult(result));
scheduleReconnection();
} else {
Expand Down
29 changes: 29 additions & 0 deletions lib/ResultUtils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once

#include <pulsar/Result.h>

namespace pulsar {

inline bool isResultRetryable(Result result) {
return result == ResultRetryable || result == ResultDisconnected;
}

} // namespace pulsar
3 changes: 2 additions & 1 deletion lib/RetryableOperation.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "ExecutorService.h"
#include "Future.h"
#include "LogUtils.h"
#include "ResultUtils.h"

namespace pulsar {

Expand Down Expand Up @@ -95,7 +96,7 @@ class RetryableOperation : public std::enable_shared_from_this<RetryableOperatio
promise_.setValue(value);
return;
}
if (result != ResultRetryable) {
if (!isResultRetryable(result)) {
promise_.setFailed(result);
return;
}
Expand Down

0 comments on commit f2c580b

Please sign in to comment.