From 34a0318dbf2407caf5f3cdd7857b487aba3f5e7f Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 21 Jun 2024 11:32:19 -0700 Subject: [PATCH] [LB policy API] change metadata mutation API to handle discarded picks (#36968) Previously, metadata mutations were made by the picker directly, which meant that they would be applied even if the channel winds up discarding the pick due to the returned subchannel having been disconnected by the time the pick result is returned. This changes the API such that pickers return metadata mutations along with the pick result, so that the mutations won't get applied unless the pick result is actually used. Closes #36968 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36968 from markdroth:lb_metadata_api 2765da61214f3ae63da3fdb8e362fac625b51c5a PiperOrigin-RevId: 645451869 --- BUILD | 1 + CMakeLists.txt | 2 + Makefile | 1 + Package.swift | 4 +- build_autogenerated.yaml | 7 +- config.m4 | 1 + config.w32 | 1 + gRPC-C++.podspec | 6 +- gRPC-Core.podspec | 7 +- grpc.gemspec | 4 +- package.xml | 4 +- src/core/BUILD | 20 +++- .../client_channel/client_channel_filter.cc | 81 ++------------- src/core/client_channel/lb_metadata.cc | 99 +++++++++++++++++++ src/core/client_channel/lb_metadata.h | 50 ++++++++++ .../load_balanced_call_destination.cc | 72 +------------- src/core/load_balancing/grpclb/grpclb.cc | 41 ++++---- src/core/load_balancing/lb_policy.h | 56 ++++++----- src/core/load_balancing/rls/rls.cc | 53 +++++----- src/core/{xds/grpc => util}/upb_utils.h | 8 +- src/core/xds/grpc/xds_client_grpc.cc | 2 +- src/core/xds/grpc/xds_cluster.cc | 2 +- src/core/xds/grpc/xds_common_types.cc | 2 +- src/core/xds/grpc/xds_endpoint.cc | 2 +- src/core/xds/grpc/xds_http_rbac_filter.cc | 2 +- .../grpc/xds_http_stateful_session_filter.cc | 2 +- src/core/xds/grpc/xds_listener.cc | 2 +- src/core/xds/grpc/xds_route_config.cc | 2 +- src/core/xds/xds_client/xds_api.cc | 2 +- src/core/xds/xds_client/xds_client.cc | 2 +- src/python/grpcio/grpc_core_dependencies.py | 1 + test/core/load_balancing/lb_policy_test_lib.h | 13 --- test/core/test_util/BUILD | 2 + test/core/test_util/test_lb_policies.cc | 8 +- test/core/transport/metadata_map_test.cc | 22 ++++- test/core/xds/xds_common_types_test.cc | 2 +- test/cpp/interop/rpc_behavior_lb_policy.cc | 15 ++- tools/doxygen/Doxyfile.c++.internal | 4 +- tools/doxygen/Doxyfile.core.internal | 4 +- .../run_tests/sanity/core_banned_functions.py | 5 +- 40 files changed, 355 insertions(+), 259 deletions(-) create mode 100644 src/core/client_channel/lb_metadata.cc create mode 100644 src/core/client_channel/lb_metadata.h rename src/core/{xds/grpc => util}/upb_utils.h (87%) diff --git a/BUILD b/BUILD index 4f9181a6d1c86..b33418d118bbd 100644 --- a/BUILD +++ b/BUILD @@ -3818,6 +3818,7 @@ grpc_cc_library( "//src/core:iomgr_fwd", "//src/core:json", "//src/core:latch", + "//src/core:lb_metadata", "//src/core:lb_policy", "//src/core:lb_policy_registry", "//src/core:loop", diff --git a/CMakeLists.txt b/CMakeLists.txt index 838ae79c140b6..0272420d75222 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1876,6 +1876,7 @@ add_library(grpc src/core/client_channel/direct_channel.cc src/core/client_channel/dynamic_filters.cc src/core/client_channel/global_subchannel_pool.cc + src/core/client_channel/lb_metadata.cc src/core/client_channel/load_balanced_call_destination.cc src/core/client_channel/local_subchannel_pool.cc src/core/client_channel/retry_filter.cc @@ -2969,6 +2970,7 @@ add_library(grpc_unsecure src/core/client_channel/direct_channel.cc src/core/client_channel/dynamic_filters.cc src/core/client_channel/global_subchannel_pool.cc + src/core/client_channel/lb_metadata.cc src/core/client_channel/load_balanced_call_destination.cc src/core/client_channel/local_subchannel_pool.cc src/core/client_channel/retry_filter.cc diff --git a/Makefile b/Makefile index d4e982f903c65..652e940ec9e27 100644 --- a/Makefile +++ b/Makefile @@ -678,6 +678,7 @@ LIBGRPC_SRC = \ src/core/client_channel/direct_channel.cc \ src/core/client_channel/dynamic_filters.cc \ src/core/client_channel/global_subchannel_pool.cc \ + src/core/client_channel/lb_metadata.cc \ src/core/client_channel/load_balanced_call_destination.cc \ src/core/client_channel/local_subchannel_pool.cc \ src/core/client_channel/retry_filter.cc \ diff --git a/Package.swift b/Package.swift index 260c070857e67..bbdc95a503726 100644 --- a/Package.swift +++ b/Package.swift @@ -144,6 +144,8 @@ let package = Package( "src/core/client_channel/dynamic_filters.h", "src/core/client_channel/global_subchannel_pool.cc", "src/core/client_channel/global_subchannel_pool.h", + "src/core/client_channel/lb_metadata.cc", + "src/core/client_channel/lb_metadata.h", "src/core/client_channel/load_balanced_call_destination.cc", "src/core/client_channel/load_balanced_call_destination.h", "src/core/client_channel/local_subchannel_pool.cc", @@ -1946,6 +1948,7 @@ let package = Package( "src/core/util/time_precise.cc", "src/core/util/time_precise.h", "src/core/util/tmpfile.h", + "src/core/util/upb_utils.h", "src/core/util/useful.h", "src/core/util/windows/cpu.cc", "src/core/util/windows/log.cc", @@ -1958,7 +1961,6 @@ let package = Package( "src/core/xds/grpc/certificate_provider_store.h", "src/core/xds/grpc/file_watcher_certificate_provider_factory.cc", "src/core/xds/grpc/file_watcher_certificate_provider_factory.h", - "src/core/xds/grpc/upb_utils.h", "src/core/xds/grpc/xds_audit_logger_registry.cc", "src/core/xds/grpc/xds_audit_logger_registry.h", "src/core/xds/grpc/xds_bootstrap_grpc.cc", diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 3f93c7d81998a..7184929424d1f 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -237,6 +237,7 @@ libs: - src/core/client_channel/direct_channel.h - src/core/client_channel/dynamic_filters.h - src/core/client_channel/global_subchannel_pool.h + - src/core/client_channel/lb_metadata.h - src/core/client_channel/load_balanced_call_destination.h - src/core/client_channel/local_subchannel_pool.h - src/core/client_channel/retry_filter.h @@ -1216,9 +1217,9 @@ libs: - src/core/util/json/json_util.h - src/core/util/json/json_writer.h - src/core/util/spinlock.h + - src/core/util/upb_utils.h - src/core/xds/grpc/certificate_provider_store.h - src/core/xds/grpc/file_watcher_certificate_provider_factory.h - - src/core/xds/grpc/upb_utils.h - src/core/xds/grpc/xds_audit_logger_registry.h - src/core/xds/grpc/xds_bootstrap_grpc.h - src/core/xds/grpc/xds_certificate_provider.h @@ -1261,6 +1262,7 @@ libs: - src/core/client_channel/direct_channel.cc - src/core/client_channel/dynamic_filters.cc - src/core/client_channel/global_subchannel_pool.cc + - src/core/client_channel/lb_metadata.cc - src/core/client_channel/load_balanced_call_destination.cc - src/core/client_channel/local_subchannel_pool.cc - src/core/client_channel/retry_filter.cc @@ -2223,6 +2225,7 @@ libs: - src/core/client_channel/direct_channel.h - src/core/client_channel/dynamic_filters.h - src/core/client_channel/global_subchannel_pool.h + - src/core/client_channel/lb_metadata.h - src/core/client_channel/load_balanced_call_destination.h - src/core/client_channel/local_subchannel_pool.h - src/core/client_channel/retry_filter.h @@ -2685,6 +2688,7 @@ libs: - src/core/util/json/json_reader.h - src/core/util/json/json_writer.h - src/core/util/spinlock.h + - src/core/util/upb_utils.h - third_party/upb/upb/generated_code_support.h - third_party/upb/upb/mini_descriptor/build_enum.h - third_party/upb/upb/mini_descriptor/decode.h @@ -2717,6 +2721,7 @@ libs: - src/core/client_channel/direct_channel.cc - src/core/client_channel/dynamic_filters.cc - src/core/client_channel/global_subchannel_pool.cc + - src/core/client_channel/lb_metadata.cc - src/core/client_channel/load_balanced_call_destination.cc - src/core/client_channel/local_subchannel_pool.cc - src/core/client_channel/retry_filter.cc diff --git a/config.m4 b/config.m4 index d3c633249d381..6435dff6bcc5f 100644 --- a/config.m4 +++ b/config.m4 @@ -53,6 +53,7 @@ if test "$PHP_GRPC" != "no"; then src/core/client_channel/direct_channel.cc \ src/core/client_channel/dynamic_filters.cc \ src/core/client_channel/global_subchannel_pool.cc \ + src/core/client_channel/lb_metadata.cc \ src/core/client_channel/load_balanced_call_destination.cc \ src/core/client_channel/local_subchannel_pool.cc \ src/core/client_channel/retry_filter.cc \ diff --git a/config.w32 b/config.w32 index ade25276f75e7..1c871b2300b3c 100644 --- a/config.w32 +++ b/config.w32 @@ -18,6 +18,7 @@ if (PHP_GRPC != "no") { "src\\core\\client_channel\\direct_channel.cc " + "src\\core\\client_channel\\dynamic_filters.cc " + "src\\core\\client_channel\\global_subchannel_pool.cc " + + "src\\core\\client_channel\\lb_metadata.cc " + "src\\core\\client_channel\\load_balanced_call_destination.cc " + "src\\core\\client_channel\\local_subchannel_pool.cc " + "src\\core\\client_channel\\retry_filter.cc " + diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 45d20d5aa5d81..4901373eb91da 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -283,6 +283,7 @@ Pod::Spec.new do |s| 'src/core/client_channel/direct_channel.h', 'src/core/client_channel/dynamic_filters.h', 'src/core/client_channel/global_subchannel_pool.h', + 'src/core/client_channel/lb_metadata.h', 'src/core/client_channel/load_balanced_call_destination.h', 'src/core/client_channel/local_subchannel_pool.h', 'src/core/client_channel/retry_filter.h', @@ -1323,10 +1324,10 @@ Pod::Spec.new do |s| 'src/core/util/string.h', 'src/core/util/time_precise.h', 'src/core/util/tmpfile.h', + 'src/core/util/upb_utils.h', 'src/core/util/useful.h', 'src/core/xds/grpc/certificate_provider_store.h', 'src/core/xds/grpc/file_watcher_certificate_provider_factory.h', - 'src/core/xds/grpc/upb_utils.h', 'src/core/xds/grpc/xds_audit_logger_registry.h', 'src/core/xds/grpc/xds_bootstrap_grpc.h', 'src/core/xds/grpc/xds_certificate_provider.h', @@ -1574,6 +1575,7 @@ Pod::Spec.new do |s| 'src/core/client_channel/direct_channel.h', 'src/core/client_channel/dynamic_filters.h', 'src/core/client_channel/global_subchannel_pool.h', + 'src/core/client_channel/lb_metadata.h', 'src/core/client_channel/load_balanced_call_destination.h', 'src/core/client_channel/local_subchannel_pool.h', 'src/core/client_channel/retry_filter.h', @@ -2596,10 +2598,10 @@ Pod::Spec.new do |s| 'src/core/util/string.h', 'src/core/util/time_precise.h', 'src/core/util/tmpfile.h', + 'src/core/util/upb_utils.h', 'src/core/util/useful.h', 'src/core/xds/grpc/certificate_provider_store.h', 'src/core/xds/grpc/file_watcher_certificate_provider_factory.h', - 'src/core/xds/grpc/upb_utils.h', 'src/core/xds/grpc/xds_audit_logger_registry.h', 'src/core/xds/grpc/xds_bootstrap_grpc.h', 'src/core/xds/grpc/xds_certificate_provider.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index e4e3f69602daf..d812ea38794ec 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -263,6 +263,8 @@ Pod::Spec.new do |s| 'src/core/client_channel/dynamic_filters.h', 'src/core/client_channel/global_subchannel_pool.cc', 'src/core/client_channel/global_subchannel_pool.h', + 'src/core/client_channel/lb_metadata.cc', + 'src/core/client_channel/lb_metadata.h', 'src/core/client_channel/load_balanced_call_destination.cc', 'src/core/client_channel/load_balanced_call_destination.h', 'src/core/client_channel/local_subchannel_pool.cc', @@ -2061,6 +2063,7 @@ Pod::Spec.new do |s| 'src/core/util/time_precise.cc', 'src/core/util/time_precise.h', 'src/core/util/tmpfile.h', + 'src/core/util/upb_utils.h', 'src/core/util/useful.h', 'src/core/util/windows/cpu.cc', 'src/core/util/windows/log.cc', @@ -2073,7 +2076,6 @@ Pod::Spec.new do |s| 'src/core/xds/grpc/certificate_provider_store.h', 'src/core/xds/grpc/file_watcher_certificate_provider_factory.cc', 'src/core/xds/grpc/file_watcher_certificate_provider_factory.h', - 'src/core/xds/grpc/upb_utils.h', 'src/core/xds/grpc/xds_audit_logger_registry.cc', 'src/core/xds/grpc/xds_audit_logger_registry.h', 'src/core/xds/grpc/xds_bootstrap_grpc.cc', @@ -2365,6 +2367,7 @@ Pod::Spec.new do |s| 'src/core/client_channel/direct_channel.h', 'src/core/client_channel/dynamic_filters.h', 'src/core/client_channel/global_subchannel_pool.h', + 'src/core/client_channel/lb_metadata.h', 'src/core/client_channel/load_balanced_call_destination.h', 'src/core/client_channel/local_subchannel_pool.h', 'src/core/client_channel/retry_filter.h', @@ -3367,10 +3370,10 @@ Pod::Spec.new do |s| 'src/core/util/string.h', 'src/core/util/time_precise.h', 'src/core/util/tmpfile.h', + 'src/core/util/upb_utils.h', 'src/core/util/useful.h', 'src/core/xds/grpc/certificate_provider_store.h', 'src/core/xds/grpc/file_watcher_certificate_provider_factory.h', - 'src/core/xds/grpc/upb_utils.h', 'src/core/xds/grpc/xds_audit_logger_registry.h', 'src/core/xds/grpc/xds_bootstrap_grpc.h', 'src/core/xds/grpc/xds_certificate_provider.h', diff --git a/grpc.gemspec b/grpc.gemspec index 423a5aa014091..213e157d0eb4b 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -150,6 +150,8 @@ Gem::Specification.new do |s| s.files += %w( src/core/client_channel/dynamic_filters.h ) s.files += %w( src/core/client_channel/global_subchannel_pool.cc ) s.files += %w( src/core/client_channel/global_subchannel_pool.h ) + s.files += %w( src/core/client_channel/lb_metadata.cc ) + s.files += %w( src/core/client_channel/lb_metadata.h ) s.files += %w( src/core/client_channel/load_balanced_call_destination.cc ) s.files += %w( src/core/client_channel/load_balanced_call_destination.h ) s.files += %w( src/core/client_channel/local_subchannel_pool.cc ) @@ -1948,6 +1950,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/util/time_precise.cc ) s.files += %w( src/core/util/time_precise.h ) s.files += %w( src/core/util/tmpfile.h ) + s.files += %w( src/core/util/upb_utils.h ) s.files += %w( src/core/util/useful.h ) s.files += %w( src/core/util/windows/cpu.cc ) s.files += %w( src/core/util/windows/log.cc ) @@ -1960,7 +1963,6 @@ Gem::Specification.new do |s| s.files += %w( src/core/xds/grpc/certificate_provider_store.h ) s.files += %w( src/core/xds/grpc/file_watcher_certificate_provider_factory.cc ) s.files += %w( src/core/xds/grpc/file_watcher_certificate_provider_factory.h ) - s.files += %w( src/core/xds/grpc/upb_utils.h ) s.files += %w( src/core/xds/grpc/xds_audit_logger_registry.cc ) s.files += %w( src/core/xds/grpc/xds_audit_logger_registry.h ) s.files += %w( src/core/xds/grpc/xds_bootstrap_grpc.cc ) diff --git a/package.xml b/package.xml index b3c2b6e941b79..3fcb69ed9813e 100644 --- a/package.xml +++ b/package.xml @@ -132,6 +132,8 @@ + + @@ -1930,6 +1932,7 @@ + @@ -1942,7 +1945,6 @@ - diff --git a/src/core/BUILD b/src/core/BUILD index d5bb1efd7259a..b9b1f80cf3082 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -3456,6 +3456,7 @@ grpc_cc_library( hdrs = ["load_balancing/lb_policy.h"], external_deps = [ "absl/base:core_headers", + "absl/container:inlined_vector", "absl/status", "absl/status:statusor", "absl/strings", @@ -3525,6 +3526,21 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "lb_metadata", + srcs = ["client_channel/lb_metadata.cc"], + hdrs = ["client_channel/lb_metadata.h"], + external_deps = [ + "absl/log:log", + "absl/strings", + "absl/types:optional", + ], + deps = [ + "lb_policy", + "metadata_batch", + ], +) + grpc_cc_library( name = "subchannel_interface", hdrs = ["load_balancing/subchannel_interface.h"], @@ -5009,6 +5025,7 @@ grpc_cc_library( "slice_refcount", "status_helper", "time", + "upb_utils", "uuid_v4", "validation_errors", "//:backoff", @@ -5039,14 +5056,13 @@ grpc_cc_library( grpc_cc_library( name = "upb_utils", hdrs = [ - "xds/grpc/upb_utils.h", + "util/upb_utils.h", ], external_deps = [ "absl/strings", "@com_google_protobuf//upb:base", ], language = "c++", - deps = ["//:gpr_platform"], ) grpc_cc_library( diff --git a/src/core/client_channel/client_channel_filter.cc b/src/core/client_channel/client_channel_filter.cc index 0ad970b549229..35cbb04e6553c 100644 --- a/src/core/client_channel/client_channel_filter.cc +++ b/src/core/client_channel/client_channel_filter.cc @@ -57,6 +57,7 @@ #include "src/core/client_channel/config_selector.h" #include "src/core/client_channel/dynamic_filters.h" #include "src/core/client_channel/global_subchannel_pool.h" +#include "src/core/client_channel/lb_metadata.h" #include "src/core/client_channel/local_subchannel_pool.h" #include "src/core/client_channel/retry_filter.h" #include "src/core/client_channel/subchannel.h" @@ -2353,79 +2354,6 @@ class ClientChannelFilter::LoadBalancedCall::LbCallState final LoadBalancedCall* lb_call_; }; -// -// ClientChannelFilter::LoadBalancedCall::Metadata -// - -class ClientChannelFilter::LoadBalancedCall::Metadata final - : public LoadBalancingPolicy::MetadataInterface { - public: - explicit Metadata(grpc_metadata_batch* batch) : batch_(batch) {} - - void Add(absl::string_view key, absl::string_view value) override { - if (batch_ == nullptr) return; - // Gross, egregious hack to support legacy grpclb behavior. - // TODO(ctiller): Use a promise context for this once that plumbing is done. - if (key == GrpcLbClientStatsMetadata::key()) { - batch_->Set( - GrpcLbClientStatsMetadata(), - const_cast( - reinterpret_cast(value.data()))); - return; - } - batch_->Append(key, Slice::FromStaticString(value), - [key](absl::string_view error, const Slice& value) { - LOG(ERROR) << error << " key:" << key - << " value:" << value.as_string_view(); - }); - } - - std::vector> TestOnlyCopyToVector() - override { - if (batch_ == nullptr) return {}; - Encoder encoder; - batch_->Encode(&encoder); - return encoder.Take(); - } - - absl::optional Lookup(absl::string_view key, - std::string* buffer) const override { - if (batch_ == nullptr) return absl::nullopt; - return batch_->GetStringValue(key, buffer); - } - - private: - class Encoder final { - public: - void Encode(const Slice& key, const Slice& value) { - out_.emplace_back(std::string(key.as_string_view()), - std::string(value.as_string_view())); - } - - template - void Encode(Which, const typename Which::ValueType& value) { - auto value_slice = Which::Encode(value); - out_.emplace_back(std::string(Which::key()), - std::string(value_slice.as_string_view())); - } - - void Encode(GrpcTimeoutMetadata, - const typename GrpcTimeoutMetadata::ValueType&) {} - void Encode(HttpPathMetadata, const Slice&) {} - void Encode(HttpMethodMetadata, - const typename HttpMethodMetadata::ValueType&) {} - - std::vector> Take() { - return std::move(out_); - } - - private: - std::vector> out_; - }; - - grpc_metadata_batch* batch_; -}; - // // ClientChannelFilter::LoadBalancedCall::LbCallState // @@ -2536,7 +2464,7 @@ void ClientChannelFilter::LoadBalancedCall::RecordCallCompletion( // If the LB policy requested a callback for trailing metadata, invoke // the callback. if (lb_subchannel_call_tracker_ != nullptr) { - Metadata trailing_metadata(recv_trailing_metadata); + LbMetadata trailing_metadata(recv_trailing_metadata); BackendMetricAccessor backend_metric_accessor(this, recv_trailing_metadata); LoadBalancingPolicy::SubchannelCallTrackerInterface::FinishArgs args = { peer_address, status, &trailing_metadata, &backend_metric_accessor}; @@ -2683,7 +2611,7 @@ bool ClientChannelFilter::LoadBalancedCall::PickSubchannelImpl( pick_args.path = path->as_string_view(); LbCallState lb_call_state(this); pick_args.call_state = &lb_call_state; - Metadata initial_metadata(send_initial_metadata()); + LbMetadata initial_metadata(send_initial_metadata()); pick_args.initial_metadata = &initial_metadata; auto result = picker->Pick(pick_args); return HandlePickResult( @@ -2716,6 +2644,9 @@ bool ClientChannelFilter::LoadBalancedCall::PickSubchannelImpl( if (lb_subchannel_call_tracker_ != nullptr) { lb_subchannel_call_tracker_->Start(); } + // Handle metadata mutations. + MetadataMutationHandler::Apply(complete_pick->metadata_mutations, + send_initial_metadata()); return true; }, // QueuePick diff --git a/src/core/client_channel/lb_metadata.cc b/src/core/client_channel/lb_metadata.cc new file mode 100644 index 0000000000000..d66552d053ad0 --- /dev/null +++ b/src/core/client_channel/lb_metadata.cc @@ -0,0 +1,99 @@ +// Copyright 2024 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/core/client_channel/lb_metadata.h" + +#include "absl/log/log.h" + +namespace grpc_core { + +// +// LbMetadata +// + +namespace { + +class Encoder { + public: + void Encode(const Slice& key, const Slice& value) { + out_.emplace_back(std::string(key.as_string_view()), + std::string(value.as_string_view())); + } + + template + void Encode(Which, const typename Which::ValueType& value) { + auto value_slice = Which::Encode(value); + out_.emplace_back(std::string(Which::key()), + std::string(value_slice.as_string_view())); + } + + void Encode(GrpcTimeoutMetadata, + const typename GrpcTimeoutMetadata::ValueType&) {} + void Encode(HttpPathMetadata, const Slice&) {} + void Encode(HttpMethodMetadata, + const typename HttpMethodMetadata::ValueType&) {} + + std::vector> Take() { + return std::move(out_); + } + + private: + std::vector> out_; +}; + +} // namespace + +absl::optional LbMetadata::Lookup( + absl::string_view key, std::string* buffer) const { + if (batch_ == nullptr) return absl::nullopt; + return batch_->GetStringValue(key, buffer); +} + +std::vector> +LbMetadata::TestOnlyCopyToVector() const { + if (batch_ == nullptr) return {}; + Encoder encoder; + batch_->Encode(&encoder); + return encoder.Take(); +} + +// +// MetadataMutationHandler +// + +void MetadataMutationHandler::Apply( + LoadBalancingPolicy::MetadataMutations& metadata_mutations, + grpc_metadata_batch* metadata) { + for (auto& p : metadata_mutations.additions_) { + absl::string_view key = p.first; + Slice& value = + grpc_event_engine::experimental::internal::SliceCast(p.second); + // Gross, egregious hack to support legacy grpclb behavior. + // TODO(ctiller): Use a promise context for this once that plumbing is done. + if (key == GrpcLbClientStatsMetadata::key()) { + metadata->Set( + GrpcLbClientStatsMetadata(), + const_cast( + reinterpret_cast(value.data()))); + continue; + } + metadata->Append(key, std::move(value), + [key](absl::string_view error, const Slice& value) { + LOG(ERROR) << error << " key:" << key + << " value:" << value.as_string_view(); + }); + } +} + +} // namespace grpc_core diff --git a/src/core/client_channel/lb_metadata.h b/src/core/client_channel/lb_metadata.h new file mode 100644 index 0000000000000..6c1fe7eeb7b64 --- /dev/null +++ b/src/core/client_channel/lb_metadata.h @@ -0,0 +1,50 @@ +// Copyright 2024 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_SRC_CORE_CLIENT_CHANNEL_LB_METADATA_H +#define GRPC_SRC_CORE_CLIENT_CHANNEL_LB_METADATA_H + +#include +#include + +#include "absl/strings/string_view.h" +#include "absl/types/optional.h" + +#include "src/core/lib/transport/metadata_batch.h" +#include "src/core/load_balancing/lb_policy.h" + +namespace grpc_core { + +class LbMetadata : public LoadBalancingPolicy::MetadataInterface { + public: + explicit LbMetadata(grpc_metadata_batch* batch) : batch_(batch) {} + + absl::optional Lookup(absl::string_view key, + std::string* buffer) const override; + + std::vector> TestOnlyCopyToVector() const; + + private: + grpc_metadata_batch* batch_; +}; + +class MetadataMutationHandler { + public: + static void Apply(LoadBalancingPolicy::MetadataMutations& metadata_mutations, + grpc_metadata_batch* metadata); +}; + +} // namespace grpc_core + +#endif // GRPC_SRC_CORE_CLIENT_CHANNEL_LB_METADATA_H diff --git a/src/core/client_channel/load_balanced_call_destination.cc b/src/core/client_channel/load_balanced_call_destination.cc index 352113fe0f28a..c13369a09fe7d 100644 --- a/src/core/client_channel/load_balanced_call_destination.cc +++ b/src/core/client_channel/load_balanced_call_destination.cc @@ -18,6 +18,7 @@ #include "src/core/client_channel/client_channel.h" #include "src/core/client_channel/client_channel_internal.h" +#include "src/core/client_channel/lb_metadata.h" #include "src/core/client_channel/subchannel.h" #include "src/core/lib/channel/status_util.h" #include "src/core/lib/config/core_configuration.h" @@ -28,74 +29,6 @@ namespace grpc_core { namespace { -class LbMetadata : public LoadBalancingPolicy::MetadataInterface { - public: - explicit LbMetadata(grpc_metadata_batch* batch) : batch_(batch) {} - - void Add(absl::string_view key, absl::string_view value) override { - if (batch_ == nullptr) return; - // Gross, egregious hack to support legacy grpclb behavior. - // TODO(ctiller): Use a promise context for this once that plumbing is done. - if (key == GrpcLbClientStatsMetadata::key()) { - batch_->Set( - GrpcLbClientStatsMetadata(), - const_cast( - reinterpret_cast(value.data()))); - return; - } - batch_->Append(key, Slice::FromStaticString(value), - [key](absl::string_view error, const Slice& value) { - LOG(ERROR) << error << " key:" << key - << " value:" << value.as_string_view(); - }); - } - - std::vector> TestOnlyCopyToVector() - override { - if (batch_ == nullptr) return {}; - Encoder encoder; - batch_->Encode(&encoder); - return encoder.Take(); - } - - absl::optional Lookup(absl::string_view key, - std::string* buffer) const override { - if (batch_ == nullptr) return absl::nullopt; - return batch_->GetStringValue(key, buffer); - } - - private: - class Encoder { - public: - void Encode(const Slice& key, const Slice& value) { - out_.emplace_back(std::string(key.as_string_view()), - std::string(value.as_string_view())); - } - - template - void Encode(Which, const typename Which::ValueType& value) { - auto value_slice = Which::Encode(value); - out_.emplace_back(std::string(Which::key()), - std::string(value_slice.as_string_view())); - } - - void Encode(GrpcTimeoutMetadata, - const typename GrpcTimeoutMetadata::ValueType&) {} - void Encode(HttpPathMetadata, const Slice&) {} - void Encode(HttpMethodMetadata, - const typename HttpMethodMetadata::ValueType&) {} - - std::vector> Take() { - return std::move(out_); - } - - private: - std::vector> out_; - }; - - grpc_metadata_batch* batch_; -}; - void MaybeCreateCallAttemptTracer(bool is_transparent_retry) { auto* call_tracer = MaybeGetContext(); if (call_tracer == nullptr) return; @@ -208,6 +141,9 @@ LoopCtl>> PickSubchannel( complete_pick->subchannel_call_tracker->Start(); SetContext(complete_pick->subchannel_call_tracker.release()); } + // Apply metadata mutations, if any. + MetadataMutationHandler::Apply(complete_pick->metadata_mutations, + &client_initial_metadata); // Return the connected subchannel. return call_destination; }, diff --git a/src/core/load_balancing/grpclb/grpclb.cc b/src/core/load_balancing/grpclb/grpclb.cc index 2258e7ebaaf50..6d928c0dfa28f 100644 --- a/src/core/load_balancing/grpclb/grpclb.cc +++ b/src/core/load_balancing/grpclb/grpclb.cc @@ -311,14 +311,17 @@ class GrpcLb final : public LoadBalancingPolicy { class SubchannelWrapper final : public DelegatingSubchannel { public: SubchannelWrapper(RefCountedPtr subchannel, - RefCountedPtr lb_policy, std::string lb_token, + RefCountedPtr lb_policy, + grpc_event_engine::experimental::Slice lb_token, RefCountedPtr client_stats) : DelegatingSubchannel(std::move(subchannel)), lb_policy_(std::move(lb_policy)), lb_token_(std::move(lb_token)), client_stats_(std::move(client_stats)) {} - const std::string& lb_token() const { return lb_token_; } + const grpc_event_engine::experimental::Slice& lb_token() const { + return lb_token_; + } GrpcLbClientStats* client_stats() const { return client_stats_.get(); } private: @@ -340,14 +343,14 @@ class GrpcLb final : public LoadBalancingPolicy { } RefCountedPtr lb_policy_; - std::string lb_token_; + grpc_event_engine::experimental::Slice lb_token_; RefCountedPtr client_stats_; }; class TokenAndClientStatsArg final : public RefCounted { public: - TokenAndClientStatsArg(std::string lb_token, + TokenAndClientStatsArg(grpc_event_engine::experimental::Slice lb_token, RefCountedPtr client_stats) : lb_token_(std::move(lb_token)), client_stats_(std::move(client_stats)) {} @@ -358,18 +361,21 @@ class GrpcLb final : public LoadBalancingPolicy { static int ChannelArgsCompare(const TokenAndClientStatsArg* a, const TokenAndClientStatsArg* b) { - int r = a->lb_token_.compare(b->lb_token_); + int r = + a->lb_token_.as_string_view().compare(b->lb_token_.as_string_view()); if (r != 0) return r; return QsortCompare(a->client_stats_.get(), b->client_stats_.get()); } - const std::string& lb_token() const { return lb_token_; } + const grpc_event_engine::experimental::Slice& lb_token() const { + return lb_token_; + } RefCountedPtr client_stats() const { return client_stats_; } private: - std::string lb_token_; + grpc_event_engine::experimental::Slice lb_token_; RefCountedPtr client_stats_; }; @@ -665,7 +671,8 @@ class GrpcLb::Serverlist::AddressIterator final // LB token processing. const size_t lb_token_length = strnlen( server.load_balance_token, GPR_ARRAY_SIZE(server.load_balance_token)); - std::string lb_token(server.load_balance_token, lb_token_length); + auto lb_token = grpc_event_engine::experimental::Slice::FromCopiedBuffer( + server.load_balance_token, lb_token_length); if (lb_token.empty()) { auto addr_uri = grpc_sockaddr_to_uri(&addr); LOG(INFO) << "Missing LB token for backend address '" @@ -773,9 +780,10 @@ GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) { // a string and rely on the client_load_reporting filter to know // how to interpret it. // NOLINTBEGIN(bugprone-string-constructor) - args.initial_metadata->Add( + complete_pick->metadata_mutations.Add( GrpcLbClientStatsMetadata::key(), - absl::string_view(reinterpret_cast(client_stats), 0)); + grpc_event_engine::experimental::Slice(grpc_slice_from_static_buffer( + reinterpret_cast(client_stats), 0))); // NOLINTEND(bugprone-string-constructor) // Update calls-started. client_stats->AddCallStarted(); @@ -785,10 +793,8 @@ GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) { // may get refreshed between when we return this pick and when the // initial metadata goes out on the wire. if (!subchannel_wrapper->lb_token().empty()) { - char* lb_token = static_cast( - args.call_state->Alloc(subchannel_wrapper->lb_token().size() + 1)); - strcpy(lb_token, subchannel_wrapper->lb_token().c_str()); - args.initial_metadata->Add(LbTokenMetadata::key(), lb_token); + complete_pick->metadata_mutations.Add( + LbTokenMetadata::key(), subchannel_wrapper->lb_token().Ref()); } // Unwrap subchannel to pass up to the channel. complete_pick->subchannel = subchannel_wrapper->wrapped_subchannel(); @@ -811,13 +817,11 @@ RefCountedPtr GrpcLb::Helper::CreateSubchannel( absl::StrFormat("[grpclb %p] no TokenAndClientStatsArg for address %s", parent(), addr_str.value_or("N/A").c_str())); } - std::string lb_token = arg->lb_token(); - RefCountedPtr client_stats = arg->client_stats(); return MakeRefCounted( parent()->channel_control_helper()->CreateSubchannel( address, per_address_args, args), parent()->RefAsSubclass(DEBUG_LOCATION, "SubchannelWrapper"), - std::move(lb_token), std::move(client_stats)); + arg->lb_token().Ref(), arg->client_stats()); } void GrpcLb::Helper::UpdateState(grpc_connectivity_state state, @@ -1542,7 +1546,8 @@ class GrpcLb::NullLbTokenEndpointIterator final private: std::shared_ptr parent_it_; RefCountedPtr empty_token_ = - MakeRefCounted("", nullptr); + MakeRefCounted( + grpc_event_engine::experimental::Slice(), nullptr); }; absl::Status GrpcLb::UpdateLocked(UpdateArgs args) { diff --git a/src/core/load_balancing/lb_policy.h b/src/core/load_balancing/lb_policy.h index cfdab1ba1a624..600ef9f4498d2 100644 --- a/src/core/load_balancing/lb_policy.h +++ b/src/core/load_balancing/lb_policy.h @@ -23,9 +23,9 @@ #include #include #include -#include #include "absl/base/thread_annotations.h" +#include "absl/container/inlined_vector.h" #include "absl/status/status.h" #include "absl/status/statusor.h" #include "absl/strings/string_view.h" @@ -33,6 +33,7 @@ #include "absl/types/variant.h" #include +#include #include #include #include @@ -116,29 +117,35 @@ class LoadBalancingPolicy : public InternallyRefCounted { public: virtual ~MetadataInterface() = default; - ////////////////////////////////////////////////////////////////////////// - // TODO(ctiller): DO NOT MAKE THIS A PUBLIC API YET - // This needs some API design to ensure we can add/remove/replace metadata - // keys... we're deliberately not doing so to save some time whilst - // cleaning up the internal metadata representation, but we should add - // something back before making this a public API. - ////////////////////////////////////////////////////////////////////////// - - /// Adds a key/value pair. - /// Does NOT take ownership of \a key or \a value. - /// Implementations must ensure that the key and value remain alive - /// until the call ends. If desired, they may be allocated via - /// CallState::Alloc(). - virtual void Add(absl::string_view key, absl::string_view value) = 0; - - /// Produce a vector of metadata key/value strings for tests. - virtual std::vector> - TestOnlyCopyToVector() = 0; - virtual absl::optional Lookup( absl::string_view key, std::string* buffer) const = 0; }; + /// A list of metadata mutations to be returned along with a PickResult. + class MetadataMutations { + public: + /// Adds a key/value pair. If the key is already present, the new + /// value will be appended with a comma delimiter. + void Add(absl::string_view key, absl::string_view value) { + Add(key, grpc_event_engine::experimental::Slice::FromCopiedString(value)); + } + void Add(absl::string_view key, + grpc_event_engine::experimental::Slice value) { + additions_.push_back({key, std::move(value)}); + } + + private: + friend class MetadataMutationHandler; + + // Avoid allocation if up to 3 additions per LB pick. Most expected + // use cases should be no more than 2, so this gives us a bit of slack. + // But it should be cheap to increase this value if we start seeing use + // cases with more than 3 additions. + absl::InlinedVector< + std::pair, 3> + additions_; + }; + /// Arguments used when picking a subchannel for a call. struct PickArgs { /// The path of the call. Indicates the RPC service and method name. @@ -210,11 +217,16 @@ class LoadBalancingPolicy : public InternallyRefCounted { /// be used. std::unique_ptr subchannel_call_tracker; + /// Metadata mutations to be applied to the call. + MetadataMutations metadata_mutations; + explicit Complete( RefCountedPtr sc, - std::unique_ptr tracker = nullptr) + std::unique_ptr tracker = nullptr, + MetadataMutations md = MetadataMutations()) : subchannel(std::move(sc)), - subchannel_call_tracker(std::move(tracker)) {} + subchannel_call_tracker(std::move(tracker)), + metadata_mutations(std::move(md)) {} }; /// Pick cannot be completed until something changes on the control diff --git a/src/core/load_balancing/rls/rls.cc b/src/core/load_balancing/rls/rls.cc index 7d518d2c86965..0af7cad20a0ff 100644 --- a/src/core/load_balancing/rls/rls.cc +++ b/src/core/load_balancing/rls/rls.cc @@ -108,6 +108,7 @@ #include "src/core/util/json/json_args.h" #include "src/core/util/json/json_object_loader.h" #include "src/core/util/json/json_writer.h" +#include "src/core/util/upb_utils.h" #include "src/proto/grpc/lookup/v1/rls.upb.h" using ::grpc_event_engine::experimental::EventEngine; @@ -315,12 +316,12 @@ class RlsLb final : public LoadBalancingPolicy { struct ResponseInfo { absl::Status status; std::vector targets; - std::string header_data; + grpc_event_engine::experimental::Slice header_data; std::string ToString() const { return absl::StrFormat("{status=%s, targets=[%s], header_data=\"%s\"}", status.ToString(), absl::StrJoin(targets, ","), - header_data); + header_data.as_string_view()); } }; @@ -464,7 +465,7 @@ class RlsLb final : public LoadBalancingPolicy { ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) { return data_expiration_time_; } - const std::string& header_data() const + const grpc_event_engine::experimental::Slice& header_data() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) { return header_data_; } @@ -543,7 +544,8 @@ class RlsLb final : public LoadBalancingPolicy { // RLS response states std::vector> child_policy_wrappers_ ABSL_GUARDED_BY(&RlsLb::mu_); - std::string header_data_ ABSL_GUARDED_BY(&RlsLb::mu_); + grpc_event_engine::experimental::Slice header_data_ + ABSL_GUARDED_BY(&RlsLb::mu_); Timestamp data_expiration_time_ ABSL_GUARDED_BY(&RlsLb::mu_) = Timestamp::InfPast(); Timestamp stale_time_ ABSL_GUARDED_BY(&RlsLb::mu_) = Timestamp::InfPast(); @@ -702,7 +704,7 @@ class RlsLb final : public LoadBalancingPolicy { RefCountedPtr rls_channel, std::unique_ptr backoff_state, grpc_lookup_v1_RouteLookupRequest_Reason reason, - std::string stale_header_data); + grpc_event_engine::experimental::Slice stale_header_data); ~RlsRequest() override; // Shuts down the request. If the request is still in flight, it is @@ -730,7 +732,7 @@ class RlsLb final : public LoadBalancingPolicy { RefCountedPtr rls_channel_; std::unique_ptr backoff_state_; grpc_lookup_v1_RouteLookupRequest_Reason reason_; - std::string stale_header_data_; + grpc_event_engine::experimental::Slice stale_header_data_; // RLS call state. Timestamp deadline_; @@ -1259,19 +1261,17 @@ LoadBalancingPolicy::PickResult RlsLb::Cache::Entry::Pick(PickArgs args) { child_policy_wrappers_.size(), ConnectivityStateName(child_policy_wrapper->connectivity_state())); } - // Add header data. - // Note that even if the target we're using is in TRANSIENT_FAILURE, - // the pick might still succeed (e.g., if the child is ring_hash), so - // we need to pass the right header info down in all cases. - if (!header_data_.empty()) { - char* copied_header_data = - static_cast(args.call_state->Alloc(header_data_.length() + 1)); - strcpy(copied_header_data, header_data_.c_str()); - args.initial_metadata->Add(kRlsHeaderKey, copied_header_data); - } auto pick_result = child_policy_wrapper->Pick(args); lb_policy_->MaybeExportPickCount(kMetricTargetPicks, child_policy_wrapper->target(), pick_result); + // Add header data. + if (!header_data_.empty()) { + auto* complete_pick = + absl::get_if(&pick_result.result); + if (complete_pick != nullptr) { + complete_pick->metadata_mutations.Add(kRlsHeaderKey, header_data_.Ref()); + } + } return pick_result; } @@ -1682,11 +1682,11 @@ void RlsLb::RlsChannel::StartRlsCall(const RequestKey& key, std::unique_ptr backoff_state; grpc_lookup_v1_RouteLookupRequest_Reason reason = grpc_lookup_v1_RouteLookupRequest_REASON_MISS; - std::string stale_header_data; + grpc_event_engine::experimental::Slice stale_header_data; if (stale_entry != nullptr) { backoff_state = stale_entry->TakeBackoffState(); reason = grpc_lookup_v1_RouteLookupRequest_REASON_STALE; - stale_header_data = stale_entry->header_data(); + stale_header_data = stale_entry->header_data().Ref(); } lb_policy_->request_map_.emplace( key, MakeOrphanable( @@ -1708,11 +1708,12 @@ void RlsLb::RlsChannel::ResetBackoff() { // RlsLb::RlsRequest // -RlsLb::RlsRequest::RlsRequest(RefCountedPtr lb_policy, RequestKey key, - RefCountedPtr rls_channel, - std::unique_ptr backoff_state, - grpc_lookup_v1_RouteLookupRequest_Reason reason, - std::string stale_header_data) +RlsLb::RlsRequest::RlsRequest( + RefCountedPtr lb_policy, RequestKey key, + RefCountedPtr rls_channel, + std::unique_ptr backoff_state, + grpc_lookup_v1_RouteLookupRequest_Reason reason, + grpc_event_engine::experimental::Slice stale_header_data) : InternallyRefCounted( GRPC_TRACE_FLAG_ENABLED(rls_lb) ? "RlsRequest" : nullptr), lb_policy_(std::move(lb_policy)), @@ -1890,8 +1891,7 @@ grpc_byte_buffer* RlsLb::RlsRequest::MakeRequestProto() { grpc_lookup_v1_RouteLookupRequest_set_reason(req, reason_); if (!stale_header_data_.empty()) { grpc_lookup_v1_RouteLookupRequest_set_stale_header_data( - req, upb_StringView_FromDataAndSize(stale_header_data_.data(), - stale_header_data_.size())); + req, StdStringToUpbString(stale_header_data_.as_string_view())); } size_t len; char* buf = @@ -1934,7 +1934,8 @@ RlsLb::ResponseInfo RlsLb::RlsRequest::ParseResponseProto() { upb_StringView header_data_strview = grpc_lookup_v1_RouteLookupResponse_header_data(response); response_info.header_data = - std::string(header_data_strview.data, header_data_strview.size); + grpc_event_engine::experimental::Slice::FromCopiedBuffer( + header_data_strview.data, header_data_strview.size); return response_info; } diff --git a/src/core/xds/grpc/upb_utils.h b/src/core/util/upb_utils.h similarity index 87% rename from src/core/xds/grpc/upb_utils.h rename to src/core/util/upb_utils.h index e2123f39cbc42..a5b1ac897fc9c 100644 --- a/src/core/xds/grpc/upb_utils.h +++ b/src/core/util/upb_utils.h @@ -14,16 +14,14 @@ // limitations under the License. // -#ifndef GRPC_SRC_CORE_XDS_GRPC_UPB_UTILS_H -#define GRPC_SRC_CORE_XDS_GRPC_UPB_UTILS_H +#ifndef GRPC_SRC_CORE_UTIL_UPB_UTILS_H +#define GRPC_SRC_CORE_UTIL_UPB_UTILS_H #include #include "absl/strings/string_view.h" #include "upb/base/string_view.h" -#include - namespace grpc_core { // Works for both std::string and absl::string_view. @@ -42,4 +40,4 @@ inline std::string UpbStringToStdString(const upb_StringView& str) { } // namespace grpc_core -#endif // GRPC_SRC_CORE_XDS_GRPC_UPB_UTILS_H +#endif // GRPC_SRC_CORE_UTIL_UPB_UTILS_H diff --git a/src/core/xds/grpc/xds_client_grpc.cc b/src/core/xds/grpc/xds_client_grpc.cc index 58771564a293b..a1899f47673f4 100644 --- a/src/core/xds/grpc/xds_client_grpc.cc +++ b/src/core/xds/grpc/xds_client_grpc.cc @@ -55,7 +55,7 @@ #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/transport/error_utils.h" #include "src/core/telemetry/metrics.h" -#include "src/core/xds/grpc/upb_utils.h" +#include "src/core/util/upb_utils.h" #include "src/core/xds/grpc/xds_bootstrap_grpc.h" #include "src/core/xds/grpc/xds_transport_grpc.h" #include "src/core/xds/xds_client/xds_api.h" diff --git a/src/core/xds/grpc/xds_cluster.cc b/src/core/xds/grpc/xds_cluster.cc index 2d2f4e6c7282b..7b232fa0ef8ab 100644 --- a/src/core/xds/grpc/xds_cluster.cc +++ b/src/core/xds/grpc/xds_cluster.cc @@ -66,7 +66,7 @@ #include "src/core/lib/matchers/matchers.h" #include "src/core/load_balancing/lb_policy_registry.h" #include "src/core/util/json/json_writer.h" -#include "src/core/xds/grpc/upb_utils.h" +#include "src/core/util/upb_utils.h" #include "src/core/xds/grpc/xds_common_types.h" #include "src/core/xds/grpc/xds_lb_policy_registry.h" #include "src/core/xds/xds_client/xds_client.h" diff --git a/src/core/xds/grpc/xds_common_types.cc b/src/core/xds/grpc/xds_common_types.cc index ff0a630a99273..2e8fafda5883a 100644 --- a/src/core/xds/grpc/xds_common_types.cc +++ b/src/core/xds/grpc/xds_common_types.cc @@ -45,7 +45,7 @@ #include #include "src/core/util/json/json_reader.h" -#include "src/core/xds/grpc/upb_utils.h" +#include "src/core/util/upb_utils.h" #include "src/core/xds/grpc/xds_bootstrap_grpc.h" #include "src/core/xds/xds_client/xds_client.h" diff --git a/src/core/xds/grpc/xds_endpoint.cc b/src/core/xds/grpc/xds_endpoint.cc index 46fb1f8188778..dc761931d66e6 100644 --- a/src/core/xds/grpc/xds_endpoint.cc +++ b/src/core/xds/grpc/xds_endpoint.cc @@ -51,7 +51,7 @@ #include "src/core/lib/gprpp/validation_errors.h" #include "src/core/lib/iomgr/resolved_address.h" #include "src/core/util/string.h" -#include "src/core/xds/grpc/upb_utils.h" +#include "src/core/util/upb_utils.h" #include "src/core/xds/grpc/xds_health_status.h" #include "src/core/xds/xds_client/xds_resource_type.h" diff --git a/src/core/xds/grpc/xds_http_rbac_filter.cc b/src/core/xds/grpc/xds_http_rbac_filter.cc index 6ba09042c88aa..3fadf5d2ebf90 100644 --- a/src/core/xds/grpc/xds_http_rbac_filter.cc +++ b/src/core/xds/grpc/xds_http_rbac_filter.cc @@ -50,7 +50,7 @@ #include "src/core/util/json/json.h" #include "src/core/util/json/json_writer.h" #include "src/core/util/string.h" -#include "src/core/xds/grpc/upb_utils.h" +#include "src/core/util/upb_utils.h" #include "src/core/xds/grpc/xds_audit_logger_registry.h" #include "src/core/xds/grpc/xds_bootstrap_grpc.h" #include "src/core/xds/xds_client/xds_client.h" diff --git a/src/core/xds/grpc/xds_http_stateful_session_filter.cc b/src/core/xds/grpc/xds_http_stateful_session_filter.cc index 7b0b05219cd1f..0c4203dd14067 100644 --- a/src/core/xds/grpc/xds_http_stateful_session_filter.cc +++ b/src/core/xds/grpc/xds_http_stateful_session_filter.cc @@ -39,7 +39,7 @@ #include "src/core/lib/gprpp/validation_errors.h" #include "src/core/util/json/json.h" #include "src/core/util/json/json_writer.h" -#include "src/core/xds/grpc/upb_utils.h" +#include "src/core/util/upb_utils.h" #include "src/core/xds/grpc/xds_common_types.h" #include "src/core/xds/grpc/xds_http_filters.h" diff --git a/src/core/xds/grpc/xds_listener.cc b/src/core/xds/grpc/xds_listener.cc index 7ea5822bf633a..02379bfe5a7fb 100644 --- a/src/core/xds/grpc/xds_listener.cc +++ b/src/core/xds/grpc/xds_listener.cc @@ -55,7 +55,7 @@ #include "src/core/lib/gprpp/validation_errors.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/matchers/matchers.h" -#include "src/core/xds/grpc/upb_utils.h" +#include "src/core/util/upb_utils.h" #include "src/core/xds/grpc/xds_common_types.h" #include "src/core/xds/xds_client/xds_resource_type.h" diff --git a/src/core/xds/grpc/xds_route_config.cc b/src/core/xds/grpc/xds_route_config.cc index d2933d8413a46..c2f8a40620763 100644 --- a/src/core/xds/grpc/xds_route_config.cc +++ b/src/core/xds/grpc/xds_route_config.cc @@ -70,7 +70,7 @@ #include "src/core/util/json/json.h" #include "src/core/util/json/json_writer.h" #include "src/core/util/string.h" -#include "src/core/xds/grpc/upb_utils.h" +#include "src/core/util/upb_utils.h" #include "src/core/xds/grpc/xds_cluster_specifier_plugin.h" #include "src/core/xds/grpc/xds_common_types.h" #include "src/core/xds/grpc/xds_http_filters.h" diff --git a/src/core/xds/xds_client/xds_api.cc b/src/core/xds/xds_client/xds_api.cc index 6964d9b993135..8ae5e438abf7d 100644 --- a/src/core/xds/xds_client/xds_api.cc +++ b/src/core/xds/xds_client/xds_api.cc @@ -48,7 +48,7 @@ #include #include "src/core/util/json/json.h" -#include "src/core/xds/grpc/upb_utils.h" +#include "src/core/util/upb_utils.h" #include "src/core/xds/xds_client/xds_client.h" // IWYU pragma: no_include "upb/msg_internal.h" diff --git a/src/core/xds/xds_client/xds_client.cc b/src/core/xds/xds_client/xds_client.cc index 1f3c9b1926f4a..8a9456cdad2f4 100644 --- a/src/core/xds/xds_client/xds_client.cc +++ b/src/core/xds/xds_client/xds_client.cc @@ -53,7 +53,7 @@ #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/uri/uri_parser.h" -#include "src/core/xds/grpc/upb_utils.h" +#include "src/core/util/upb_utils.h" #include "src/core/xds/xds_client/xds_api.h" #include "src/core/xds/xds_client/xds_bootstrap.h" #include "src/core/xds/xds_client/xds_client_stats.h" diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 4d4b22f2225db..70c1bbfa63ddc 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -27,6 +27,7 @@ 'src/core/client_channel/direct_channel.cc', 'src/core/client_channel/dynamic_filters.cc', 'src/core/client_channel/global_subchannel_pool.cc', + 'src/core/client_channel/lb_metadata.cc', 'src/core/client_channel/load_balanced_call_destination.cc', 'src/core/client_channel/local_subchannel_pool.cc', 'src/core/client_channel/retry_filter.cc', diff --git a/test/core/load_balancing/lb_policy_test_lib.h b/test/core/load_balancing/lb_policy_test_lib.h index aa7ea8c9ae8b1..fcd85c8928eee 100644 --- a/test/core/load_balancing/lb_policy_test_lib.h +++ b/test/core/load_balancing/lb_policy_test_lib.h @@ -620,20 +620,7 @@ class LoadBalancingPolicyTest : public ::testing::Test { explicit FakeMetadata(std::map metadata) : metadata_(std::move(metadata)) {} - const std::map& metadata() const { - return metadata_; - } - private: - void Add(absl::string_view key, absl::string_view value) override { - metadata_[std::string(key)] = std::string(value); - } - - std::vector> TestOnlyCopyToVector() - override { - return {}; // Not used. - } - absl::optional Lookup( absl::string_view key, std::string* /*buffer*/) const override { auto it = metadata_.find(std::string(key)); diff --git a/test/core/test_util/BUILD b/test/core/test_util/BUILD index d2e935f75f4ca..e0962df86f370 100644 --- a/test/core/test_util/BUILD +++ b/test/core/test_util/BUILD @@ -333,10 +333,12 @@ grpc_cc_library( "//:uri_parser", "//src/core:channel_args", "//src/core:delegating_helper", + "//src/core:down_cast", "//src/core:error", "//src/core:grpc_backend_metric_data", "//src/core:json", "//src/core:json_util", + "//src/core:lb_metadata", "//src/core:lb_policy", "//src/core:lb_policy_factory", "//src/core:lb_policy_registry", diff --git a/test/core/test_util/test_lb_policies.cc b/test/core/test_util/test_lb_policies.cc index aea9e9194140a..2d674333b12b8 100644 --- a/test/core/test_util/test_lb_policies.cc +++ b/test/core/test_util/test_lb_policies.cc @@ -30,9 +30,11 @@ #include #include +#include "src/core/client_channel/lb_metadata.h" #include "src/core/lib/address_utils/parse_address.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/config/core_configuration.h" +#include "src/core/lib/gprpp/down_cast.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/status_helper.h" @@ -129,7 +131,8 @@ class TestPickArgsLb : public ForwardingLoadBalancingPolicy { // Report args seen. PickArgsSeen args_seen; args_seen.path = std::string(args.path); - args_seen.metadata = args.initial_metadata->TestOnlyCopyToVector(); + args_seen.metadata = + DownCast(args.initial_metadata)->TestOnlyCopyToVector(); cb_(args_seen); // Do pick. return delegate_picker_->Pick(args); @@ -269,7 +272,8 @@ class InterceptRecvTrailingMetadataLoadBalancingPolicy args_seen.status = args.status; args_seen.backend_metric_data = args.backend_metric_accessor->GetBackendMetricData(); - args_seen.metadata = args.trailing_metadata->TestOnlyCopyToVector(); + args_seen.metadata = + DownCast(args.trailing_metadata)->TestOnlyCopyToVector(); cb_(args_seen); } diff --git a/test/core/transport/metadata_map_test.cc b/test/core/transport/metadata_map_test.cc index 12e42c7167a35..cc977cd3c4e9a 100644 --- a/test/core/transport/metadata_map_test.cc +++ b/test/core/transport/metadata_map_test.cc @@ -81,7 +81,7 @@ TEST(MetadataMapTest, SimpleOps) { // EXPECT_EQ it later. class FakeEncoder { public: - std::string output() { return output_; } + const std::string& output() { return output_; } void Encode(const Slice& key, const Slice& value) { output_ += absl::StrCat("UNKNOWN METADATUM: key=", key.as_string_view(), @@ -128,6 +128,26 @@ TEST(MetadataMapTest, NonEncodableTrait) { EXPECT_EQ(map.DebugString(), "GrpcStreamNetworkState: not sent on wire"); } +TEST(MetadataMapTest, NonTraitKeyWithMultipleValues) { + FakeEncoder encoder; + TimeoutOnlyMetadataMap map; + const absl::string_view kKey = "key"; + map.Append(kKey, Slice::FromStaticString("value1"), + [](absl::string_view error, const Slice& value) { + LOG(ERROR) << error << " value:" << value.as_string_view(); + }); + map.Append(kKey, Slice::FromStaticString("value2"), + [](absl::string_view error, const Slice& value) { + LOG(ERROR) << error << " value:" << value.as_string_view(); + }); + map.Encode(&encoder); + EXPECT_EQ(encoder.output(), + "UNKNOWN METADATUM: key=key value=value1\n" + "UNKNOWN METADATUM: key=key value=value2\n"); + std::string buffer; + EXPECT_EQ(map.GetStringValue(kKey, &buffer), "value1,value2"); +} + TEST(DebugStringBuilderTest, OneAddAfterRedaction) { metadata_detail::DebugStringBuilder b; b.AddAfterRedaction(ContentTypeMetadata::key(), "AddValue01"); diff --git a/test/core/xds/xds_common_types_test.cc b/test/core/xds/xds_common_types_test.cc index b934a977cb622..6202407862f5a 100644 --- a/test/core/xds/xds_common_types_test.cc +++ b/test/core/xds/xds_common_types_test.cc @@ -45,7 +45,7 @@ #include "src/core/lib/gprpp/validation_errors.h" #include "src/core/lib/matchers/matchers.h" #include "src/core/util/json/json_writer.h" -#include "src/core/xds/grpc/upb_utils.h" +#include "src/core/util/upb_utils.h" #include "src/core/xds/grpc/xds_bootstrap_grpc.h" #include "src/core/xds/xds_client/xds_bootstrap.h" #include "src/core/xds/xds_client/xds_client.h" diff --git a/test/cpp/interop/rpc_behavior_lb_policy.cc b/test/cpp/interop/rpc_behavior_lb_policy.cc index b065af99ad969..1e6ab254fd2a7 100644 --- a/test/cpp/interop/rpc_behavior_lb_policy.cc +++ b/test/cpp/interop/rpc_behavior_lb_policy.cc @@ -111,12 +111,17 @@ class RpcBehaviorLbPolicy : public LoadBalancingPolicy { rpc_behavior_(rpc_behavior) {} PickResult Pick(PickArgs args) override { - char* rpc_behavior_copy = static_cast( - args.call_state->Alloc(rpc_behavior_.length() + 1)); - strcpy(rpc_behavior_copy, rpc_behavior_.c_str()); - args.initial_metadata->Add(kRpcBehaviorMetadataKey, rpc_behavior_copy); // Do pick. - return delegate_picker_->Pick(args); + auto pick_result = delegate_picker_->Pick(args); + // Add metadata. + auto* complete_pick = + absl::get_if(&pick_result.result); + if (complete_pick != nullptr) { + complete_pick->metadata_mutations.Add(kRpcBehaviorMetadataKey, + rpc_behavior_); + } + // Return result. + return pick_result; } private: diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index cf6eb657b4f44..7065c3262d76b 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1115,6 +1115,8 @@ src/core/client_channel/dynamic_filters.cc \ src/core/client_channel/dynamic_filters.h \ src/core/client_channel/global_subchannel_pool.cc \ src/core/client_channel/global_subchannel_pool.h \ +src/core/client_channel/lb_metadata.cc \ +src/core/client_channel/lb_metadata.h \ src/core/client_channel/load_balanced_call_destination.cc \ src/core/client_channel/load_balanced_call_destination.h \ src/core/client_channel/local_subchannel_pool.cc \ @@ -2951,6 +2953,7 @@ src/core/util/time.cc \ src/core/util/time_precise.cc \ src/core/util/time_precise.h \ src/core/util/tmpfile.h \ +src/core/util/upb_utils.h \ src/core/util/useful.h \ src/core/util/windows/cpu.cc \ src/core/util/windows/log.cc \ @@ -2963,7 +2966,6 @@ src/core/xds/grpc/certificate_provider_store.cc \ src/core/xds/grpc/certificate_provider_store.h \ src/core/xds/grpc/file_watcher_certificate_provider_factory.cc \ src/core/xds/grpc/file_watcher_certificate_provider_factory.h \ -src/core/xds/grpc/upb_utils.h \ src/core/xds/grpc/xds_audit_logger_registry.cc \ src/core/xds/grpc/xds_audit_logger_registry.h \ src/core/xds/grpc/xds_bootstrap_grpc.cc \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 6b3601d835935..de0d4e57c86a5 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -915,6 +915,8 @@ src/core/client_channel/dynamic_filters.cc \ src/core/client_channel/dynamic_filters.h \ src/core/client_channel/global_subchannel_pool.cc \ src/core/client_channel/global_subchannel_pool.h \ +src/core/client_channel/lb_metadata.cc \ +src/core/client_channel/lb_metadata.h \ src/core/client_channel/load_balanced_call_destination.cc \ src/core/client_channel/load_balanced_call_destination.h \ src/core/client_channel/local_subchannel_pool.cc \ @@ -2730,6 +2732,7 @@ src/core/util/time.cc \ src/core/util/time_precise.cc \ src/core/util/time_precise.h \ src/core/util/tmpfile.h \ +src/core/util/upb_utils.h \ src/core/util/useful.h \ src/core/util/windows/cpu.cc \ src/core/util/windows/log.cc \ @@ -2742,7 +2745,6 @@ src/core/xds/grpc/certificate_provider_store.cc \ src/core/xds/grpc/certificate_provider_store.h \ src/core/xds/grpc/file_watcher_certificate_provider_factory.cc \ src/core/xds/grpc/file_watcher_certificate_provider_factory.h \ -src/core/xds/grpc/upb_utils.h \ src/core/xds/grpc/xds_audit_logger_registry.cc \ src/core/xds/grpc/xds_audit_logger_registry.h \ src/core/xds/grpc/xds_bootstrap_grpc.cc \ diff --git a/tools/run_tests/sanity/core_banned_functions.py b/tools/run_tests/sanity/core_banned_functions.py index 7a4270ecf0df4..24b38f3142169 100755 --- a/tools/run_tests/sanity/core_banned_functions.py +++ b/tools/run_tests/sanity/core_banned_functions.py @@ -24,7 +24,10 @@ # map of banned function signature to allowlist BANNED_EXCEPT = { - "grpc_slice_from_static_buffer(": ["src/core/lib/slice/slice.cc"], + "grpc_slice_from_static_buffer(": [ + "src/core/lib/slice/slice.cc", + "src/core/load_balancing/grpclb/grpclb.cc", + ], "grpc_resource_quota_ref(": ["src/core/lib/resource_quota/api.cc"], "grpc_resource_quota_unref(": [ "src/core/lib/resource_quota/api.cc",