Skip to content

Commit

Permalink
Merge branch 'main' into feature/observability/grafana/agent-memory-d…
Browse files Browse the repository at this point in the history
…ashboard
  • Loading branch information
kpango authored Dec 27, 2023
2 parents 7b96943 + 78bd472 commit 7d40ddf
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 17 deletions.
2 changes: 1 addition & 1 deletion .github/actions/docker-build/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ runs:
DOCKER="docker" \
BUILDKIT_INLINE_CACHE=0 \
DOCKER_OPTS="--platform ${PLATFORMS} --builder ${BUILDER} ${LABEL_OPTS} --label org.opencontainers.image.version=${PRIMARY_TAG} --label org.opencontainers.image.title=${TARGET}" \
EXTRA_TAGS="${EXTRA_TAGS}" \
EXTRA_ARGS="${EXTRA_TAGS}" \
TAG="${PRIMARY_TAG}" \
docker/build/${TARGET}
env:
Expand Down
2 changes: 1 addition & 1 deletion Makefile.d/docker.mk
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ docker/name/operator/helm:
docker/build/operator/helm:
@make DOCKERFILE="$(ROOTDIR)/dockers/operator/helm/Dockerfile" \
IMAGE=$(HELM_OPERATOR_IMAGE) \
EXTRA_ARGS="--build-arg OPERATOR_SDK_VERSION=$(OPERATOR_SDK_VERSION) --build-arg UPX_OPTIONS=$(UPX_OPTIONS)" \
EXTRA_ARGS="--build-arg OPERATOR_SDK_VERSION=$(OPERATOR_SDK_VERSION) --build-arg UPX_OPTIONS=$(UPX_OPTIONS) $(EXTRA_ARGS)" \
docker/build/image

.PHONY: docker/name/loadtest
Expand Down
5 changes: 3 additions & 2 deletions internal/net/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,10 +259,11 @@ func (g *gRPCClient) StartConnectionMonitor(ctx context.Context) (<-chan error,
disconnectTargets = append(disconnectTargets, addr)
return true
}
// for health check we don't need to reconnect when pool is healthy
if p.IsHealthy(ctx) {
// for health check we don't need to reconnect when ip connection pool is healthy
if p.IsHealthy(ctx) && p.IsIPConn() {
return true
}
// if connection is not ip direct or unhealthy let's re-connect
var err error
// if not healthy we should try reconnect
p, err = p.Reconnect(ctx, false)
Expand Down
48 changes: 42 additions & 6 deletions internal/net/grpc/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,10 @@ func (p *pool) Connect(ctx context.Context) (c Conn, err error) {
if err != nil {
return p.singleTargetConnect(ctx)
}
return p.connect(ctx, ips...)
}

func (p *pool) connect(ctx context.Context, ips ...string) (c Conn, err error) {
if uint64(len(ips)) > p.Size() {
p.grow(uint64(len(ips)))
}
Expand Down Expand Up @@ -385,6 +388,15 @@ func (p *pool) Reconnect(ctx context.Context, force bool) (c Conn, err error) {

healthy := p.IsHealthy(ctx)
if healthy {
if !p.isIP && p.resolveDNS && hash != nil && *hash != "" {
ips, err := p.lookupIPAddr(ctx)
if err != nil {
return p, nil
}
if *hash != strings.Join(ips, "-") {
return p.connect(ctx, ips...)
}
}
return p, nil
}

Expand Down Expand Up @@ -499,24 +511,42 @@ func (p *pool) IsHealthy(ctx context.Context) (healthy bool) {
if p == nil || p.closing.Load() {
return false
}
var cnt int
var cnt, unhealthy int
pl := p.len()
unhealthy := pl
err := p.loop(ctx, func(ctx context.Context, idx int, pc *poolConn) bool {
if pc == nil || !isHealthy(pc.conn) {
if p.isIP {
if pc != nil && pc.addr != "" {
err := p.refreshConn(ctx, idx, pc, pc.addr)
if err != nil {
// target addr cannot re-connect so, connection is unhealthy
unhealthy++
return false
}
return true
}
return false
}
addr := p.addr
if pc != nil {
addr = pc.addr
}
// re-connect to last connected addr
err := p.refreshConn(ctx, idx, pc, addr)
if err != nil {
return true
if addr == p.addr {
unhealthy++
return true
}
// last connect addr is not dns and cannot connect then try dns
err = p.refreshConn(ctx, idx, pc, p.addr)
// dns addr cannot connect so, connection is unhealthy
if err != nil {
unhealthy = pl - cnt
return false
}
}
}
unhealthy--
cnt++
return true
})
Expand All @@ -525,9 +555,15 @@ func (p *pool) IsHealthy(ctx context.Context) (healthy bool) {
}
if cnt == 0 {
log.Debugf("no connection pool %d/%d found for %s,\thealthy %d/%d", cnt, pl, p.addr, pl-unhealthy, pl)
return cnt != 0 && unhealthy == 0
return false
}
return unhealthy == 0
if p.isIP {
// if ip pool connection, each connection target should be healthy
return unhealthy == 0
}

// some pool target may unhealthy but pool client is healthy when unhealthy is less than pool length
return unhealthy < pl
}

func (p *pool) Do(ctx context.Context, f func(conn *ClientConn) error) error {
Expand Down
25 changes: 18 additions & 7 deletions pkg/index/job/readreplica/rotate/service/rotator.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ func (r *rotator) rotate(ctx context.Context) error {
return err
}

newPvc, oldPvc, err := r.createPVC(ctx, newSnap.Name)
newPvc, oldPvc, err := r.createPVC(ctx, newSnap.GetName())
if err != nil {
log.Infof("failed to create PVC. removing the new snapshot(%v)...", newSnap.Name)
log.Errorf("failed to create PVC. removing the new snapshot(%s)...", newSnap.GetName())
if dserr := r.deleteSnapshot(ctx, newSnap); dserr != nil {
errors.Join(err, dserr)
}
Expand All @@ -125,7 +125,7 @@ func (r *rotator) rotate(ctx context.Context) error {

err = r.updateDeployment(ctx, newPvc.GetName())
if err != nil {
log.Infof("failed to update Deployment. removing the new snapshot(%v) and pvc(%v)...", newSnap.Name, newPvc.Name)
log.Errorf("failed to update Deployment. removing the new snapshot(%s) and pvc(%s)...", newSnap.GetName(), newPvc.GetName())
if dperr := r.deletePVC(ctx, newPvc); dperr != nil {
errors.Join(err, dperr)
}
Expand Down Expand Up @@ -172,6 +172,9 @@ func (r *rotator) createSnapshot(ctx context.Context) (newSnap, oldSnap *client.
Spec: cur.Spec,
}

log.Infof("creating new snapshot(%s)...", newSnap.GetName())
log.Debugf("snapshot detail: %#v", newSnap)

err = r.client.Create(ctx, newSnap)
if err != nil {
return nil, nil, fmt.Errorf("failed to create snapshot: %w", err)
Expand Down Expand Up @@ -214,6 +217,9 @@ func (r *rotator) createPVC(ctx context.Context, newSnapShot string) (newPvc, ol
},
}

log.Infof("creating new pvc(%s)...", newPvc.GetName())
log.Debugf("pvc detail: %#v", newPvc)

if err := r.client.Create(ctx, newPvc); err != nil {
return nil, nil, fmt.Errorf("failed to create PVC(%s): %w", newPvc.GetName(), err)
}
Expand Down Expand Up @@ -242,6 +248,9 @@ func (r *rotator) updateDeployment(ctx context.Context, newPVC string) error {
}
}

log.Infof("updating deployment(%s)...", deployment.GetName())
log.Debugf("deployment detail: %#v", deployment)

if err := r.client.Update(ctx, &deployment); err != nil {
return fmt.Errorf("failed to update deployment: %w", err)
}
Expand All @@ -263,17 +272,18 @@ func (r *rotator) deleteSnapshot(ctx context.Context, snapshot *snapshotv1.Volum

eg, egctx := errgroup.New(ctx)
eg.Go(func() error {
log.Infof("deleting volume snapshot(%v)...", snapshot.GetName())
log.Infof("deleting volume snapshot(%s)...", snapshot.GetName())
log.Debugf("volume snapshot detail: %#v", snapshot)
for {
select {
case <-egctx.Done():
return egctx.Err()
case event := <-watcher.ResultChan():
if event.Type == client.WatchDeletedEvent {
log.Infof("volume snapshot(%v) deleted", snapshot.GetName())
log.Infof("volume snapshot(%s) deleted", snapshot.GetName())
return nil
} else {
log.Debugf("waching volume snapshot(%s) events. event: ", snapshot.GetName(), event.Type)
log.Debugf("watching volume snapshot(%s) events. event: %v", snapshot.GetName(), event.Type)
}
}
}
Expand All @@ -300,6 +310,7 @@ func (r *rotator) deletePVC(ctx context.Context, pvc *v1.PersistentVolumeClaim)
eg, egctx := errgroup.New(ctx)
eg.Go(func() error {
log.Infof("deleting PVC(%s)...", pvc.GetName())
log.Debugf("PVC detail: %#v", pvc)
for {
select {
case <-egctx.Done():
Expand All @@ -309,7 +320,7 @@ func (r *rotator) deletePVC(ctx context.Context, pvc *v1.PersistentVolumeClaim)
log.Infof("PVC(%s) deleted", pvc.GetName())
return nil
} else {
log.Debugf("waching PVC(%s) events. event: %v", pvc.GetName(), event.Type)
log.Debugf("watching PVC(%s) events. event: %v", pvc.GetName(), event.Type)
}
}
}
Expand Down

0 comments on commit 7d40ddf

Please sign in to comment.