From 72a126e5809acae4e9058c51f4e883dc7c8c6e19 Mon Sep 17 00:00:00 2001 From: wwqgtxx Date: Wed, 25 Dec 2024 10:23:55 +0800 Subject: [PATCH] feat: support inline proxy provider --- adapter/provider/parser.go | 35 +-- adapter/provider/provider.go | 337 +++++++++++++++----------- adapter/provider/subscription_info.go | 13 +- component/resource/vehicle.go | 10 +- constant/provider/interface.go | 1 - docs/config.yaml | 11 + 6 files changed, 231 insertions(+), 176 deletions(-) diff --git a/adapter/provider/parser.go b/adapter/provider/parser.go index b305df7f89..cd5c3ff7b6 100644 --- a/adapter/provider/parser.go +++ b/adapter/provider/parser.go @@ -57,16 +57,17 @@ type OverrideSchema struct { } type proxyProviderSchema struct { - Type string `provider:"type"` - Path string `provider:"path,omitempty"` - URL string `provider:"url,omitempty"` - Proxy string `provider:"proxy,omitempty"` - Interval int `provider:"interval,omitempty"` - Filter string `provider:"filter,omitempty"` - ExcludeFilter string `provider:"exclude-filter,omitempty"` - ExcludeType string `provider:"exclude-type,omitempty"` - DialerProxy string `provider:"dialer-proxy,omitempty"` - SizeLimit int64 `provider:"size-limit,omitempty"` + Type string `provider:"type"` + Path string `provider:"path,omitempty"` + URL string `provider:"url,omitempty"` + Proxy string `provider:"proxy,omitempty"` + Interval int `provider:"interval,omitempty"` + Filter string `provider:"filter,omitempty"` + ExcludeFilter string `provider:"exclude-filter,omitempty"` + ExcludeType string `provider:"exclude-type,omitempty"` + DialerProxy string `provider:"dialer-proxy,omitempty"` + SizeLimit int64 `provider:"size-limit,omitempty"` + Payload []map[string]any `provider:"payload,omitempty"` HealthCheck healthCheckSchema `provider:"health-check,omitempty"` Override OverrideSchema `provider:"override,omitempty"` @@ -99,6 +100,11 @@ func ParseProxyProvider(name string, mapping map[string]any) (types.ProxyProvide } hc := NewHealthCheck([]C.Proxy{}, schema.HealthCheck.URL, uint(schema.HealthCheck.TestTimeout), hcInterval, schema.HealthCheck.Lazy, expectedStatus) + parser, err := NewProxiesParser(schema.Filter, schema.ExcludeFilter, schema.ExcludeType, schema.DialerProxy, schema.Override) + if err != nil { + return nil, err + } + var vehicle types.Vehicle switch schema.Type { case "file": @@ -113,16 +119,13 @@ func ParseProxyProvider(name string, mapping map[string]any) (types.ProxyProvide } } vehicle = resource.NewHTTPVehicle(schema.URL, path, schema.Proxy, schema.Header, resource.DefaultHttpTimeout, schema.SizeLimit) + case "inline": + return NewInlineProvider(name, schema.Payload, parser, hc) default: return nil, fmt.Errorf("%w: %s", errVehicleType, schema.Type) } interval := time.Duration(uint(schema.Interval)) * time.Second - filter := schema.Filter - excludeFilter := schema.ExcludeFilter - excludeType := schema.ExcludeType - dialerProxy := schema.DialerProxy - override := schema.Override - return NewProxySetProvider(name, interval, filter, excludeFilter, excludeType, dialerProxy, override, vehicle, hc) + return NewProxySetProvider(name, interval, parser, vehicle, hc) } diff --git a/adapter/provider/provider.go b/adapter/provider/provider.go index 2fe9633c7a..8558c4347d 100644 --- a/adapter/provider/provider.go +++ b/adapter/provider/provider.go @@ -4,6 +4,7 @@ import ( "encoding/json" "errors" "fmt" + "net/http" "reflect" "runtime" "strings" @@ -30,101 +31,116 @@ type ProxySchema struct { Proxies []map[string]any `yaml:"proxies"` } -// ProxySetProvider for auto gc -type ProxySetProvider struct { - *proxySetProvider +type providerForApi struct { + Name string `json:"name"` + Type string `json:"type"` + VehicleType string `json:"vehicleType"` + Proxies []C.Proxy `json:"proxies"` + TestUrl string `json:"testUrl"` + ExpectedStatus string `json:"expectedStatus"` + UpdatedAt time.Time `json:"updatedAt,omitempty"` + SubscriptionInfo *SubscriptionInfo `json:"subscriptionInfo,omitempty"` } -type proxySetProvider struct { - *resource.Fetcher[[]C.Proxy] - proxies []C.Proxy - healthCheck *HealthCheck - version uint32 - subscriptionInfo *SubscriptionInfo +type baseProvider struct { + name string + proxies []C.Proxy + healthCheck *HealthCheck + version uint32 } -func (pp *proxySetProvider) MarshalJSON() ([]byte, error) { - return json.Marshal(map[string]any{ - "name": pp.Name(), - "type": pp.Type().String(), - "vehicleType": pp.VehicleType().String(), - "proxies": pp.Proxies(), - "testUrl": pp.healthCheck.url, - "expectedStatus": pp.healthCheck.expectedStatus.String(), - "updatedAt": pp.UpdatedAt(), - "subscriptionInfo": pp.subscriptionInfo, - }) +func (bp *baseProvider) Name() string { + return bp.name } -func (pp *proxySetProvider) Version() uint32 { - return pp.version +func (bp *baseProvider) Version() uint32 { + return bp.version } -func (pp *proxySetProvider) Name() string { - return pp.Fetcher.Name() +func (bp *baseProvider) HealthCheck() { + bp.healthCheck.check() } -func (pp *proxySetProvider) HealthCheck() { - pp.healthCheck.check() +func (bp *baseProvider) Type() types.ProviderType { + return types.Proxy } -func (pp *proxySetProvider) Update() error { - _, _, err := pp.Fetcher.Update() - return err +func (bp *baseProvider) Proxies() []C.Proxy { + return bp.proxies } -func (pp *proxySetProvider) Initial() error { - _, err := pp.Fetcher.Initial() - if err != nil { - return err - } - if subscriptionInfo := cachefile.Cache().GetSubscriptionInfo(pp.Name()); subscriptionInfo != "" { - pp.SetSubscriptionInfo(subscriptionInfo) - } - pp.closeAllConnections() - return nil +func (bp *baseProvider) Count() int { + return len(bp.proxies) } -func (pp *proxySetProvider) Type() types.ProviderType { - return types.Proxy +func (bp *baseProvider) Touch() { + bp.healthCheck.touch() } -func (pp *proxySetProvider) Proxies() []C.Proxy { - return pp.proxies +func (bp *baseProvider) HealthCheckURL() string { + return bp.healthCheck.url } -func (pp *proxySetProvider) Count() int { - return len(pp.proxies) +func (bp *baseProvider) RegisterHealthCheckTask(url string, expectedStatus utils.IntRanges[uint16], filter string, interval uint) { + bp.healthCheck.registerHealthCheckTask(url, expectedStatus, filter, interval) } -func (pp *proxySetProvider) Touch() { - pp.healthCheck.touch() +func (bp *baseProvider) setProxies(proxies []C.Proxy) { + bp.proxies = proxies + bp.healthCheck.setProxy(proxies) + if bp.healthCheck.auto() { + go bp.healthCheck.check() + } } -func (pp *proxySetProvider) HealthCheckURL() string { - return pp.healthCheck.url +func (bp *baseProvider) Close() error { + bp.healthCheck.close() + return nil } -func (pp *proxySetProvider) RegisterHealthCheckTask(url string, expectedStatus utils.IntRanges[uint16], filter string, interval uint) { - pp.healthCheck.registerHealthCheckTask(url, expectedStatus, filter, interval) +// ProxySetProvider for auto gc +type ProxySetProvider struct { + *proxySetProvider } -func (pp *proxySetProvider) setProxies(proxies []C.Proxy) { - pp.proxies = proxies - pp.healthCheck.setProxy(proxies) - if pp.healthCheck.auto() { - go pp.healthCheck.check() - } +type proxySetProvider struct { + baseProvider + *resource.Fetcher[[]C.Proxy] + subscriptionInfo *SubscriptionInfo +} + +func (pp *proxySetProvider) MarshalJSON() ([]byte, error) { + return json.Marshal(providerForApi{ + Name: pp.Name(), + Type: pp.Type().String(), + VehicleType: pp.VehicleType().String(), + Proxies: pp.Proxies(), + TestUrl: pp.healthCheck.url, + ExpectedStatus: pp.healthCheck.expectedStatus.String(), + UpdatedAt: pp.UpdatedAt(), + SubscriptionInfo: pp.subscriptionInfo, + }) } -func (pp *proxySetProvider) SetSubscriptionInfo(userInfo string) { - pp.subscriptionInfo = NewSubscriptionInfo(userInfo) +func (pp *proxySetProvider) Name() string { + return pp.Fetcher.Name() } -func (pp *proxySetProvider) SetProvider(provider types.ProxyProvider) { - if httpVehicle, ok := pp.Vehicle().(*resource.HTTPVehicle); ok { - httpVehicle.SetProvider(provider) +func (pp *proxySetProvider) Update() error { + _, _, err := pp.Fetcher.Update() + return err +} + +func (pp *proxySetProvider) Initial() error { + _, err := pp.Fetcher.Initial() + if err != nil { + return err + } + if subscriptionInfo := cachefile.Cache().GetSubscriptionInfo(pp.Name()); subscriptionInfo != "" { + pp.subscriptionInfo.Update(subscriptionInfo) } + pp.closeAllConnections() + return nil } func (pp *proxySetProvider) closeAllConnections() { @@ -140,44 +156,37 @@ func (pp *proxySetProvider) closeAllConnections() { } func (pp *proxySetProvider) Close() error { - pp.healthCheck.close() + _ = pp.baseProvider.Close() return pp.Fetcher.Close() } -func NewProxySetProvider(name string, interval time.Duration, filter string, excludeFilter string, excludeType string, dialerProxy string, override OverrideSchema, vehicle types.Vehicle, hc *HealthCheck) (*ProxySetProvider, error) { - excludeFilterReg, err := regexp2.Compile(excludeFilter, regexp2.None) - if err != nil { - return nil, fmt.Errorf("invalid excludeFilter regex: %w", err) - } - var excludeTypeArray []string - if excludeType != "" { - excludeTypeArray = strings.Split(excludeType, "|") - } - - var filterRegs []*regexp2.Regexp - for _, filter := range strings.Split(filter, "`") { - filterReg, err := regexp2.Compile(filter, regexp2.None) - if err != nil { - return nil, fmt.Errorf("invalid filter regex: %w", err) - } - filterRegs = append(filterRegs, filterReg) - } - +func NewProxySetProvider(name string, interval time.Duration, parser resource.Parser[[]C.Proxy], vehicle types.Vehicle, hc *HealthCheck) (*ProxySetProvider, error) { if hc.auto() { go hc.process() } + si := new(SubscriptionInfo) pd := &proxySetProvider{ - proxies: []C.Proxy{}, - healthCheck: hc, + baseProvider: baseProvider{ + name: name, + proxies: []C.Proxy{}, + healthCheck: hc, + }, + subscriptionInfo: si, } - fetcher := resource.NewFetcher[[]C.Proxy](name, interval, vehicle, proxiesParseAndFilter(filter, excludeFilter, excludeTypeArray, filterRegs, excludeFilterReg, dialerProxy, override), proxiesOnUpdate(pd)) + fetcher := resource.NewFetcher[[]C.Proxy](name, interval, vehicle, parser, proxiesOnUpdate(pd)) pd.Fetcher = fetcher - wrapper := &ProxySetProvider{pd} if httpVehicle, ok := vehicle.(*resource.HTTPVehicle); ok { - httpVehicle.SetProvider(wrapper) + httpVehicle.SetInRead(func(resp *http.Response) { + if subscriptionInfo := resp.Header.Get("subscription-userinfo"); subscriptionInfo != "" { + cachefile.Cache().SetSubscriptionInfo(name, subscriptionInfo) + si.Update(subscriptionInfo) + } + }) } + + wrapper := &ProxySetProvider{pd} runtime.SetFinalizer(wrapper, (*ProxySetProvider).Close) return wrapper, nil } @@ -187,92 +196,108 @@ func (pp *ProxySetProvider) Close() error { return pp.proxySetProvider.Close() } -func (pp *ProxySetProvider) SetProvider(provider types.ProxyProvider) { - pp.proxySetProvider.SetProvider(provider) +// InlineProvider for auto gc +type InlineProvider struct { + *inlineProvider } -// CompatibleProvider for auto gc -type CompatibleProvider struct { - *compatibleProvider +type inlineProvider struct { + baseProvider + updateAt time.Time } -type compatibleProvider struct { - name string - healthCheck *HealthCheck - subscriptionInfo *SubscriptionInfo - proxies []C.Proxy - version uint32 -} - -func (cp *compatibleProvider) MarshalJSON() ([]byte, error) { - return json.Marshal(map[string]any{ - "name": cp.Name(), - "type": cp.Type().String(), - "vehicleType": cp.VehicleType().String(), - "proxies": cp.Proxies(), - "testUrl": cp.healthCheck.url, - "expectedStatus": cp.healthCheck.expectedStatus.String(), +func (ip *inlineProvider) MarshalJSON() ([]byte, error) { + return json.Marshal(providerForApi{ + Name: ip.Name(), + Type: ip.Type().String(), + VehicleType: ip.VehicleType().String(), + Proxies: ip.Proxies(), + TestUrl: ip.healthCheck.url, + ExpectedStatus: ip.healthCheck.expectedStatus.String(), + UpdatedAt: ip.updateAt, }) } -func (cp *compatibleProvider) Version() uint32 { - return cp.version -} - -func (cp *compatibleProvider) Name() string { - return cp.name +func (ip *inlineProvider) VehicleType() types.VehicleType { + return types.Inline } -func (cp *compatibleProvider) HealthCheck() { - cp.healthCheck.check() +func (ip *inlineProvider) Initial() error { + return nil } -func (cp *compatibleProvider) Update() error { +func (ip *inlineProvider) Update() error { + // make api update happy + ip.updateAt = time.Now() return nil } -func (cp *compatibleProvider) Initial() error { - if cp.healthCheck.interval != 0 && cp.healthCheck.url != "" { - cp.HealthCheck() +func NewInlineProvider(name string, payload []map[string]any, parser resource.Parser[[]C.Proxy], hc *HealthCheck) (*InlineProvider, error) { + if hc.auto() { + go hc.process() } - return nil -} -func (cp *compatibleProvider) VehicleType() types.VehicleType { - return types.Compatible -} + ps := ProxySchema{Proxies: payload} + buf, err := yaml.Marshal(ps) + if err != nil { + return nil, err + } + proxies, err := parser(buf) + if err != nil { + return nil, err + } -func (cp *compatibleProvider) Type() types.ProviderType { - return types.Proxy + ip := &inlineProvider{ + baseProvider: baseProvider{ + name: name, + proxies: proxies, + healthCheck: hc, + }, + updateAt: time.Now(), + } + wrapper := &InlineProvider{ip} + runtime.SetFinalizer(wrapper, (*InlineProvider).Close) + return wrapper, nil } -func (cp *compatibleProvider) Proxies() []C.Proxy { - return cp.proxies +func (ip *InlineProvider) Close() error { + runtime.SetFinalizer(ip, nil) + return ip.baseProvider.Close() } -func (cp *compatibleProvider) Count() int { - return len(cp.proxies) +// CompatibleProvider for auto gc +type CompatibleProvider struct { + *compatibleProvider } -func (cp *compatibleProvider) Touch() { - cp.healthCheck.touch() +type compatibleProvider struct { + baseProvider } -func (cp *compatibleProvider) HealthCheckURL() string { - return cp.healthCheck.url +func (cp *compatibleProvider) MarshalJSON() ([]byte, error) { + return json.Marshal(providerForApi{ + Name: cp.Name(), + Type: cp.Type().String(), + VehicleType: cp.VehicleType().String(), + Proxies: cp.Proxies(), + TestUrl: cp.healthCheck.url, + ExpectedStatus: cp.healthCheck.expectedStatus.String(), + }) } -func (cp *compatibleProvider) RegisterHealthCheckTask(url string, expectedStatus utils.IntRanges[uint16], filter string, interval uint) { - cp.healthCheck.registerHealthCheckTask(url, expectedStatus, filter, interval) +func (cp *compatibleProvider) Update() error { + return nil } -func (cp *compatibleProvider) Close() error { - cp.healthCheck.close() +func (cp *compatibleProvider) Initial() error { + if cp.healthCheck.interval != 0 && cp.healthCheck.url != "" { + cp.HealthCheck() + } return nil } -func (cp *compatibleProvider) SetSubscriptionInfo(userInfo string) { - cp.subscriptionInfo = NewSubscriptionInfo(userInfo) +func (cp *compatibleProvider) VehicleType() types.VehicleType { + return types.Compatible } func NewCompatibleProvider(name string, proxies []C.Proxy, hc *HealthCheck) (*CompatibleProvider, error) { @@ -285,9 +310,11 @@ func NewCompatibleProvider(name string, proxies []C.Proxy, hc *HealthCheck) (*Co } pd := &compatibleProvider{ - name: name, - proxies: proxies, - healthCheck: hc, + baseProvider: baseProvider{ + name: name, + proxies: proxies, + healthCheck: hc, + }, } wrapper := &CompatibleProvider{pd} @@ -307,7 +334,25 @@ func proxiesOnUpdate(pd *proxySetProvider) func([]C.Proxy) { } } -func proxiesParseAndFilter(filter string, excludeFilter string, excludeTypeArray []string, filterRegs []*regexp2.Regexp, excludeFilterReg *regexp2.Regexp, dialerProxy string, override OverrideSchema) resource.Parser[[]C.Proxy] { +func NewProxiesParser(filter string, excludeFilter string, excludeType string, dialerProxy string, override OverrideSchema) (resource.Parser[[]C.Proxy], error) { + excludeFilterReg, err := regexp2.Compile(excludeFilter, regexp2.None) + if err != nil { + return nil, fmt.Errorf("invalid excludeFilter regex: %w", err) + } + var excludeTypeArray []string + if excludeType != "" { + excludeTypeArray = strings.Split(excludeType, "|") + } + + var filterRegs []*regexp2.Regexp + for _, filter := range strings.Split(filter, "`") { + filterReg, err := regexp2.Compile(filter, regexp2.None) + if err != nil { + return nil, fmt.Errorf("invalid filter regex: %w", err) + } + filterRegs = append(filterRegs, filterReg) + } + return func(buf []byte) ([]C.Proxy, error) { schema := &ProxySchema{} @@ -422,5 +467,5 @@ func proxiesParseAndFilter(filter string, excludeFilter string, excludeTypeArray } return proxies, nil - } + }, nil } diff --git a/adapter/provider/subscription_info.go b/adapter/provider/subscription_info.go index 412b4342fc..194e398464 100644 --- a/adapter/provider/subscription_info.go +++ b/adapter/provider/subscription_info.go @@ -15,9 +15,8 @@ type SubscriptionInfo struct { Expire int64 } -func NewSubscriptionInfo(userinfo string) (si *SubscriptionInfo) { +func (info *SubscriptionInfo) Update(userinfo string) { userinfo = strings.ReplaceAll(strings.ToLower(userinfo), " ", "") - si = new(SubscriptionInfo) for _, field := range strings.Split(userinfo, ";") { name, value, ok := strings.Cut(field, "=") @@ -33,17 +32,15 @@ func NewSubscriptionInfo(userinfo string) (si *SubscriptionInfo) { switch name { case "upload": - si.Upload = intValue + info.Upload = intValue case "download": - si.Download = intValue + info.Download = intValue case "total": - si.Total = intValue + info.Total = intValue case "expire": - si.Expire = intValue + info.Expire = intValue } } - - return si } func parseValue(value string) (int64, error) { diff --git a/component/resource/vehicle.go b/component/resource/vehicle.go index 18cebf002c..00b3170bfd 100644 --- a/component/resource/vehicle.go +++ b/component/resource/vehicle.go @@ -90,6 +90,7 @@ type HTTPVehicle struct { header http.Header timeout time.Duration sizeLimit int64 + inRead func(response *http.Response) provider types.ProxyProvider } @@ -113,8 +114,8 @@ func (h *HTTPVehicle) Write(buf []byte) error { return safeWrite(h.path, buf) } -func (h *HTTPVehicle) SetProvider(provider types.ProxyProvider) { - h.provider = provider +func (h *HTTPVehicle) SetInRead(fn func(response *http.Response)) { + h.inRead = fn } func (h *HTTPVehicle) Read(ctx context.Context, oldHash utils.HashType) (buf []byte, hash utils.HashType, err error) { @@ -140,9 +141,8 @@ func (h *HTTPVehicle) Read(ctx context.Context, oldHash utils.HashType) (buf []b } defer resp.Body.Close() - if subscriptionInfo := resp.Header.Get("subscription-userinfo"); h.provider != nil && subscriptionInfo != "" { - cachefile.Cache().SetSubscriptionInfo(h.provider.Name(), subscriptionInfo) - h.provider.SetSubscriptionInfo(subscriptionInfo) + if h.inRead != nil { + h.inRead(resp) } if resp.StatusCode < 200 || resp.StatusCode > 299 { diff --git a/constant/provider/interface.go b/constant/provider/interface.go index 4309adacbf..8c6ee6f8bd 100644 --- a/constant/provider/interface.go +++ b/constant/provider/interface.go @@ -84,7 +84,6 @@ type ProxyProvider interface { Version() uint32 RegisterHealthCheckTask(url string, expectedStatus utils.IntRanges[uint16], filter string, interval uint) HealthCheckURL() string - SetSubscriptionInfo(userInfo string) } // RuleProvider interface diff --git a/docs/config.yaml b/docs/config.yaml index ca48f0e231..6dfeb5866e 100644 --- a/docs/config.yaml +++ b/docs/config.yaml @@ -975,6 +975,17 @@ proxy-providers: # - pattern: "IPLC-(.*?)倍" # target: "iplc x $1" + provider2: + type: inline + dialer-proxy: proxy + payload: + - name: "ss1" + type: ss + server: server + port: 443 + cipher: chacha20-ietf-poly1305 + password: "password" + test: type: file path: /test.yaml