diff --git a/adapter/provider/healthcheck.go b/adapter/provider/healthcheck.go index 8b5e833851..8737ff96ae 100644 --- a/adapter/provider/healthcheck.go +++ b/adapter/provider/healthcheck.go @@ -27,6 +27,8 @@ type extraOption struct { } type HealthCheck struct { + ctx context.Context + ctxCancel context.CancelFunc url string extra map[string]*extraOption mu sync.Mutex @@ -36,7 +38,6 @@ type HealthCheck struct { lazy bool expectedStatus utils.IntRanges[uint16] lastTouch atomic.TypedValue[time.Time] - done chan struct{} singleDo *singledo.Single[struct{}] timeout time.Duration } @@ -59,7 +60,7 @@ func (hc *HealthCheck) process() { } else { log.Debugln("Skip once health check because we are lazy") } - case <-hc.done: + case <-hc.ctx.Done(): ticker.Stop() hc.stop() return @@ -146,7 +147,7 @@ func (hc *HealthCheck) check() { _, _, _ = hc.singleDo.Do(func() (struct{}, error) { id := utils.NewUUIDV4().String() log.Debugln("Start New Health Checking {%s}", id) - b, _ := batch.New[bool](context.Background(), batch.WithConcurrencyNum[bool](10)) + b, _ := batch.New[bool](hc.ctx, batch.WithConcurrencyNum[bool](10)) // execute default health check option := &extraOption{filters: nil, expectedStatus: hc.expectedStatus} @@ -195,7 +196,7 @@ func (hc *HealthCheck) execute(b *batch.Batch[bool], url, uid string, option *ex p := proxy b.Go(p.Name(), func() (bool, error) { - ctx, cancel := context.WithTimeout(context.Background(), hc.timeout) + ctx, cancel := context.WithTimeout(hc.ctx, hc.timeout) defer cancel() log.Debugln("Health Checking, proxy: %s, url: %s, id: {%s}", p.Name(), url, uid) _, _ = p.URLTest(ctx, url, expectedStatus) @@ -206,7 +207,7 @@ func (hc *HealthCheck) execute(b *batch.Batch[bool], url, uid string, option *ex } func (hc *HealthCheck) close() { - hc.done <- struct{}{} + hc.ctxCancel() } func NewHealthCheck(proxies []C.Proxy, url string, timeout uint, interval uint, lazy bool, expectedStatus utils.IntRanges[uint16]) *HealthCheck { @@ -217,8 +218,11 @@ func NewHealthCheck(proxies []C.Proxy, url string, timeout uint, interval uint, if timeout == 0 { timeout = 5000 } + ctx, cancel := context.WithCancel(context.Background()) return &HealthCheck{ + ctx: ctx, + ctxCancel: cancel, proxies: proxies, url: url, timeout: time.Duration(timeout) * time.Millisecond, @@ -226,7 +230,6 @@ func NewHealthCheck(proxies []C.Proxy, url string, timeout uint, interval uint, interval: time.Duration(interval) * time.Second, lazy: lazy, expectedStatus: expectedStatus, - done: make(chan struct{}, 1), singleDo: singledo.NewSingle[struct{}](time.Second), } } diff --git a/adapter/provider/patch_android.go b/adapter/provider/patch_android.go index e9042bdac0..2a91d7f14a 100644 --- a/adapter/provider/patch_android.go +++ b/adapter/provider/patch_android.go @@ -14,23 +14,6 @@ type UpdatableProvider interface { UpdatedAt() time.Time } -func (pp *proxySetProvider) UpdatedAt() time.Time { - return pp.Fetcher.UpdatedAt -} - -func (pp *proxySetProvider) Close() error { - pp.healthCheck.close() - pp.Fetcher.Destroy() - - return nil -} - -func (cp *compatibleProvider) Close() error { - cp.healthCheck.close() - - return nil -} - func Suspend(s bool) { suspended = s } diff --git a/adapter/provider/provider.go b/adapter/provider/provider.go index 694eae436f..a99c1d9680 100644 --- a/adapter/provider/provider.go +++ b/adapter/provider/provider.go @@ -54,7 +54,7 @@ func (pp *proxySetProvider) MarshalJSON() ([]byte, error) { "proxies": pp.Proxies(), "testUrl": pp.healthCheck.url, "expectedStatus": pp.healthCheck.expectedStatus.String(), - "updatedAt": pp.UpdatedAt, + "updatedAt": pp.UpdatedAt(), "subscriptionInfo": pp.subscriptionInfo, }) } @@ -164,9 +164,9 @@ func (pp *proxySetProvider) closeAllConnections() { }) } -func stopProxyProvider(pd *ProxySetProvider) { - pd.healthCheck.close() - _ = pd.Fetcher.Destroy() +func (pp *proxySetProvider) Close() error { + pp.healthCheck.close() + return pp.Fetcher.Close() } func NewProxySetProvider(name string, interval time.Duration, filter string, excludeFilter string, excludeType string, dialerProxy string, override OverrideSchema, vehicle types.Vehicle, hc *HealthCheck) (*ProxySetProvider, error) { @@ -200,10 +200,15 @@ func NewProxySetProvider(name string, interval time.Duration, filter string, exc fetcher := resource.NewFetcher[[]C.Proxy](name, interval, vehicle, proxiesParseAndFilter(filter, excludeFilter, excludeTypeArray, filterRegs, excludeFilterReg, dialerProxy, override), proxiesOnUpdate(pd)) pd.Fetcher = fetcher wrapper := &ProxySetProvider{pd} - runtime.SetFinalizer(wrapper, stopProxyProvider) + runtime.SetFinalizer(wrapper, (*ProxySetProvider).Close) return wrapper, nil } +func (pp *ProxySetProvider) Close() error { + runtime.SetFinalizer(pp, nil) + return pp.proxySetProvider.Close() +} + // CompatibleProvider for auto gc type CompatibleProvider struct { *compatibleProvider @@ -274,8 +279,9 @@ func (cp *compatibleProvider) RegisterHealthCheckTask(url string, expectedStatus cp.healthCheck.registerHealthCheckTask(url, expectedStatus, filter, interval) } -func stopCompatibleProvider(pd *CompatibleProvider) { - pd.healthCheck.close() +func (cp *compatibleProvider) Close() error { + cp.healthCheck.close() + return nil } func NewCompatibleProvider(name string, proxies []C.Proxy, hc *HealthCheck) (*CompatibleProvider, error) { @@ -294,10 +300,15 @@ func NewCompatibleProvider(name string, proxies []C.Proxy, hc *HealthCheck) (*Co } wrapper := &CompatibleProvider{pd} - runtime.SetFinalizer(wrapper, stopCompatibleProvider) + runtime.SetFinalizer(wrapper, (*CompatibleProvider).Close) return wrapper, nil } +func (cp *CompatibleProvider) Close() error { + runtime.SetFinalizer(cp, nil) + return cp.compatibleProvider.Close() +} + func proxiesOnUpdate(pd *proxySetProvider) func([]C.Proxy) { return func(elm []C.Proxy) { pd.setProxies(elm) diff --git a/component/resource/fetcher.go b/component/resource/fetcher.go index c82a54a3d1..9a4f5a7591 100644 --- a/component/resource/fetcher.go +++ b/component/resource/fetcher.go @@ -2,6 +2,7 @@ package resource import ( "bytes" + "context" "crypto/md5" "os" "path/filepath" @@ -22,11 +23,12 @@ var ( type Parser[V any] func([]byte) (V, error) type Fetcher[V any] struct { + ctx context.Context + ctxCancel context.CancelFunc resourceType string name string vehicle types.Vehicle - UpdatedAt time.Time - done chan struct{} + updatedAt time.Time hash [16]byte parser Parser[V] interval time.Duration @@ -46,6 +48,10 @@ func (f *Fetcher[V]) VehicleType() types.VehicleType { return f.vehicle.Type() } +func (f *Fetcher[V]) UpdatedAt() time.Time { + return f.updatedAt +} + func (f *Fetcher[V]) Initial() (V, error) { var ( buf []byte @@ -57,15 +63,15 @@ func (f *Fetcher[V]) Initial() (V, error) { if stat, fErr := os.Stat(f.vehicle.Path()); fErr == nil { buf, err = os.ReadFile(f.vehicle.Path()) modTime := stat.ModTime() - f.UpdatedAt = modTime + f.updatedAt = modTime isLocal = true if f.interval != 0 && modTime.Add(f.interval).Before(time.Now()) { log.Warnln("[Provider] %s not updated for a long time, force refresh", f.Name()) forceUpdate = true } } else { - buf, err = f.vehicle.Read() - f.UpdatedAt = time.Now() + buf, err = f.vehicle.Read(f.ctx) + f.updatedAt = time.Now() } if err != nil { @@ -75,7 +81,7 @@ func (f *Fetcher[V]) Initial() (V, error) { var contents V if forceUpdate { var forceBuf []byte - if forceBuf, err = f.vehicle.Read(); err == nil { + if forceBuf, err = f.vehicle.Read(f.ctx); err == nil { if contents, err = f.parser(forceBuf); err == nil { isLocal = false buf = forceBuf @@ -93,7 +99,7 @@ func (f *Fetcher[V]) Initial() (V, error) { } // parse local file error, fallback to remote - buf, err = f.vehicle.Read() + buf, err = f.vehicle.Read(f.ctx) if err != nil { return lo.Empty[V](), err } @@ -136,15 +142,18 @@ func (f *Fetcher[V]) Initial() (V, error) { } func (f *Fetcher[V]) Update() (V, bool, error) { - buf, err := f.vehicle.Read() + buf, err := f.vehicle.Read(f.ctx) if err != nil { return lo.Empty[V](), false, err } + return f.SideUpdate(buf) +} +func (f *Fetcher[V]) SideUpdate(buf []byte) (V, bool, error) { now := time.Now() hash := md5.Sum(buf) if bytes.Equal(f.hash[:], hash[:]) { - f.UpdatedAt = now + f.updatedAt = now _ = os.Chtimes(f.vehicle.Path(), now, now) return lo.Empty[V](), true, nil } @@ -160,16 +169,14 @@ func (f *Fetcher[V]) Update() (V, bool, error) { } } - f.UpdatedAt = now + f.updatedAt = now f.hash = hash return contents, false, nil } -func (f *Fetcher[V]) Destroy() error { - if f.interval > 0 { - f.done <- struct{}{} - } +func (f *Fetcher[V]) Close() error { + f.ctxCancel() if f.watcher != nil { _ = f.watcher.Close() } @@ -177,7 +184,7 @@ func (f *Fetcher[V]) Destroy() error { } func (f *Fetcher[V]) pullLoop() { - initialInterval := f.interval - time.Since(f.UpdatedAt) + initialInterval := f.interval - time.Since(f.updatedAt) if initialInterval > f.interval { initialInterval = f.interval } @@ -189,7 +196,7 @@ func (f *Fetcher[V]) pullLoop() { case <-timer.C: timer.Reset(f.interval) f.update(f.vehicle.Path()) - case <-f.done: + case <-f.ctx.Done(): return } } @@ -226,13 +233,14 @@ func safeWrite(path string, buf []byte) error { } func NewFetcher[V any](name string, interval time.Duration, vehicle types.Vehicle, parser Parser[V], onUpdate func(V)) *Fetcher[V] { - + ctx, cancel := context.WithCancel(context.Background()) return &Fetcher[V]{ - name: name, - vehicle: vehicle, - parser: parser, - done: make(chan struct{}, 8), - OnUpdate: onUpdate, - interval: interval, + ctx: ctx, + ctxCancel: cancel, + name: name, + vehicle: vehicle, + parser: parser, + OnUpdate: onUpdate, + interval: interval, } } diff --git a/component/resource/vehicle.go b/component/resource/vehicle.go index b13369d22e..4618ef52f2 100644 --- a/component/resource/vehicle.go +++ b/component/resource/vehicle.go @@ -24,7 +24,7 @@ func (f *FileVehicle) Path() string { return f.path } -func (f *FileVehicle) Read() ([]byte, error) { +func (f *FileVehicle) Read(ctx context.Context) ([]byte, error) { return os.ReadFile(f.path) } @@ -59,8 +59,8 @@ func (h *HTTPVehicle) Proxy() string { return h.proxy } -func (h *HTTPVehicle) Read() ([]byte, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*20) +func (h *HTTPVehicle) Read(ctx context.Context) ([]byte, error) { + ctx, cancel := context.WithTimeout(ctx, time.Second*20) defer cancel() resp, err := mihomoHttp.HttpRequestWithProxy(ctx, h.url, http.MethodGet, h.header, nil, h.proxy) if err != nil { diff --git a/config/config.go b/config/config.go index f1ffbce1fd..48a99e41e3 100644 --- a/config/config.go +++ b/config/config.go @@ -431,9 +431,8 @@ func Parse(buf []byte) (*Config, error) { return ParseRawConfig(rawCfg) } -func UnmarshalRawConfig(buf []byte) (*RawConfig, error) { - // config with default value - rawCfg := &RawConfig{ +func DefaultRawConfig() *RawConfig { + return &RawConfig{ AllowLan: false, BindAddress: "*", LanAllowedIPs: []netip.Prefix{netip.MustParsePrefix("0.0.0.0/0"), netip.MustParsePrefix("::/0")}, @@ -544,6 +543,11 @@ func UnmarshalRawConfig(buf []byte) (*RawConfig, error) { }, ExternalUIURL: "https://github.com/MetaCubeX/metacubexd/archive/refs/heads/gh-pages.zip", } +} + +func UnmarshalRawConfig(buf []byte) (*RawConfig, error) { + // config with default value + rawCfg := DefaultRawConfig() if err := yaml.Unmarshal(buf, rawCfg); err != nil { return nil, err diff --git a/constant/provider/interface.go b/constant/provider/interface.go index bd6b6e9470..911f774a62 100644 --- a/constant/provider/interface.go +++ b/constant/provider/interface.go @@ -1,6 +1,7 @@ package provider import ( + "context" "fmt" "github.com/metacubex/mihomo/common/utils" @@ -31,7 +32,7 @@ func (v VehicleType) String() string { } type Vehicle interface { - Read() ([]byte, error) + Read(ctx context.Context) ([]byte, error) Path() string Proxy() string Type() VehicleType @@ -83,6 +84,7 @@ type ProxyProvider interface { type RuleProvider interface { Provider Behavior() RuleBehavior + Count() int Match(*constant.Metadata) bool ShouldResolveIP() bool ShouldFindProcess() bool diff --git a/rules/provider/patch_android.go b/rules/provider/patch_android.go index 7ef1df1b59..d4b752c3ad 100644 --- a/rules/provider/patch_android.go +++ b/rules/provider/patch_android.go @@ -12,16 +12,6 @@ type UpdatableProvider interface { UpdatedAt() time.Time } -func (rp *ruleSetProvider) UpdatedAt() time.Time { - return rp.Fetcher.UpdatedAt -} - -func (rp *ruleSetProvider) Close() error { - rp.Fetcher.Destroy() - - return nil -} - func Suspend(s bool) { suspended = s } diff --git a/rules/provider/provider.go b/rules/provider/provider.go index b9524c35e6..ad720d477d 100644 --- a/rules/provider/provider.go +++ b/rules/provider/provider.go @@ -89,6 +89,10 @@ func (rp *ruleSetProvider) Behavior() P.RuleBehavior { return rp.behavior } +func (rp *ruleSetProvider) Count() int { + return rp.strategy.Count() +} + func (rp *ruleSetProvider) Match(metadata *C.Metadata) bool { return rp.strategy != nil && rp.strategy.Match(metadata) } @@ -113,11 +117,16 @@ func (rp *ruleSetProvider) MarshalJSON() ([]byte, error) { "name": rp.Name(), "ruleCount": rp.strategy.Count(), "type": rp.Type().String(), - "updatedAt": rp.UpdatedAt, + "updatedAt": rp.UpdatedAt(), "vehicleType": rp.VehicleType().String(), }) } +func (rp *RuleSetProvider) Close() error { + runtime.SetFinalizer(rp, nil) + return rp.ruleSetProvider.Close() +} + func NewRuleSetProvider(name string, behavior P.RuleBehavior, format P.RuleFormat, interval time.Duration, vehicle P.Vehicle, parse func(tp, payload, target string, params []string, subRules map[string][]C.Rule) (parsed C.Rule, parseErr error)) P.RuleProvider { rp := &ruleSetProvider{ @@ -139,8 +148,7 @@ func NewRuleSetProvider(name string, behavior P.RuleBehavior, format P.RuleForma rp, } - final := func(provider *RuleSetProvider) { _ = rp.Fetcher.Destroy() } - runtime.SetFinalizer(wrapper, final) + runtime.SetFinalizer(wrapper, (*RuleSetProvider).Close) return wrapper }