diff --git a/.github/actions/docker-build/action.yaml b/.github/actions/docker-build/action.yaml index 2001b6108a..de68fa6500 100644 --- a/.github/actions/docker-build/action.yaml +++ b/.github/actions/docker-build/action.yaml @@ -106,7 +106,7 @@ runs: DOCKER="docker" \ BUILDKIT_INLINE_CACHE=0 \ DOCKER_OPTS="--platform ${PLATFORMS} --builder ${BUILDER} ${LABEL_OPTS} --label org.opencontainers.image.version=${PRIMARY_TAG} --label org.opencontainers.image.title=${TARGET}" \ - EXTRA_TAGS="${EXTRA_TAGS}" \ + EXTRA_ARGS="${EXTRA_TAGS}" \ TAG="${PRIMARY_TAG}" \ docker/build/${TARGET} env: diff --git a/Makefile.d/docker.mk b/Makefile.d/docker.mk index 2749e4f0f8..e9e924eb2f 100644 --- a/Makefile.d/docker.mk +++ b/Makefile.d/docker.mk @@ -169,7 +169,7 @@ docker/name/operator/helm: docker/build/operator/helm: @make DOCKERFILE="$(ROOTDIR)/dockers/operator/helm/Dockerfile" \ IMAGE=$(HELM_OPERATOR_IMAGE) \ - EXTRA_ARGS="--build-arg OPERATOR_SDK_VERSION=$(OPERATOR_SDK_VERSION) --build-arg UPX_OPTIONS=$(UPX_OPTIONS)" \ + EXTRA_ARGS="--build-arg OPERATOR_SDK_VERSION=$(OPERATOR_SDK_VERSION) --build-arg UPX_OPTIONS=$(UPX_OPTIONS) $(EXTRA_ARGS)" \ docker/build/image .PHONY: docker/name/loadtest diff --git a/internal/net/grpc/client.go b/internal/net/grpc/client.go index df9f84ddd0..dc0da88de5 100644 --- a/internal/net/grpc/client.go +++ b/internal/net/grpc/client.go @@ -259,10 +259,11 @@ func (g *gRPCClient) StartConnectionMonitor(ctx context.Context) (<-chan error, disconnectTargets = append(disconnectTargets, addr) return true } - // for health check we don't need to reconnect when pool is healthy - if p.IsHealthy(ctx) { + // for health check we don't need to reconnect when ip connection pool is healthy + if p.IsHealthy(ctx) && p.IsIPConn() { return true } + // if connection is not ip direct or unhealthy let's re-connect var err error // if not healthy we should try reconnect p, err = p.Reconnect(ctx, false) diff --git a/internal/net/grpc/pool/pool.go b/internal/net/grpc/pool/pool.go index 16980f4341..e1576aaf65 100644 --- a/internal/net/grpc/pool/pool.go +++ b/internal/net/grpc/pool/pool.go @@ -343,7 +343,10 @@ func (p *pool) Connect(ctx context.Context) (c Conn, err error) { if err != nil { return p.singleTargetConnect(ctx) } + return p.connect(ctx, ips...) +} +func (p *pool) connect(ctx context.Context, ips ...string) (c Conn, err error) { if uint64(len(ips)) > p.Size() { p.grow(uint64(len(ips))) } @@ -385,6 +388,15 @@ func (p *pool) Reconnect(ctx context.Context, force bool) (c Conn, err error) { healthy := p.IsHealthy(ctx) if healthy { + if !p.isIP && p.resolveDNS && hash != nil && *hash != "" { + ips, err := p.lookupIPAddr(ctx) + if err != nil { + return p, nil + } + if *hash != strings.Join(ips, "-") { + return p.connect(ctx, ips...) + } + } return p, nil } @@ -499,24 +511,42 @@ func (p *pool) IsHealthy(ctx context.Context) (healthy bool) { if p == nil || p.closing.Load() { return false } - var cnt int + var cnt, unhealthy int pl := p.len() - unhealthy := pl err := p.loop(ctx, func(ctx context.Context, idx int, pc *poolConn) bool { if pc == nil || !isHealthy(pc.conn) { if p.isIP { + if pc != nil && pc.addr != "" { + err := p.refreshConn(ctx, idx, pc, pc.addr) + if err != nil { + // target addr cannot re-connect so, connection is unhealthy + unhealthy++ + return false + } + return true + } return false } addr := p.addr if pc != nil { addr = pc.addr } + // re-connect to last connected addr err := p.refreshConn(ctx, idx, pc, addr) if err != nil { - return true + if addr == p.addr { + unhealthy++ + return true + } + // last connect addr is not dns and cannot connect then try dns + err = p.refreshConn(ctx, idx, pc, p.addr) + // dns addr cannot connect so, connection is unhealthy + if err != nil { + unhealthy = pl - cnt + return false + } } } - unhealthy-- cnt++ return true }) @@ -525,9 +555,15 @@ func (p *pool) IsHealthy(ctx context.Context) (healthy bool) { } if cnt == 0 { log.Debugf("no connection pool %d/%d found for %s,\thealthy %d/%d", cnt, pl, p.addr, pl-unhealthy, pl) - return cnt != 0 && unhealthy == 0 + return false } - return unhealthy == 0 + if p.isIP { + // if ip pool connection, each connection target should be healthy + return unhealthy == 0 + } + + // some pool target may unhealthy but pool client is healthy when unhealthy is less than pool length + return unhealthy < pl } func (p *pool) Do(ctx context.Context, f func(conn *ClientConn) error) error { diff --git a/pkg/index/job/readreplica/rotate/service/rotator.go b/pkg/index/job/readreplica/rotate/service/rotator.go index 07fa72d84d..da9578ded7 100644 --- a/pkg/index/job/readreplica/rotate/service/rotator.go +++ b/pkg/index/job/readreplica/rotate/service/rotator.go @@ -114,9 +114,9 @@ func (r *rotator) rotate(ctx context.Context) error { return err } - newPvc, oldPvc, err := r.createPVC(ctx, newSnap.Name) + newPvc, oldPvc, err := r.createPVC(ctx, newSnap.GetName()) if err != nil { - log.Infof("failed to create PVC. removing the new snapshot(%v)...", newSnap.Name) + log.Errorf("failed to create PVC. removing the new snapshot(%s)...", newSnap.GetName()) if dserr := r.deleteSnapshot(ctx, newSnap); dserr != nil { errors.Join(err, dserr) } @@ -125,7 +125,7 @@ func (r *rotator) rotate(ctx context.Context) error { err = r.updateDeployment(ctx, newPvc.GetName()) if err != nil { - log.Infof("failed to update Deployment. removing the new snapshot(%v) and pvc(%v)...", newSnap.Name, newPvc.Name) + log.Errorf("failed to update Deployment. removing the new snapshot(%s) and pvc(%s)...", newSnap.GetName(), newPvc.GetName()) if dperr := r.deletePVC(ctx, newPvc); dperr != nil { errors.Join(err, dperr) } @@ -172,6 +172,9 @@ func (r *rotator) createSnapshot(ctx context.Context) (newSnap, oldSnap *client. Spec: cur.Spec, } + log.Infof("creating new snapshot(%s)...", newSnap.GetName()) + log.Debugf("snapshot detail: %#v", newSnap) + err = r.client.Create(ctx, newSnap) if err != nil { return nil, nil, fmt.Errorf("failed to create snapshot: %w", err) @@ -214,6 +217,9 @@ func (r *rotator) createPVC(ctx context.Context, newSnapShot string) (newPvc, ol }, } + log.Infof("creating new pvc(%s)...", newPvc.GetName()) + log.Debugf("pvc detail: %#v", newPvc) + if err := r.client.Create(ctx, newPvc); err != nil { return nil, nil, fmt.Errorf("failed to create PVC(%s): %w", newPvc.GetName(), err) } @@ -242,6 +248,9 @@ func (r *rotator) updateDeployment(ctx context.Context, newPVC string) error { } } + log.Infof("updating deployment(%s)...", deployment.GetName()) + log.Debugf("deployment detail: %#v", deployment) + if err := r.client.Update(ctx, &deployment); err != nil { return fmt.Errorf("failed to update deployment: %w", err) } @@ -263,17 +272,18 @@ func (r *rotator) deleteSnapshot(ctx context.Context, snapshot *snapshotv1.Volum eg, egctx := errgroup.New(ctx) eg.Go(func() error { - log.Infof("deleting volume snapshot(%v)...", snapshot.GetName()) + log.Infof("deleting volume snapshot(%s)...", snapshot.GetName()) + log.Debugf("volume snapshot detail: %#v", snapshot) for { select { case <-egctx.Done(): return egctx.Err() case event := <-watcher.ResultChan(): if event.Type == client.WatchDeletedEvent { - log.Infof("volume snapshot(%v) deleted", snapshot.GetName()) + log.Infof("volume snapshot(%s) deleted", snapshot.GetName()) return nil } else { - log.Debugf("waching volume snapshot(%s) events. event: ", snapshot.GetName(), event.Type) + log.Debugf("watching volume snapshot(%s) events. event: %v", snapshot.GetName(), event.Type) } } } @@ -300,6 +310,7 @@ func (r *rotator) deletePVC(ctx context.Context, pvc *v1.PersistentVolumeClaim) eg, egctx := errgroup.New(ctx) eg.Go(func() error { log.Infof("deleting PVC(%s)...", pvc.GetName()) + log.Debugf("PVC detail: %#v", pvc) for { select { case <-egctx.Done(): @@ -309,7 +320,7 @@ func (r *rotator) deletePVC(ctx context.Context, pvc *v1.PersistentVolumeClaim) log.Infof("PVC(%s) deleted", pvc.GetName()) return nil } else { - log.Debugf("waching PVC(%s) events. event: %v", pvc.GetName(), event.Type) + log.Debugf("watching PVC(%s) events. event: %v", pvc.GetName(), event.Type) } } }