Skip to content

Commit

Permalink
Merge branch 'main' into refactor/main/for-release-v1.7.14
Browse files Browse the repository at this point in the history
  • Loading branch information
kpango authored Sep 24, 2024
2 parents bf09110 + 686c16a commit 0d3d09a
Show file tree
Hide file tree
Showing 7 changed files with 231 additions and 61 deletions.
193 changes: 162 additions & 31 deletions charts/vald/templates/gateway/ing.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
{{- $filter := .Values.gateway.filter -}}
{{- $filterIngEnabled := and $filter.enabled $filter.ingress.enabled -}}
{{- $mirror := .Values.gateway.mirror -}}
{{- $mirrorIngEnabled := and $mirror.enabled $mirror.ingress.enabled -}}
{{- $lb := .Values.gateway.lb -}}
{{- $lbIngEnabled := and $lb.enabled $lb.ingress.enabled -}}
{{- $gateway := "" -}}
{{- $gatewayName := "" -}}
{{- $reflectionEnabled := .Values.defaults.server_config.servers.grpc.server.grpc.enable_reflection -}}
{{- $filter := .Values.gateway.filter -}}
{{- $filterIngEnabled := and $filter.enabled $filter.ingress.enabled -}}
{{- $filterReflectionEnabled := and $filterIngEnabled (default $reflectionEnabled $filter.server_config.servers.grpc.enable_reflection) -}}
{{- $mirror := .Values.gateway.mirror -}}
{{- $mirrorIngEnabled := and $mirror.enabled $mirror.ingress.enabled -}}
{{- $lb := .Values.gateway.lb -}}
{{- $lbIngEnabled := and $lb.enabled $lb.ingress.enabled -}}
{{- $lbReflectionEnabled := and $lbIngEnabled (default $reflectionEnabled $lb.server_config.servers.grpc.enable_reflection) -}}
{{- $gateway := "" -}}
{{- $gatewayName := "" -}}
{{- if or $filterIngEnabled $mirrorIngEnabled $lbIngEnabled }}
{{- if $filterIngEnabled }}
{{- $gateway = $filter -}}
Expand Down Expand Up @@ -62,125 +65,253 @@ spec:
- host: {{ $gateway.ingress.host }}
http:
paths:
{{- if and $mirrorIngEnabled $filterIngEnabled $lb.enabled }}
- path: "/vald.v1.Search"
{{- if and $mirrorIngEnabled $filterIngEnabled $lb.enabled }}
- path: "/vald.v1.Search/"
backend:
service:
name: {{ $filter.name }}
{{- include "vald.ingressPort" (dict "Values" $filter.ingress) | nindent 12 }}
pathType: {{ $filter.ingress.pathType }}
- path: "/vald.v1.Insert"
- path: "/vald.v1.Insert/"
backend:
service:
name: {{ $filter.name }}
{{- include "vald.ingressPort" (dict "Values" $filter.ingress) | nindent 12 }}
pathType: {{ $filter.ingress.pathType }}
- path: "/vald.v1.Update"
# NOTE: Change backend service to mirror after UpdateTimestamp is implemented in mirror.
- path: "/vald.v1.Update/UpdateTimestamp"
backend:
service:
name: {{ $lb.name }}
{{- include "vald.ingressPort" (dict "Values" $lb.ingress) | nindent 12 }}
pathType: {{ $lb.ingress.pathType }}
- path: "/vald.v1.Update/"
backend:
service:
name: {{ $filter.name }}
{{- include "vald.ingressPort" (dict "Values" $filter.ingress) | nindent 12 }}
pathType: {{ $filter.ingress.pathType }}
- path: "/vald.v1.Upsert"
- path: "/vald.v1.Upsert/"
backend:
service:
name: {{ $filter.name }}
{{- include "vald.ingressPort" (dict "Values" $filter.ingress) | nindent 12 }}
pathType: {{ $filter.ingress.pathType }}
- path: "/vald.v1.Index"
- path: "/vald.v1.Remove/"
backend:
service:
name: {{ $mirror.name }}
{{- include "vald.ingressPort" (dict "Values" $mirror.ingress) | nindent 12 }}
pathType: {{ $mirror.ingress.pathType }}
- path: "/vald.v1.Object/Exists"
backend:
service:
name: {{ $lb.name }}
{{- include "vald.ingressPort" (dict "Values" $lb.ingress) | nindent 12 }}
pathType: {{ $lb.ingress.pathType }}
- path: "/vald.v1.Object.Exists"
- path: "/vald.v1.Object/GetTimestamp"
backend:
service:
name: {{ $lb.name }}
{{- include "vald.ingressPort" (dict "Values" $lb.ingress) | nindent 12 }}
pathType: {{ $lb.ingress.pathType }}
- path: "/vald.v1.Object.GetTimestamp"
- path: "/vald.v1.Object/"
backend:
service:
name: {{ $filter.name }}
{{- include "vald.ingressPort" (dict "Values" $filter.ingress) | nindent 12 }}
pathType: {{ $filter.ingress.pathType }}
- path: "/vald.v1.Index/"
backend:
service:
name: {{ $lb.name }}
{{- include "vald.ingressPort" (dict "Values" $lb.ingress) | nindent 12 }}
pathType: {{ $lb.ingress.pathType }}
- path: "/vald.v1.Object"
# NOTE: Change backend service to mirror after Flush is implemented in mirror.
- path: "/vald.v1.Flush/"
backend:
service:
name: {{ $filter.name }}
name: {{ $lb.name }}
{{- include "vald.ingressPort" (dict "Values" $lb.ingress) | nindent 12 }}
pathType: {{ $filter.ingress.pathType }}
- backend:
pathType: {{ $lb.ingress.pathType }}
- path: "/mirror.v1.Mirror/Register"
backend:
service:
name: {{ $mirror.name }}
{{- include "vald.ingressPort" (dict "Values" $mirror.ingress) | nindent 12 }}
pathType: {{ $mirror.ingress.pathType }}
- path: "/vald.v1.Filter/"
backend:
service:
name: {{ $filter.name }}
{{- include "vald.ingressPort" (dict "Values" $filter.ingress) | nindent 12 }}
pathType: {{ $filter.ingress.pathType }}
{{- else if and $filterIngEnabled $lb.enabled }}
- path: "/vald.v1.Index"
- path: "/vald.v1.Search/"
backend:
service:
name: {{ $filter.name }}
{{- include "vald.ingressPort" (dict "Values" $filter.ingress) | nindent 12 }}
pathType: {{ $filter.ingress.pathType }}
- path: "/vald.v1.Insert/"
backend:
service:
name: {{ $filter.name }}
{{- include "vald.ingressPort" (dict "Values" $filter.ingress) | nindent 12 }}
pathType: {{ $filter.ingress.pathType }}
- path: "/vald.v1.Update/"
backend:
service:
name: {{ $filter.name }}
{{- include "vald.ingressPort" (dict "Values" $filter.ingress) | nindent 12 }}
pathType: {{ $filter.ingress.pathType }}
- path: "/vald.v1.Upsert/"
backend:
service:
name: {{ $filter.name }}
{{- include "vald.ingressPort" (dict "Values" $filter.ingress) | nindent 12 }}
pathType: {{ $filter.ingress.pathType }}
- path: "/vald.v1.Remove/"
backend:
service:
name: {{ $lb.name }}
{{- include "vald.ingressPort" (dict "Values" $lb.ingress) | nindent 12 }}
pathType: {{ $lb.ingress.pathType }}
- path: "/vald.v1.Index/"
backend:
service:
name: {{ $lb.name }}
{{- include "vald.ingressPort" (dict "Values" $lb.ingress) | nindent 12 }}
pathType: {{ $lb.ingress.pathType }}
- path: "/vald.v1.Object/Exists"
backend:
service:
name: {{ $lb.name }}
{{- include "vald.ingressPort" (dict "Values" $lb.ingress) | nindent 12 }}
pathType: {{ $lb.ingress.pathType }}
- path: "/vald.v1.Object.Exists"
- path: "/vald.v1.Object/GetTimestamp"
backend:
service:
name: {{ $lb.name }}
{{- include "vald.ingressPort" (dict "Values" $lb.ingress) | nindent 12 }}
pathType: {{ $lb.ingress.pathType }}
- path: "/vald.v1.Object.GetTimestamp"
- path: "/vald.v1.Object/"
backend:
service:
name: {{ $filter.name }}
{{- include "vald.ingressPort" (dict "Values" $filter.ingress) | nindent 12 }}
pathType: {{ $filter.ingress.pathType }}
- path: "/vald.v1.Flush/"
backend:
service:
name: {{ $lb.name }}
{{- include "vald.ingressPort" (dict "Values" $lb.ingress) | nindent 12 }}
pathType: {{ $lb.ingress.pathType }}
- backend:
- path: "/vald.v1.Filter/"
backend:
service:
name: {{ $filter.name }}
{{- include "vald.ingressPort" (dict "Values" $filter.ingress) | nindent 12 }}
pathType: {{ $filter.ingress.pathType }}
{{- else if and $mirrorIngEnabled $lb.enabled }}
- path: "/vald.v1.Search"
- path: "/vald.v1.Search/"
backend:
service:
name: {{ $lb.name }}
{{- include "vald.ingressPort" (dict "Values" $lb.ingress) | nindent 12 }}
pathType: {{ $lb.ingress.pathType }}
- path: "/vald.v1.Index"
- path: "/vald.v1.Insert/"
backend:
service:
name: {{ $mirror.name }}
{{- include "vald.ingressPort" (dict "Values" $mirror.ingress) | nindent 12 }}
pathType: {{ $mirror.ingress.pathType }}
# NOTE: Change backend service to mirror after UpdateTimestamp is implemented in mirror.
- path: "/vald.v1.Update/UpdateTimestamp"
backend:
service:
name: {{ $lb.name }}
{{- include "vald.ingressPort" (dict "Values" $lb.ingress) | nindent 12 }}
pathType: {{ $lb.ingress.pathType }}
- path: "/vald.v1.Object.Exists"
- path: "/vald.v1.Update/"
backend:
service:
name: {{ $mirror.name }}
{{- include "vald.ingressPort" (dict "Values" $mirror.ingress) | nindent 12 }}
pathType: {{ $mirror.ingress.pathType }}
- path: "/vald.v1.Upsert/"
backend:
service:
name: {{ $mirror.name }}
{{- include "vald.ingressPort" (dict "Values" $mirror.ingress) | nindent 12 }}
pathType: {{ $mirror.ingress.pathType }}
- path: "/vald.v1.Remove/"
backend:
service:
name: {{ $mirror.name }}
{{- include "vald.ingressPort" (dict "Values" $mirror.ingress) | nindent 12 }}
pathType: {{ $mirror.ingress.pathType }}
- path: "/vald.v1.Object/"
backend:
service:
name: {{ $lb.name }}
{{- include "vald.ingressPort" (dict "Values" $lb.ingress) | nindent 12 }}
pathType: {{ $lb.ingress.pathType }}
- path: "/vald.v1.Object.GetTimestamp"
- path: "/vald.v1.Index/"
backend:
service:
name: {{ $lb.name }}
{{- include "vald.ingressPort" (dict "Values" $lb.ingress) | nindent 12 }}
pathType: {{ $lb.ingress.pathType }}
- path: "/vald.v1.Object"
# NOTE: Change backend service to mirror after Flush is implemented in mirror.
- path: "/vald.v1.Flush/"
backend:
service:
name: {{ $lb.name }}
{{- include "vald.ingressPort" (dict "Values" $lb.ingress) | nindent 12 }}
pathType: {{ $lb.ingress.pathType }}
- backend:
- path: "/mirror.v1.Mirror/Register"
backend:
service:
name: {{ $mirror.name }}
{{- include "vald.ingressPort" (dict "Values" $mirror.ingress) | nindent 12 }}
pathType: {{ $mirror.ingress.pathType }}
{{- else if $lbIngEnabled }}
- backend:
- path: "/"
backend:
service:
name: {{ $lb.name }}
{{- include "vald.ingressPort" (dict "Values" $lb.ingress) | nindent 12 }}
pathType: {{ $lb.ingress.pathType }}
{{- end }}
{{- if or $filterReflectionEnabled $lbReflectionEnabled }}
- path: "/grpc.reflection.v1alpha.ServerReflection/ServerReflectionInfo"
{{- if $filterReflectionEnabled }}
backend:
service:
name: {{ $filter.name }}
{{- include "vald.ingressPort" (dict "Values" $filter.ingress) | nindent 12 }}
pathType: {{ $filter.ingress.pathType }}
{{- else }}
backend:
service:
name: {{ $lb.name }}
{{- include "vald.ingressPort" (dict "Values" $lb.ingress) | nindent 12 }}
pathType: {{ $lb.ingress.pathType }}
{{- end }}
- path: "/grpc.reflection.v1.ServerReflection/ServerReflectionInfo"
{{- if $filterReflectionEnabled }}
backend:
service:
name: {{ $filter.name }}
{{- include "vald.ingressPort" (dict "Values" $filter.ingress) | nindent 12 }}
pathType: {{ $filter.ingress.pathType }}
{{- else }}
backend:
service:
name: {{ $lb.name }}
{{- include "vald.ingressPort" (dict "Values" $lb.ingress) | nindent 12 }}
pathType: {{ $lb.ingress.pathType }}
{{- end }}
{{- end }}
{{- end }}
54 changes: 34 additions & 20 deletions internal/net/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,30 +87,32 @@ type Client interface {
GetDialOption() []DialOption
GetCallOption() []CallOption
GetBackoff() backoff.Backoff
SetDisableResolveDNSAddr(addr string, disabled bool)
ConnectedAddrs() []string
Close(ctx context.Context) error
}

type gRPCClient struct {
addrs map[string]struct{}
poolSize uint64
clientCount uint64
conns sync.Map[string, pool.Conn]
hcDur time.Duration
prDur time.Duration
dialer net.Dialer
enablePoolRebalance bool
resolveDNS bool
dopts []DialOption
copts []CallOption
roccd string // reconnection old connection closing duration
eg errgroup.Group
bo backoff.Backoff
cb circuitbreaker.CircuitBreaker
gbo gbackoff.Config // grpc's original backoff configuration
mcd time.Duration // minimum connection timeout duration
group singleflight.Group[pool.Conn]
crl sync.Map[string, bool] // connection request list
addrs map[string]struct{}
poolSize uint64
clientCount uint64
conns sync.Map[string, pool.Conn]
hcDur time.Duration
prDur time.Duration
dialer net.Dialer
enablePoolRebalance bool
disableResolveDNSAddrs sync.Map[string, bool]
resolveDNS bool
dopts []DialOption
copts []CallOption
roccd string // reconnection old connection closing duration
eg errgroup.Group
bo backoff.Backoff
cb circuitbreaker.CircuitBreaker
gbo gbackoff.Config // grpc's original backoff configuration
mcd time.Duration // minimum connection timeout duration
group singleflight.Group[pool.Conn]
crl sync.Map[string, bool] // connection request list

ech <-chan error
monitorRunning atomic.Bool
Expand Down Expand Up @@ -946,6 +948,12 @@ func (g *gRPCClient) GetBackoff() backoff.Backoff {
return g.bo
}

func (g *gRPCClient) SetDisableResolveDNSAddr(addr string, disabled bool) {
// NOTE: When connecting to multiple locations, it was necessary to switch dynamically, so implementation was added.
// There is no setting for disable on the helm chart side, so I used this implementation.
g.disableResolveDNSAddrs.Store(addr, disabled)
}

func (g *gRPCClient) Connect(
ctx context.Context, addr string, dopts ...DialOption,
) (conn pool.Conn, err error) {
Expand Down Expand Up @@ -975,7 +983,13 @@ func (g *gRPCClient) Connect(
pool.WithAddr(addr),
pool.WithSize(g.poolSize),
pool.WithDialOptions(append(g.dopts, dopts...)...),
pool.WithResolveDNS(g.resolveDNS),
pool.WithResolveDNS(func() bool {
disabled, ok := g.disableResolveDNSAddrs.Load(addr)
if ok && disabled {
return false
}
return g.resolveDNS
}()),
}
if g.bo != nil {
opts = append(opts, pool.WithBackoff(g.bo))
Expand Down
Loading

0 comments on commit 0d3d09a

Please sign in to comment.