Skip to content

Commit

Permalink
[export] [endpoint] Remove grpc_endpoint_shutdown().
Browse files Browse the repository at this point in the history
This gives grpc_endpoint the same destruction-is-shutdown semantic as
EventEngine::Endpoint, which will make the migration easier.

----
DO NOT SUBMIT. This PR is for testing purposes only. [cl/620101818](http://cl/620101818)

PiperOrigin-RevId: 620101818
  • Loading branch information
markdroth authored and copybara-github committed Apr 20, 2024
1 parent 91f0ead commit 3bb70cc
Show file tree
Hide file tree
Showing 47 changed files with 271 additions and 349 deletions.
1 change: 1 addition & 0 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -4648,6 +4648,7 @@ grpc_cc_library(
"chttp2_context_list_entry",
"chttp2_legacy_frame",
"chttp2_varint",
"config_vars",
"debug_location",
"exec_ctx",
"gpr",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,6 @@ void ChaoticGoodConnector::OnHandshakeDone(void* arg, grpc_error_handle error) {
// We were shut down after handshaking completed successfully, so
// destroy the endpoint here.
if (args->endpoint != nullptr) {
grpc_endpoint_shutdown(args->endpoint, error);
grpc_endpoint_destroy(args->endpoint);
}
}
Expand Down
28 changes: 5 additions & 23 deletions src/core/ext/transport/chttp2/client/chttp2_connector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,6 @@ void NullThenSchedClosure(const DebugLocation& location, grpc_closure** closure,
}
} // namespace

Chttp2Connector::~Chttp2Connector() {
if (endpoint_ != nullptr) {
grpc_endpoint_destroy(endpoint_);
}
}

void Chttp2Connector::Connect(const Args& args, Result* result,
grpc_closure* notify) {
{
Expand All @@ -108,7 +102,6 @@ void Chttp2Connector::Connect(const Args& args, Result* result,
args_ = args;
result_ = result;
notify_ = notify;
GPR_ASSERT(endpoint_ == nullptr);
event_engine_ = args_.channel_args.GetObject<EventEngine>();
}
absl::StatusOr<std::string> address = grpc_sockaddr_to_uri(args.address);
Expand Down Expand Up @@ -151,11 +144,6 @@ void Chttp2Connector::OnHandshakeDone(void* arg, grpc_error_handle error) {
// We were shut down after handshaking completed successfully, so
// destroy the endpoint here.
if (args->endpoint != nullptr) {
// TODO(ctiller): It is currently necessary to shutdown endpoints
// before destroying them, even if we know that there are no
// pending read/write callbacks. This should be fixed, at which
// point this can be removed.
grpc_endpoint_shutdown(args->endpoint, error);
grpc_endpoint_destroy(args->endpoint);
grpc_slice_buffer_destroy(args->read_buffer);
gpr_free(args->read_buffer);
Expand All @@ -170,13 +158,12 @@ void Chttp2Connector::OnHandshakeDone(void* arg, grpc_error_handle error) {
self->result_->socket_node =
grpc_chttp2_transport_get_socket_node(self->result_->transport);
self->result_->channel_args = args->args;
self->endpoint_ = args->endpoint;
self->Ref().release(); // Ref held by OnReceiveSettings()
GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings, self,
grpc_schedule_on_exec_ctx);
grpc_chttp2_transport_start_reading(self->result_->transport,
args->read_buffer,
&self->on_receive_settings_, nullptr);
grpc_chttp2_transport_start_reading(
self->result_->transport, args->read_buffer,
&self->on_receive_settings_, self->args_.interested_parties, nullptr);
self->timer_handle_ = self->event_engine_->RunAfter(
self->args_.deadline - Timestamp::Now(),
[self = self->RefAsSubclass<Chttp2Connector>()] {
Expand All @@ -201,8 +188,6 @@ void Chttp2Connector::OnReceiveSettings(void* arg, grpc_error_handle error) {
{
MutexLock lock(&self->mu_);
if (!self->notify_error_.has_value()) {
grpc_endpoint_delete_from_pollset_set(self->endpoint_,
self->args_.interested_parties);
if (!error.ok()) {
// Transport got an error while waiting on SETTINGS frame.
self->result_->Reset();
Expand Down Expand Up @@ -231,7 +216,6 @@ void Chttp2Connector::OnTimeout() {
if (!notify_error_.has_value()) {
// The transport did not receive the settings frame in time. Destroy the
// transport.
grpc_endpoint_delete_from_pollset_set(endpoint_, args_.interested_parties);
result_->Reset();
MaybeNotify(GRPC_ERROR_CREATE(
"connection attempt timed out before receiving SETTINGS frame"));
Expand All @@ -246,9 +230,6 @@ void Chttp2Connector::MaybeNotify(grpc_error_handle error) {
if (notify_error_.has_value()) {
NullThenSchedClosure(DEBUG_LOCATION, &notify_, notify_error_.value());
// Clear state for a new Connect().
// Clear out the endpoint_, since it is the responsibility of
// the transport to shut it down.
endpoint_ = nullptr;
notify_error_.reset();
} else {
notify_error_ = error;
Expand Down Expand Up @@ -407,7 +388,8 @@ grpc_channel* grpc_channel_create_from_fd(const char* target, int fd,
auto channel = grpc_core::ChannelCreate(
target, final_args, GRPC_CLIENT_DIRECT_CHANNEL, transport);
if (channel.ok()) {
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr,
nullptr);
grpc_core::ExecCtx::Get()->Flush();
return channel->release()->c_ptr();
} else {
Expand Down
5 changes: 0 additions & 5 deletions src/core/ext/transport/chttp2/client/chttp2_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ namespace grpc_core {

class Chttp2Connector : public SubchannelConnector {
public:
~Chttp2Connector() override;

void Connect(const Args& args, Result* result, grpc_closure* notify) override;
void Shutdown(grpc_error_handle error) override;

Expand All @@ -63,9 +61,6 @@ class Chttp2Connector : public SubchannelConnector {
Result* result_ = nullptr;
grpc_closure* notify_ = nullptr;
bool shutdown_ = false;
// Holds the endpoint when first created before being handed off to
// the handshake manager, and then again after handshake is done.
grpc_endpoint* endpoint_ = nullptr;
grpc_closure on_receive_settings_;
absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
timer_handle_ ABSL_GUARDED_BY(mu_);
Expand Down
29 changes: 9 additions & 20 deletions src/core/ext/transport/chttp2/server/chttp2_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -461,11 +461,6 @@ void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone(
if (error.ok() && args->endpoint != nullptr) {
// We were shut down or stopped serving after handshaking completed
// successfully, so destroy the endpoint here.
// TODO(ctiller): It is currently necessary to shutdown endpoints
// before destroying them, even if we know that there are no
// pending read/write callbacks. This should be fixed, at which
// point this can be removed.
grpc_endpoint_shutdown(args->endpoint, absl::OkStatus());
grpc_endpoint_destroy(args->endpoint);
grpc_slice_buffer_destroy(args->read_buffer);
gpr_free(args->read_buffer);
Expand Down Expand Up @@ -523,7 +518,7 @@ void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone(
}
grpc_chttp2_transport_start_reading(transport, args->read_buffer,
&self->on_receive_settings_,
on_close);
nullptr, on_close);
self->timer_handle_ = self->connection_->event_engine_->RunAfter(
self->deadline_ - Timestamp::Now(),
[self = self->Ref()]() mutable {
Expand Down Expand Up @@ -644,7 +639,6 @@ void Chttp2ServerListener::ActiveConnection::Start(
// owning Chttp2ServerListener and all associated ActiveConnections have
// been orphaned. The generated endpoints need to be shutdown here to
// ensure the tcp connections are closed appropriately.
grpc_endpoint_shutdown(endpoint, absl::OkStatus());
grpc_endpoint_destroy(endpoint);
return;
}
Expand Down Expand Up @@ -844,36 +838,30 @@ void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp,
MutexLock lock(&self->mu_);
connection_manager = self->connection_manager_;
}
auto endpoint_cleanup = [&](grpc_error_handle error) {
grpc_endpoint_shutdown(tcp, error);
auto endpoint_cleanup = [&]() {
grpc_endpoint_destroy(tcp);
gpr_free(acceptor);
};
if (!self->connection_quota_->AllowIncomingConnection(
self->memory_quota_, grpc_endpoint_get_peer(tcp))) {
grpc_error_handle error = GRPC_ERROR_CREATE(
"Rejected incoming connection because configured connection quota "
"limits have been exceeded.");
endpoint_cleanup(error);
endpoint_cleanup();
return;
}
if (self->server_->config_fetcher() != nullptr) {
if (connection_manager == nullptr) {
grpc_error_handle error = GRPC_ERROR_CREATE(
"No ConnectionManager configured. Closing connection.");
endpoint_cleanup(error);
endpoint_cleanup();
return;
}
absl::StatusOr<ChannelArgs> args_result =
connection_manager->UpdateChannelArgsForConnection(args, tcp);
if (!args_result.ok()) {
endpoint_cleanup(GRPC_ERROR_CREATE(args_result.status().ToString()));
endpoint_cleanup();
return;
}
grpc_error_handle error;
args = self->args_modifier_(*args_result, &error);
if (!error.ok()) {
endpoint_cleanup(error);
endpoint_cleanup();
return;
}
}
Expand Down Expand Up @@ -903,7 +891,7 @@ void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp,
}
}
if (connection != nullptr) {
endpoint_cleanup(absl::OkStatus());
endpoint_cleanup();
} else {
connection_ref->Start(std::move(listener_ref), tcp, args);
}
Expand Down Expand Up @@ -1127,7 +1115,8 @@ void grpc_server_add_channel_from_fd(grpc_server* server, int fd,
for (grpc_pollset* pollset : core_server->pollsets()) {
grpc_endpoint_add_to_pollset(server_endpoint, pollset);
}
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr,
nullptr);
} else {
gpr_log(GPR_ERROR, "Failed to create channel: %s",
grpc_core::StatusToString(error).c_str());
Expand Down
48 changes: 39 additions & 9 deletions src/core/ext/transport/chttp2/transport/chttp2_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/channel/tcp_tracer.h"
#include "src/core/lib/config/config_vars.h"
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/stats_data.h"
#include "src/core/lib/experiments/experiments.h"
Expand All @@ -90,6 +91,7 @@
#include "src/core/lib/http/parser.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/iomgr/port.h"
Expand All @@ -110,10 +112,6 @@
#include "src/core/lib/transport/status_conversion.h"
#include "src/core/lib/transport/transport.h"

#ifdef GRPC_POSIX_SOCKET_TCP
#include "src/core/lib/iomgr/ev_posix.h"
#endif

#define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
#define MAX_WINDOW 0x7fffffffu
#define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024)
Expand Down Expand Up @@ -385,7 +383,7 @@ grpc_chttp2_transport::~grpc_chttp2_transport() {
channelz_socket.reset();
}

grpc_endpoint_destroy(ep);
if (ep != nullptr) grpc_endpoint_destroy(ep);

grpc_slice_buffer_destroy(&qbuf);

Expand Down Expand Up @@ -759,9 +757,21 @@ static void close_transport_locked(grpc_chttp2_transport* t,
GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:close");
}
GPR_ASSERT(t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE);
grpc_endpoint_shutdown(t->ep, error);
if (t->interested_parties_until_recv_settings != nullptr) {
grpc_endpoint_delete_from_pollset_set(
t->ep, t->interested_parties_until_recv_settings);
t->interested_parties_until_recv_settings = nullptr;
}
grpc_core::MutexLock lock(&t->ep_destroy_mu);
grpc_endpoint_destroy(t->ep);
t->ep = nullptr;
}
if (t->notify_on_receive_settings != nullptr) {
if (t->interested_parties_until_recv_settings != nullptr) {
grpc_endpoint_delete_from_pollset_set(
t->ep, t->interested_parties_until_recv_settings);
t->interested_parties_until_recv_settings = nullptr;
}
grpc_core::ExecCtx::Run(DEBUG_LOCATION, t->notify_on_receive_settings,
error);
t->notify_on_receive_settings = nullptr;
Expand Down Expand Up @@ -3010,12 +3020,22 @@ static void connectivity_state_set(grpc_chttp2_transport* t,

void grpc_chttp2_transport::SetPollset(grpc_stream* /*gs*/,
grpc_pollset* pollset) {
grpc_endpoint_add_to_pollset(ep, pollset);
// We don't want the overhead of acquiring the mutex unless we're
// using the "poll" polling engine, which is the only one that
// actually uses pollsets.
if (strcmp(grpc_get_poll_strategy_name(), "poll") != 0) return;
grpc_core::MutexLock lock(&ep_destroy_mu);
if (ep != nullptr) grpc_endpoint_add_to_pollset(ep, pollset);
}

void grpc_chttp2_transport::SetPollsetSet(grpc_stream* /*gs*/,
grpc_pollset_set* pollset_set) {
grpc_endpoint_add_to_pollset_set(ep, pollset_set);
// We don't want the overhead of acquiring the mutex unless we're
// using the "poll" polling engine, which is the only one that
// actually uses pollsets.
if (strcmp(grpc_get_poll_strategy_name(), "poll") != 0) return;
grpc_core::MutexLock lock(&ep_destroy_mu);
if (ep != nullptr) grpc_endpoint_add_to_pollset_set(ep, pollset_set);
}

//
Expand Down Expand Up @@ -3203,7 +3223,9 @@ grpc_core::Transport* grpc_create_chttp2_transport(

void grpc_chttp2_transport_start_reading(
grpc_core::Transport* transport, grpc_slice_buffer* read_buffer,
grpc_closure* notify_on_receive_settings, grpc_closure* notify_on_close) {
grpc_closure* notify_on_receive_settings,
grpc_pollset_set* interested_parties_until_recv_settings,
grpc_closure* notify_on_close) {
auto t = reinterpret_cast<grpc_chttp2_transport*>(transport)->Ref();
if (read_buffer != nullptr) {
grpc_slice_buffer_move_into(read_buffer, &t->read_buffer);
Expand All @@ -3212,9 +3234,15 @@ void grpc_chttp2_transport_start_reading(
auto* tp = t.get();
tp->combiner->Run(
grpc_core::NewClosure([t = std::move(t), notify_on_receive_settings,
interested_parties_until_recv_settings,
notify_on_close](grpc_error_handle) mutable {
if (!t->closed_with_error.ok()) {
if (notify_on_receive_settings != nullptr) {
if (t->ep != nullptr &&
interested_parties_until_recv_settings != nullptr) {
grpc_endpoint_delete_from_pollset_set(
t->ep, interested_parties_until_recv_settings);
}
grpc_core::ExecCtx::Run(DEBUG_LOCATION, notify_on_receive_settings,
t->closed_with_error);
}
Expand All @@ -3224,6 +3252,8 @@ void grpc_chttp2_transport_start_reading(
}
return;
}
t->interested_parties_until_recv_settings =
interested_parties_until_recv_settings;
t->notify_on_receive_settings = notify_on_receive_settings;
t->notify_on_close = notify_on_close;
read_action_locked(std::move(t), absl::OkStatus());
Expand Down
7 changes: 6 additions & 1 deletion src/core/ext/transport/chttp2/transport/chttp2_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,14 @@ grpc_chttp2_transport_get_socket_node(grpc_core::Transport* transport);
/// leftover bytes previously read from the endpoint (e.g., by handshakers).
/// If non-null, \a notify_on_receive_settings will be scheduled when
/// HTTP/2 settings are received from the peer.
/// If non-null, the endpoint will be removed from
/// interested_parties_until_recv_settings before
/// notify_on_receive_settings is invoked.
void grpc_chttp2_transport_start_reading(
grpc_core::Transport* transport, grpc_slice_buffer* read_buffer,
grpc_closure* notify_on_receive_settings, grpc_closure* notify_on_close);
grpc_closure* notify_on_receive_settings,
grpc_pollset_set* interested_parties_until_recv_settings,
grpc_closure* notify_on_close);

namespace grpc_core {
typedef void (*TestOnlyGlobalHttp2TransportInitCallback)();
Expand Down
5 changes: 5 additions & 0 deletions src/core/ext/transport/chttp2/transport/frame_settings.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ grpc_error_handle grpc_chttp2_settings_parser_parse(void* p,
grpc_chttp2_initiate_write(t,
GRPC_CHTTP2_INITIATE_WRITE_SETTINGS_ACK);
if (t->notify_on_receive_settings != nullptr) {
if (t->interested_parties_until_recv_settings != nullptr) {
grpc_endpoint_delete_from_pollset_set(
t->ep, t->interested_parties_until_recv_settings);
t->interested_parties_until_recv_settings = nullptr;
}
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
t->notify_on_receive_settings,
absl::OkStatus());
Expand Down
10 changes: 10 additions & 0 deletions src/core/ext/transport/chttp2/transport/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ struct grpc_chttp2_transport final
grpc_endpoint* GetEndpoint() override;

grpc_endpoint* ep;
grpc_core::Mutex ep_destroy_mu; // Guards endpoint destruction only.

grpc_core::Slice peer_string;

grpc_core::MemoryOwner memory_owner;
Expand All @@ -269,6 +271,14 @@ struct grpc_chttp2_transport final
grpc_core::Combiner* combiner;
absl::BitGen bitgen;

// On the client side, when the transport is first created, the
// endpoint will already have been added to this pollset_set, and it
// needs to stay there until the notify_on_receive_settings callback
// is invoked. After that, the polling will be coordinated via the
// bind_pollset_set transport op, sent by the subchannel when it
// starts a connectivity watch.
grpc_pollset_set* interested_parties_until_recv_settings = nullptr;

grpc_closure* notify_on_receive_settings = nullptr;
grpc_closure* notify_on_close = nullptr;

Expand Down
3 changes: 2 additions & 1 deletion src/core/lib/http/httpcli.cc
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ void HttpRequest::Orphan() {
GRPC_ERROR_CREATE("HTTP request cancelled during handshake"));
}
if (own_endpoint_ && ep_ != nullptr) {
grpc_endpoint_shutdown(ep_, GRPC_ERROR_CREATE("HTTP request cancelled"));
grpc_endpoint_destroy(ep_);
ep_ = nullptr;
}
}
Unref();
Expand Down
Loading

0 comments on commit 3bb70cc

Please sign in to comment.