From 9f04adfe161e2a4b94e8b0569e8f7e165fc8fa4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A5=9E=E6=A5=BD=E5=9D=82=C2=B7=E5=96=B5?= Date: Sun, 8 Sep 2024 22:13:06 +0800 Subject: [PATCH] feat: allow group override global node connectivity check (#623) Co-authored-by: mzz2017 <2017@duck.com> --- .../outbound/dialer/connectivity_check.go | 13 ++++ component/outbound/dialer/dialer.go | 25 ++++++++ config/config.go | 6 ++ config/desc.go | 5 ++ control/control_plane.go | 60 ++++++++++++++----- example.dae | 20 +++++++ 6 files changed, 115 insertions(+), 14 deletions(-) diff --git a/component/outbound/dialer/connectivity_check.go b/component/outbound/dialer/connectivity_check.go index 9e478a666..4d330e38b 100644 --- a/component/outbound/dialer/connectivity_check.go +++ b/component/outbound/dialer/connectivity_check.go @@ -19,6 +19,7 @@ import ( "strings" "sync" "time" + "unsafe" "github.com/daeuniverse/dae/common" @@ -452,6 +453,18 @@ func (d *Dialer) aliveBackground() { } } }() + var unused int + for _, opt := range CheckOpts { + if len(d.mustGetCollection(opt.networkType).AliveDialerSetSet) == 0 { + unused++ + } + } + if unused == len(CheckOpts) { + d.Log.WithField("dialer", d.Property().Name). + WithField("p", unsafe.Pointer(d)). + Traceln("cleaned up due to unused") + return + } var wg sync.WaitGroup for range d.checkCh { for _, opt := range CheckOpts { diff --git a/component/outbound/dialer/dialer.go b/component/outbound/dialer/dialer.go index 5d0870608..5be577cec 100644 --- a/component/outbound/dialer/dialer.go +++ b/component/outbound/dialer/dialer.go @@ -10,7 +10,10 @@ import ( "fmt" "sync" "time" + "unsafe" + "github.com/daeuniverse/dae/common" + "github.com/daeuniverse/dae/config" D "github.com/daeuniverse/outbound/dialer" "github.com/daeuniverse/outbound/netproxy" "github.com/sirupsen/logrus" @@ -60,6 +63,21 @@ type Property struct { type AliveDialerSetSet map[*AliveDialerSet]int +func NewGlobalOption(global *config.Global, log *logrus.Logger) *GlobalOption { + return &GlobalOption{ + ExtraOption: D.ExtraOption{ + AllowInsecure: global.AllowInsecure, + TlsImplementation: global.TlsImplementation, + UtlsImitate: global.UtlsImitate}, + Log: log, + TcpCheckOptionRaw: TcpCheckOptionRaw{Raw: global.TcpCheckUrl, Log: log, ResolverNetwork: common.MagicNetwork("udp", global.SoMarkFromDae, global.Mptcp), Method: global.TcpCheckHttpMethod}, + CheckDnsOptionRaw: CheckDnsOptionRaw{Raw: global.UdpCheckDns, ResolverNetwork: common.MagicNetwork("udp", global.SoMarkFromDae, global.Mptcp), Somark: global.SoMarkFromDae}, + CheckInterval: global.CheckInterval, + CheckTolerance: global.CheckTolerance, + CheckDnsTcp: true, + } +} + // NewDialer is for register in general. func NewDialer(dialer netproxy.Dialer, option *GlobalOption, iOption InstanceOption, property *Property) *Dialer { var collections [6]*collection @@ -80,9 +98,16 @@ func NewDialer(dialer netproxy.Dialer, option *GlobalOption, iOption InstanceOpt ctx: ctx, cancel: cancel, } + option.Log.WithField("dialer", d.Property().Name). + WithField("p", unsafe.Pointer(d)). + Traceln("NewDialer") return d } +func (d *Dialer) Clone() *Dialer { + return NewDialer(d.Dialer, d.GlobalOption, d.InstanceOption, d.property) +} + func (d *Dialer) Close() error { d.cancel() d.tickerMu.Lock() diff --git a/config/config.go b/config/config.go index 873278d70..85c2434bc 100644 --- a/config/config.go +++ b/config/config.go @@ -89,6 +89,12 @@ type Group struct { Filter [][]*config_parser.Function `mapstructure:"filter" repeatable:""` FilterAnnotation [][]*config_parser.Param `mapstructure:"_"` Policy FunctionListOrString `mapstructure:"policy" required:""` + + TcpCheckUrl []string `mapstructure:"tcp_check_url"` + TcpCheckHttpMethod string `mapstructure:"tcp_check_http_method"` + UdpCheckDns []string `mapstructure:"udp_check_dns"` + CheckInterval time.Duration `mapstructure:"check_interval"` + CheckTolerance time.Duration `mapstructure:"check_tolerance"` } type DnsRequestRouting struct { diff --git a/config/desc.go b/config/desc.go index 2440527cb..121bf6cba 100644 --- a/config/desc.go +++ b/config/desc.go @@ -85,4 +85,9 @@ min: Select node by the latency of last check. min_avg10: Select node by the average of latencies of last 10 checks. min_moving_avg: Select node by the moving average of latencies of checks, which means more recent latencies have higher weight. `, + "tcp_check_url": "Override global config.", + "tcp_check_http_method": "Override global config.", + "udp_check_dns": "Override global config.", + "check_interval": "Override global config.", + "check_tolerance": "Override global config.", } diff --git a/control/control_plane.go b/control/control_plane.go index 5cc190853..5da969184 100644 --- a/control/control_plane.go +++ b/control/control_plane.go @@ -33,7 +33,6 @@ import ( "github.com/daeuniverse/dae/config" "github.com/daeuniverse/dae/pkg/config_parser" internal "github.com/daeuniverse/dae/pkg/ebpf_internal" - D "github.com/daeuniverse/outbound/dialer" "github.com/daeuniverse/outbound/pool" "github.com/daeuniverse/outbound/protocol/direct" "github.com/daeuniverse/outbound/transport/grpc" @@ -256,18 +255,7 @@ func NewControlPlane( if global.AllowInsecure { log.Warnln("AllowInsecure is enabled, but it is not recommended. Please make sure you have to turn it on.") } - option := &dialer.GlobalOption{ - ExtraOption: D.ExtraOption{ - AllowInsecure: global.AllowInsecure, - TlsImplementation: global.TlsImplementation, - UtlsImitate: global.UtlsImitate}, - Log: log, - TcpCheckOptionRaw: dialer.TcpCheckOptionRaw{Raw: global.TcpCheckUrl, Log: log, ResolverNetwork: common.MagicNetwork("udp", global.SoMarkFromDae, global.Mptcp), Method: global.TcpCheckHttpMethod}, - CheckDnsOptionRaw: dialer.CheckDnsOptionRaw{Raw: global.UdpCheckDns, ResolverNetwork: common.MagicNetwork("udp", global.SoMarkFromDae, global.Mptcp), Somark: global.SoMarkFromDae}, - CheckInterval: global.CheckInterval, - CheckTolerance: global.CheckTolerance, - CheckDnsTcp: true, - } + option := dialer.NewGlobalOption(global, log) // Dial mode. dialMode, err := consts.ParseDialMode(global.DialMode) @@ -323,8 +311,22 @@ func NewControlPlane( if len(dialers) == 0 { log.Infoln("\t") } + groupOption, err := ParseGroupOverrideOption(group, *global, log) + finalOption := option + if err == nil && groupOption != nil { + newDialers := make([]*dialer.Dialer, 0) + for _, d := range dialers { + newDialer := d.Clone() + deferFuncs = append(deferFuncs, newDialer.Close) + newDialer.GlobalOption = groupOption + newDialers = append(newDialers, newDialer) + } + log.Infof(`Group "%v"'s check option has been override.`, group.Name) + dialers = newDialers + finalOption = groupOption + } // Create dialer group and append it to outbounds. - dialerGroup := outbound.NewDialerGroup(option, group.Name, dialers, annos, *policy, + dialerGroup := outbound.NewDialerGroup(finalOption, group.Name, dialers, annos, *policy, core.outboundAliveChangeCallback(uint8(len(outbounds)), disableKernelAliveCallback)) outbounds = append(outbounds, dialerGroup) } @@ -515,6 +517,36 @@ func ParseFixedDomainTtl(ks []config.KeyableString) (map[string]int, error) { return m, nil } +func ParseGroupOverrideOption(group config.Group, global config.Global, log *logrus.Logger) (*dialer.GlobalOption, error) { + result := global + changed := false + if group.TcpCheckUrl != nil { + result.TcpCheckUrl = group.TcpCheckUrl + changed = true + } + if group.TcpCheckHttpMethod != "" { + result.TcpCheckHttpMethod = group.TcpCheckHttpMethod + changed = true + } + if group.UdpCheckDns != nil { + result.UdpCheckDns = group.UdpCheckDns + changed = true + } + if group.CheckInterval != 0 { + result.CheckInterval = group.CheckInterval + changed = true + } + if group.CheckTolerance != 0 { + result.CheckTolerance = group.CheckTolerance + changed = true + } + if changed { + option := dialer.NewGlobalOption(&result, log) + return option, nil + } + return nil, nil +} + // EjectBpf will resect bpf from destroying life-cycle of control plane. func (c *ControlPlane) EjectBpf() *bpfObjects { return c.core.EjectBpf() diff --git a/example.dae b/example.dae index e1814e2b8..b894c35f8 100644 --- a/example.dae +++ b/example.dae @@ -40,6 +40,7 @@ global { auto_config_kernel_parameter: true ##### Node connectivity check. + # These options, as defaults, are effective when no definition is given in the group. # Host of URL should have both IPv4 and IPv6 if you have double stack in local. # First is URL, others are IP addresses if given. @@ -213,6 +214,24 @@ group { # Select the node with min average of the last 10 latencies from the group for every connection. policy: min_avg10 } + + steam { + # Filter nodes from the global node pool defined by the subscription and node section above. + filter: subtag(my_sub) && !name(keyword: 'ExpireAt:') + # Select the node with min moving average of latencies from the group for every connection. + policy: min_moving_avg + + # Override tcp_check_url in global. + tcp_check_url: 'http://test.steampowered.com' + # Override tcp_check_http_method in global + #tcp_check_http_method: HEAD + # Override udp_check_dns in global + #udp_check_dns: 'dns.google.com:53,8.8.8.8,2001:4860:4860::8888' + # Override check_interval in global + #check_interval: 30s + # Override check_tolerance in global + #check_tolerance: 50ms + } } # See https://github.com/daeuniverse/dae/blob/main/docs/en/configuration/routing.md for full examples. @@ -238,6 +257,7 @@ routing { l4proto(udp) && dport(443) -> block dip(geoip:cn) -> direct domain(geosite:cn) -> direct + fallback: my_group }