Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[patch] Bug fix on StreamGetObject API and DNS cache expiration / refactor net connection #986

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f4e0ad2
refactor net connection
kpango Feb 5, 2021
47cef3f
close unclosed channel
kpango Feb 5, 2021
e659767
fix
kpango Feb 5, 2021
1c68e4e
fix failing test & add insecure option for user
kpango Feb 5, 2021
58d6c07
bugfix
kpango Feb 5, 2021
ccb25cb
add custom connection dialer to k8s
kpango Feb 5, 2021
8abb1a0
fix
kpango Feb 5, 2021
ab422a4
fix e2e test conparator for v1 API
kpango Feb 6, 2021
91c5c20
fix
kpango Feb 6, 2021
f63c78d
fix
kpango Feb 6, 2021
b480b95
fix useless schema annotation for helm charts
kpango Feb 8, 2021
0c0bf84
Merge branch 'master' into refactor/internal-net/more-controllable-tc…
kpango Feb 8, 2021
44f6197
Merge branch 'master' into refactor/internal-net/more-controllable-tc…
kpango Feb 8, 2021
02f575a
change CreateIndex precondition failure log level to Warn from Error …
kpango Feb 8, 2021
7de5261
Merge branch 'refactor/internal-net/more-controllable-tcp-handshake-a…
kpango Feb 8, 2021
5664078
Merge branch 'master' into refactor/internal-net/more-controllable-tc…
kpango Feb 8, 2021
46e67d1
Merge branch 'master' into refactor/internal-net/more-controllable-tc…
kpango Feb 9, 2021
52cb14e
Merge branch 'master' into refactor/internal-net/more-controllable-tc…
kpango Feb 9, 2021
b5ab14e
add generated test & update helm schema
kpango Feb 9, 2021
22987df
update golang version to 1.15.8 inclueds cgo&net/http bugfixes
kpango Feb 9, 2021
2ea7cff
fix incorrect helm resource path
kpango Feb 9, 2021
c2c6b4b
bugfix: add priorityClass namespace to each gateways
kpango Feb 9, 2021
fc4805b
extend wait duration for e2e deploy test
kpango Feb 9, 2021
2d43482
fix pointed out from vankichi
kpango Feb 9, 2021
f37f012
fix failed test
kpango Feb 9, 2021
d0c1eed
format
kpango Feb 9, 2021
f2385ee
Update internal/tls/option_test.go
kpango Feb 9, 2021
791896d
Merge branch 'master' into refactor/internal-net/more-controllable-tc…
kpango Feb 9, 2021
06aa294
fix failed test
kpango Feb 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions internal/config/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
// Package config providers configuration type and load configuration logic
package config

import "fmt"
import (
"strconv"

"github.com/vdaas/vald/internal/net"
)

type Meta struct {
Host string `json:"host" yaml:"host"`
Expand All @@ -39,7 +43,7 @@ func (m *Meta) Bind() *Meta {
m.Client = newGRPCClientConfig()
}
if len(m.Host) != 0 {
m.Client.Addrs = append(m.Client.Addrs, fmt.Sprintf("%s:%d", m.Host, m.Port))
m.Client.Addrs = append(m.Client.Addrs, net.JoinHostPort(m.Host, strconv.FormatInt(int64(m.Port), 10)))
}
return m
}
3 changes: 3 additions & 0 deletions internal/config/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type TCP struct {
type Dialer struct {
Timeout string `yaml:"timeout" json:"timeout"`
KeepAlive string `yaml:"keep_alive" json:"keep_alive"`
FallbackDelay string `yaml:"fallback_delay" json:"fallback_delay"`
DualStackEnabled bool `yaml:"dual_stack_enabled" json:"dual_stack_enabled"`
}

Expand All @@ -50,6 +51,7 @@ func (d *DNS) Bind() *DNS {
func (d *Dialer) Bind() *Dialer {
d.Timeout = GetActualValue(d.Timeout)
d.KeepAlive = GetActualValue(d.KeepAlive)
d.FallbackDelay = GetActualValue(d.FallbackDelay)
return d
}

Expand Down Expand Up @@ -84,6 +86,7 @@ func (t *TCP) Opts() []tcp.DialerOption {
opts = append(opts,
tcp.WithDialerKeepAlive(t.Dialer.KeepAlive),
tcp.WithDialerTimeout(t.Dialer.Timeout),
tcp.WithDialerFallbackDelay(t.Dialer.FallbackDelay),
)
if t.Dialer.DualStackEnabled {
opts = append(opts,
Expand Down
5 changes: 5 additions & 0 deletions internal/db/nosql/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/scylladb/gocqlx/qb"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/net/tcp"
)

var (
Expand Down Expand Up @@ -156,6 +157,7 @@ type (
connectObserver ConnectObserver
frameHeaderObserver FrameHeaderObserver
defaultIdempotence bool
rawDialer tcp.Dialer
dialer gocql.Dialer
writeCoalesceWaitTime time.Duration

Expand Down Expand Up @@ -305,6 +307,9 @@ func (c *client) Open(ctx context.Context) (err error) {
log.Debugf("failed to create session %#v", c)
return errors.ErrCassandraFailedToCreateSession(err, c.hosts, c.port, c.cqlVersion)
}
if c.rawDialer != nil {
c.rawDialer.StartDialerCache(ctx)
}
return nil
}

Expand Down
5 changes: 5 additions & 0 deletions internal/db/nosql/cassandra/cassandra_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ func (dm *DialerMock) DialContext(ctx context.Context, network, addr string) (ne
return dm.DialContextFunc(ctx, network, addr)
}

func (dm *DialerMock) GetDialer() func(ctx context.Context, network, addr string) (net.Conn, error) {
return dm.DialContextFunc
}
func (dm *DialerMock) StartDialerCache(ctx context.Context) {}

func TestMockClusterConfig_CreateSession(t *testing.T) {
t.Parallel()
type fields struct {
Expand Down
4 changes: 3 additions & 1 deletion internal/db/nosql/cassandra/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/gocql/gocql"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/net/tcp"
"github.com/vdaas/vald/internal/timeutil"
)

Expand Down Expand Up @@ -79,11 +80,12 @@ func WithHosts(hosts ...string) Option {
}

// WithDialer returns the option to set the dialer.
func WithDialer(der gocql.Dialer) Option {
func WithDialer(der tcp.Dialer) Option {
return func(c *client) error {
if der == nil {
return errors.NewErrInvalidOption("dialer", der)
}
c.rawDialer = der
c.dialer = der
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion internal/db/nosql/cassandra/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/gocql/gocql"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/net/tcp"
"github.com/vdaas/vald/internal/test/comparator"
"go.uber.org/goleak"
)
Expand Down Expand Up @@ -147,7 +148,7 @@ func TestWithHosts(t *testing.T) {
func TestWithDialer(t *testing.T) {
type T = client
type args struct {
der gocql.Dialer
der tcp.Dialer
}
type want struct {
obj *T
Expand Down
6 changes: 6 additions & 0 deletions internal/net/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/net/grpc/pool"
"github.com/vdaas/vald/internal/net/tcp"
"github.com/vdaas/vald/internal/observability/trace"
"github.com/vdaas/vald/internal/safety"
"github.com/vdaas/vald/internal/singleflight"
Expand Down Expand Up @@ -93,6 +94,7 @@ type gRPCClient struct {
conns grpcConns
hcDur time.Duration
prDur time.Duration
dialer tcp.Dialer
enablePoolRebalance bool
resolveDNS bool
dopts []DialOption
Expand Down Expand Up @@ -138,6 +140,10 @@ func (g *gRPCClient) StartConnectionMonitor(ctx context.Context) (<-chan error,
return nil, errors.ErrGRPCTargetAddrNotFound
}

if g.dialer != nil {
g.dialer.StartDialerCache(ctx)
}

ech := make(chan error, len(addrs))
for _, addr := range addrs {
if len(addr) != 0 {
Expand Down
1 change: 1 addition & 0 deletions internal/net/grpc/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ func WithKeepaliveParams(t, to string, permitWithoutStream bool) Option {
func WithDialer(der tcp.Dialer) Option {
return func(g *gRPCClient) {
if der != nil {
g.dialer = der
g.dopts = append(g.dopts,
grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
return der.GetDialer()(ctx, "tcp", addr)
Expand Down
12 changes: 7 additions & 5 deletions internal/net/grpc/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"fmt"
"sort"
"strconv"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -97,7 +98,7 @@ func New(ctx context.Context, opts ...Option) (c Conn, err error) {
if err != nil {
return nil, err
}
p.addr = fmt.Sprintf("%s:%d", p.host, p.port)
p.addr = net.JoinHostPort(p.host, strconv.FormatInt(int64(p.port), 10))
}

conn, err := grpc.DialContext(ctx, p.addr, p.dopts...)
Expand All @@ -107,7 +108,7 @@ func New(ctx context.Context, opts ...Option) (c Conn, err error) {
if err != nil {
return nil, err
}
p.addr = fmt.Sprintf("%s:%d", p.host, p.port)
p.addr = net.JoinHostPort(p.host, strconv.FormatInt(int64(p.port), 10))
}
if conn != nil {
err = conn.Close()
Expand Down Expand Up @@ -144,7 +145,7 @@ func (p *pool) Connect(ctx context.Context) (c Conn, err error) {
default:
var (
conn *ClientConn
addr = fmt.Sprintf("%s:%d", ips[i%len(ips)], p.port)
addr = net.JoinHostPort(ips[i%len(ips)], strconv.FormatInt(int64(p.port), 10))
pc, ok = p.load(i)
)
if ok && pc != nil && pc.addr == addr && isHealthy(pc.conn) {
Expand Down Expand Up @@ -387,7 +388,7 @@ func (p *pool) lookupIPAddr(ctx context.Context) (ips []string, err error) {
ipStr = fmt.Sprintf("[%s]", ipStr)
}
var conn net.Conn
addr := fmt.Sprintf("%s:%d", ipStr, p.port)
addr := net.JoinHostPort(ipStr, strconv.FormatInt(int64(p.port), 10))
if net.DefaultResolver.Dial != nil {
ctx, cancel := context.WithTimeout(ctx, time.Millisecond*10)
conn, err = net.DefaultResolver.Dial(ctx, network, addr)
Expand Down Expand Up @@ -470,7 +471,8 @@ func (p *pool) scanGRPCPort(ctx context.Context) (err error) {
func isGRPCPort(ctx context.Context, host string, port uint16) bool {
ctx, cancel := context.WithTimeout(ctx, time.Millisecond*5)
defer cancel()
conn, err := grpc.DialContext(ctx, fmt.Sprintf("%s:%d", host, port),
conn, err := grpc.DialContext(ctx,
net.JoinHostPort(host, strconv.FormatInt(int64(port), 10)),
grpc.WithInsecure(),
grpc.WithBlock(),
)
Expand Down
14 changes: 10 additions & 4 deletions internal/net/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package net

import (
"context"
"fmt"
"net"
"strconv"
"strings"
Expand All @@ -29,6 +28,7 @@ import (
"github.com/vdaas/vald/internal/errgroup"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/safety"
)

const (
Expand Down Expand Up @@ -97,6 +97,11 @@ func IsIPv4(addr string) bool {
return net.ParseIP(addr) != nil && strings.Count(addr, ":") < 2
}

// JoinHostPort joins the host/IP address and the port number,
func JoinHostPort(host, port string) string {
return net.JoinHostPort(host, port)
}

// SplitHostPort splits the address, and return the host/IP address and the port number,
// and any error occurred.
// If it is the loopback address, it will return the loopback address and corresponding port number.
Expand Down Expand Up @@ -136,14 +141,15 @@ func ScanPorts(ctx context.Context, start, end uint16, host string) (ports []uin

var mu sync.Mutex

var der net.Dialer
for i := start; i >= start && i <= end; i++ {
port := i
eg.Go(func() error {
eg.Go(safety.RecoverFunc(func() error {
kpango marked this conversation as resolved.
Show resolved Hide resolved
select {
case <-egctx.Done():
return egctx.Err()
default:
conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", host, port))
conn, err := der.DialContext(ctx, "tcp", JoinHostPort(host, strconv.FormatInt(int64(port), 10)))
if err != nil {
log.Warn(err)
return nil
Expand All @@ -158,7 +164,7 @@ func ScanPorts(ctx context.Context, start, end uint16, host string) (ports []uin
}
return nil
}
})
}))
}

if err = eg.Wait(); err != nil {
Expand Down
Loading