Skip to content

Commit

Permalink
add method to control if a phase can be run synchronously
Browse files Browse the repository at this point in the history
Signed-off-by: spacewander <[email protected]>
  • Loading branch information
spacewander committed Nov 18, 2024
1 parent 886be64 commit c922220
Show file tree
Hide file tree
Showing 9 changed files with 487 additions and 242 deletions.
5 changes: 4 additions & 1 deletion api/internal/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@ type Consumer struct {
FilterConfigs map[string]*fmModel.ParsedFilterConfig

// fields that generated from the configuration
CanSkipMethod map[string]bool
FilterNames []string
InitOnce sync.Once
CanSkipMethod map[string]bool
CanSkipMethodOnce sync.Once
CanSyncRunMethod map[string]bool
// CanSyncRunMethod share the same sync.Once with CanSkipMethodOnce
}

func (c *Consumer) Unmarshal(s string) error {
Expand Down Expand Up @@ -95,6 +97,7 @@ func (c *Consumer) InitConfigs() error {
Name: name,
ParsedConfig: conf,
Factory: p.Factory,
CanSyncRun: p.ConfigParser.IsNonBlocking(),
}
}

Expand Down
1 change: 1 addition & 0 deletions api/pkg/filtermanager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigC
Name: proto.Name,
ParsedConfig: config,
Factory: plugin.Factory,
CanSyncRun: plugin.ConfigParser.IsNonBlocking(),
})

_, ok := pkgPlugins.LoadPlugin(name).(pkgPlugins.ConsumerPlugin)
Expand Down
Loading

0 comments on commit c922220

Please sign in to comment.