Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: global filter #376

Merged
merged 15 commits into from
Nov 26, 2021
Merged
249 changes: 249 additions & 0 deletions pkg/object/globalfilter/globalfilter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
/*
* Copyright (c) 2017, MegaEase
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package globalfilter
brzyangg marked this conversation as resolved.
Show resolved Hide resolved

import (
"fmt"
"sync/atomic"

"github.com/megaease/easegress/pkg/protocol"

"github.com/megaease/easegress/pkg/context"
"github.com/megaease/easegress/pkg/object/httppipeline"
"github.com/megaease/easegress/pkg/supervisor"
"github.com/megaease/easegress/pkg/util/yamltool"
)

const (
// Category is the category of GlobalFilter.
Category = supervisor.CategoryBusinessController

// Kind is the kind of GlobalFilter.
Kind = "GlobalFilter"
)

type (
// GlobalFilter is a business controller
brzyangg marked this conversation as resolved.
Show resolved Hide resolved
// provide handler before and after pipeline in HTTPServer
brzyangg marked this conversation as resolved.
Show resolved Hide resolved
GlobalFilter struct {
super *supervisor.Supervisor
superSpec *supervisor.Spec
spec *Spec

beforePipeline atomic.Value
afterPipeline atomic.Value
}

// Spec describes the GlobalFilter.
Spec struct {
BeforePipeline httppipeline.Spec `yaml:"beforePipeline" jsonschema:"omitempty"`
AfterPipeline httppipeline.Spec `yaml:"afterPipeline" jsonschema:"omitempty"`
}

// pipelineSpec define httppipeline spec to create a httppipeline entity
brzyangg marked this conversation as resolved.
Show resolved Hide resolved
pipelineSpec struct {
Kind string `yaml:"kind" jsonschema:"omitempty"`
Name string `yaml:"name" jsonschema:"omitempty"`
httppipeline.Spec `yaml:",inline"`
}
)

func init() {
supervisor.Register(&GlobalFilter{})
}

// CreateAndUpdateBeforePipelineForSpec ...
brzyangg marked this conversation as resolved.
Show resolved Hide resolved
func (gf *GlobalFilter) CreateAndUpdateBeforePipelineForSpec(spec *Spec, previousGeneration *httppipeline.HTTPPipeline) error {
beforePipeline := &pipelineSpec{
Kind: httppipeline.Kind,
Name: "before",
Spec: spec.BeforePipeline,
}
pipeline, err := gf.CreateAndUpdatePipeline(beforePipeline, previousGeneration)
if err != nil {
return err
}
if pipeline == nil {
return fmt.Errorf("before pipeline is nil, spec: %v", beforePipeline)
}
gf.beforePipeline.Store(pipeline)
return nil
}

// CreateAndUpdateAfterPipelineForSpec ...
brzyangg marked this conversation as resolved.
Show resolved Hide resolved
func (gf *GlobalFilter) CreateAndUpdateAfterPipelineForSpec(spec *Spec, previousGeneration *httppipeline.HTTPPipeline) error {
afterPipeline := &pipelineSpec{
Kind: httppipeline.Kind,
Name: "after",
Spec: spec.AfterPipeline,
}
pipeline, err := gf.CreateAndUpdatePipeline(afterPipeline, previousGeneration)
if err != nil {
return err
}
if pipeline == nil {
return fmt.Errorf("after pipeline is nil, spec: %v", afterPipeline)
}
gf.afterPipeline.Store(pipeline)
return nil
}

// CreateAndUpdatePipeline create and update globalFilter`s pipelines from pipeline spec
func (gf *GlobalFilter) CreateAndUpdatePipeline(spec *pipelineSpec, previousGeneration *httppipeline.HTTPPipeline) (*httppipeline.HTTPPipeline, error) {
// init config
config := yamltool.Marshal(spec)
specS, err := supervisor.NewSpec(string(config))
brzyangg marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}

// init or update pipeline
var pipeline = new(httppipeline.HTTPPipeline)
if previousGeneration != nil {
pipeline.Inherit(specS, previousGeneration, nil)
} else {
pipeline.Init(specS, nil)
}
return pipeline, nil
}

// Category returns the object category of itself.
func (gf *GlobalFilter) Category() supervisor.ObjectCategory {
return Category
}

// Kind returns the unique kind name to represent itself.
func (gf *GlobalFilter) Kind() string {
return Kind
}

// DefaultSpec returns the default spec.
// It must return a pointer to point a struct.
func (gf *GlobalFilter) DefaultSpec() interface{} {
return &Spec{}
}

// Status returns its runtime status.
func (gf *GlobalFilter) Status() *supervisor.Status {
return &supervisor.Status{
ObjectStatus: struct{}{},
}
}

// Init initializes GlobalFilter.
func (gf *GlobalFilter) Init(superSpec *supervisor.Spec) {
gf.superSpec, gf.spec = superSpec, superSpec.ObjectSpec().(*Spec)
gf.reload(nil)
}

// Inherit inherits previous generation of GlobalFilter.
func (gf *GlobalFilter) Inherit(superSpec *supervisor.Spec, previousGeneration supervisor.Object) {
gf.superSpec, gf.spec = superSpec, superSpec.ObjectSpec().(*Spec)
gf.reload(previousGeneration.(*GlobalFilter))
}

// Handle `beforePipeline` and `afterPipeline` before and after the httpHandler is executed
func (gf *GlobalFilter) Handle(ctx context.HTTPContext, httpHandle protocol.HTTPHandler) {
brzyangg marked this conversation as resolved.
Show resolved Hide resolved
result := gf.beforeHandle(ctx)
if result == httppipeline.LabelEND {
return
}
result = httpHandle.Handle(ctx)
if result == httppipeline.LabelEND {
return
}
gf.afterHandle(ctx)
return
}

// BeforeHandle before handler logic for beforePipeline spec
func (gf *GlobalFilter) beforeHandle(ctx context.HTTPContext) string {
value := gf.beforePipeline.Load()
if value == nil {
return ""
}
handler, ok := value.(*httppipeline.HTTPPipeline)
if !ok {
return ""
}
return handler.Handle(ctx)
}

// AfterHandle after handler logic for afterPipeline spec
func (gf *GlobalFilter) afterHandle(ctx context.HTTPContext) string {
value := gf.afterPipeline.Load()
if value == nil {
return ""
}
handler, ok := value.(*httppipeline.HTTPPipeline)
if !ok {
return ""
}
return handler.Handle(ctx)
}

// Close closes itself. It is called by deleting.
brzyangg marked this conversation as resolved.
Show resolved Hide resolved
// Supervisor won't call Close for previous generation in Update.
brzyangg marked this conversation as resolved.
Show resolved Hide resolved
func (gf *GlobalFilter) Close() {

}

// Validate validates Spec.
func (s *Spec) Validate() (err error) {

err = s.BeforePipeline.Validate()
if err != nil {
return fmt.Errorf("before pipeline is invalidate err: %v", err)
brzyangg marked this conversation as resolved.
Show resolved Hide resolved
}
err = s.AfterPipeline.Validate()
if err != nil {
return fmt.Errorf("after pipeline is invalidate err: %v", err)
brzyangg marked this conversation as resolved.
Show resolved Hide resolved
}

return nil
}

func (gf *GlobalFilter) reload(previousGeneration *GlobalFilter) {
var beforePreviousPipeline, afterPreviousPipeline *httppipeline.HTTPPipeline
// create and update beforePipeline entity
if len(gf.spec.BeforePipeline.Flow) != 0 {
if previousGeneration != nil {
previous := previousGeneration.beforePipeline.Load()
if previous != nil {
beforePreviousPipeline = previous.(*httppipeline.HTTPPipeline)
}
}
err := gf.CreateAndUpdateBeforePipelineForSpec(gf.spec, beforePreviousPipeline)
if err != nil {
panic(fmt.Sprintf("create before pipeline error %v", err))
brzyangg marked this conversation as resolved.
Show resolved Hide resolved
}
}
//create and update afterPipeline entity
if len(gf.spec.AfterPipeline.Flow) != 0 {
if previousGeneration != nil {
previous := previousGeneration.afterPipeline.Load()
if previous != nil {
afterPreviousPipeline = previous.(*httppipeline.HTTPPipeline)
}
}
err := gf.CreateAndUpdateAfterPipelineForSpec(gf.spec, afterPreviousPipeline)
if err != nil {
panic(fmt.Sprintf("create after pipeline error %v", err))
brzyangg marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
26 changes: 16 additions & 10 deletions pkg/object/httppipeline/httppipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,10 +402,11 @@ func (hp *HTTPPipeline) reload(previousGeneration *HTTPPipeline) {
hp.runningFilters = runningFilters
}

func (hp *HTTPPipeline) getNextFilterIndex(index int, result string) int {
// getNextFilterIndex return index and whether end
brzyangg marked this conversation as resolved.
Show resolved Hide resolved
func (hp *HTTPPipeline) getNextFilterIndex(index int, result string) (int, bool) {
// return index + 1 if last filter succeeded
if result == "" {
return index + 1
return index + 1, false
}

// check the jumpIf table of current filter, return its index if the jump
Expand All @@ -417,31 +418,32 @@ func (hp *HTTPPipeline) getNextFilterIndex(index int, result string) int {
}

if len(filter.jumpIf) == 0 {
return -1
return -1, false
}
name, ok := filter.jumpIf[result]
if !ok {
return -1
return -1, false
}
if name == LabelEND {
return len(hp.runningFilters)
return len(hp.runningFilters), true
}

for index++; index < len(hp.runningFilters); index++ {
if hp.runningFilters[index].spec.Name() == name {
return index
return index, false
}
}

return -1
return -1, false
}

// Handle is the handler to deal with HTTP
func (hp *HTTPPipeline) Handle(ctx context.HTTPContext) {
func (hp *HTTPPipeline) Handle(ctx context.HTTPContext) string {
ctx.SetTemplate(hp.ht)

filterIndex := -1
filterStat := newFilterStat()
isEnd := false
brzyangg marked this conversation as resolved.
Show resolved Hide resolved

handle := func(lastResult string) string {
// For saving the `filterIndex`'s filter generated HTTP Response.
Expand All @@ -467,7 +469,10 @@ func (hp *HTTPPipeline) Handle(ctx context.HTTPContext) {
filterStat = lastStat
}()

filterIndex = hp.getNextFilterIndex(filterIndex, lastResult)
filterIndex, isEnd = hp.getNextFilterIndex(filterIndex, lastResult)
samutamm marked this conversation as resolved.
Show resolved Hide resolved
if isEnd {
return LabelEND // jumpIf end of pipeline
}
if filterIndex == len(hp.runningFilters) {
return "" // reach the end of pipeline
} else if filterIndex == -1 {
Expand Down Expand Up @@ -500,9 +505,10 @@ func (hp *HTTPPipeline) Handle(ctx context.HTTPContext) {
}

ctx.SetHandlerCaller(handle)
handle("")
result := handle("")

ctx.AddTag(filterStat.marshalAndRelease())
return result
}

func (hp *HTTPPipeline) getRunningFilter(name string) *runningFilter {
Expand Down
13 changes: 7 additions & 6 deletions pkg/object/httppipeline/httppipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
package httppipeline

import (
"reflect"
"testing"

"github.com/megaease/easegress/pkg/context"
"github.com/megaease/easegress/pkg/context/contexttest"
"github.com/megaease/easegress/pkg/logger"
"github.com/megaease/easegress/pkg/supervisor"
"reflect"
"testing"
)

func CreateObjectMock(kind string) Filter {
Expand Down Expand Up @@ -275,16 +276,16 @@ filters:
httpPipeline.Inherit(superSpec, &httpPipeline, nil)

t.Run("test getNextFilterIndex", func(t *testing.T) {
if ind := httpPipeline.getNextFilterIndex(0, ""); ind != 1 {
if ind, end := httpPipeline.getNextFilterIndex(0, ""); ind != 1 && end != false {
t.Errorf("next index should be 1, was %d", ind)
}
if ind := httpPipeline.getNextFilterIndex(0, "invalid"); ind != 3 {
if ind, end := httpPipeline.getNextFilterIndex(0, "invalid"); ind != 3 && end != true {
t.Errorf("next index should be 3, was %d", ind)
}
if ind := httpPipeline.getNextFilterIndex(0, "unknown"); ind != -1 {
if ind, end := httpPipeline.getNextFilterIndex(0, "unknown"); ind != -1 && end != false {
t.Errorf("next index should be -1, was %d", ind)
}
if ind := httpPipeline.getNextFilterIndex(1, "specialCase"); ind != 2 {
if ind, end := httpPipeline.getNextFilterIndex(1, "specialCase"); ind != 2 && end != false {
t.Errorf("next index should be 2, was %d", ind)
}
})
Expand Down
Loading