Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

golang filter: change register configFactory to filterFactory #32183

Merged
merged 2 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ minor_behavior_changes:
Upstream now excludes hosts set to ``DRAINING`` state via EDS from load balancing and panic routing
threshold calculation. This feature can be disabled by setting
``envoy.reloadable_features.exclude_host_in_eds_status_draining`` to false.
- area: golang
change: |
Change ``RegisterHttpFilterConfigFactoryAndParser`` to ``RegisterHttpFilterFactoryAndConfigParser``.

bug_fixes:
# *Changes expected to improve the state of the world and are unlikely to have negative effects*
Expand Down
9 changes: 7 additions & 2 deletions contrib/golang/common/go/api/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,17 @@ func (*PassThroughStreamFilter) OnDestroy(DestroyReason) {
}

type StreamFilterConfigParser interface {
// Parse the proto message to any Go value, and return error to reject the config.
// This is called when Envoy receives the config from the control plane.
// Also, you can define Metrics through the callbacks, and the callbacks will be nil when parsing the route config.
Parse(any *anypb.Any, callbacks ConfigCallbackHandler) (interface{}, error)
// Merge the two configs(filter level config or route level config) into one.
// May merge multi-level configurations, i.e. filter level, virtualhost level, router level and weighted cluster level,
// into a single one recursively, by invoking this method multiple times.
Merge(parentConfig interface{}, childConfig interface{}) interface{}
}

type StreamFilterConfigFactory func(config interface{}) StreamFilterFactory
type StreamFilterFactory func(callbacks FilterCallbackHandler) StreamFilter
type StreamFilterFactory func(config interface{}, callbacks FilterCallbackHandler) StreamFilter

// stream info
// refer https://github.com/envoyproxy/envoy/blob/main/envoy/stream_info/stream_info.h
Expand Down
58 changes: 24 additions & 34 deletions contrib/golang/filters/http/source/go/pkg/http/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,23 +79,20 @@ func envoyGoFilterNewHttpPluginConfig(c *C.httpConfig) uint64 {

name := utils.BytesToString(uint64(c.plugin_name_ptr), uint64(c.plugin_name_len))
configParser := getHttpFilterConfigParser(name)
if configParser != nil {
var parsedConfig interface{}
var err error
if c.is_route_config == 1 {
parsedConfig, err = configParser.Parse(&any, nil)
} else {
http_config := createConfig(c)
parsedConfig, err = configParser.Parse(&any, http_config)
}
if err != nil {
cAPI.HttpLog(api.Error, fmt.Sprintf("failed to parse golang plugin config: %v", err))
return 0
}
configCache.Store(configNum, parsedConfig)

var parsedConfig interface{}
var err error
if c.is_route_config == 1 {
parsedConfig, err = configParser.Parse(&any, nil)
} else {
configCache.Store(configNum, &any)
config := createConfig(c)
parsedConfig, err = configParser.Parse(&any, config)
}
if err != nil {
cAPI.HttpLog(api.Error, fmt.Sprintf("failed to parse golang plugin config: %v", err))
return 0
}
configCache.Store(configNum, parsedConfig)

return configNum
}
Expand All @@ -121,24 +118,17 @@ func envoyGoFilterMergeHttpPluginConfig(namePtr, nameLen, parentId, childId uint
name := utils.BytesToString(namePtr, nameLen)
configParser := getHttpFilterConfigParser(name)

if configParser != nil {
parent, ok := configCache.Load(parentId)
if !ok {
panic(fmt.Sprintf("merge config: get parentId: %d config failed", parentId))
}
child, ok := configCache.Load(childId)
if !ok {
panic(fmt.Sprintf("merge config: get childId: %d config failed", childId))
}

new := configParser.Merge(parent, child)
configNum := atomic.AddUint64(&configNumGenerator, 1)
configCache.Store(configNum, new)
return configNum

} else {
// child override parent by default.
// It's safe to reuse the childId, since the merged config have the same life time with the child config.
return childId
parent, ok := configCache.Load(parentId)
if !ok {
panic(fmt.Sprintf("merge config: get parentId: %d config failed", parentId))
}
child, ok := configCache.Load(childId)
if !ok {
panic(fmt.Sprintf("merge config: get childId: %d config failed", childId))
}

new := configParser.Merge(parent, child)
configNum := atomic.AddUint64(&configNumGenerator, 1)
configCache.Store(configNum, new)
return configNum
}
58 changes: 41 additions & 17 deletions contrib/golang/filters/http/source/go/pkg/http/filtermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,44 +22,68 @@ import (
"sync"

"github.com/envoyproxy/envoy/contrib/golang/common/go/api"
"google.golang.org/protobuf/types/known/anypb"
)

var httpFilterConfigFactoryAndParser = sync.Map{}
var httpFilterFactoryAndParser = sync.Map{}

type filterConfigFactoryAndParser struct {
configFactory api.StreamFilterConfigFactory
type filterFactoryAndParser struct {
filterFactory api.StreamFilterFactory
configParser api.StreamFilterConfigParser
}

// Register config factory and config parser for the specified plugin.
// The "factory" parameter is required, should not be nil,
// and the "parser" parameter is optional, could be nil.
func RegisterHttpFilterConfigFactoryAndParser(name string, factory api.StreamFilterConfigFactory, parser api.StreamFilterConfigParser) {
// nullParser is a no-op implementation of the StreamFilterConfigParser interface.
type nullParser struct{}

// Parse do nothing, returns the input any as is.
doujiang24 marked this conversation as resolved.
Show resolved Hide resolved
func (p *nullParser) Parse(any *anypb.Any, callbacks api.ConfigCallbackHandler) (interface{}, error) {
return any, nil
}

// Merge only use the childConfig, ignore the parentConfig.
doujiang24 marked this conversation as resolved.
Show resolved Hide resolved
func (p *nullParser) Merge(parentConfig interface{}, childConfig interface{}) interface{} {
return childConfig
}

var NullParser api.StreamFilterConfigParser = &nullParser{}

// RegisterHttpFilterFactoryAndConfigParser register http filter factory and config parser for the specified plugin.
doujiang24 marked this conversation as resolved.
Show resolved Hide resolved
// The factory and parser should not be nil.
// Use the NullParser if the plugin does not care about config.
func RegisterHttpFilterFactoryAndConfigParser(name string, factory api.StreamFilterFactory, parser api.StreamFilterConfigParser) {
if factory == nil {
panic("config factory should not be nil")
panic("filter factory should not be nil")
}
if parser == nil {
panic("config parser should not be nil")
}
httpFilterConfigFactoryAndParser.Store(name, &filterConfigFactoryAndParser{factory, parser})
httpFilterFactoryAndParser.Store(name, &filterFactoryAndParser{factory, parser})
}

func getOrCreateHttpFilterFactory(name string, configId uint64) api.StreamFilterFactory {
func getHttpFilterFactoryAndConfig(name string, configId uint64) (api.StreamFilterFactory, interface{}) {
config, ok := configCache.Load(configId)
if !ok {
panic(fmt.Sprintf("config not found, plugin: %s, configId: %d", name, configId))
}

if v, ok := httpFilterConfigFactoryAndParser.Load(name); ok {
return (v.(*filterConfigFactoryAndParser)).configFactory(config)
if v, ok := httpFilterFactoryAndParser.Load(name); ok {
return (v.(*filterFactoryAndParser)).filterFactory, config
}

api.LogErrorf("plugin %s not found, pass through by default", name)

// pass through by default
return PassThroughFactory(config)
// return PassThroughFactory when no factory found
return PassThroughFactory, config
}

func getHttpFilterConfigParser(name string) api.StreamFilterConfigParser {
if v, ok := httpFilterConfigFactoryAndParser.Load(name); ok {
return (v.(*filterConfigFactoryAndParser)).configParser
if v, ok := httpFilterFactoryAndParser.Load(name); ok {
parser := (v.(*filterFactoryAndParser)).configParser
if parser == nil {
panic(fmt.Sprintf("config parser not found, plugin: %s", name))
}
return parser
}
return nil
// return NullParser when no parser found
return NullParser
}
8 changes: 3 additions & 5 deletions contrib/golang/filters/http/source/go/pkg/http/passthrough.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,8 @@ type passThroughFilter struct {
callbacks api.FilterCallbackHandler
}

func PassThroughFactory(interface{}) api.StreamFilterFactory {
return func(callbacks api.FilterCallbackHandler) api.StreamFilter {
return &passThroughFilter{
callbacks: callbacks,
}
func PassThroughFactory(config interface{}, callbacks api.FilterCallbackHandler) api.StreamFilter {
return &passThroughFilter{
callbacks: callbacks,
}
}
6 changes: 3 additions & 3 deletions contrib/golang/filters/http/source/go/pkg/http/shim.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ var (
// memory per worker thread to avoid locks.
//
// Note: Do not use inside of an `init()` function, the value will not be populated yet. Use within the filters
// `StreamFilterConfigFactory` or `StreamFilterConfigParser`
// `StreamFilterFactory` or `StreamFilterConfigParser`
func EnvoyConcurrency() uint32 {
if !initialized {
panic("concurrency has not yet been initialized, do not access within an init()")
Expand Down Expand Up @@ -119,8 +119,8 @@ func createRequest(r *C.httpRequest) *httpRequest {
}

configId := uint64(r.configId)
filterFactory := getOrCreateHttpFilterFactory(req.pluginName(), configId)
f := filterFactory(req)
filterFactory, config := getHttpFilterFactoryAndConfig(req.pluginName(), configId)
f := filterFactory(config, req)
req.httpFilter = f

return req
Expand Down
12 changes: 5 additions & 7 deletions contrib/golang/filters/http/test/test_data/access_log/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
const Name = "access_log"

func init() {
http.RegisterHttpFilterConfigFactoryAndParser(Name, ConfigFactory, &parser{})
http.RegisterHttpFilterFactoryAndConfigParser(Name, filterFactory, &parser{})
}

type config struct {
Expand All @@ -28,16 +28,14 @@ func (p *parser) Merge(parent interface{}, child interface{}) interface{} {
return child
}

func ConfigFactory(c interface{}) api.StreamFilterFactory {
func filterFactory(c interface{}, callbacks api.FilterCallbackHandler) api.StreamFilter {
conf, ok := c.(*config)
if !ok {
panic("unexpected config type")
}
return func(callbacks api.FilterCallbackHandler) api.StreamFilter {
return &filter{
callbacks: callbacks,
config: conf,
}
return &filter{
callbacks: callbacks,
config: conf,
}
}

Expand Down
14 changes: 4 additions & 10 deletions contrib/golang/filters/http/test/test_data/action/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,12 @@ import (
const Name = "action"

func init() {
http.RegisterHttpFilterConfigFactoryAndParser(Name, ConfigFactory, nil)
http.RegisterHttpFilterFactoryAndConfigParser(Name, filterFactory, http.NullParser)
}

type config struct {
decodeHeadersRet api.StatusType
}

func ConfigFactory(c interface{}) api.StreamFilterFactory {
return func(callbacks api.FilterCallbackHandler) api.StreamFilter {
return &filter{
callbacks: callbacks,
}
func filterFactory(c interface{}, callbacks api.FilterCallbackHandler) api.StreamFilter {
return &filter{
callbacks: callbacks,
}
}

Expand Down
10 changes: 4 additions & 6 deletions contrib/golang/filters/http/test/test_data/basic/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,12 @@ func init() {
api.LogCritical("init")
api.LogCritical(api.GetLogLevel().String())

http.RegisterHttpFilterConfigFactoryAndParser(Name, ConfigFactory, nil)
http.RegisterHttpFilterFactoryAndConfigParser(Name, filterFactory, http.NullParser)
}

func ConfigFactory(interface{}) api.StreamFilterFactory {
return func(callbacks api.FilterCallbackHandler) api.StreamFilter {
return &filter{
callbacks: callbacks,
}
func filterFactory(c interface{}, callbacks api.FilterCallbackHandler) api.StreamFilter {
return &filter{
callbacks: callbacks,
}
}

Expand Down
12 changes: 5 additions & 7 deletions contrib/golang/filters/http/test/test_data/buffer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
const Name = "buffer"

func init() {
http.RegisterHttpFilterConfigFactoryAndParser(Name, ConfigFactory, &parser{})
http.RegisterHttpFilterFactoryAndConfigParser(Name, filterFactory, &parser{})
}

type config struct {
Expand All @@ -28,16 +28,14 @@ func (p *parser) Merge(parent interface{}, child interface{}) interface{} {
return child
}

func ConfigFactory(c interface{}) api.StreamFilterFactory {
func filterFactory(c interface{}, callbacks api.FilterCallbackHandler) api.StreamFilter {
conf, ok := c.(*config)
if !ok {
panic("unexpected config type")
}
return func(callbacks api.FilterCallbackHandler) api.StreamFilter {
return &filter{
callbacks: callbacks,
config: conf,
}
return &filter{
callbacks: callbacks,
config: conf,
}
}

Expand Down
2 changes: 1 addition & 1 deletion contrib/golang/filters/http/test/test_data/dummy/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

func init() {
http.RegisterHttpFilterConfigFactoryAndParser("", http.PassThroughFactory, nil)
http.RegisterHttpFilterFactoryAndConfigParser("", http.PassThroughFactory, http.NullParser)
}

func main() {
Expand Down
12 changes: 5 additions & 7 deletions contrib/golang/filters/http/test/test_data/echo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
const Name = "echo"

func init() {
http.RegisterHttpFilterConfigFactoryAndParser(Name, ConfigFactory, &parser{})
http.RegisterHttpFilterFactoryAndConfigParser(Name, filterFactory, &parser{})
}

type config struct {
Expand Down Expand Up @@ -43,16 +43,14 @@ func (p *parser) Merge(parent interface{}, child interface{}) interface{} {
panic("TODO")
}

func ConfigFactory(c interface{}) api.StreamFilterFactory {
func filterFactory(c interface{}, callbacks api.FilterCallbackHandler) api.StreamFilter {
conf, ok := c.(*config)
if !ok {
panic("unexpected config type")
}
return func(callbacks api.FilterCallbackHandler) api.StreamFilter {
return &filter{
callbacks: callbacks,
config: conf,
}
return &filter{
callbacks: callbacks,
config: conf,
}
}

Expand Down
12 changes: 5 additions & 7 deletions contrib/golang/filters/http/test/test_data/metric/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func init() {
api.LogCritical("init")
api.LogCritical(api.GetLogLevel().String())

http.RegisterHttpFilterConfigFactoryAndParser(Name, ConfigFactory, &parser{})
http.RegisterHttpFilterFactoryAndConfigParser(Name, filterFactory, &parser{})
}

type config struct {
Expand All @@ -37,16 +37,14 @@ func (p *parser) Merge(parent interface{}, child interface{}) interface{} {
panic("TODO")
}

func ConfigFactory(c interface{}) api.StreamFilterFactory {
func filterFactory(c interface{}, callbacks api.FilterCallbackHandler) api.StreamFilter {
conf, ok := c.(*config)
if !ok {
panic("unexpected config type")
}
return func(callbacks api.FilterCallbackHandler) api.StreamFilter {
return &filter{
callbacks: callbacks,
config: conf,
}
return &filter{
callbacks: callbacks,
config: conf,
}
}

Expand Down
Loading
Loading