Skip to content

Commit

Permalink
[export] [endpoint] Remove grpc_endpoint_shutdown().
Browse files Browse the repository at this point in the history
This gives grpc_endpoint the same destruction-is-shutdown semantic as
EventEngine::Endpoint, which will make the migration easier.

----
DO NOT SUBMIT. This PR is for testing purposes only. [cl/620101818](http://cl/620101818)

PiperOrigin-RevId: 620101818
  • Loading branch information
markdroth authored and copybara-github committed Mar 29, 2024
1 parent 0331288 commit f4dc0e2
Show file tree
Hide file tree
Showing 38 changed files with 140 additions and 231 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,6 @@ void ChaoticGoodConnector::OnHandshakeDone(void* arg, grpc_error_handle error) {
// We were shut down after handshaking completed successfully, so
// destroy the endpoint here.
if (args->endpoint != nullptr) {
grpc_endpoint_shutdown(args->endpoint, error);
grpc_endpoint_destroy(args->endpoint);
}
}
Expand Down
28 changes: 5 additions & 23 deletions src/core/ext/transport/chttp2/client/chttp2_connector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,6 @@ void NullThenSchedClosure(const DebugLocation& location, grpc_closure** closure,
}
} // namespace

Chttp2Connector::~Chttp2Connector() {
if (endpoint_ != nullptr) {
grpc_endpoint_destroy(endpoint_);
}
}

void Chttp2Connector::Connect(const Args& args, Result* result,
grpc_closure* notify) {
{
Expand All @@ -109,7 +103,6 @@ void Chttp2Connector::Connect(const Args& args, Result* result,
args_ = args;
result_ = result;
notify_ = notify;
GPR_ASSERT(endpoint_ == nullptr);
event_engine_ = args_.channel_args.GetObject<EventEngine>();
}
absl::StatusOr<std::string> address = grpc_sockaddr_to_uri(args.address);
Expand Down Expand Up @@ -152,11 +145,6 @@ void Chttp2Connector::OnHandshakeDone(void* arg, grpc_error_handle error) {
// We were shut down after handshaking completed successfully, so
// destroy the endpoint here.
if (args->endpoint != nullptr) {
// TODO(ctiller): It is currently necessary to shutdown endpoints
// before destroying them, even if we know that there are no
// pending read/write callbacks. This should be fixed, at which
// point this can be removed.
grpc_endpoint_shutdown(args->endpoint, error);
grpc_endpoint_destroy(args->endpoint);
grpc_slice_buffer_destroy(args->read_buffer);
gpr_free(args->read_buffer);
Expand All @@ -171,13 +159,12 @@ void Chttp2Connector::OnHandshakeDone(void* arg, grpc_error_handle error) {
self->result_->socket_node =
grpc_chttp2_transport_get_socket_node(self->result_->transport);
self->result_->channel_args = args->args;
self->endpoint_ = args->endpoint;
self->Ref().release(); // Ref held by OnReceiveSettings()
GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings, self,
grpc_schedule_on_exec_ctx);
grpc_chttp2_transport_start_reading(self->result_->transport,
args->read_buffer,
&self->on_receive_settings_, nullptr);
grpc_chttp2_transport_start_reading(
self->result_->transport, args->read_buffer,
&self->on_receive_settings_, self->args_.interested_parties, nullptr);
self->timer_handle_ = self->event_engine_->RunAfter(
self->args_.deadline - Timestamp::Now(),
[self = self->RefAsSubclass<Chttp2Connector>()] {
Expand All @@ -202,8 +189,6 @@ void Chttp2Connector::OnReceiveSettings(void* arg, grpc_error_handle error) {
{
MutexLock lock(&self->mu_);
if (!self->notify_error_.has_value()) {
grpc_endpoint_delete_from_pollset_set(self->endpoint_,
self->args_.interested_parties);
if (!error.ok()) {
// Transport got an error while waiting on SETTINGS frame.
self->result_->Reset();
Expand Down Expand Up @@ -232,7 +217,6 @@ void Chttp2Connector::OnTimeout() {
if (!notify_error_.has_value()) {
// The transport did not receive the settings frame in time. Destroy the
// transport.
grpc_endpoint_delete_from_pollset_set(endpoint_, args_.interested_parties);
result_->Reset();
MaybeNotify(GRPC_ERROR_CREATE(
"connection attempt timed out before receiving SETTINGS frame"));
Expand All @@ -247,9 +231,6 @@ void Chttp2Connector::MaybeNotify(grpc_error_handle error) {
if (notify_error_.has_value()) {
NullThenSchedClosure(DEBUG_LOCATION, &notify_, notify_error_.value());
// Clear state for a new Connect().
// Clear out the endpoint_, since it is the responsibility of
// the transport to shut it down.
endpoint_ = nullptr;
notify_error_.reset();
} else {
notify_error_ = error;
Expand Down Expand Up @@ -408,7 +389,8 @@ grpc_channel* grpc_channel_create_from_fd(const char* target, int fd,
auto channel = grpc_core::ChannelCreate(
target, final_args, GRPC_CLIENT_DIRECT_CHANNEL, transport);
if (channel.ok()) {
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr,
nullptr);
grpc_core::ExecCtx::Get()->Flush();
return channel->release()->c_ptr();
} else {
Expand Down
5 changes: 0 additions & 5 deletions src/core/ext/transport/chttp2/client/chttp2_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ namespace grpc_core {

class Chttp2Connector : public SubchannelConnector {
public:
~Chttp2Connector() override;

void Connect(const Args& args, Result* result, grpc_closure* notify) override;
void Shutdown(grpc_error_handle error) override;

Expand All @@ -64,9 +62,6 @@ class Chttp2Connector : public SubchannelConnector {
Result* result_ = nullptr;
grpc_closure* notify_ = nullptr;
bool shutdown_ = false;
// Holds the endpoint when first created before being handed off to
// the handshake manager, and then again after handshake is done.
grpc_endpoint* endpoint_ = nullptr;
grpc_closure on_receive_settings_;
absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
timer_handle_ ABSL_GUARDED_BY(mu_);
Expand Down
12 changes: 3 additions & 9 deletions src/core/ext/transport/chttp2/server/chttp2_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -462,11 +462,6 @@ void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone(
if (error.ok() && args->endpoint != nullptr) {
// We were shut down or stopped serving after handshaking completed
// successfully, so destroy the endpoint here.
// TODO(ctiller): It is currently necessary to shutdown endpoints
// before destroying them, even if we know that there are no
// pending read/write callbacks. This should be fixed, at which
// point this can be removed.
grpc_endpoint_shutdown(args->endpoint, absl::OkStatus());
grpc_endpoint_destroy(args->endpoint);
grpc_slice_buffer_destroy(args->read_buffer);
gpr_free(args->read_buffer);
Expand Down Expand Up @@ -524,7 +519,7 @@ void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone(
}
grpc_chttp2_transport_start_reading(transport, args->read_buffer,
&self->on_receive_settings_,
on_close);
nullptr, on_close);
self->timer_handle_ = self->connection_->event_engine_->RunAfter(
self->deadline_ - Timestamp::Now(),
[self = self->Ref()]() mutable {
Expand Down Expand Up @@ -645,7 +640,6 @@ void Chttp2ServerListener::ActiveConnection::Start(
// owning Chttp2ServerListener and all associated ActiveConnections have
// been orphaned. The generated endpoints need to be shutdown here to
// ensure the tcp connections are closed appropriately.
grpc_endpoint_shutdown(endpoint, absl::OkStatus());
grpc_endpoint_destroy(endpoint);
return;
}
Expand Down Expand Up @@ -846,7 +840,6 @@ void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp,
connection_manager = self->connection_manager_;
}
auto endpoint_cleanup = [&](grpc_error_handle error) {
grpc_endpoint_shutdown(tcp, error);
grpc_endpoint_destroy(tcp);
gpr_free(acceptor);
};
Expand Down Expand Up @@ -1128,7 +1121,8 @@ void grpc_server_add_channel_from_fd(grpc_server* server, int fd,
for (grpc_pollset* pollset : core_server->pollsets()) {
grpc_endpoint_add_to_pollset(server_endpoint, pollset);
}
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr,
nullptr);
} else {
gpr_log(GPR_ERROR, "Failed to create channel: %s",
grpc_core::StatusToString(error).c_str());
Expand Down
26 changes: 23 additions & 3 deletions src/core/ext/transport/chttp2/transport/chttp2_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ grpc_chttp2_transport::~grpc_chttp2_transport() {
channelz_socket.reset();
}

grpc_endpoint_destroy(ep);
if (ep != nullptr) grpc_endpoint_destroy(ep);

grpc_slice_buffer_destroy(&qbuf);

Expand Down Expand Up @@ -759,9 +759,20 @@ static void close_transport_locked(grpc_chttp2_transport* t,
GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:close");
}
GPR_ASSERT(t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE);
grpc_endpoint_shutdown(t->ep, error);
if (t->interested_parties_until_recv_settings != nullptr) {
grpc_endpoint_delete_from_pollset_set(
t->ep, t->interested_parties_until_recv_settings);
t->interested_parties_until_recv_settings = nullptr;
}
grpc_endpoint_destroy(t->ep);
t->ep = nullptr;
}
if (t->notify_on_receive_settings != nullptr) {
if (t->interested_parties_until_recv_settings != nullptr) {
grpc_endpoint_delete_from_pollset_set(
t->ep, t->interested_parties_until_recv_settings);
t->interested_parties_until_recv_settings = nullptr;
}
grpc_core::ExecCtx::Run(DEBUG_LOCATION, t->notify_on_receive_settings,
error);
t->notify_on_receive_settings = nullptr;
Expand Down Expand Up @@ -3206,7 +3217,9 @@ grpc_core::Transport* grpc_create_chttp2_transport(

void grpc_chttp2_transport_start_reading(
grpc_core::Transport* transport, grpc_slice_buffer* read_buffer,
grpc_closure* notify_on_receive_settings, grpc_closure* notify_on_close) {
grpc_closure* notify_on_receive_settings,
grpc_pollset_set* interested_parties_until_recv_settings,
grpc_closure* notify_on_close) {
auto t = reinterpret_cast<grpc_chttp2_transport*>(transport)->Ref();
if (read_buffer != nullptr) {
grpc_slice_buffer_move_into(read_buffer, &t->read_buffer);
Expand All @@ -3215,9 +3228,14 @@ void grpc_chttp2_transport_start_reading(
auto* tp = t.get();
tp->combiner->Run(
grpc_core::NewClosure([t = std::move(t), notify_on_receive_settings,
interested_parties_until_recv_settings,
notify_on_close](grpc_error_handle) mutable {
if (!t->closed_with_error.ok()) {
if (notify_on_receive_settings != nullptr) {
if (interested_parties_until_recv_settings != nullptr) {
grpc_endpoint_delete_from_pollset_set(
t->ep, interested_parties_until_recv_settings);
}
grpc_core::ExecCtx::Run(DEBUG_LOCATION, notify_on_receive_settings,
t->closed_with_error);
}
Expand All @@ -3227,6 +3245,8 @@ void grpc_chttp2_transport_start_reading(
}
return;
}
t->interested_parties_until_recv_settings =
interested_parties_until_recv_settings;
t->notify_on_receive_settings = notify_on_receive_settings;
t->notify_on_close = notify_on_close;
read_action_locked(std::move(t), absl::OkStatus());
Expand Down
7 changes: 6 additions & 1 deletion src/core/ext/transport/chttp2/transport/chttp2_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,14 @@ grpc_chttp2_transport_get_socket_node(grpc_core::Transport* transport);
/// leftover bytes previously read from the endpoint (e.g., by handshakers).
/// If non-null, \a notify_on_receive_settings will be scheduled when
/// HTTP/2 settings are received from the peer.
/// If non-null, the endpoint will be removed from
/// interested_parties_until_recv_settings before
/// notify_on_receive_settings is invoked.
void grpc_chttp2_transport_start_reading(
grpc_core::Transport* transport, grpc_slice_buffer* read_buffer,
grpc_closure* notify_on_receive_settings, grpc_closure* notify_on_close);
grpc_closure* notify_on_receive_settings,
grpc_pollset_set* interested_parties_until_recv_settings,
grpc_closure* notify_on_close);

namespace grpc_core {
typedef void (*TestOnlyGlobalHttp2TransportInitCallback)();
Expand Down
5 changes: 5 additions & 0 deletions src/core/ext/transport/chttp2/transport/frame_settings.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ grpc_error_handle grpc_chttp2_settings_parser_parse(void* p,
grpc_chttp2_initiate_write(t,
GRPC_CHTTP2_INITIATE_WRITE_SETTINGS_ACK);
if (t->notify_on_receive_settings != nullptr) {
if (t->interested_parties_until_recv_settings != nullptr) {
grpc_endpoint_delete_from_pollset_set(
t->ep, t->interested_parties_until_recv_settings);
t->interested_parties_until_recv_settings = nullptr;
}
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
t->notify_on_receive_settings,
absl::OkStatus());
Expand Down
8 changes: 8 additions & 0 deletions src/core/ext/transport/chttp2/transport/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,14 @@ struct grpc_chttp2_transport final
grpc_core::Combiner* combiner;
absl::BitGen bitgen;

// On the client side, when the transport is first created, the
// endpoint will already have been added to this pollset_set, and it
// needs to stay there until the notify_on_receive_settings callback
// is invoked. After that, the polling will be coordinated via the
// bind_pollset_set transport op, sent by the subchannel when it
// starts a connectivity watch.
grpc_pollset_set* interested_parties_until_recv_settings = nullptr;

grpc_closure* notify_on_receive_settings = nullptr;
grpc_closure* notify_on_close = nullptr;

Expand Down
3 changes: 2 additions & 1 deletion src/core/lib/http/httpcli.cc
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ void HttpRequest::Orphan() {
GRPC_ERROR_CREATE("HTTP request cancelled during handshake"));
}
if (own_endpoint_ && ep_ != nullptr) {
grpc_endpoint_shutdown(ep_, GRPC_ERROR_CREATE("HTTP request cancelled"));
grpc_endpoint_destroy(ep_);
ep_ = nullptr;
}
}
Unref();
Expand Down
4 changes: 0 additions & 4 deletions src/core/lib/iomgr/endpoint.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,6 @@ void grpc_endpoint_delete_from_pollset_set(grpc_endpoint* ep,
ep->vtable->delete_from_pollset_set(ep, pollset_set);
}

void grpc_endpoint_shutdown(grpc_endpoint* ep, grpc_error_handle why) {
ep->vtable->shutdown(ep, why);
}

void grpc_endpoint_destroy(grpc_endpoint* ep) { ep->vtable->destroy(ep); }

absl::string_view grpc_endpoint_get_peer(grpc_endpoint* ep) {
Expand Down
2 changes: 0 additions & 2 deletions src/core/lib/iomgr/endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ struct grpc_endpoint_vtable {
void (*add_to_pollset)(grpc_endpoint* ep, grpc_pollset* pollset);
void (*add_to_pollset_set)(grpc_endpoint* ep, grpc_pollset_set* pollset);
void (*delete_from_pollset_set)(grpc_endpoint* ep, grpc_pollset_set* pollset);
void (*shutdown)(grpc_endpoint* ep, grpc_error_handle why);
void (*destroy)(grpc_endpoint* ep);
absl::string_view (*get_peer)(grpc_endpoint* ep);
absl::string_view (*get_local_address)(grpc_endpoint* ep);
Expand Down Expand Up @@ -87,7 +86,6 @@ void grpc_endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* slices,

// Causes any pending and future read/write callbacks to run immediately with
// success==0
void grpc_endpoint_shutdown(grpc_endpoint* ep, grpc_error_handle why);
void grpc_endpoint_destroy(grpc_endpoint* ep);

// Add an endpoint to a pollset or pollset_set, so that when the pollset is
Expand Down
18 changes: 3 additions & 15 deletions src/core/lib/iomgr/event_engine_shims/endpoint.cc
Original file line number Diff line number Diff line change
Expand Up @@ -345,29 +345,18 @@ void EndpointAddToPollsetSet(grpc_endpoint* /* ep */,
grpc_pollset_set* /* pollset */) {}
void EndpointDeleteFromPollsetSet(grpc_endpoint* /* ep */,
grpc_pollset_set* /* pollset */) {}

/// After shutdown, all endpoint operations except destroy are no-op,
/// and will return some kind of sane default (empty strings, nullptrs, etc).
/// It is the caller's responsibility to ensure that calls to EndpointShutdown
/// are synchronized.
void EndpointShutdown(grpc_endpoint* ep, grpc_error_handle why) {
auto* eeep =
reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
ep);
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
gpr_log(GPR_INFO, "TCP Endpoint %p shutdown why=%s", eeep->wrapper,
why.ToString().c_str());
}
GRPC_EVENT_ENGINE_TRACE("EventEngine::Endpoint %p Shutdown:%s", eeep->wrapper,
why.ToString().c_str());
eeep->wrapper->TriggerShutdown(nullptr);
}

// Attempts to free the underlying data structures.
/// Attempts to free the underlying data structures.
void EndpointDestroy(grpc_endpoint* ep) {
auto* eeep =
reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
ep);
GRPC_EVENT_ENGINE_TRACE("EventEngine::Endpoint %p Destroy", eeep->wrapper);
eeep->wrapper->TriggerShutdown(nullptr);
eeep->wrapper->Unref();
}

Expand Down Expand Up @@ -405,7 +394,6 @@ grpc_endpoint_vtable grpc_event_engine_endpoint_vtable = {
EndpointAddToPollset,
EndpointAddToPollsetSet,
EndpointDeleteFromPollsetSet,
EndpointShutdown,
EndpointDestroy,
EndpointGetPeerAddress,
EndpointGetLocalAddress,
Expand Down
13 changes: 2 additions & 11 deletions src/core/lib/iomgr/tcp_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -773,15 +773,6 @@ static grpc_error_handle tcp_annotate_error(grpc_error_handle src_error,
static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error_handle error);
static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error_handle error);

static void tcp_shutdown(grpc_endpoint* ep, grpc_error_handle why) {
grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
ZerocopyDisableAndWaitForRemaining(tcp);
grpc_fd_shutdown(tcp->em_fd, why);
tcp->read_mu.Lock();
tcp->memory_owner.Reset();
tcp->read_mu.Unlock();
}

static void tcp_free(grpc_tcp* tcp) {
grpc_fd_orphan(tcp->em_fd, tcp->release_fd_cb, tcp->release_fd,
"tcp_unref_orphan");
Expand Down Expand Up @@ -820,9 +811,10 @@ static void tcp_ref(grpc_tcp* tcp) { tcp->refcount.Ref(); }

static void tcp_destroy(grpc_endpoint* ep) {
grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
ZerocopyDisableAndWaitForRemaining(tcp);
grpc_fd_shutdown(tcp->em_fd, absl::UnavailableError("endpoint shutdown"));
grpc_slice_buffer_reset_and_unref(&tcp->last_read_buffer);
if (grpc_event_engine_can_track_errors()) {
ZerocopyDisableAndWaitForRemaining(tcp);
gpr_atm_no_barrier_store(&tcp->stop_error_notification, true);
grpc_fd_set_error(tcp->em_fd);
}
Expand Down Expand Up @@ -1975,7 +1967,6 @@ static const grpc_endpoint_vtable vtable = {tcp_read,
tcp_add_to_pollset,
tcp_add_to_pollset_set,
tcp_delete_from_pollset_set,
tcp_shutdown,
tcp_destroy,
tcp_get_peer,
tcp_get_local_address,
Expand Down
Loading

0 comments on commit f4dc0e2

Please sign in to comment.