Skip to content

Commit

Permalink
Merge branch 'main' into feature/internal-net/support-quic
Browse files Browse the repository at this point in the history
Signed-off-by: kpango <[email protected]>
  • Loading branch information
kpango committed Oct 31, 2024
2 parents 2c01e8a + 467ab10 commit a713ffe
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 29 deletions.
63 changes: 42 additions & 21 deletions internal/client/v1/client/discoverer/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,28 +98,48 @@ func (c *client) Start(ctx context.Context) (<-chan error, error) {
}
}

addrs, err := c.dnsDiscovery(ctx)
if err != nil {
return nil, err
addrs, err :=c.updateDiscoveryInfo(ctx)
if err != nil || len(addrs) == 0 {
addrs, err = c.dnsDiscovery(ctx)
if err != nil {
return nil, err
}
}
c.addrs.Store(&addrs)

var aech <-chan error
if c.autoconn {
c.client = grpc.New(
append(
c.opts,
grpc.WithAddrs(addrs...),
grpc.WithErrGroup(c.eg),
)...,
)
if c.client != nil {
aech, err = c.client.StartConnectionMonitor(ctx)
if c.client == nil {
c.client = grpc.New(
append(
c.opts,
grpc.WithAddrs(addrs...),
grpc.WithErrGroup(c.eg),
)...,
)
aech, err = c.client.StartConnectionMonitor(ctx)
if err != nil {
return nil, err
}
for _, addr := range addrs {
if c.onConnect != nil {
err = c.onConnect(ctx, c, addr)
if err != nil {
return nil, err
}
}
}
} else {
for _, addr := range addrs {
err = c.connect(ctx, addr)
if err != nil {
return nil, err
}
}
aech, err = c.client.StartConnectionMonitor(ctx)
}
if err != nil {
return nil, err
}
c.addrs.Store(&addrs)

err = c.discover(ctx)
if err != nil {
Expand Down Expand Up @@ -211,7 +231,7 @@ func (c *client) GetReadClient() grpc.Client {

func (c *client) connect(ctx context.Context, addr string) (err error) {
if c.autoconn && c.client != nil {
_, err = c.client.Connect(ctx, addr)
_, err = c.client.Connect(ctx, addr, c.client.GetDialOption()...)
if err != nil {
return err
}
Expand Down Expand Up @@ -379,12 +399,13 @@ func (c *client) discoverAddrs(
len(node.GetPods().GetPods()) > i &&
len(node.GetPods().GetPods()[i].GetIp()) != 0 {
addr := net.JoinHostPort(node.GetPods().GetPods()[i].GetIp(), uint16(c.port))
if err = c.connect(ctx, addr); err != nil {
log.Error(errors.ErrAddrCouldNotDiscover(err, addr))
err = nil
} else {
addrs = append(addrs, addr)
}
if err = c.connect(ctx, addr); err != nil {
log.Debugf("resource based discovery connect from discoverer API for addr = %s failed %v", addr, errors.ErrAddrCouldNotDiscover(err, addr))
err = nil
} else {
addrs = append(addrs, addr)
}

}
}
}
Expand Down
14 changes: 6 additions & 8 deletions pkg/gateway/lb/handler/grpc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1904,13 +1904,12 @@ func (s *server) MultiUpdate(
}

if errs != nil {
st, _ := status.FromError(err)
st, _ := status.FromError(errs)
if st != nil && span != nil {
span.RecordError(err)
span.RecordError(errs)
span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...)
span.SetStatus(trace.StatusError, err.Error())
span.SetStatus(trace.StatusError, errs.Error())
}
errs = err
}

return locs, errs
Expand Down Expand Up @@ -2735,13 +2734,12 @@ func (s *server) MultiRemove(
}

if errs != nil {
st, _ := status.FromError(err)
st, _ := status.FromError(errs)
if st != nil && span != nil {
span.RecordError(err)
span.RecordError(errs)
span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...)
span.SetStatus(trace.StatusError, err.Error())
span.SetStatus(trace.StatusError, errs.Error())
}
errs = err
}

return locs, errs
Expand Down

0 comments on commit a713ffe

Please sign in to comment.