Skip to content

Commit

Permalink
feat: support resource consumer group (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
lingsamuel authored Sep 12, 2023
1 parent 197a8d7 commit c2c6ae0
Show file tree
Hide file tree
Showing 23 changed files with 632 additions and 69 deletions.
18 changes: 12 additions & 6 deletions cmd/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,19 @@ func dumpConfiguration(cmd *cobra.Command) error {
return err
}

consumerGroups, err := cluster.ConsumerGroup().List(context.Background())
if err != nil {
return err
}

conf := &types.Configuration{
Routes: routes,
Services: svcs,
Consumers: consumers,
SSLs: ssls,
GlobalRules: globalRules,
PluginConfigs: pluginConfigs,
Routes: routes,
Services: svcs,
Consumers: consumers,
SSLs: ssls,
GlobalRules: globalRules,
PluginConfigs: pluginConfigs,
ConsumerGroups: consumerGroups,
}

if save {
Expand Down
4 changes: 4 additions & 0 deletions cmd/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func newValidateCmd() *cobra.Command {
msg += fmt.Sprintf(", plugin_configs: %v", len(d.PluginConfigs))
changed = true
}
if len(d.ConsumerGroups) > 0 {
msg += fmt.Sprintf(", consumer_groups: %v", len(d.ConsumerGroups))
changed = true
}
if !changed {
msg += "nothing changed"
}
Expand Down
25 changes: 20 additions & 5 deletions internal/pkg/db/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,15 @@ var schema = &memdb.DBSchema{
Unique: true,
Indexer: &memdb.StringFieldIndex{Field: "ID"},
},
"name": {
Name: "name",
Unique: true,
Indexer: &memdb.StringFieldIndex{Field: "Name"},
AllowMissing: true,
},
},
"consumer_groups": {
Name: "consumer_groups",
Indexes: map[string]*memdb.IndexSchema{
"id": {
Name: "id",
Unique: true,
Indexer: &memdb.StringFieldIndex{Field: "ID"},
},
},
},
Expand Down Expand Up @@ -140,6 +144,13 @@ func NewMemDB(config *types.Configuration) (*DB, error) {
}
}

for _, consumerGroup := range config.ConsumerGroups {
err = txn.Insert("consumer_groups", consumerGroup)
if err != nil {
return nil, err
}
}

txn.Commit()

return &DB{memDB: db}, nil
Expand Down Expand Up @@ -181,3 +192,7 @@ func (db *DB) GetGlobalRuleByID(id string) (*types.GlobalRule, error) {
func (db *DB) GetPluginConfigByID(id string) (*types.PluginConfig, error) {
return getByID[types.PluginConfig](db, "plugin_configs", id)
}

func (db *DB) GetConsumerGroupByID(id string) (*types.ConsumerGroup, error) {
return getByID[types.ConsumerGroup](db, "consumer_groups", id)
}
108 changes: 89 additions & 19 deletions internal/pkg/differ/differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,36 @@ func _key(typ data.ResourceType, option int) string {
return fmt.Sprintf("%s:%d", typ, option)
}

// order is the events order to ensure the data dependency
// order is the events order to ensure the data dependency. Higher takes priority
// route requires: service, plugin config, consumer (soft require)
// consumer requires: consumer group
// The dependent resources should be created/updated first but deleted later
var order = map[string]int{
_key(data.ConsumerResourceType, data.DeleteOption): _order(),
_key(data.ServiceResourceType, data.DeleteOption): _order(),
_key(data.PluginConfigResourceType, data.DeleteOption): _order(),
_key(data.RouteResourceType, data.DeleteOption): _order(),
_key(data.RouteResourceType, data.UpdateOption): _order(),
_key(data.ServiceResourceType, data.UpdateOption): _order(),
_key(data.PluginConfigResourceType, data.UpdateOption): _order(),
_key(data.RouteResourceType, data.CreateOption): _order(),
_key(data.ServiceResourceType, data.CreateOption): _order(),
_key(data.PluginConfigResourceType, data.CreateOption): _order(),
_key(data.ConsumerResourceType, data.CreateOption): _order(),
_key(data.ConsumerResourceType, data.UpdateOption): _order(),
_key(data.SSLResourceType, data.DeleteOption): _order(),
_key(data.SSLResourceType, data.CreateOption): _order(),
_key(data.SSLResourceType, data.UpdateOption): _order(),
_key(data.GlobalRuleResourceType, data.DeleteOption): _order(),
_key(data.GlobalRuleResourceType, data.CreateOption): _order(),
_key(data.GlobalRuleResourceType, data.UpdateOption): _order(),
_key(data.ServiceResourceType, data.DeleteOption): _order(),
_key(data.PluginConfigResourceType, data.DeleteOption): _order(),
_key(data.ConsumerGroupResourceType, data.DeleteOption): _order(),
_key(data.ConsumerResourceType, data.DeleteOption): _order(),
_key(data.RouteResourceType, data.DeleteOption): _order(),

_key(data.RouteResourceType, data.UpdateOption): _order(),
_key(data.ServiceResourceType, data.UpdateOption): _order(),
_key(data.PluginConfigResourceType, data.UpdateOption): _order(),
_key(data.ConsumerResourceType, data.UpdateOption): _order(),
_key(data.ConsumerGroupResourceType, data.UpdateOption): _order(),

_key(data.RouteResourceType, data.CreateOption): _order(),
_key(data.ServiceResourceType, data.CreateOption): _order(),
_key(data.PluginConfigResourceType, data.CreateOption): _order(),
_key(data.ConsumerResourceType, data.CreateOption): _order(),
_key(data.ConsumerGroupResourceType, data.CreateOption): _order(),

// no dependency
_key(data.SSLResourceType, data.DeleteOption): _order(),
_key(data.SSLResourceType, data.CreateOption): _order(),
_key(data.SSLResourceType, data.UpdateOption): _order(),
_key(data.GlobalRuleResourceType, data.DeleteOption): _order(),
_key(data.GlobalRuleResourceType, data.CreateOption): _order(),
_key(data.GlobalRuleResourceType, data.UpdateOption): _order(),
}

// Differ is the object of comparing two configurations.
Expand All @@ -66,6 +76,7 @@ func NewDiffer(local, remote *types.Configuration) (*Differ, error) {
}, nil
}

// sortEvents sorts events descending, higher priority events will be executed first
func sortEvents(events []*data.Event) {
sort.Slice(events, func(i, j int) bool {
return order[_key(events[i].ResourceType, events[i].Option)] > order[_key(events[j].ResourceType, events[j].Option)]
Expand Down Expand Up @@ -107,12 +118,18 @@ func (d *Differ) Diff() ([]*data.Event, error) {
return nil, err
}

consumerGroupEvents, err := d.diffConsumerGroup()
if err != nil {
return nil, err
}

events = append(events, serviceEvents...)
events = append(events, routeEvents...)
events = append(events, consumerEvents...)
events = append(events, sslEvents...)
events = append(events, globalRuleEvents...)
events = append(events, pluginConfigEvents...)
events = append(events, consumerGroupEvents...)

sortEvents(events)

Expand Down Expand Up @@ -437,3 +454,56 @@ func (d *Differ) diffPluginConfig() ([]*data.Event, error) {

return events, nil
}

// diffConsumerGroup compares the global_rules between local and remote.
func (d *Differ) diffConsumerGroup() ([]*data.Event, error) {
var events []*data.Event
var mark = make(map[string]bool)

for _, remoteConsumerGroup := range d.remoteConfig.ConsumerGroups {
localConsumerGroup, err := d.localDB.GetConsumerGroupByID(remoteConsumerGroup.ID)
if err != nil {
// we can't find in local config, should delete it
if err == db.NotFound {
e := data.Event{
ResourceType: data.ConsumerGroupResourceType,
Option: data.DeleteOption,
OldValue: remoteConsumerGroup,
}
events = append(events, &e)
continue
}

return nil, err
}

mark[localConsumerGroup.ID] = true
// skip when equals
if equal := reflect.DeepEqual(localConsumerGroup, remoteConsumerGroup); equal {
continue
}

// otherwise update
events = append(events, &data.Event{
ResourceType: data.ConsumerGroupResourceType,
Option: data.UpdateOption,
OldValue: remoteConsumerGroup,
Value: localConsumerGroup,
})
}

// only in local, create
for _, consumerGroup := range d.localConfig.ConsumerGroups {
if mark[consumerGroup.ID] {
continue
}

events = append(events, &data.Event{
ResourceType: data.ConsumerGroupResourceType,
Option: data.CreateOption,
Value: consumerGroup,
})
}

return events, nil
}
8 changes: 8 additions & 0 deletions internal/pkg/validator/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,5 +92,13 @@ func (v *Validator) Validate() []error {
}
}

for _, consumerGroup := range v.localConfig.ConsumerGroups {
consumerGroup := consumerGroup
err := v.cluster.ConsumerGroup().Validate(context.Background(), consumerGroup)
if err != nil {
allErr = append(allErr, err)
}
}

return allErr
}
5 changes: 5 additions & 0 deletions pkg/api/apisix/apisix.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Cluster interface {
SSL() SSL
GlobalRule() GlobalRule
PluginConfig() PluginConfig
ConsumerGroup() ConsumerGroup
}

type ResourceClient[T any] interface {
Expand Down Expand Up @@ -47,3 +48,7 @@ type GlobalRule interface {
type PluginConfig interface {
ResourceClient[types.PluginConfig]
}

type ConsumerGroup interface {
ResourceClient[types.ConsumerGroup]
}
19 changes: 13 additions & 6 deletions pkg/api/apisix/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ type cluster struct {

cli *Client

route Route
service Service
consumer Consumer
ssl SSL
globalRule GlobalRule
pluginConfig PluginConfig
route Route
service Service
consumer Consumer
ssl SSL
globalRule GlobalRule
pluginConfig PluginConfig
consumerGroup ConsumerGroup
}

func NewCluster(ctx context.Context, url, adminKey string) Cluster {
Expand All @@ -33,6 +34,7 @@ func NewCluster(ctx context.Context, url, adminKey string) Cluster {
c.ssl = newSSL(cli)
c.globalRule = newGlobalRule(cli)
c.pluginConfig = newPluginConfig(cli)
c.consumerGroup = newConsumerGroup(cli)

return c
}
Expand Down Expand Up @@ -66,3 +68,8 @@ func (c *cluster) GlobalRule() GlobalRule {
func (c *cluster) PluginConfig() PluginConfig {
return c.pluginConfig
}

// ConsumerGroup implements Cluster.ConsumerGroup method.
func (c *cluster) ConsumerGroup() ConsumerGroup {
return c.consumerGroup
}
26 changes: 26 additions & 0 deletions pkg/api/apisix/consumer_group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package apisix

import (
"context"

"github.com/api7/adc/pkg/api/apisix/types"
)

type consumerGroupClient struct {
*resourceClient[types.ConsumerGroup]
}

func newConsumerGroup(c *Client) ConsumerGroup {
cli := newResourceClient[types.ConsumerGroup](c, "consumer_groups")
return &consumerGroupClient{
resourceClient: cli,
}
}

func (u *consumerGroupClient) Create(ctx context.Context, obj *types.ConsumerGroup) (*types.ConsumerGroup, error) {
return u.resourceClient.Create(ctx, obj.ID, obj)
}

func (u *consumerGroupClient) Update(ctx context.Context, obj *types.ConsumerGroup) (*types.ConsumerGroup, error) {
return u.resourceClient.Update(ctx, obj.ID, obj)
}
29 changes: 20 additions & 9 deletions pkg/api/apisix/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ import (

// Configuration is the configuration of services
type Configuration struct {
Name string `yaml:"name" json:"name"`
Version string `yaml:"version" json:"version"`
Services []*Service `yaml:"services,omitempty" json:"services,omitempty"`
Routes []*Route `yaml:"routes,omitempty" json:"routes,omitempty"`
Consumers []*Consumer `yaml:"consumers,omitempty" json:"consumers,omitempty"`
SSLs []*SSL `yaml:"ssls,omitempty" json:"ssls,omitempty"`
GlobalRules []*GlobalRule `yaml:"global_rules,omitempty" json:"global_rules,omitempty"`
PluginConfigs []*PluginConfig `yaml:"plugin_configs,omitempty" json:"plugin_configs,omitempty"`
Name string `yaml:"name" json:"name"`
Version string `yaml:"version" json:"version"`
Services []*Service `yaml:"services,omitempty" json:"services,omitempty"`
Routes []*Route `yaml:"routes,omitempty" json:"routes,omitempty"`
Consumers []*Consumer `yaml:"consumers,omitempty" json:"consumers,omitempty"`
SSLs []*SSL `yaml:"ssls,omitempty" json:"ssls,omitempty"`
GlobalRules []*GlobalRule `yaml:"global_rules,omitempty" json:"global_rules,omitempty"`
PluginConfigs []*PluginConfig `yaml:"plugin_configs,omitempty" json:"plugin_configs,omitempty"`
ConsumerGroups []*ConsumerGroup `yaml:"consumer_groups,omitempty" json:"consumer_groups,omitempty"`
}

// Labels is the APISIX resource labels
Expand Down Expand Up @@ -303,6 +304,7 @@ type Consumer struct {
Labels Labels `json:"labels,omitempty" yaml:"labels,omitempty"`

Plugins Plugins `json:"plugins,omitempty" yaml:"plugins,omitempty"`
GroupID string `json:"group_id,omitempty" yaml:"group_id,omitempty"`
}

// SSL represents the ssl object in APISIX.
Expand All @@ -327,7 +329,16 @@ type GlobalRule struct {
// +k8s:deepcopy-gen=true
type PluginConfig struct {
ID string `json:"id,omitempty" yaml:"id,omitempty"`
Name string `json:"name,omitempty" yaml:"name,omitempty"`
Desc string `json:"desc,omitempty" yaml:"desc,omitempty"`
Labels Labels `json:"labels,omitempty" yaml:"labels,omitempty"`

Plugins Plugins `json:"plugins" yaml:"plugins"`
}

// ConsumerGroup apisix consumer group object
// +k8s:deepcopy-gen=true
type ConsumerGroup struct {
ID string `json:"id,omitempty" yaml:"id,omitempty"`
Desc string `json:"desc,omitempty" yaml:"desc,omitempty"`
Labels Labels `json:"labels,omitempty" yaml:"labels,omitempty"`

Expand Down
Loading

0 comments on commit c2c6ae0

Please sign in to comment.