Skip to content

Commit

Permalink
[export] [endpoint] Remove grpc_endpoint_shutdown().
Browse files Browse the repository at this point in the history
This gives grpc_endpoint the same destruction-is-shutdown semantic as
EventEngine::Endpoint, which will make the migration easier.

----
DO NOT SUBMIT. This PR is for testing purposes only. [cl/620101818](http://cl/620101818) [cl/624063459](http://cl/624063459)

PiperOrigin-RevId: 620101818
  • Loading branch information
markdroth authored and copybara-github committed Apr 12, 2024
1 parent fadf1bb commit b2721a6
Show file tree
Hide file tree
Showing 40 changed files with 199 additions and 303 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,6 @@ void ChaoticGoodConnector::OnHandshakeDone(void* arg, grpc_error_handle error) {
// We were shut down after handshaking completed successfully, so
// destroy the endpoint here.
if (args->endpoint != nullptr) {
grpc_endpoint_shutdown(args->endpoint, error);
grpc_endpoint_destroy(args->endpoint);
}
}
Expand Down
28 changes: 5 additions & 23 deletions src/core/ext/transport/chttp2/client/chttp2_connector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,6 @@ void NullThenSchedClosure(const DebugLocation& location, grpc_closure** closure,
}
} // namespace

Chttp2Connector::~Chttp2Connector() {
if (endpoint_ != nullptr) {
grpc_endpoint_destroy(endpoint_);
}
}

void Chttp2Connector::Connect(const Args& args, Result* result,
grpc_closure* notify) {
{
Expand All @@ -108,7 +102,6 @@ void Chttp2Connector::Connect(const Args& args, Result* result,
args_ = args;
result_ = result;
notify_ = notify;
GPR_ASSERT(endpoint_ == nullptr);
event_engine_ = args_.channel_args.GetObject<EventEngine>();
}
absl::StatusOr<std::string> address = grpc_sockaddr_to_uri(args.address);
Expand Down Expand Up @@ -151,11 +144,6 @@ void Chttp2Connector::OnHandshakeDone(void* arg, grpc_error_handle error) {
// We were shut down after handshaking completed successfully, so
// destroy the endpoint here.
if (args->endpoint != nullptr) {
// TODO(ctiller): It is currently necessary to shutdown endpoints
// before destroying them, even if we know that there are no
// pending read/write callbacks. This should be fixed, at which
// point this can be removed.
grpc_endpoint_shutdown(args->endpoint, error);
grpc_endpoint_destroy(args->endpoint);
grpc_slice_buffer_destroy(args->read_buffer);
gpr_free(args->read_buffer);
Expand All @@ -170,13 +158,12 @@ void Chttp2Connector::OnHandshakeDone(void* arg, grpc_error_handle error) {
self->result_->socket_node =
grpc_chttp2_transport_get_socket_node(self->result_->transport);
self->result_->channel_args = args->args;
self->endpoint_ = args->endpoint;
self->Ref().release(); // Ref held by OnReceiveSettings()
GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings, self,
grpc_schedule_on_exec_ctx);
grpc_chttp2_transport_start_reading(self->result_->transport,
args->read_buffer,
&self->on_receive_settings_, nullptr);
grpc_chttp2_transport_start_reading(
self->result_->transport, args->read_buffer,
&self->on_receive_settings_, self->args_.interested_parties, nullptr);
self->timer_handle_ = self->event_engine_->RunAfter(
self->args_.deadline - Timestamp::Now(),
[self = self->RefAsSubclass<Chttp2Connector>()] {
Expand All @@ -201,8 +188,6 @@ void Chttp2Connector::OnReceiveSettings(void* arg, grpc_error_handle error) {
{
MutexLock lock(&self->mu_);
if (!self->notify_error_.has_value()) {
grpc_endpoint_delete_from_pollset_set(self->endpoint_,
self->args_.interested_parties);
if (!error.ok()) {
// Transport got an error while waiting on SETTINGS frame.
self->result_->Reset();
Expand Down Expand Up @@ -231,7 +216,6 @@ void Chttp2Connector::OnTimeout() {
if (!notify_error_.has_value()) {
// The transport did not receive the settings frame in time. Destroy the
// transport.
grpc_endpoint_delete_from_pollset_set(endpoint_, args_.interested_parties);
result_->Reset();
MaybeNotify(GRPC_ERROR_CREATE(
"connection attempt timed out before receiving SETTINGS frame"));
Expand All @@ -246,9 +230,6 @@ void Chttp2Connector::MaybeNotify(grpc_error_handle error) {
if (notify_error_.has_value()) {
NullThenSchedClosure(DEBUG_LOCATION, &notify_, notify_error_.value());
// Clear state for a new Connect().
// Clear out the endpoint_, since it is the responsibility of
// the transport to shut it down.
endpoint_ = nullptr;
notify_error_.reset();
} else {
notify_error_ = error;
Expand Down Expand Up @@ -407,7 +388,8 @@ grpc_channel* grpc_channel_create_from_fd(const char* target, int fd,
auto channel = grpc_core::ChannelCreate(
target, final_args, GRPC_CLIENT_DIRECT_CHANNEL, transport);
if (channel.ok()) {
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr,
nullptr);
grpc_core::ExecCtx::Get()->Flush();
return channel->release()->c_ptr();
} else {
Expand Down
5 changes: 0 additions & 5 deletions src/core/ext/transport/chttp2/client/chttp2_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ namespace grpc_core {

class Chttp2Connector : public SubchannelConnector {
public:
~Chttp2Connector() override;

void Connect(const Args& args, Result* result, grpc_closure* notify) override;
void Shutdown(grpc_error_handle error) override;

Expand All @@ -63,9 +61,6 @@ class Chttp2Connector : public SubchannelConnector {
Result* result_ = nullptr;
grpc_closure* notify_ = nullptr;
bool shutdown_ = false;
// Holds the endpoint when first created before being handed off to
// the handshake manager, and then again after handshake is done.
grpc_endpoint* endpoint_ = nullptr;
grpc_closure on_receive_settings_;
absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
timer_handle_ ABSL_GUARDED_BY(mu_);
Expand Down
12 changes: 3 additions & 9 deletions src/core/ext/transport/chttp2/server/chttp2_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -461,11 +461,6 @@ void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone(
if (error.ok() && args->endpoint != nullptr) {
// We were shut down or stopped serving after handshaking completed
// successfully, so destroy the endpoint here.
// TODO(ctiller): It is currently necessary to shutdown endpoints
// before destroying them, even if we know that there are no
// pending read/write callbacks. This should be fixed, at which
// point this can be removed.
grpc_endpoint_shutdown(args->endpoint, absl::OkStatus());
grpc_endpoint_destroy(args->endpoint);
grpc_slice_buffer_destroy(args->read_buffer);
gpr_free(args->read_buffer);
Expand Down Expand Up @@ -523,7 +518,7 @@ void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone(
}
grpc_chttp2_transport_start_reading(transport, args->read_buffer,
&self->on_receive_settings_,
on_close);
nullptr, on_close);
self->timer_handle_ = self->connection_->event_engine_->RunAfter(
self->deadline_ - Timestamp::Now(),
[self = self->Ref()]() mutable {
Expand Down Expand Up @@ -644,7 +639,6 @@ void Chttp2ServerListener::ActiveConnection::Start(
// owning Chttp2ServerListener and all associated ActiveConnections have
// been orphaned. The generated endpoints need to be shutdown here to
// ensure the tcp connections are closed appropriately.
grpc_endpoint_shutdown(endpoint, absl::OkStatus());
grpc_endpoint_destroy(endpoint);
return;
}
Expand Down Expand Up @@ -845,7 +839,6 @@ void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp,
connection_manager = self->connection_manager_;
}
auto endpoint_cleanup = [&](grpc_error_handle error) {
grpc_endpoint_shutdown(tcp, error);
grpc_endpoint_destroy(tcp);
gpr_free(acceptor);
};
Expand Down Expand Up @@ -1127,7 +1120,8 @@ void grpc_server_add_channel_from_fd(grpc_server* server, int fd,
for (grpc_pollset* pollset : core_server->pollsets()) {
grpc_endpoint_add_to_pollset(server_endpoint, pollset);
}
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr,
nullptr);
} else {
gpr_log(GPR_ERROR, "Failed to create channel: %s",
grpc_core::StatusToString(error).c_str());
Expand Down
37 changes: 32 additions & 5 deletions src/core/ext/transport/chttp2/transport/chttp2_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/channel/tcp_tracer.h"
#include "src/core/lib/config/config_vars.h"
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/stats_data.h"
#include "src/core/lib/experiments/experiments.h"
Expand Down Expand Up @@ -385,7 +386,7 @@ grpc_chttp2_transport::~grpc_chttp2_transport() {
channelz_socket.reset();
}

grpc_endpoint_destroy(ep);
if (ep != nullptr) grpc_endpoint_destroy(ep);

grpc_slice_buffer_destroy(&qbuf);

Expand Down Expand Up @@ -759,9 +760,21 @@ static void close_transport_locked(grpc_chttp2_transport* t,
GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:close");
}
GPR_ASSERT(t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE);
grpc_endpoint_shutdown(t->ep, error);
if (t->interested_parties_until_recv_settings != nullptr) {
grpc_endpoint_delete_from_pollset_set(
t->ep, t->interested_parties_until_recv_settings);
t->interested_parties_until_recv_settings = nullptr;
}
grpc_core::MutexLock lock(&t->ep_destroy_mu);
grpc_endpoint_destroy(t->ep);
t->ep = nullptr;
}
if (t->notify_on_receive_settings != nullptr) {
if (t->interested_parties_until_recv_settings != nullptr) {
grpc_endpoint_delete_from_pollset_set(
t->ep, t->interested_parties_until_recv_settings);
t->interested_parties_until_recv_settings = nullptr;
}
grpc_core::ExecCtx::Run(DEBUG_LOCATION, t->notify_on_receive_settings,
error);
t->notify_on_receive_settings = nullptr;
Expand Down Expand Up @@ -3010,12 +3023,16 @@ static void connectivity_state_set(grpc_chttp2_transport* t,

void grpc_chttp2_transport::SetPollset(grpc_stream* /*gs*/,
grpc_pollset* pollset) {
grpc_endpoint_add_to_pollset(ep, pollset);
if (grpc_core::ConfigVars::Get().PollStrategy() != "poll") return;
grpc_core::MutexLock lock(&ep_destroy_mu);
if (ep != nullptr) grpc_endpoint_add_to_pollset(ep, pollset);
}

void grpc_chttp2_transport::SetPollsetSet(grpc_stream* /*gs*/,
grpc_pollset_set* pollset_set) {
grpc_endpoint_add_to_pollset_set(ep, pollset_set);
if (grpc_core::ConfigVars::Get().PollStrategy() != "poll") return;
grpc_core::MutexLock lock(&ep_destroy_mu);
if (ep != nullptr) grpc_endpoint_add_to_pollset_set(ep, pollset_set);
}

//
Expand Down Expand Up @@ -3203,7 +3220,9 @@ grpc_core::Transport* grpc_create_chttp2_transport(

void grpc_chttp2_transport_start_reading(
grpc_core::Transport* transport, grpc_slice_buffer* read_buffer,
grpc_closure* notify_on_receive_settings, grpc_closure* notify_on_close) {
grpc_closure* notify_on_receive_settings,
grpc_pollset_set* interested_parties_until_recv_settings,
grpc_closure* notify_on_close) {
auto t = reinterpret_cast<grpc_chttp2_transport*>(transport)->Ref();
if (read_buffer != nullptr) {
grpc_slice_buffer_move_into(read_buffer, &t->read_buffer);
Expand All @@ -3212,9 +3231,15 @@ void grpc_chttp2_transport_start_reading(
auto* tp = t.get();
tp->combiner->Run(
grpc_core::NewClosure([t = std::move(t), notify_on_receive_settings,
interested_parties_until_recv_settings,
notify_on_close](grpc_error_handle) mutable {
if (!t->closed_with_error.ok()) {
if (notify_on_receive_settings != nullptr) {
if (t->ep != nullptr &&
interested_parties_until_recv_settings != nullptr) {
grpc_endpoint_delete_from_pollset_set(
t->ep, interested_parties_until_recv_settings);
}
grpc_core::ExecCtx::Run(DEBUG_LOCATION, notify_on_receive_settings,
t->closed_with_error);
}
Expand All @@ -3224,6 +3249,8 @@ void grpc_chttp2_transport_start_reading(
}
return;
}
t->interested_parties_until_recv_settings =
interested_parties_until_recv_settings;
t->notify_on_receive_settings = notify_on_receive_settings;
t->notify_on_close = notify_on_close;
read_action_locked(std::move(t), absl::OkStatus());
Expand Down
7 changes: 6 additions & 1 deletion src/core/ext/transport/chttp2/transport/chttp2_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,14 @@ grpc_chttp2_transport_get_socket_node(grpc_core::Transport* transport);
/// leftover bytes previously read from the endpoint (e.g., by handshakers).
/// If non-null, \a notify_on_receive_settings will be scheduled when
/// HTTP/2 settings are received from the peer.
/// If non-null, the endpoint will be removed from
/// interested_parties_until_recv_settings before
/// notify_on_receive_settings is invoked.
void grpc_chttp2_transport_start_reading(
grpc_core::Transport* transport, grpc_slice_buffer* read_buffer,
grpc_closure* notify_on_receive_settings, grpc_closure* notify_on_close);
grpc_closure* notify_on_receive_settings,
grpc_pollset_set* interested_parties_until_recv_settings,
grpc_closure* notify_on_close);

namespace grpc_core {
typedef void (*TestOnlyGlobalHttp2TransportInitCallback)();
Expand Down
5 changes: 5 additions & 0 deletions src/core/ext/transport/chttp2/transport/frame_settings.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ grpc_error_handle grpc_chttp2_settings_parser_parse(void* p,
grpc_chttp2_initiate_write(t,
GRPC_CHTTP2_INITIATE_WRITE_SETTINGS_ACK);
if (t->notify_on_receive_settings != nullptr) {
if (t->interested_parties_until_recv_settings != nullptr) {
grpc_endpoint_delete_from_pollset_set(
t->ep, t->interested_parties_until_recv_settings);
t->interested_parties_until_recv_settings = nullptr;
}
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
t->notify_on_receive_settings,
absl::OkStatus());
Expand Down
10 changes: 10 additions & 0 deletions src/core/ext/transport/chttp2/transport/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ struct grpc_chttp2_transport final
grpc_endpoint* GetEndpoint() override;

grpc_endpoint* ep;
grpc_core::Mutex ep_destroy_mu; // Guards endpoint destruction only.

grpc_core::Slice peer_string;

grpc_core::MemoryOwner memory_owner;
Expand All @@ -269,6 +271,14 @@ struct grpc_chttp2_transport final
grpc_core::Combiner* combiner;
absl::BitGen bitgen;

// On the client side, when the transport is first created, the
// endpoint will already have been added to this pollset_set, and it
// needs to stay there until the notify_on_receive_settings callback
// is invoked. After that, the polling will be coordinated via the
// bind_pollset_set transport op, sent by the subchannel when it
// starts a connectivity watch.
grpc_pollset_set* interested_parties_until_recv_settings = nullptr;

grpc_closure* notify_on_receive_settings = nullptr;
grpc_closure* notify_on_close = nullptr;

Expand Down
3 changes: 2 additions & 1 deletion src/core/lib/http/httpcli.cc
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ void HttpRequest::Orphan() {
GRPC_ERROR_CREATE("HTTP request cancelled during handshake"));
}
if (own_endpoint_ && ep_ != nullptr) {
grpc_endpoint_shutdown(ep_, GRPC_ERROR_CREATE("HTTP request cancelled"));
grpc_endpoint_destroy(ep_);
ep_ = nullptr;
}
}
Unref();
Expand Down
4 changes: 0 additions & 4 deletions src/core/lib/iomgr/endpoint.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,6 @@ void grpc_endpoint_delete_from_pollset_set(grpc_endpoint* ep,
ep->vtable->delete_from_pollset_set(ep, pollset_set);
}

void grpc_endpoint_shutdown(grpc_endpoint* ep, grpc_error_handle why) {
ep->vtable->shutdown(ep, why);
}

void grpc_endpoint_destroy(grpc_endpoint* ep) { ep->vtable->destroy(ep); }

absl::string_view grpc_endpoint_get_peer(grpc_endpoint* ep) {
Expand Down
2 changes: 0 additions & 2 deletions src/core/lib/iomgr/endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ struct grpc_endpoint_vtable {
void (*add_to_pollset)(grpc_endpoint* ep, grpc_pollset* pollset);
void (*add_to_pollset_set)(grpc_endpoint* ep, grpc_pollset_set* pollset);
void (*delete_from_pollset_set)(grpc_endpoint* ep, grpc_pollset_set* pollset);
void (*shutdown)(grpc_endpoint* ep, grpc_error_handle why);
void (*destroy)(grpc_endpoint* ep);
absl::string_view (*get_peer)(grpc_endpoint* ep);
absl::string_view (*get_local_address)(grpc_endpoint* ep);
Expand Down Expand Up @@ -86,7 +85,6 @@ void grpc_endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* slices,

// Causes any pending and future read/write callbacks to run immediately with
// success==0
void grpc_endpoint_shutdown(grpc_endpoint* ep, grpc_error_handle why);
void grpc_endpoint_destroy(grpc_endpoint* ep);

// Add an endpoint to a pollset or pollset_set, so that when the pollset is
Expand Down
18 changes: 4 additions & 14 deletions src/core/lib/iomgr/endpoint_cfstream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -272,25 +272,16 @@ static void CFStreamWrite(grpc_endpoint* ep, grpc_slice_buffer* slices,
ep_impl->stream_sync->NotifyOnWrite(&ep_impl->write_action);
}

void CFStreamShutdown(grpc_endpoint* ep, grpc_error_handle why) {
void CFStreamDestroy(grpc_endpoint* ep) {
CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "CFStream endpoint:%p shutdown (%s)", ep_impl,
grpc_core::StatusToString(why).c_str());
gpr_log(GPR_DEBUG, "CFStream endpoint:%p destroy", ep_impl);
}
CFReadStreamClose(ep_impl->read_stream);
CFWriteStreamClose(ep_impl->write_stream);
ep_impl->stream_sync->Shutdown(why);
ep_impl->stream_sync->Shutdown(absl::UnavailableError("endpoint shutdown"));
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "CFStream endpoint:%p shutdown DONE (%s)", ep_impl,
grpc_core::StatusToString(why).c_str());
}
}

void CFStreamDestroy(grpc_endpoint* ep) {
CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "CFStream endpoint:%p destroy", ep_impl);
gpr_log(GPR_DEBUG, "CFStream endpoint:%p destroy DONE", ep_impl);
}
EP_UNREF(ep_impl, "destroy");
}
Expand Down Expand Up @@ -320,7 +311,6 @@ static const grpc_endpoint_vtable vtable = {CFStreamRead,
CFStreamAddToPollset,
CFStreamAddToPollsetSet,
CFStreamDeleteFromPollsetSet,
CFStreamShutdown,
CFStreamDestroy,
CFStreamGetPeer,
CFStreamGetLocalAddress,
Expand Down
Loading

0 comments on commit b2721a6

Please sign in to comment.