From e7500e8bf9aee2df27a1929016935f63e008a8b0 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 21 Jun 2024 11:32:19 -0700 Subject: [PATCH] [LB policy API] change metadata mutation API to handle discarded picks (#36968) Previously, metadata mutations were made by the picker directly, which meant that they would be applied even if the channel winds up discarding the pick due to the returned subchannel having been disconnected by the time the pick result is returned. This changes the API such that pickers return metadata mutations along with the pick result, so that the mutations won't get applied unless the pick result is actually used. Closes #36968 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/36968 from markdroth:lb_metadata_api 2765da61214f3ae63da3fdb8e362fac625b51c5a PiperOrigin-RevId: 645451869 --- BUILD | 1 + CMakeLists.txt | 2 + Makefile | 1 + Package.swift | 4 +- build_autogenerated.yaml | 7 +- config.m4 | 1 + config.w32 | 1 + gRPC-C++.podspec | 6 +- gRPC-Core.podspec | 7 +- grpc.gemspec | 4 +- package.xml | 4 +- src/core/BUILD | 20 +++- .../client_channel/client_channel_filter.cc | 81 ++------------- src/core/client_channel/lb_metadata.cc | 99 +++++++++++++++++++ src/core/client_channel/lb_metadata.h | 50 ++++++++++ .../load_balanced_call_destination.cc | 72 +------------- src/core/load_balancing/grpclb/grpclb.cc | 41 ++++---- src/core/load_balancing/lb_policy.h | 56 ++++++----- src/core/load_balancing/rls/rls.cc | 53 +++++----- src/core/{xds/grpc => util}/upb_utils.h | 8 +- src/core/xds/grpc/xds_client_grpc.cc | 2 +- src/core/xds/grpc/xds_cluster.cc | 2 +- src/core/xds/grpc/xds_common_types.cc | 2 +- src/core/xds/grpc/xds_endpoint.cc | 2 +- src/core/xds/grpc/xds_http_rbac_filter.cc | 2 +- .../grpc/xds_http_stateful_session_filter.cc | 2 +- src/core/xds/grpc/xds_listener.cc | 2 +- src/core/xds/grpc/xds_route_config.cc | 2 +- src/core/xds/xds_client/xds_api.cc | 2 +- src/core/xds/xds_client/xds_client.cc | 2 +- src/python/grpcio/grpc_core_dependencies.py | 1 + test/core/load_balancing/lb_policy_test_lib.h | 13 --- test/core/test_util/BUILD | 2 + test/core/test_util/test_lb_policies.cc | 8 +- test/core/transport/metadata_map_test.cc | 22 ++++- test/core/xds/xds_common_types_test.cc | 2 +- test/cpp/interop/rpc_behavior_lb_policy.cc | 15 ++- tools/doxygen/Doxyfile.c++.internal | 4 +- tools/doxygen/Doxyfile.core.internal | 4 +- .../run_tests/sanity/core_banned_functions.py | 5 +- 40 files changed, 355 insertions(+), 259 deletions(-) create mode 100644 src/core/client_channel/lb_metadata.cc create mode 100644 src/core/client_channel/lb_metadata.h rename src/core/{xds/grpc => util}/upb_utils.h (87%) diff --git a/BUILD b/BUILD index 4f9181a6d1c86e..b33418d118bbd8 100644 --- a/BUILD +++ b/BUILD @@ -3818,6 +3818,7 @@ grpc_cc_library( "//src/core:iomgr_fwd", "//src/core:json", "//src/core:latch", + "//src/core:lb_metadata", "//src/core:lb_policy", "//src/core:lb_policy_registry", "//src/core:loop", diff --git a/CMakeLists.txt b/CMakeLists.txt index 838ae79c140b6a..0272420d75222c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1876,6 +1876,7 @@ add_library(grpc src/core/client_channel/direct_channel.cc src/core/client_channel/dynamic_filters.cc src/core/client_channel/global_subchannel_pool.cc + src/core/client_channel/lb_metadata.cc src/core/client_channel/load_balanced_call_destination.cc src/core/client_channel/local_subchannel_pool.cc src/core/client_channel/retry_filter.cc @@ -2969,6 +2970,7 @@ add_library(grpc_unsecure src/core/client_channel/direct_channel.cc src/core/client_channel/dynamic_filters.cc src/core/client_channel/global_subchannel_pool.cc + src/core/client_channel/lb_metadata.cc src/core/client_channel/load_balanced_call_destination.cc src/core/client_channel/local_subchannel_pool.cc src/core/client_channel/retry_filter.cc diff --git a/Makefile b/Makefile index d4e982f903c651..652e940ec9e27b 100644 --- a/Makefile +++ b/Makefile @@ -678,6 +678,7 @@ LIBGRPC_SRC = \ src/core/client_channel/direct_channel.cc \ src/core/client_channel/dynamic_filters.cc \ src/core/client_channel/global_subchannel_pool.cc \ + src/core/client_channel/lb_metadata.cc \ src/core/client_channel/load_balanced_call_destination.cc \ src/core/client_channel/local_subchannel_pool.cc \ src/core/client_channel/retry_filter.cc \ diff --git a/Package.swift b/Package.swift index 260c070857e67e..bbdc95a5037262 100644 --- a/Package.swift +++ b/Package.swift @@ -144,6 +144,8 @@ let package = Package( "src/core/client_channel/dynamic_filters.h", "src/core/client_channel/global_subchannel_pool.cc", "src/core/client_channel/global_subchannel_pool.h", + "src/core/client_channel/lb_metadata.cc", + "src/core/client_channel/lb_metadata.h", "src/core/client_channel/load_balanced_call_destination.cc", "src/core/client_channel/load_balanced_call_destination.h", "src/core/client_channel/local_subchannel_pool.cc", @@ -1946,6 +1948,7 @@ let package = Package( "src/core/util/time_precise.cc", "src/core/util/time_precise.h", "src/core/util/tmpfile.h", + "src/core/util/upb_utils.h", "src/core/util/useful.h", "src/core/util/windows/cpu.cc", "src/core/util/windows/log.cc", @@ -1958,7 +1961,6 @@ let package = Package( "src/core/xds/grpc/certificate_provider_store.h", "src/core/xds/grpc/file_watcher_certificate_provider_factory.cc", "src/core/xds/grpc/file_watcher_certificate_provider_factory.h", - "src/core/xds/grpc/upb_utils.h", "src/core/xds/grpc/xds_audit_logger_registry.cc", "src/core/xds/grpc/xds_audit_logger_registry.h", "src/core/xds/grpc/xds_bootstrap_grpc.cc", diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 3f93c7d81998a4..7184929424d1f9 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -237,6 +237,7 @@ libs: - src/core/client_channel/direct_channel.h - src/core/client_channel/dynamic_filters.h - src/core/client_channel/global_subchannel_pool.h + - src/core/client_channel/lb_metadata.h - src/core/client_channel/load_balanced_call_destination.h - src/core/client_channel/local_subchannel_pool.h - src/core/client_channel/retry_filter.h @@ -1216,9 +1217,9 @@ libs: - src/core/util/json/json_util.h - src/core/util/json/json_writer.h - src/core/util/spinlock.h + - src/core/util/upb_utils.h - src/core/xds/grpc/certificate_provider_store.h - src/core/xds/grpc/file_watcher_certificate_provider_factory.h - - src/core/xds/grpc/upb_utils.h - src/core/xds/grpc/xds_audit_logger_registry.h - src/core/xds/grpc/xds_bootstrap_grpc.h - src/core/xds/grpc/xds_certificate_provider.h @@ -1261,6 +1262,7 @@ libs: - src/core/client_channel/direct_channel.cc - src/core/client_channel/dynamic_filters.cc - src/core/client_channel/global_subchannel_pool.cc + - src/core/client_channel/lb_metadata.cc - src/core/client_channel/load_balanced_call_destination.cc - src/core/client_channel/local_subchannel_pool.cc - src/core/client_channel/retry_filter.cc @@ -2223,6 +2225,7 @@ libs: - src/core/client_channel/direct_channel.h - src/core/client_channel/dynamic_filters.h - src/core/client_channel/global_subchannel_pool.h + - src/core/client_channel/lb_metadata.h - src/core/client_channel/load_balanced_call_destination.h - src/core/client_channel/local_subchannel_pool.h - src/core/client_channel/retry_filter.h @@ -2685,6 +2688,7 @@ libs: - src/core/util/json/json_reader.h - src/core/util/json/json_writer.h - src/core/util/spinlock.h + - src/core/util/upb_utils.h - third_party/upb/upb/generated_code_support.h - third_party/upb/upb/mini_descriptor/build_enum.h - third_party/upb/upb/mini_descriptor/decode.h @@ -2717,6 +2721,7 @@ libs: - src/core/client_channel/direct_channel.cc - src/core/client_channel/dynamic_filters.cc - src/core/client_channel/global_subchannel_pool.cc + - src/core/client_channel/lb_metadata.cc - src/core/client_channel/load_balanced_call_destination.cc - src/core/client_channel/local_subchannel_pool.cc - src/core/client_channel/retry_filter.cc diff --git a/config.m4 b/config.m4 index d3c633249d3819..6435dff6bcc5f8 100644 --- a/config.m4 +++ b/config.m4 @@ -53,6 +53,7 @@ if test "$PHP_GRPC" != "no"; then src/core/client_channel/direct_channel.cc \ src/core/client_channel/dynamic_filters.cc \ src/core/client_channel/global_subchannel_pool.cc \ + src/core/client_channel/lb_metadata.cc \ src/core/client_channel/load_balanced_call_destination.cc \ src/core/client_channel/local_subchannel_pool.cc \ src/core/client_channel/retry_filter.cc \ diff --git a/config.w32 b/config.w32 index ade25276f75e73..1c871b2300b3c6 100644 --- a/config.w32 +++ b/config.w32 @@ -18,6 +18,7 @@ if (PHP_GRPC != "no") { "src\\core\\client_channel\\direct_channel.cc " + "src\\core\\client_channel\\dynamic_filters.cc " + "src\\core\\client_channel\\global_subchannel_pool.cc " + + "src\\core\\client_channel\\lb_metadata.cc " + "src\\core\\client_channel\\load_balanced_call_destination.cc " + "src\\core\\client_channel\\local_subchannel_pool.cc " + "src\\core\\client_channel\\retry_filter.cc " + diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 45d20d5aa5d81c..4901373eb91da2 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -283,6 +283,7 @@ Pod::Spec.new do |s| 'src/core/client_channel/direct_channel.h', 'src/core/client_channel/dynamic_filters.h', 'src/core/client_channel/global_subchannel_pool.h', + 'src/core/client_channel/lb_metadata.h', 'src/core/client_channel/load_balanced_call_destination.h', 'src/core/client_channel/local_subchannel_pool.h', 'src/core/client_channel/retry_filter.h', @@ -1323,10 +1324,10 @@ Pod::Spec.new do |s| 'src/core/util/string.h', 'src/core/util/time_precise.h', 'src/core/util/tmpfile.h', + 'src/core/util/upb_utils.h', 'src/core/util/useful.h', 'src/core/xds/grpc/certificate_provider_store.h', 'src/core/xds/grpc/file_watcher_certificate_provider_factory.h', - 'src/core/xds/grpc/upb_utils.h', 'src/core/xds/grpc/xds_audit_logger_registry.h', 'src/core/xds/grpc/xds_bootstrap_grpc.h', 'src/core/xds/grpc/xds_certificate_provider.h', @@ -1574,6 +1575,7 @@ Pod::Spec.new do |s| 'src/core/client_channel/direct_channel.h', 'src/core/client_channel/dynamic_filters.h', 'src/core/client_channel/global_subchannel_pool.h', + 'src/core/client_channel/lb_metadata.h', 'src/core/client_channel/load_balanced_call_destination.h', 'src/core/client_channel/local_subchannel_pool.h', 'src/core/client_channel/retry_filter.h', @@ -2596,10 +2598,10 @@ Pod::Spec.new do |s| 'src/core/util/string.h', 'src/core/util/time_precise.h', 'src/core/util/tmpfile.h', + 'src/core/util/upb_utils.h', 'src/core/util/useful.h', 'src/core/xds/grpc/certificate_provider_store.h', 'src/core/xds/grpc/file_watcher_certificate_provider_factory.h', - 'src/core/xds/grpc/upb_utils.h', 'src/core/xds/grpc/xds_audit_logger_registry.h', 'src/core/xds/grpc/xds_bootstrap_grpc.h', 'src/core/xds/grpc/xds_certificate_provider.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index e4e3f69602daf3..d812ea38794ecb 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -263,6 +263,8 @@ Pod::Spec.new do |s| 'src/core/client_channel/dynamic_filters.h', 'src/core/client_channel/global_subchannel_pool.cc', 'src/core/client_channel/global_subchannel_pool.h', + 'src/core/client_channel/lb_metadata.cc', + 'src/core/client_channel/lb_metadata.h', 'src/core/client_channel/load_balanced_call_destination.cc', 'src/core/client_channel/load_balanced_call_destination.h', 'src/core/client_channel/local_subchannel_pool.cc', @@ -2061,6 +2063,7 @@ Pod::Spec.new do |s| 'src/core/util/time_precise.cc', 'src/core/util/time_precise.h', 'src/core/util/tmpfile.h', + 'src/core/util/upb_utils.h', 'src/core/util/useful.h', 'src/core/util/windows/cpu.cc', 'src/core/util/windows/log.cc', @@ -2073,7 +2076,6 @@ Pod::Spec.new do |s| 'src/core/xds/grpc/certificate_provider_store.h', 'src/core/xds/grpc/file_watcher_certificate_provider_factory.cc', 'src/core/xds/grpc/file_watcher_certificate_provider_factory.h', - 'src/core/xds/grpc/upb_utils.h', 'src/core/xds/grpc/xds_audit_logger_registry.cc', 'src/core/xds/grpc/xds_audit_logger_registry.h', 'src/core/xds/grpc/xds_bootstrap_grpc.cc', @@ -2365,6 +2367,7 @@ Pod::Spec.new do |s| 'src/core/client_channel/direct_channel.h', 'src/core/client_channel/dynamic_filters.h', 'src/core/client_channel/global_subchannel_pool.h', + 'src/core/client_channel/lb_metadata.h', 'src/core/client_channel/load_balanced_call_destination.h', 'src/core/client_channel/local_subchannel_pool.h', 'src/core/client_channel/retry_filter.h', @@ -3367,10 +3370,10 @@ Pod::Spec.new do |s| 'src/core/util/string.h', 'src/core/util/time_precise.h', 'src/core/util/tmpfile.h', + 'src/core/util/upb_utils.h', 'src/core/util/useful.h', 'src/core/xds/grpc/certificate_provider_store.h', 'src/core/xds/grpc/file_watcher_certificate_provider_factory.h', - 'src/core/xds/grpc/upb_utils.h', 'src/core/xds/grpc/xds_audit_logger_registry.h', 'src/core/xds/grpc/xds_bootstrap_grpc.h', 'src/core/xds/grpc/xds_certificate_provider.h', diff --git a/grpc.gemspec b/grpc.gemspec index 423a5aa0140914..213e157d0eb4b9 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -150,6 +150,8 @@ Gem::Specification.new do |s| s.files += %w( src/core/client_channel/dynamic_filters.h ) s.files += %w( src/core/client_channel/global_subchannel_pool.cc ) s.files += %w( src/core/client_channel/global_subchannel_pool.h ) + s.files += %w( src/core/client_channel/lb_metadata.cc ) + s.files += %w( src/core/client_channel/lb_metadata.h ) s.files += %w( src/core/client_channel/load_balanced_call_destination.cc ) s.files += %w( src/core/client_channel/load_balanced_call_destination.h ) s.files += %w( src/core/client_channel/local_subchannel_pool.cc ) @@ -1948,6 +1950,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/util/time_precise.cc ) s.files += %w( src/core/util/time_precise.h ) s.files += %w( src/core/util/tmpfile.h ) + s.files += %w( src/core/util/upb_utils.h ) s.files += %w( src/core/util/useful.h ) s.files += %w( src/core/util/windows/cpu.cc ) s.files += %w( src/core/util/windows/log.cc ) @@ -1960,7 +1963,6 @@ Gem::Specification.new do |s| s.files += %w( src/core/xds/grpc/certificate_provider_store.h ) s.files += %w( src/core/xds/grpc/file_watcher_certificate_provider_factory.cc ) s.files += %w( src/core/xds/grpc/file_watcher_certificate_provider_factory.h ) - s.files += %w( src/core/xds/grpc/upb_utils.h ) s.files += %w( src/core/xds/grpc/xds_audit_logger_registry.cc ) s.files += %w( src/core/xds/grpc/xds_audit_logger_registry.h ) s.files += %w( src/core/xds/grpc/xds_bootstrap_grpc.cc ) diff --git a/package.xml b/package.xml index b3c2b6e941b792..3fcb69ed9813e5 100644 --- a/package.xml +++ b/package.xml @@ -132,6 +132,8 @@ + + @@ -1930,6 +1932,7 @@ + @@ -1942,7 +1945,6 @@ - diff --git a/src/core/BUILD b/src/core/BUILD index d5bb1efd7259ac..b9b1f80cf30828 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -3456,6 +3456,7 @@ grpc_cc_library( hdrs = ["load_balancing/lb_policy.h"], external_deps = [ "absl/base:core_headers", + "absl/container:inlined_vector", "absl/status", "absl/status:statusor", "absl/strings", @@ -3525,6 +3526,21 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "lb_metadata", + srcs = ["client_channel/lb_metadata.cc"], + hdrs = ["client_channel/lb_metadata.h"], + external_deps = [ + "absl/log:log", + "absl/strings", + "absl/types:optional", + ], + deps = [ + "lb_policy", + "metadata_batch", + ], +) + grpc_cc_library( name = "subchannel_interface", hdrs = ["load_balancing/subchannel_interface.h"], @@ -5009,6 +5025,7 @@ grpc_cc_library( "slice_refcount", "status_helper", "time", + "upb_utils", "uuid_v4", "validation_errors", "//:backoff", @@ -5039,14 +5056,13 @@ grpc_cc_library( grpc_cc_library( name = "upb_utils", hdrs = [ - "xds/grpc/upb_utils.h", + "util/upb_utils.h", ], external_deps = [ "absl/strings", "@com_google_protobuf//upb:base", ], language = "c++", - deps = ["//:gpr_platform"], ) grpc_cc_library( diff --git a/src/core/client_channel/client_channel_filter.cc b/src/core/client_channel/client_channel_filter.cc index 0ad970b549229b..35cbb04e6553c5 100644 --- a/src/core/client_channel/client_channel_filter.cc +++ b/src/core/client_channel/client_channel_filter.cc @@ -57,6 +57,7 @@ #include "src/core/client_channel/config_selector.h" #include "src/core/client_channel/dynamic_filters.h" #include "src/core/client_channel/global_subchannel_pool.h" +#include "src/core/client_channel/lb_metadata.h" #include "src/core/client_channel/local_subchannel_pool.h" #include "src/core/client_channel/retry_filter.h" #include "src/core/client_channel/subchannel.h" @@ -2353,79 +2354,6 @@ class ClientChannelFilter::LoadBalancedCall::LbCallState final LoadBalancedCall* lb_call_; }; -// -// ClientChannelFilter::LoadBalancedCall::Metadata -// - -class ClientChannelFilter::LoadBalancedCall::Metadata final - : public LoadBalancingPolicy::MetadataInterface { - public: - explicit Metadata(grpc_metadata_batch* batch) : batch_(batch) {} - - void Add(absl::string_view key, absl::string_view value) override { - if (batch_ == nullptr) return; - // Gross, egregious hack to support legacy grpclb behavior. - // TODO(ctiller): Use a promise context for this once that plumbing is done. - if (key == GrpcLbClientStatsMetadata::key()) { - batch_->Set( - GrpcLbClientStatsMetadata(), - const_cast( - reinterpret_cast(value.data()))); - return; - } - batch_->Append(key, Slice::FromStaticString(value), - [key](absl::string_view error, const Slice& value) { - LOG(ERROR) << error << " key:" << key - << " value:" << value.as_string_view(); - }); - } - - std::vector> TestOnlyCopyToVector() - override { - if (batch_ == nullptr) return {}; - Encoder encoder; - batch_->Encode(&encoder); - return encoder.Take(); - } - - absl::optional Lookup(absl::string_view key, - std::string* buffer) const override { - if (batch_ == nullptr) return absl::nullopt; - return batch_->GetStringValue(key, buffer); - } - - private: - class Encoder final { - public: - void Encode(const Slice& key, const Slice& value) { - out_.emplace_back(std::string(key.as_string_view()), - std::string(value.as_string_view())); - } - - template - void Encode(Which, const typename Which::ValueType& value) { - auto value_slice = Which::Encode(value); - out_.emplace_back(std::string(Which::key()), - std::string(value_slice.as_string_view())); - } - - void Encode(GrpcTimeoutMetadata, - const typename GrpcTimeoutMetadata::ValueType&) {} - void Encode(HttpPathMetadata, const Slice&) {} - void Encode(HttpMethodMetadata, - const typename HttpMethodMetadata::ValueType&) {} - - std::vector> Take() { - return std::move(out_); - } - - private: - std::vector> out_; - }; - - grpc_metadata_batch* batch_; -}; - // // ClientChannelFilter::LoadBalancedCall::LbCallState // @@ -2536,7 +2464,7 @@ void ClientChannelFilter::LoadBalancedCall::RecordCallCompletion( // If the LB policy requested a callback for trailing metadata, invoke // the callback. if (lb_subchannel_call_tracker_ != nullptr) { - Metadata trailing_metadata(recv_trailing_metadata); + LbMetadata trailing_metadata(recv_trailing_metadata); BackendMetricAccessor backend_metric_accessor(this, recv_trailing_metadata); LoadBalancingPolicy::SubchannelCallTrackerInterface::FinishArgs args = { peer_address, status, &trailing_metadata, &backend_metric_accessor}; @@ -2683,7 +2611,7 @@ bool ClientChannelFilter::LoadBalancedCall::PickSubchannelImpl( pick_args.path = path->as_string_view(); LbCallState lb_call_state(this); pick_args.call_state = &lb_call_state; - Metadata initial_metadata(send_initial_metadata()); + LbMetadata initial_metadata(send_initial_metadata()); pick_args.initial_metadata = &initial_metadata; auto result = picker->Pick(pick_args); return HandlePickResult( @@ -2716,6 +2644,9 @@ bool ClientChannelFilter::LoadBalancedCall::PickSubchannelImpl( if (lb_subchannel_call_tracker_ != nullptr) { lb_subchannel_call_tracker_->Start(); } + // Handle metadata mutations. + MetadataMutationHandler::Apply(complete_pick->metadata_mutations, + send_initial_metadata()); return true; }, // QueuePick diff --git a/src/core/client_channel/lb_metadata.cc b/src/core/client_channel/lb_metadata.cc new file mode 100644 index 00000000000000..d66552d053ad00 --- /dev/null +++ b/src/core/client_channel/lb_metadata.cc @@ -0,0 +1,99 @@ +// Copyright 2024 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/core/client_channel/lb_metadata.h" + +#include "absl/log/log.h" + +namespace grpc_core { + +// +// LbMetadata +// + +namespace { + +class Encoder { + public: + void Encode(const Slice& key, const Slice& value) { + out_.emplace_back(std::string(key.as_string_view()), + std::string(value.as_string_view())); + } + + template + void Encode(Which, const typename Which::ValueType& value) { + auto value_slice = Which::Encode(value); + out_.emplace_back(std::string(Which::key()), + std::string(value_slice.as_string_view())); + } + + void Encode(GrpcTimeoutMetadata, + const typename GrpcTimeoutMetadata::ValueType&) {} + void Encode(HttpPathMetadata, const Slice&) {} + void Encode(HttpMethodMetadata, + const typename HttpMethodMetadata::ValueType&) {} + + std::vector> Take() { + return std::move(out_); + } + + private: + std::vector> out_; +}; + +} // namespace + +absl::optional LbMetadata::Lookup( + absl::string_view key, std::string* buffer) const { + if (batch_ == nullptr) return absl::nullopt; + return batch_->GetStringValue(key, buffer); +} + +std::vector> +LbMetadata::TestOnlyCopyToVector() const { + if (batch_ == nullptr) return {}; + Encoder encoder; + batch_->Encode(&encoder); + return encoder.Take(); +} + +// +// MetadataMutationHandler +// + +void MetadataMutationHandler::Apply( + LoadBalancingPolicy::MetadataMutations& metadata_mutations, + grpc_metadata_batch* metadata) { + for (auto& p : metadata_mutations.additions_) { + absl::string_view key = p.first; + Slice& value = + grpc_event_engine::experimental::internal::SliceCast(p.second); + // Gross, egregious hack to support legacy grpclb behavior. + // TODO(ctiller): Use a promise context for this once that plumbing is done. + if (key == GrpcLbClientStatsMetadata::key()) { + metadata->Set( + GrpcLbClientStatsMetadata(), + const_cast( + reinterpret_cast(value.data()))); + continue; + } + metadata->Append(key, std::move(value), + [key](absl::string_view error, const Slice& value) { + LOG(ERROR) << error << " key:" << key + << " value:" << value.as_string_view(); + }); + } +} + +} // namespace grpc_core diff --git a/src/core/client_channel/lb_metadata.h b/src/core/client_channel/lb_metadata.h new file mode 100644 index 00000000000000..6c1fe7eeb7b64b --- /dev/null +++ b/src/core/client_channel/lb_metadata.h @@ -0,0 +1,50 @@ +// Copyright 2024 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GRPC_SRC_CORE_CLIENT_CHANNEL_LB_METADATA_H +#define GRPC_SRC_CORE_CLIENT_CHANNEL_LB_METADATA_H + +#include +#include + +#include "absl/strings/string_view.h" +#include "absl/types/optional.h" + +#include "src/core/lib/transport/metadata_batch.h" +#include "src/core/load_balancing/lb_policy.h" + +namespace grpc_core { + +class LbMetadata : public LoadBalancingPolicy::MetadataInterface { + public: + explicit LbMetadata(grpc_metadata_batch* batch) : batch_(batch) {} + + absl::optional Lookup(absl::string_view key, + std::string* buffer) const override; + + std::vector> TestOnlyCopyToVector() const; + + private: + grpc_metadata_batch* batch_; +}; + +class MetadataMutationHandler { + public: + static void Apply(LoadBalancingPolicy::MetadataMutations& metadata_mutations, + grpc_metadata_batch* metadata); +}; + +} // namespace grpc_core + +#endif // GRPC_SRC_CORE_CLIENT_CHANNEL_LB_METADATA_H diff --git a/src/core/client_channel/load_balanced_call_destination.cc b/src/core/client_channel/load_balanced_call_destination.cc index 352113fe0f28ad..c13369a09fe7d6 100644 --- a/src/core/client_channel/load_balanced_call_destination.cc +++ b/src/core/client_channel/load_balanced_call_destination.cc @@ -18,6 +18,7 @@ #include "src/core/client_channel/client_channel.h" #include "src/core/client_channel/client_channel_internal.h" +#include "src/core/client_channel/lb_metadata.h" #include "src/core/client_channel/subchannel.h" #include "src/core/lib/channel/status_util.h" #include "src/core/lib/config/core_configuration.h" @@ -28,74 +29,6 @@ namespace grpc_core { namespace { -class LbMetadata : public LoadBalancingPolicy::MetadataInterface { - public: - explicit LbMetadata(grpc_metadata_batch* batch) : batch_(batch) {} - - void Add(absl::string_view key, absl::string_view value) override { - if (batch_ == nullptr) return; - // Gross, egregious hack to support legacy grpclb behavior. - // TODO(ctiller): Use a promise context for this once that plumbing is done. - if (key == GrpcLbClientStatsMetadata::key()) { - batch_->Set( - GrpcLbClientStatsMetadata(), - const_cast( - reinterpret_cast(value.data()))); - return; - } - batch_->Append(key, Slice::FromStaticString(value), - [key](absl::string_view error, const Slice& value) { - LOG(ERROR) << error << " key:" << key - << " value:" << value.as_string_view(); - }); - } - - std::vector> TestOnlyCopyToVector() - override { - if (batch_ == nullptr) return {}; - Encoder encoder; - batch_->Encode(&encoder); - return encoder.Take(); - } - - absl::optional Lookup(absl::string_view key, - std::string* buffer) const override { - if (batch_ == nullptr) return absl::nullopt; - return batch_->GetStringValue(key, buffer); - } - - private: - class Encoder { - public: - void Encode(const Slice& key, const Slice& value) { - out_.emplace_back(std::string(key.as_string_view()), - std::string(value.as_string_view())); - } - - template - void Encode(Which, const typename Which::ValueType& value) { - auto value_slice = Which::Encode(value); - out_.emplace_back(std::string(Which::key()), - std::string(value_slice.as_string_view())); - } - - void Encode(GrpcTimeoutMetadata, - const typename GrpcTimeoutMetadata::ValueType&) {} - void Encode(HttpPathMetadata, const Slice&) {} - void Encode(HttpMethodMetadata, - const typename HttpMethodMetadata::ValueType&) {} - - std::vector> Take() { - return std::move(out_); - } - - private: - std::vector> out_; - }; - - grpc_metadata_batch* batch_; -}; - void MaybeCreateCallAttemptTracer(bool is_transparent_retry) { auto* call_tracer = MaybeGetContext(); if (call_tracer == nullptr) return; @@ -208,6 +141,9 @@ LoopCtl>> PickSubchannel( complete_pick->subchannel_call_tracker->Start(); SetContext(complete_pick->subchannel_call_tracker.release()); } + // Apply metadata mutations, if any. + MetadataMutationHandler::Apply(complete_pick->metadata_mutations, + &client_initial_metadata); // Return the connected subchannel. return call_destination; }, diff --git a/src/core/load_balancing/grpclb/grpclb.cc b/src/core/load_balancing/grpclb/grpclb.cc index 2258e7ebaaf509..6d928c0dfa28f8 100644 --- a/src/core/load_balancing/grpclb/grpclb.cc +++ b/src/core/load_balancing/grpclb/grpclb.cc @@ -311,14 +311,17 @@ class GrpcLb final : public LoadBalancingPolicy { class SubchannelWrapper final : public DelegatingSubchannel { public: SubchannelWrapper(RefCountedPtr subchannel, - RefCountedPtr lb_policy, std::string lb_token, + RefCountedPtr lb_policy, + grpc_event_engine::experimental::Slice lb_token, RefCountedPtr client_stats) : DelegatingSubchannel(std::move(subchannel)), lb_policy_(std::move(lb_policy)), lb_token_(std::move(lb_token)), client_stats_(std::move(client_stats)) {} - const std::string& lb_token() const { return lb_token_; } + const grpc_event_engine::experimental::Slice& lb_token() const { + return lb_token_; + } GrpcLbClientStats* client_stats() const { return client_stats_.get(); } private: @@ -340,14 +343,14 @@ class GrpcLb final : public LoadBalancingPolicy { } RefCountedPtr lb_policy_; - std::string lb_token_; + grpc_event_engine::experimental::Slice lb_token_; RefCountedPtr client_stats_; }; class TokenAndClientStatsArg final : public RefCounted { public: - TokenAndClientStatsArg(std::string lb_token, + TokenAndClientStatsArg(grpc_event_engine::experimental::Slice lb_token, RefCountedPtr client_stats) : lb_token_(std::move(lb_token)), client_stats_(std::move(client_stats)) {} @@ -358,18 +361,21 @@ class GrpcLb final : public LoadBalancingPolicy { static int ChannelArgsCompare(const TokenAndClientStatsArg* a, const TokenAndClientStatsArg* b) { - int r = a->lb_token_.compare(b->lb_token_); + int r = + a->lb_token_.as_string_view().compare(b->lb_token_.as_string_view()); if (r != 0) return r; return QsortCompare(a->client_stats_.get(), b->client_stats_.get()); } - const std::string& lb_token() const { return lb_token_; } + const grpc_event_engine::experimental::Slice& lb_token() const { + return lb_token_; + } RefCountedPtr client_stats() const { return client_stats_; } private: - std::string lb_token_; + grpc_event_engine::experimental::Slice lb_token_; RefCountedPtr client_stats_; }; @@ -665,7 +671,8 @@ class GrpcLb::Serverlist::AddressIterator final // LB token processing. const size_t lb_token_length = strnlen( server.load_balance_token, GPR_ARRAY_SIZE(server.load_balance_token)); - std::string lb_token(server.load_balance_token, lb_token_length); + auto lb_token = grpc_event_engine::experimental::Slice::FromCopiedBuffer( + server.load_balance_token, lb_token_length); if (lb_token.empty()) { auto addr_uri = grpc_sockaddr_to_uri(&addr); LOG(INFO) << "Missing LB token for backend address '" @@ -773,9 +780,10 @@ GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) { // a string and rely on the client_load_reporting filter to know // how to interpret it. // NOLINTBEGIN(bugprone-string-constructor) - args.initial_metadata->Add( + complete_pick->metadata_mutations.Add( GrpcLbClientStatsMetadata::key(), - absl::string_view(reinterpret_cast(client_stats), 0)); + grpc_event_engine::experimental::Slice(grpc_slice_from_static_buffer( + reinterpret_cast(client_stats), 0))); // NOLINTEND(bugprone-string-constructor) // Update calls-started. client_stats->AddCallStarted(); @@ -785,10 +793,8 @@ GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) { // may get refreshed between when we return this pick and when the // initial metadata goes out on the wire. if (!subchannel_wrapper->lb_token().empty()) { - char* lb_token = static_cast( - args.call_state->Alloc(subchannel_wrapper->lb_token().size() + 1)); - strcpy(lb_token, subchannel_wrapper->lb_token().c_str()); - args.initial_metadata->Add(LbTokenMetadata::key(), lb_token); + complete_pick->metadata_mutations.Add( + LbTokenMetadata::key(), subchannel_wrapper->lb_token().Ref()); } // Unwrap subchannel to pass up to the channel. complete_pick->subchannel = subchannel_wrapper->wrapped_subchannel(); @@ -811,13 +817,11 @@ RefCountedPtr GrpcLb::Helper::CreateSubchannel( absl::StrFormat("[grpclb %p] no TokenAndClientStatsArg for address %s", parent(), addr_str.value_or("N/A").c_str())); } - std::string lb_token = arg->lb_token(); - RefCountedPtr client_stats = arg->client_stats(); return MakeRefCounted( parent()->channel_control_helper()->CreateSubchannel( address, per_address_args, args), parent()->RefAsSubclass(DEBUG_LOCATION, "SubchannelWrapper"), - std::move(lb_token), std::move(client_stats)); + arg->lb_token().Ref(), arg->client_stats()); } void GrpcLb::Helper::UpdateState(grpc_connectivity_state state, @@ -1542,7 +1546,8 @@ class GrpcLb::NullLbTokenEndpointIterator final private: std::shared_ptr parent_it_; RefCountedPtr empty_token_ = - MakeRefCounted("", nullptr); + MakeRefCounted( + grpc_event_engine::experimental::Slice(), nullptr); }; absl::Status GrpcLb::UpdateLocked(UpdateArgs args) { diff --git a/src/core/load_balancing/lb_policy.h b/src/core/load_balancing/lb_policy.h index cfdab1ba1a6249..600ef9f4498d2d 100644 --- a/src/core/load_balancing/lb_policy.h +++ b/src/core/load_balancing/lb_policy.h @@ -23,9 +23,9 @@ #include #include #include -#include #include "absl/base/thread_annotations.h" +#include "absl/container/inlined_vector.h" #include "absl/status/status.h" #include "absl/status/statusor.h" #include "absl/strings/string_view.h" @@ -33,6 +33,7 @@ #include "absl/types/variant.h" #include +#include #include #include #include @@ -116,29 +117,35 @@ class LoadBalancingPolicy : public InternallyRefCounted { public: virtual ~MetadataInterface() = default; - ////////////////////////////////////////////////////////////////////////// - // TODO(ctiller): DO NOT MAKE THIS A PUBLIC API YET - // This needs some API design to ensure we can add/remove/replace metadata - // keys... we're deliberately not doing so to save some time whilst - // cleaning up the internal metadata representation, but we should add - // something back before making this a public API. - ////////////////////////////////////////////////////////////////////////// - - /// Adds a key/value pair. - /// Does NOT take ownership of \a key or \a value. - /// Implementations must ensure that the key and value remain alive - /// until the call ends. If desired, they may be allocated via - /// CallState::Alloc(). - virtual void Add(absl::string_view key, absl::string_view value) = 0; - - /// Produce a vector of metadata key/value strings for tests. - virtual std::vector> - TestOnlyCopyToVector() = 0; - virtual absl::optional Lookup( absl::string_view key, std::string* buffer) const = 0; }; + /// A list of metadata mutations to be returned along with a PickResult. + class MetadataMutations { + public: + /// Adds a key/value pair. If the key is already present, the new + /// value will be appended with a comma delimiter. + void Add(absl::string_view key, absl::string_view value) { + Add(key, grpc_event_engine::experimental::Slice::FromCopiedString(value)); + } + void Add(absl::string_view key, + grpc_event_engine::experimental::Slice value) { + additions_.push_back({key, std::move(value)}); + } + + private: + friend class MetadataMutationHandler; + + // Avoid allocation if up to 3 additions per LB pick. Most expected + // use cases should be no more than 2, so this gives us a bit of slack. + // But it should be cheap to increase this value if we start seeing use + // cases with more than 3 additions. + absl::InlinedVector< + std::pair, 3> + additions_; + }; + /// Arguments used when picking a subchannel for a call. struct PickArgs { /// The path of the call. Indicates the RPC service and method name. @@ -210,11 +217,16 @@ class LoadBalancingPolicy : public InternallyRefCounted { /// be used. std::unique_ptr subchannel_call_tracker; + /// Metadata mutations to be applied to the call. + MetadataMutations metadata_mutations; + explicit Complete( RefCountedPtr sc, - std::unique_ptr tracker = nullptr) + std::unique_ptr tracker = nullptr, + MetadataMutations md = MetadataMutations()) : subchannel(std::move(sc)), - subchannel_call_tracker(std::move(tracker)) {} + subchannel_call_tracker(std::move(tracker)), + metadata_mutations(std::move(md)) {} }; /// Pick cannot be completed until something changes on the control diff --git a/src/core/load_balancing/rls/rls.cc b/src/core/load_balancing/rls/rls.cc index 7d518d2c869657..0af7cad20a0ff9 100644 --- a/src/core/load_balancing/rls/rls.cc +++ b/src/core/load_balancing/rls/rls.cc @@ -108,6 +108,7 @@ #include "src/core/util/json/json_args.h" #include "src/core/util/json/json_object_loader.h" #include "src/core/util/json/json_writer.h" +#include "src/core/util/upb_utils.h" #include "src/proto/grpc/lookup/v1/rls.upb.h" using ::grpc_event_engine::experimental::EventEngine; @@ -315,12 +316,12 @@ class RlsLb final : public LoadBalancingPolicy { struct ResponseInfo { absl::Status status; std::vector targets; - std::string header_data; + grpc_event_engine::experimental::Slice header_data; std::string ToString() const { return absl::StrFormat("{status=%s, targets=[%s], header_data=\"%s\"}", status.ToString(), absl::StrJoin(targets, ","), - header_data); + header_data.as_string_view()); } }; @@ -464,7 +465,7 @@ class RlsLb final : public LoadBalancingPolicy { ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) { return data_expiration_time_; } - const std::string& header_data() const + const grpc_event_engine::experimental::Slice& header_data() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) { return header_data_; } @@ -543,7 +544,8 @@ class RlsLb final : public LoadBalancingPolicy { // RLS response states std::vector> child_policy_wrappers_ ABSL_GUARDED_BY(&RlsLb::mu_); - std::string header_data_ ABSL_GUARDED_BY(&RlsLb::mu_); + grpc_event_engine::experimental::Slice header_data_ + ABSL_GUARDED_BY(&RlsLb::mu_); Timestamp data_expiration_time_ ABSL_GUARDED_BY(&RlsLb::mu_) = Timestamp::InfPast(); Timestamp stale_time_ ABSL_GUARDED_BY(&RlsLb::mu_) = Timestamp::InfPast(); @@ -702,7 +704,7 @@ class RlsLb final : public LoadBalancingPolicy { RefCountedPtr rls_channel, std::unique_ptr backoff_state, grpc_lookup_v1_RouteLookupRequest_Reason reason, - std::string stale_header_data); + grpc_event_engine::experimental::Slice stale_header_data); ~RlsRequest() override; // Shuts down the request. If the request is still in flight, it is @@ -730,7 +732,7 @@ class RlsLb final : public LoadBalancingPolicy { RefCountedPtr rls_channel_; std::unique_ptr backoff_state_; grpc_lookup_v1_RouteLookupRequest_Reason reason_; - std::string stale_header_data_; + grpc_event_engine::experimental::Slice stale_header_data_; // RLS call state. Timestamp deadline_; @@ -1259,19 +1261,17 @@ LoadBalancingPolicy::PickResult RlsLb::Cache::Entry::Pick(PickArgs args) { child_policy_wrappers_.size(), ConnectivityStateName(child_policy_wrapper->connectivity_state())); } - // Add header data. - // Note that even if the target we're using is in TRANSIENT_FAILURE, - // the pick might still succeed (e.g., if the child is ring_hash), so - // we need to pass the right header info down in all cases. - if (!header_data_.empty()) { - char* copied_header_data = - static_cast(args.call_state->Alloc(header_data_.length() + 1)); - strcpy(copied_header_data, header_data_.c_str()); - args.initial_metadata->Add(kRlsHeaderKey, copied_header_data); - } auto pick_result = child_policy_wrapper->Pick(args); lb_policy_->MaybeExportPickCount(kMetricTargetPicks, child_policy_wrapper->target(), pick_result); + // Add header data. + if (!header_data_.empty()) { + auto* complete_pick = + absl::get_if(&pick_result.result); + if (complete_pick != nullptr) { + complete_pick->metadata_mutations.Add(kRlsHeaderKey, header_data_.Ref()); + } + } return pick_result; } @@ -1682,11 +1682,11 @@ void RlsLb::RlsChannel::StartRlsCall(const RequestKey& key, std::unique_ptr backoff_state; grpc_lookup_v1_RouteLookupRequest_Reason reason = grpc_lookup_v1_RouteLookupRequest_REASON_MISS; - std::string stale_header_data; + grpc_event_engine::experimental::Slice stale_header_data; if (stale_entry != nullptr) { backoff_state = stale_entry->TakeBackoffState(); reason = grpc_lookup_v1_RouteLookupRequest_REASON_STALE; - stale_header_data = stale_entry->header_data(); + stale_header_data = stale_entry->header_data().Ref(); } lb_policy_->request_map_.emplace( key, MakeOrphanable( @@ -1708,11 +1708,12 @@ void RlsLb::RlsChannel::ResetBackoff() { // RlsLb::RlsRequest // -RlsLb::RlsRequest::RlsRequest(RefCountedPtr lb_policy, RequestKey key, - RefCountedPtr rls_channel, - std::unique_ptr backoff_state, - grpc_lookup_v1_RouteLookupRequest_Reason reason, - std::string stale_header_data) +RlsLb::RlsRequest::RlsRequest( + RefCountedPtr lb_policy, RequestKey key, + RefCountedPtr rls_channel, + std::unique_ptr backoff_state, + grpc_lookup_v1_RouteLookupRequest_Reason reason, + grpc_event_engine::experimental::Slice stale_header_data) : InternallyRefCounted( GRPC_TRACE_FLAG_ENABLED(rls_lb) ? "RlsRequest" : nullptr), lb_policy_(std::move(lb_policy)), @@ -1890,8 +1891,7 @@ grpc_byte_buffer* RlsLb::RlsRequest::MakeRequestProto() { grpc_lookup_v1_RouteLookupRequest_set_reason(req, reason_); if (!stale_header_data_.empty()) { grpc_lookup_v1_RouteLookupRequest_set_stale_header_data( - req, upb_StringView_FromDataAndSize(stale_header_data_.data(), - stale_header_data_.size())); + req, StdStringToUpbString(stale_header_data_.as_string_view())); } size_t len; char* buf = @@ -1934,7 +1934,8 @@ RlsLb::ResponseInfo RlsLb::RlsRequest::ParseResponseProto() { upb_StringView header_data_strview = grpc_lookup_v1_RouteLookupResponse_header_data(response); response_info.header_data = - std::string(header_data_strview.data, header_data_strview.size); + grpc_event_engine::experimental::Slice::FromCopiedBuffer( + header_data_strview.data, header_data_strview.size); return response_info; } diff --git a/src/core/xds/grpc/upb_utils.h b/src/core/util/upb_utils.h similarity index 87% rename from src/core/xds/grpc/upb_utils.h rename to src/core/util/upb_utils.h index e2123f39cbc42f..a5b1ac897fc9c3 100644 --- a/src/core/xds/grpc/upb_utils.h +++ b/src/core/util/upb_utils.h @@ -14,16 +14,14 @@ // limitations under the License. // -#ifndef GRPC_SRC_CORE_XDS_GRPC_UPB_UTILS_H -#define GRPC_SRC_CORE_XDS_GRPC_UPB_UTILS_H +#ifndef GRPC_SRC_CORE_UTIL_UPB_UTILS_H +#define GRPC_SRC_CORE_UTIL_UPB_UTILS_H #include #include "absl/strings/string_view.h" #include "upb/base/string_view.h" -#include - namespace grpc_core { // Works for both std::string and absl::string_view. @@ -42,4 +40,4 @@ inline std::string UpbStringToStdString(const upb_StringView& str) { } // namespace grpc_core -#endif // GRPC_SRC_CORE_XDS_GRPC_UPB_UTILS_H +#endif // GRPC_SRC_CORE_UTIL_UPB_UTILS_H diff --git a/src/core/xds/grpc/xds_client_grpc.cc b/src/core/xds/grpc/xds_client_grpc.cc index 58771564a293b6..a1899f47673f42 100644 --- a/src/core/xds/grpc/xds_client_grpc.cc +++ b/src/core/xds/grpc/xds_client_grpc.cc @@ -55,7 +55,7 @@ #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/transport/error_utils.h" #include "src/core/telemetry/metrics.h" -#include "src/core/xds/grpc/upb_utils.h" +#include "src/core/util/upb_utils.h" #include "src/core/xds/grpc/xds_bootstrap_grpc.h" #include "src/core/xds/grpc/xds_transport_grpc.h" #include "src/core/xds/xds_client/xds_api.h" diff --git a/src/core/xds/grpc/xds_cluster.cc b/src/core/xds/grpc/xds_cluster.cc index 2d2f4e6c7282b1..7b232fa0ef8ab4 100644 --- a/src/core/xds/grpc/xds_cluster.cc +++ b/src/core/xds/grpc/xds_cluster.cc @@ -66,7 +66,7 @@ #include "src/core/lib/matchers/matchers.h" #include "src/core/load_balancing/lb_policy_registry.h" #include "src/core/util/json/json_writer.h" -#include "src/core/xds/grpc/upb_utils.h" +#include "src/core/util/upb_utils.h" #include "src/core/xds/grpc/xds_common_types.h" #include "src/core/xds/grpc/xds_lb_policy_registry.h" #include "src/core/xds/xds_client/xds_client.h" diff --git a/src/core/xds/grpc/xds_common_types.cc b/src/core/xds/grpc/xds_common_types.cc index ff0a630a992731..2e8fafda5883a5 100644 --- a/src/core/xds/grpc/xds_common_types.cc +++ b/src/core/xds/grpc/xds_common_types.cc @@ -45,7 +45,7 @@ #include #include "src/core/util/json/json_reader.h" -#include "src/core/xds/grpc/upb_utils.h" +#include "src/core/util/upb_utils.h" #include "src/core/xds/grpc/xds_bootstrap_grpc.h" #include "src/core/xds/xds_client/xds_client.h" diff --git a/src/core/xds/grpc/xds_endpoint.cc b/src/core/xds/grpc/xds_endpoint.cc index 46fb1f81887781..dc761931d66e6d 100644 --- a/src/core/xds/grpc/xds_endpoint.cc +++ b/src/core/xds/grpc/xds_endpoint.cc @@ -51,7 +51,7 @@ #include "src/core/lib/gprpp/validation_errors.h" #include "src/core/lib/iomgr/resolved_address.h" #include "src/core/util/string.h" -#include "src/core/xds/grpc/upb_utils.h" +#include "src/core/util/upb_utils.h" #include "src/core/xds/grpc/xds_health_status.h" #include "src/core/xds/xds_client/xds_resource_type.h" diff --git a/src/core/xds/grpc/xds_http_rbac_filter.cc b/src/core/xds/grpc/xds_http_rbac_filter.cc index 6ba09042c88aa3..3fadf5d2ebf90e 100644 --- a/src/core/xds/grpc/xds_http_rbac_filter.cc +++ b/src/core/xds/grpc/xds_http_rbac_filter.cc @@ -50,7 +50,7 @@ #include "src/core/util/json/json.h" #include "src/core/util/json/json_writer.h" #include "src/core/util/string.h" -#include "src/core/xds/grpc/upb_utils.h" +#include "src/core/util/upb_utils.h" #include "src/core/xds/grpc/xds_audit_logger_registry.h" #include "src/core/xds/grpc/xds_bootstrap_grpc.h" #include "src/core/xds/xds_client/xds_client.h" diff --git a/src/core/xds/grpc/xds_http_stateful_session_filter.cc b/src/core/xds/grpc/xds_http_stateful_session_filter.cc index 7b0b05219cd1fd..0c4203dd14067a 100644 --- a/src/core/xds/grpc/xds_http_stateful_session_filter.cc +++ b/src/core/xds/grpc/xds_http_stateful_session_filter.cc @@ -39,7 +39,7 @@ #include "src/core/lib/gprpp/validation_errors.h" #include "src/core/util/json/json.h" #include "src/core/util/json/json_writer.h" -#include "src/core/xds/grpc/upb_utils.h" +#include "src/core/util/upb_utils.h" #include "src/core/xds/grpc/xds_common_types.h" #include "src/core/xds/grpc/xds_http_filters.h" diff --git a/src/core/xds/grpc/xds_listener.cc b/src/core/xds/grpc/xds_listener.cc index 7ea5822bf633ac..02379bfe5a7fb8 100644 --- a/src/core/xds/grpc/xds_listener.cc +++ b/src/core/xds/grpc/xds_listener.cc @@ -55,7 +55,7 @@ #include "src/core/lib/gprpp/validation_errors.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/matchers/matchers.h" -#include "src/core/xds/grpc/upb_utils.h" +#include "src/core/util/upb_utils.h" #include "src/core/xds/grpc/xds_common_types.h" #include "src/core/xds/xds_client/xds_resource_type.h" diff --git a/src/core/xds/grpc/xds_route_config.cc b/src/core/xds/grpc/xds_route_config.cc index d2933d8413a46d..c2f8a406207633 100644 --- a/src/core/xds/grpc/xds_route_config.cc +++ b/src/core/xds/grpc/xds_route_config.cc @@ -70,7 +70,7 @@ #include "src/core/util/json/json.h" #include "src/core/util/json/json_writer.h" #include "src/core/util/string.h" -#include "src/core/xds/grpc/upb_utils.h" +#include "src/core/util/upb_utils.h" #include "src/core/xds/grpc/xds_cluster_specifier_plugin.h" #include "src/core/xds/grpc/xds_common_types.h" #include "src/core/xds/grpc/xds_http_filters.h" diff --git a/src/core/xds/xds_client/xds_api.cc b/src/core/xds/xds_client/xds_api.cc index 6964d9b993135d..8ae5e438abf7d5 100644 --- a/src/core/xds/xds_client/xds_api.cc +++ b/src/core/xds/xds_client/xds_api.cc @@ -48,7 +48,7 @@ #include #include "src/core/util/json/json.h" -#include "src/core/xds/grpc/upb_utils.h" +#include "src/core/util/upb_utils.h" #include "src/core/xds/xds_client/xds_client.h" // IWYU pragma: no_include "upb/msg_internal.h" diff --git a/src/core/xds/xds_client/xds_client.cc b/src/core/xds/xds_client/xds_client.cc index 1f3c9b1926f4ac..8a9456cdad2f4a 100644 --- a/src/core/xds/xds_client/xds_client.cc +++ b/src/core/xds/xds_client/xds_client.cc @@ -53,7 +53,7 @@ #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/uri/uri_parser.h" -#include "src/core/xds/grpc/upb_utils.h" +#include "src/core/util/upb_utils.h" #include "src/core/xds/xds_client/xds_api.h" #include "src/core/xds/xds_client/xds_bootstrap.h" #include "src/core/xds/xds_client/xds_client_stats.h" diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 4d4b22f2225dba..70c1bbfa63ddcb 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -27,6 +27,7 @@ 'src/core/client_channel/direct_channel.cc', 'src/core/client_channel/dynamic_filters.cc', 'src/core/client_channel/global_subchannel_pool.cc', + 'src/core/client_channel/lb_metadata.cc', 'src/core/client_channel/load_balanced_call_destination.cc', 'src/core/client_channel/local_subchannel_pool.cc', 'src/core/client_channel/retry_filter.cc', diff --git a/test/core/load_balancing/lb_policy_test_lib.h b/test/core/load_balancing/lb_policy_test_lib.h index aa7ea8c9ae8b17..fcd85c8928eeec 100644 --- a/test/core/load_balancing/lb_policy_test_lib.h +++ b/test/core/load_balancing/lb_policy_test_lib.h @@ -620,20 +620,7 @@ class LoadBalancingPolicyTest : public ::testing::Test { explicit FakeMetadata(std::map metadata) : metadata_(std::move(metadata)) {} - const std::map& metadata() const { - return metadata_; - } - private: - void Add(absl::string_view key, absl::string_view value) override { - metadata_[std::string(key)] = std::string(value); - } - - std::vector> TestOnlyCopyToVector() - override { - return {}; // Not used. - } - absl::optional Lookup( absl::string_view key, std::string* /*buffer*/) const override { auto it = metadata_.find(std::string(key)); diff --git a/test/core/test_util/BUILD b/test/core/test_util/BUILD index d2e935f75f4caf..e0962df86f3703 100644 --- a/test/core/test_util/BUILD +++ b/test/core/test_util/BUILD @@ -333,10 +333,12 @@ grpc_cc_library( "//:uri_parser", "//src/core:channel_args", "//src/core:delegating_helper", + "//src/core:down_cast", "//src/core:error", "//src/core:grpc_backend_metric_data", "//src/core:json", "//src/core:json_util", + "//src/core:lb_metadata", "//src/core:lb_policy", "//src/core:lb_policy_factory", "//src/core:lb_policy_registry", diff --git a/test/core/test_util/test_lb_policies.cc b/test/core/test_util/test_lb_policies.cc index aea9e9194140a4..2d674333b12b84 100644 --- a/test/core/test_util/test_lb_policies.cc +++ b/test/core/test_util/test_lb_policies.cc @@ -30,9 +30,11 @@ #include #include +#include "src/core/client_channel/lb_metadata.h" #include "src/core/lib/address_utils/parse_address.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/config/core_configuration.h" +#include "src/core/lib/gprpp/down_cast.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/status_helper.h" @@ -129,7 +131,8 @@ class TestPickArgsLb : public ForwardingLoadBalancingPolicy { // Report args seen. PickArgsSeen args_seen; args_seen.path = std::string(args.path); - args_seen.metadata = args.initial_metadata->TestOnlyCopyToVector(); + args_seen.metadata = + DownCast(args.initial_metadata)->TestOnlyCopyToVector(); cb_(args_seen); // Do pick. return delegate_picker_->Pick(args); @@ -269,7 +272,8 @@ class InterceptRecvTrailingMetadataLoadBalancingPolicy args_seen.status = args.status; args_seen.backend_metric_data = args.backend_metric_accessor->GetBackendMetricData(); - args_seen.metadata = args.trailing_metadata->TestOnlyCopyToVector(); + args_seen.metadata = + DownCast(args.trailing_metadata)->TestOnlyCopyToVector(); cb_(args_seen); } diff --git a/test/core/transport/metadata_map_test.cc b/test/core/transport/metadata_map_test.cc index 12e42c7167a352..cc977cd3c4e9a7 100644 --- a/test/core/transport/metadata_map_test.cc +++ b/test/core/transport/metadata_map_test.cc @@ -81,7 +81,7 @@ TEST(MetadataMapTest, SimpleOps) { // EXPECT_EQ it later. class FakeEncoder { public: - std::string output() { return output_; } + const std::string& output() { return output_; } void Encode(const Slice& key, const Slice& value) { output_ += absl::StrCat("UNKNOWN METADATUM: key=", key.as_string_view(), @@ -128,6 +128,26 @@ TEST(MetadataMapTest, NonEncodableTrait) { EXPECT_EQ(map.DebugString(), "GrpcStreamNetworkState: not sent on wire"); } +TEST(MetadataMapTest, NonTraitKeyWithMultipleValues) { + FakeEncoder encoder; + TimeoutOnlyMetadataMap map; + const absl::string_view kKey = "key"; + map.Append(kKey, Slice::FromStaticString("value1"), + [](absl::string_view error, const Slice& value) { + LOG(ERROR) << error << " value:" << value.as_string_view(); + }); + map.Append(kKey, Slice::FromStaticString("value2"), + [](absl::string_view error, const Slice& value) { + LOG(ERROR) << error << " value:" << value.as_string_view(); + }); + map.Encode(&encoder); + EXPECT_EQ(encoder.output(), + "UNKNOWN METADATUM: key=key value=value1\n" + "UNKNOWN METADATUM: key=key value=value2\n"); + std::string buffer; + EXPECT_EQ(map.GetStringValue(kKey, &buffer), "value1,value2"); +} + TEST(DebugStringBuilderTest, OneAddAfterRedaction) { metadata_detail::DebugStringBuilder b; b.AddAfterRedaction(ContentTypeMetadata::key(), "AddValue01"); diff --git a/test/core/xds/xds_common_types_test.cc b/test/core/xds/xds_common_types_test.cc index b934a977cb6221..6202407862f5ab 100644 --- a/test/core/xds/xds_common_types_test.cc +++ b/test/core/xds/xds_common_types_test.cc @@ -45,7 +45,7 @@ #include "src/core/lib/gprpp/validation_errors.h" #include "src/core/lib/matchers/matchers.h" #include "src/core/util/json/json_writer.h" -#include "src/core/xds/grpc/upb_utils.h" +#include "src/core/util/upb_utils.h" #include "src/core/xds/grpc/xds_bootstrap_grpc.h" #include "src/core/xds/xds_client/xds_bootstrap.h" #include "src/core/xds/xds_client/xds_client.h" diff --git a/test/cpp/interop/rpc_behavior_lb_policy.cc b/test/cpp/interop/rpc_behavior_lb_policy.cc index b065af99ad9699..1e6ab254fd2a77 100644 --- a/test/cpp/interop/rpc_behavior_lb_policy.cc +++ b/test/cpp/interop/rpc_behavior_lb_policy.cc @@ -111,12 +111,17 @@ class RpcBehaviorLbPolicy : public LoadBalancingPolicy { rpc_behavior_(rpc_behavior) {} PickResult Pick(PickArgs args) override { - char* rpc_behavior_copy = static_cast( - args.call_state->Alloc(rpc_behavior_.length() + 1)); - strcpy(rpc_behavior_copy, rpc_behavior_.c_str()); - args.initial_metadata->Add(kRpcBehaviorMetadataKey, rpc_behavior_copy); // Do pick. - return delegate_picker_->Pick(args); + auto pick_result = delegate_picker_->Pick(args); + // Add metadata. + auto* complete_pick = + absl::get_if(&pick_result.result); + if (complete_pick != nullptr) { + complete_pick->metadata_mutations.Add(kRpcBehaviorMetadataKey, + rpc_behavior_); + } + // Return result. + return pick_result; } private: diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index cf6eb657b4f445..7065c3262d76b0 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1115,6 +1115,8 @@ src/core/client_channel/dynamic_filters.cc \ src/core/client_channel/dynamic_filters.h \ src/core/client_channel/global_subchannel_pool.cc \ src/core/client_channel/global_subchannel_pool.h \ +src/core/client_channel/lb_metadata.cc \ +src/core/client_channel/lb_metadata.h \ src/core/client_channel/load_balanced_call_destination.cc \ src/core/client_channel/load_balanced_call_destination.h \ src/core/client_channel/local_subchannel_pool.cc \ @@ -2951,6 +2953,7 @@ src/core/util/time.cc \ src/core/util/time_precise.cc \ src/core/util/time_precise.h \ src/core/util/tmpfile.h \ +src/core/util/upb_utils.h \ src/core/util/useful.h \ src/core/util/windows/cpu.cc \ src/core/util/windows/log.cc \ @@ -2963,7 +2966,6 @@ src/core/xds/grpc/certificate_provider_store.cc \ src/core/xds/grpc/certificate_provider_store.h \ src/core/xds/grpc/file_watcher_certificate_provider_factory.cc \ src/core/xds/grpc/file_watcher_certificate_provider_factory.h \ -src/core/xds/grpc/upb_utils.h \ src/core/xds/grpc/xds_audit_logger_registry.cc \ src/core/xds/grpc/xds_audit_logger_registry.h \ src/core/xds/grpc/xds_bootstrap_grpc.cc \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 6b3601d8359357..de0d4e57c86a59 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -915,6 +915,8 @@ src/core/client_channel/dynamic_filters.cc \ src/core/client_channel/dynamic_filters.h \ src/core/client_channel/global_subchannel_pool.cc \ src/core/client_channel/global_subchannel_pool.h \ +src/core/client_channel/lb_metadata.cc \ +src/core/client_channel/lb_metadata.h \ src/core/client_channel/load_balanced_call_destination.cc \ src/core/client_channel/load_balanced_call_destination.h \ src/core/client_channel/local_subchannel_pool.cc \ @@ -2730,6 +2732,7 @@ src/core/util/time.cc \ src/core/util/time_precise.cc \ src/core/util/time_precise.h \ src/core/util/tmpfile.h \ +src/core/util/upb_utils.h \ src/core/util/useful.h \ src/core/util/windows/cpu.cc \ src/core/util/windows/log.cc \ @@ -2742,7 +2745,6 @@ src/core/xds/grpc/certificate_provider_store.cc \ src/core/xds/grpc/certificate_provider_store.h \ src/core/xds/grpc/file_watcher_certificate_provider_factory.cc \ src/core/xds/grpc/file_watcher_certificate_provider_factory.h \ -src/core/xds/grpc/upb_utils.h \ src/core/xds/grpc/xds_audit_logger_registry.cc \ src/core/xds/grpc/xds_audit_logger_registry.h \ src/core/xds/grpc/xds_bootstrap_grpc.cc \ diff --git a/tools/run_tests/sanity/core_banned_functions.py b/tools/run_tests/sanity/core_banned_functions.py index 7a4270ecf0df4e..24b38f3142169c 100755 --- a/tools/run_tests/sanity/core_banned_functions.py +++ b/tools/run_tests/sanity/core_banned_functions.py @@ -24,7 +24,10 @@ # map of banned function signature to allowlist BANNED_EXCEPT = { - "grpc_slice_from_static_buffer(": ["src/core/lib/slice/slice.cc"], + "grpc_slice_from_static_buffer(": [ + "src/core/lib/slice/slice.cc", + "src/core/load_balancing/grpclb/grpclb.cc", + ], "grpc_resource_quota_ref(": ["src/core/lib/resource_quota/api.cc"], "grpc_resource_quota_unref(": [ "src/core/lib/resource_quota/api.cc",