Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support controller as etcd server #1803

Merged
merged 51 commits into from
Aug 30, 2023
Merged
Changes from 1 commit
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
878158f
feat: support controller as etcd server
AlinsRan Apr 17, 2023
dc2aa6b
fix e2e test
AlinsRan Apr 21, 2023
c21e944
add store
AlinsRan Apr 23, 2023
461f158
fix not found
AlinsRan Apr 23, 2023
2bfe7ce
fix
AlinsRan Apr 24, 2023
bb1aca7
fix upstream
AlinsRan Apr 24, 2023
494a128
add compare and delete verify
AlinsRan Apr 25, 2023
fcb839e
fix test
AlinsRan Apr 25, 2023
f1b1ff3
fix httprou
AlinsRan Apr 25, 2023
6250c13
hmac
AlinsRan Apr 26, 2023
d90b760
upgrade etcd-adapter
AlinsRan May 4, 2023
4012456
fix configmap
AlinsRan May 4, 2023
28b1aec
Merge branch 'apache:master' into feat/etcdserver
AlinsRan May 4, 2023
90ebc83
skip cases
AlinsRan May 4, 2023
395b256
update go mod
AlinsRan May 5, 2023
3ca9565
test ci
AlinsRan May 5, 2023
9c99f51
fix sd e2e
AlinsRan May 5, 2023
47da0cd
Merge branch 'apache:master' into feat/etcdserver
AlinsRan May 5, 2023
91270e7
fix ssl
AlinsRan May 5, 2023
9f767a0
skip some case
AlinsRan May 6, 2023
3dfa3e8
Merge branch 'master' into feat/etcdserver
AlinsRan May 31, 2023
af3b638
Merge branch 'master' into feat/etcdserver
AlinsRan Aug 14, 2023
1bbb974
support schema validator
AlinsRan Aug 22, 2023
6789ec2
fix test cases
AlinsRan Aug 23, 2023
3db6dd5
support high availability
AlinsRan Aug 23, 2023
13b9d73
fix ci
AlinsRan Aug 24, 2023
d8366e1
fix(ci): udp forward failed and missing pigz
AlinsRan Aug 24, 2023
13852f1
update ci
AlinsRan Aug 24, 2023
f4e4f6a
fix test case
AlinsRan Aug 24, 2023
8c3ebc2
update PR to ready
AlinsRan Aug 24, 2023
6ea956b
Merge branch 'fix/ci-udp' into feat/etcdserver
AlinsRan Aug 24, 2023
f8e9dd4
update ci
AlinsRan Aug 25, 2023
ea6891f
fix test case
AlinsRan Aug 25, 2023
c18d765
Merge branch 'master' into feat/etcdserver
AlinsRan Aug 25, 2023
27d47ac
update dep
AlinsRan Aug 25, 2023
e822da1
update e2e dep
AlinsRan Aug 25, 2023
7b8cacd
support composite deployment
AlinsRan Aug 27, 2023
058a272
Merge branch 'master' into test/etcdserver
AlinsRan Aug 27, 2023
157382d
perf test cases
AlinsRan Aug 27, 2023
ca60618
perf test cases
AlinsRan Aug 27, 2023
4a94967
fix test cases
AlinsRan Aug 28, 2023
cd56477
update yaml file
AlinsRan Aug 28, 2023
fba0596
update yaml file
AlinsRan Aug 28, 2023
1fefa4e
remove debug opt
AlinsRan Aug 28, 2023
c2a9b9e
shutdown apisix tunnel
AlinsRan Aug 28, 2023
818e7d3
fix status
AlinsRan Aug 28, 2023
38d0eab
perf leader election
AlinsRan Aug 28, 2023
29e4531
Merge branch 'master' into feat/etcdserver
AlinsRan Aug 28, 2023
fcdcf27
fix typo
AlinsRan Aug 29, 2023
c49c7ea
resolve conflict
AlinsRan Aug 29, 2023
5433e2a
update doc
AlinsRan Aug 30, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: support controller as etcd server
  • Loading branch information
AlinsRan committed Apr 17, 2023
commit 878158fbb0e639d85c84509f5c85c1e0cb26a9c1
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -31,7 +31,7 @@ RUN if [ "$ENABLE_PROXY" = "true" ] ; then go env -w GOPROXY=https://goproxy.cn,
&& go mod download

COPY . .
RUN --mount=type=cache,target=/root/.cache/go-build make build
RUN make build

FROM centos:centos7
LABEL maintainer="gxthrj@163.com"
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -75,7 +75,7 @@ clean-image: ## Removes local image
.PHONY: build-image
build-image:
ifeq ($(E2E_SKIP_BUILD), 0)
DOCKER_BUILDKIT=1 docker build -t apache/apisix-ingress-controller:$(IMAGE_TAG) --build-arg ENABLE_PROXY=$(ENABLE_PROXY) .
DOCKER_BUILDKIT=0 docker build -t apache/apisix-ingress-controller:$(IMAGE_TAG) --build-arg ENABLE_PROXY=$(ENABLE_PROXY) .
docker tag apache/apisix-ingress-controller:$(IMAGE_TAG) $(REGISTRY)/apisix-ingress-controller:$(IMAGE_TAG)
endif

18 changes: 17 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
module github.com/apache/apisix-ingress-controller

go 1.19
go 1.20

require (
github.com/api7/etcd-adapter v0.1.1
github.com/gin-gonic/gin v1.9.0
github.com/hashicorp/go-memdb v1.3.4
github.com/hashicorp/go-multierror v1.1.1
@@ -31,6 +32,7 @@ require (
)

require (
github.com/api7/gopkg v0.1.2 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bytedance/sonic v1.8.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
@@ -51,16 +53,20 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/btree v1.0.1 // indirect
github.com/google/gnostic v0.6.9 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/k3s-io/kine v0.8.1 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
@@ -73,11 +79,17 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/soheilhy/cmux v0.1.5 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.9 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f // indirect
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
go.etcd.io/etcd v3.3.27+incompatible // indirect
go.etcd.io/etcd/api/v3 v3.5.5 // indirect
go.etcd.io/etcd/client/v3 v3.5.5 // indirect
go.uber.org/atomic v1.10.0 // indirect
golang.org/x/arch v0.0.0-20210923205945-b76863e36670 // indirect
golang.org/x/crypto v0.5.0 // indirect
@@ -89,6 +101,8 @@ require (
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.6.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220502173005-c8bf987b8c21 // indirect
google.golang.org/grpc v1.49.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/go-playground/assert.v1 v1.2.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
@@ -101,3 +115,5 @@ require (
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
)

replace github.com/api7/etcd-adapter => github.com/alinsran/etcd-adapter v0.2.1
723 changes: 722 additions & 1 deletion go.sum

Large diffs are not rendered by default.

140 changes: 97 additions & 43 deletions pkg/apisix/cluster.go
Original file line number Diff line number Diff line change
@@ -38,6 +38,7 @@ import (
"github.com/apache/apisix-ingress-controller/pkg/metrics"
"github.com/apache/apisix-ingress-controller/pkg/types"
v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
adapter "github.com/api7/etcd-adapter/pkg/adapter"
)

const (
@@ -84,6 +85,7 @@ type ClusterOptions struct {
SyncInterval types.TimeDuration
SyncComparison bool
MetricsCollector metrics.Collector
EnableEtcdServer bool
}

type cluster struct {
@@ -111,6 +113,7 @@ type cluster struct {
metricsCollector metrics.Collector
upstreamServiceRelation UpstreamServiceRelation
pluginMetadata PluginMetadata
adapter adapter.Adapter
}

func newCluster(ctx context.Context, o *ClusterOptions) (Cluster, error) {
@@ -135,6 +138,10 @@ func newCluster(ctx context.Context, o *ClusterOptions) (Cluster, error) {
if adminVersion != "v3" {
adminVersion = "v2"
}
var etcdserver adapter.Adapter
if o.EnableEtcdServer {
etcdserver = adapter.NewEtcdAdapter(nil)
}
c := &cluster{
adminVersion: adminVersion,
name: o.Name,
@@ -149,6 +156,7 @@ func newCluster(ctx context.Context, o *ClusterOptions) (Cluster, error) {
cacheSynced: make(chan struct{}),
syncComparison: o.SyncComparison,
metricsCollector: o.MetricsCollector,
adapter: etcdserver,
}
c.route = newRouteClient(c)
c.upstream = newUpstreamClient(c)
@@ -176,58 +184,72 @@ func newCluster(ctx context.Context, o *ClusterOptions) (Cluster, error) {
return nil, err
}

if o.EnableEtcdServer {
fmt.Println("start etcd server")
ln, err := net.Listen("tcp", "0.0.0.0:12379")
if err != nil {
return nil, err
}
go c.adapter.Serve(ctx, ln)
}

go c.syncCache(ctx)
go c.syncSchema(ctx, o.SyncInterval.Duration)
//go c.syncSchema(ctx, o.SyncInterval.Duration)

return c, nil
}

func (c *cluster) syncCache(ctx context.Context) {
log.Infow("syncing cache", zap.String("cluster", c.name))
now := time.Now()
defer func() {
if c.cacheSyncErr == nil {
log.Infow("cache synced",
zap.String("cost_time", time.Since(now).String()),
zap.String("cluster", c.name),
)
c.metricsCollector.IncrCacheSyncOperation("success")
} else {
log.Errorw("failed to sync cache",
zap.String("cost_time", time.Since(now).String()),
zap.String("cluster", c.name),
)
c.metricsCollector.IncrCacheSyncOperation("failure")
}
}()
time.Sleep(2 * time.Second)
close(c.cacheSynced)
return
/*
log.Infow("syncing cache", zap.String("cluster", c.name))
now := time.Now()
defer func() {
if c.cacheSyncErr == nil {
log.Infow("cache synced",
zap.String("cost_time", time.Since(now).String()),
zap.String("cluster", c.name),
)
c.metricsCollector.IncrCacheSyncOperation("success")
} else {
log.Errorw("failed to sync cache",
zap.String("cost_time", time.Since(now).String()),
zap.String("cluster", c.name),
)
c.metricsCollector.IncrCacheSyncOperation("failure")
}
}()

backoff := wait.Backoff{
Duration: 2 * time.Second,
Factor: 1,
Steps: 5,
}
var lastSyncErr error
err := wait.ExponentialBackoff(backoff, func() (done bool, err error) {
// impossibly return: false, nil
// so can safe used
done, lastSyncErr = c.syncCacheOnce(ctx)
select {
case <-ctx.Done():
err = context.Canceled
default:
break
backoff := wait.Backoff{
Duration: 2 * time.Second,
Factor: 1,
Steps: 5,
}
return
})
if err != nil {
// if ErrWaitTimeout then set lastSyncErr
c.cacheSyncErr = lastSyncErr
}
close(c.cacheSynced)
var lastSyncErr error
err := wait.ExponentialBackoff(backoff, func() (done bool, err error) {
// impossibly return: false, nil
// so can safe used
done, lastSyncErr = c.syncCacheOnce(ctx)
select {
case <-ctx.Done():
err = context.Canceled
default:
break
}
return
})
if err != nil {
// if ErrWaitTimeout then set lastSyncErr
c.cacheSyncErr = lastSyncErr
}
close(c.cacheSynced)

if !atomic.CompareAndSwapInt32(&c.cacheState, _cacheSyncing, _cacheSynced) {
panic("dubious state when sync cache")
}
if !atomic.CompareAndSwapInt32(&c.cacheState, _cacheSyncing, _cacheSynced) {
panic("dubious state when sync cache")
}
*/
}

func (c *cluster) syncCacheOnce(ctx context.Context) (bool, error) {
@@ -1146,3 +1168,35 @@ func (c *cluster) GetSSL(ctx context.Context, baseUrl, id string) (*v1.Ssl, erro
}
return ssl, nil
}

func (c *cluster) pushEvent(eventType string, key string, value []byte) {
var et types.EventType
switch eventType {
case "create":
et = types.EventAdd
case "update":
et = types.EventUpdate
case "delete":
et = types.EventDelete
}
events := []*adapter.Event{
{
Type: adapter.EventType(et),
Key: key,
Value: value,
},
}
c.adapter.EventCh() <- events
}

func (c *cluster) CreateResource(resource string, id string, value []byte) {
c.pushEvent("create", "/apisix/"+resource+"/"+id, value)
}

func (c *cluster) UpdateResource(resource string, id string, value []byte) {
c.pushEvent("update", "/apisix/"+resource+"/"+id, value)
}

func (c *cluster) DeleteResource(resource string, id string, value []byte) {
c.pushEvent("delete", "/apisix/"+resource+"/"+id, value)
}
24 changes: 24 additions & 0 deletions pkg/apisix/route.go
Original file line number Diff line number Diff line change
@@ -112,6 +112,14 @@ func (r *routeClient) List(ctx context.Context) ([]*v1.Route, error) {
}

func (r *routeClient) Create(ctx context.Context, obj *v1.Route, shouldCompare bool) (*v1.Route, error) {
if r.cluster.adapter != nil {
data, err := json.Marshal(obj)
if err != nil {
return nil, err
}
r.cluster.CreateResource("routes", obj.ID, data)
return obj, nil
}
if v, skip := skipRequest(r.cluster, shouldCompare, r.url, obj.ID, obj); skip {
return v, nil
}
@@ -155,6 +163,14 @@ func (r *routeClient) Create(ctx context.Context, obj *v1.Route, shouldCompare b
}

func (r *routeClient) Delete(ctx context.Context, obj *v1.Route) error {
if r.cluster.adapter != nil {
data, err := json.Marshal(obj)
if err != nil {
return err
}
r.cluster.DeleteResource("routes", obj.ID, data)
return nil
}
log.Debugw("try to delete route",
zap.String("id", obj.ID),
zap.String("name", obj.Name),
@@ -184,6 +200,14 @@ func (r *routeClient) Delete(ctx context.Context, obj *v1.Route) error {
}

func (r *routeClient) Update(ctx context.Context, obj *v1.Route, shouldCompare bool) (*v1.Route, error) {
if r.cluster.adapter != nil {
data, err := json.Marshal(obj)
if err != nil {
return nil, err
}
r.cluster.UpdateResource("routes", obj.ID, data)
return obj, nil
}
if v, skip := skipRequest(r.cluster, shouldCompare, r.url, obj.ID, obj); skip {
return v, nil
}
24 changes: 24 additions & 0 deletions pkg/apisix/upstream.go
Original file line number Diff line number Diff line change
@@ -106,6 +106,14 @@ func (u *upstreamClient) List(ctx context.Context) ([]*v1.Upstream, error) {
}

func (u *upstreamClient) Create(ctx context.Context, obj *v1.Upstream, shouldCompare bool) (*v1.Upstream, error) {
if u.cluster.adapter != nil {
data, err := json.Marshal(obj)
if err != nil {
return nil, err
}
u.cluster.CreateResource("upstreams", obj.ID, data)
return obj, nil
}
if v, skip := skipRequest(u.cluster, shouldCompare, u.url, obj.ID, obj); skip {
return v, nil
}
@@ -151,6 +159,14 @@ func (u *upstreamClient) Create(ctx context.Context, obj *v1.Upstream, shouldCom
}

func (u *upstreamClient) Delete(ctx context.Context, obj *v1.Upstream) error {
if u.cluster.adapter != nil {
data, err := json.Marshal(obj)
if err != nil {
return nil
}
u.cluster.CreateResource("upstreams", obj.ID, data)
return nil
}
log.Debugw("try to delete upstream",
zap.String("id", obj.ID),
zap.String("name", obj.Name),
@@ -181,6 +197,14 @@ func (u *upstreamClient) Delete(ctx context.Context, obj *v1.Upstream) error {
}

func (u *upstreamClient) Update(ctx context.Context, obj *v1.Upstream, shouldCompare bool) (*v1.Upstream, error) {
if u.cluster.adapter != nil {
data, err := json.Marshal(obj)
if err != nil {
return nil, err
}
u.cluster.CreateResource("upstreams", obj.ID, data)
return obj, nil
}
if v, skip := skipRequest(u.cluster, shouldCompare, u.url, obj.ID, obj); skip {
return v, nil
}
1 change: 1 addition & 0 deletions pkg/providers/apisix/provider_init.go
Original file line number Diff line number Diff line change
@@ -31,6 +31,7 @@ import (
// This func is NOT concurrency safe.
// cc https://github.com/apache/apisix-ingress-controller/pull/742#discussion_r757197791
func (p *apisixProvider) Init(ctx context.Context) error {
return nil
var (
wg sync.WaitGroup
routeMapK8S = new(sync.Map)
1 change: 1 addition & 0 deletions pkg/providers/controller.go
Original file line number Diff line number Diff line change
@@ -384,6 +384,7 @@ func (c *Controller) run(ctx context.Context) {
BaseURL: c.cfg.APISIX.DefaultClusterBaseURL,
MetricsCollector: c.MetricsCollector,
SyncComparison: c.cfg.ApisixResourceSyncComparison,
EnableEtcdServer: true,
}
err := c.apisix.AddCluster(ctx, clusterOpts)
if err != nil && err != apisix.ErrDuplicatedCluster {
1 change: 1 addition & 0 deletions pkg/providers/types/types.go
Original file line number Diff line number Diff line change
@@ -208,6 +208,7 @@ func (c *Common) SyncUpstreamNodesChangeToCluster(ctx context.Context, cluster a
zap.String("upstream_name", upsName),
zap.Any("nodes", nodes),
)
return nil
upstream, err := cluster.Upstream().Get(ctx, upsName)
if err != nil {
if err == apisixcache.ErrNotFound {