diff --git a/go.mod b/go.mod index 832f36a50..9a6c87f30 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,6 @@ require ( github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e github.com/google/gopacket v1.1.19 github.com/grafov/m3u8 v0.0.0-20171211212457-6ab8f28ed427 - github.com/juju/ratelimit v1.0.2 github.com/marusama/semaphore v0.0.0-20171214154724-565ffd8e868a github.com/miekg/dns v1.1.44-0.20210804161652-ab67aa642300 github.com/mitchellh/panicwrap v0.0.0-20170106182340-fce601fe5557 @@ -47,6 +46,7 @@ require ( golang.org/x/sync v0.2.0 golang.org/x/sys v0.19.0 golang.org/x/term v0.19.0 + golang.org/x/time v0.5.0 ) require ( diff --git a/go.sum b/go.sum index c1d9d741c..a29224c8b 100644 --- a/go.sum +++ b/go.sum @@ -115,8 +115,6 @@ github.com/jsimonetti/rtnetlink v0.0.0-20210212075122-66c871082f2b/go.mod h1:8w9 github.com/jsimonetti/rtnetlink v0.0.0-20210525051524-4cc836578190/go.mod h1:NmKSdU4VGSiv1bMsdqNALI4RSvvjtz65tTMCnD05qLo= github.com/jsimonetti/rtnetlink v0.0.0-20210721205614-4cc3c1489576 h1:dH/k0qzR1oouF25AoMwH6FXOr16zV4WZFcYnZGpqro0= github.com/jsimonetti/rtnetlink v0.0.0-20210721205614-4cc3c1489576/go.mod h1:qdKhcKUxYn3/QvneOvPWXXMPqktEBHnCW98wUTA3rmA= -github.com/juju/ratelimit v1.0.2 h1:sRxmtRiajbvrcLQT7S+JbqU0ntsb9W2yhSdNN8tWfaI= -github.com/juju/ratelimit v1.0.2/go.mod h1:qapgC/Gy+xNh9UxzV13HGGl/6UXNN+ct+vwSgWNm/qk= github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 h1:iQTw/8FWTuc7uiaSepXwyf3o52HaUYcV+Tu66S3F5GA= github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8= github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= @@ -339,6 +337,8 @@ golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= diff --git a/psiphon/common/throttled.go b/psiphon/common/throttled.go index 744b4f9c5..37379374a 100644 --- a/psiphon/common/throttled.go +++ b/psiphon/common/throttled.go @@ -26,7 +26,7 @@ import ( "time" "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors" - "github.com/juju/ratelimit" + "golang.org/x/time/rate" ) // RateLimits specify the rate limits for a ThrottledConn. @@ -72,20 +72,28 @@ type ThrottledConn struct { writeBytesPerSecond int64 closeAfterExhausted int32 readLock sync.Mutex - readRateLimiter *ratelimit.Bucket + readRateLimiter *rate.Limiter readDelayTimer *time.Timer writeLock sync.Mutex - writeRateLimiter *ratelimit.Bucket + writeRateLimiter *rate.Limiter writeDelayTimer *time.Timer isClosed int32 stopBroadcast chan struct{} + isStream bool net.Conn } // NewThrottledConn initializes a new ThrottledConn. -func NewThrottledConn(conn net.Conn, limits RateLimits) *ThrottledConn { +// +// Set isStreamConn to true when conn is stream-oriented, such as TCP, and +// false when the conn is packet-oriented, such as UDP. When conn is a +// stream, reads and writes may be split to accomodate rate limits. +func NewThrottledConn( + conn net.Conn, isStream bool, limits RateLimits) *ThrottledConn { + throttledConn := &ThrottledConn{ Conn: conn, + isStream: isStream, stopBroadcast: make(chan struct{}), } throttledConn.SetLimits(limits) @@ -137,10 +145,8 @@ func (conn *ThrottledConn) Read(buffer []byte) (int, error) { conn.readLock.Lock() defer conn.readLock.Unlock() - select { - case <-conn.stopBroadcast: + if atomic.LoadInt32(&conn.isClosed) == 1 { return 0, errors.TraceNew("throttled conn closed") - default: } // Use the base conn until the unthrottled count is @@ -158,34 +164,68 @@ func (conn *ThrottledConn) Read(buffer []byte) (int, error) { return 0, errors.TraceNew("throttled conn exhausted") } - rate := atomic.SwapInt64(&conn.readBytesPerSecond, -1) + readRate := atomic.SwapInt64(&conn.readBytesPerSecond, -1) - if rate != -1 { + if readRate != -1 { // SetLimits has been called and a new rate limiter // must be initialized. When no limit is specified, // the reader/writer is simply the base conn. // No state is retained from the previous rate limiter, // so a pending I/O throttle sleep may be skipped when // the old and new rate are similar. - if rate == 0 { + if readRate == 0 { conn.readRateLimiter = nil } else { conn.readRateLimiter = - ratelimit.NewBucketWithRate(float64(rate), rate) + rate.NewLimiter(rate.Limit(readRate), int(readRate)) + } + } + + // The number of bytes read cannot exceed the rate limiter burst size, + // which is enforced by rate.Limiter.ReserveN. Reduce any read buffer + // size to be at most the burst size. + // + // Read should still return as soon as read bytes are available; and the + // number of bytes that will be received is unknown; so there is no loop + // here to read more bytes. Reducing the read buffer size minimizes + // latency for the up-to-burst-size bytes read, whereas allowing a full + // read followed by multiple ReserveN calls and sleeps would increase + // latency. + // + // In practise, with Psiphon tunnels, throttling is not applied until + // after the Psiphon API handshake, so read buffer reductions won't + // impact early obfuscation traffic shaping; and reads are on the order + // of one SSH "packet", up to 32K, unlikely to be split for all but the + // most restrictive of rate limits. + + if conn.readRateLimiter != nil { + burst := conn.readRateLimiter.Burst() + if len(buffer) > burst { + if !conn.isStream { + return 0, errors.TraceNew("non-stream read buffer exceeds burst") + } + buffer = buffer[:burst] } } n, err := conn.Conn.Read(buffer) - // Sleep to enforce the rate limit. This is the same logic as implemented in - // ratelimit.Reader, but using a timer and a close signal instead of an - // uninterruptible time.Sleep. - // - // The readDelayTimer is always expired/stopped and drained after this code - // block and is ready to be Reset on the next call. + if n > 0 && conn.readRateLimiter != nil { + + // While rate.Limiter.WaitN would be simpler to use, internally Wait + // creates a new timer for every call which must sleep, which is + // expected to be most calls. Instead, call ReserveN to get the sleep + // time and reuse one timer without allocation. + // + // TODO: avoid allocation: ReserveN allocates a *Reservation; while + // the internal reserveN returns a struct, not a pointer. - if n >= 0 && conn.readRateLimiter != nil { - sleepDuration := conn.readRateLimiter.Take(int64(n)) + reservation := conn.readRateLimiter.ReserveN(time.Now(), n) + if !reservation.OK() { + // This error is not expected, given the buffer size adjustment. + return 0, errors.TraceNew("burst size exceeded") + } + sleepDuration := reservation.Delay() if sleepDuration > 0 { if conn.readDelayTimer == nil { conn.readDelayTimer = time.NewTimer(sleepDuration) @@ -202,7 +242,8 @@ func (conn *ThrottledConn) Read(buffer []byte) (int, error) { } } - return n, errors.Trace(err) + // Don't wrap I/O errors + return n, err } func (conn *ThrottledConn) Write(buffer []byte) (int, error) { @@ -212,10 +253,8 @@ func (conn *ThrottledConn) Write(buffer []byte) (int, error) { conn.writeLock.Lock() defer conn.writeLock.Unlock() - select { - case <-conn.stopBroadcast: + if atomic.LoadInt32(&conn.isClosed) == 1 { return 0, errors.TraceNew("throttled conn closed") - default: } if atomic.LoadInt64(&conn.writeUnthrottledBytes) > 0 { @@ -229,19 +268,58 @@ func (conn *ThrottledConn) Write(buffer []byte) (int, error) { return 0, errors.TraceNew("throttled conn exhausted") } - rate := atomic.SwapInt64(&conn.writeBytesPerSecond, -1) + writeRate := atomic.SwapInt64(&conn.writeBytesPerSecond, -1) - if rate != -1 { - if rate == 0 { + if writeRate != -1 { + if writeRate == 0 { conn.writeRateLimiter = nil } else { conn.writeRateLimiter = - ratelimit.NewBucketWithRate(float64(rate), rate) + rate.NewLimiter(rate.Limit(writeRate), int(writeRate)) } } - if len(buffer) >= 0 && conn.writeRateLimiter != nil { - sleepDuration := conn.writeRateLimiter.Take(int64(len(buffer))) + if conn.writeRateLimiter == nil { + n, err := conn.Conn.Write(buffer) + // Don't wrap I/O errors + return n, err + } + + // The number of bytes written cannot exceed the rate limiter burst size, + // which is enforced by rate.Limiter.ReserveN. Split writes to be at most + // the burst size. + // + // Splitting writes may have some effect on the shape of TCP packets sent + // on the network. + // + // In practise, with Psiphon tunnels, throttling is not applied until + // after the Psiphon API handshake, so write splits won't impact early + // obfuscation traffic shaping; and writes are on the order of one + // SSH "packet", up to 32K, unlikely to be split for all but the most + // restrictive of rate limits. + + burst := conn.writeRateLimiter.Burst() + if !conn.isStream && len(buffer) > burst { + return 0, errors.TraceNew("non-stream write exceeds burst") + } + totalWritten := 0 + for i := 0; i < len(buffer); i += burst { + + j := i + burst + if j > len(buffer) { + j = len(buffer) + } + b := buffer[i:j] + + // See comment in Read regarding rate.Limiter.ReserveN vs. + // rate.Limiter.WaitN. + + reservation := conn.writeRateLimiter.ReserveN(time.Now(), len(b)) + if !reservation.OK() { + // This error is not expected, given the write split adjustments. + return 0, errors.TraceNew("burst size exceeded") + } + sleepDuration := reservation.Delay() if sleepDuration > 0 { if conn.writeDelayTimer == nil { conn.writeDelayTimer = time.NewTimer(sleepDuration) @@ -256,11 +334,16 @@ func (conn *ThrottledConn) Write(buffer []byte) (int, error) { } } } - } - n, err := conn.Conn.Write(buffer) + n, err := conn.Conn.Write(b) + totalWritten += n + if err != nil { + // Don't wrap I/O errors + return totalWritten, err + } + } - return n, errors.Trace(err) + return totalWritten, nil } func (conn *ThrottledConn) Close() error { diff --git a/psiphon/common/throttled_test.go b/psiphon/common/throttled_test.go index c4993f49d..b35299501 100644 --- a/psiphon/common/throttled_test.go +++ b/psiphon/common/throttled_test.go @@ -22,6 +22,7 @@ package common import ( "bytes" "fmt" + "io" "io/ioutil" "math" "net" @@ -113,7 +114,7 @@ func runRateLimitsTest(t *testing.T, rateLimits RateLimits) { if err != nil { return conn, err } - return NewThrottledConn(conn, rateLimits), nil + return NewThrottledConn(conn, true, rateLimits), nil } client := &http.Client{ @@ -204,27 +205,27 @@ func TestThrottledConnClose(t *testing.T) { n := 4 b := make([]byte, n+1) - throttledConn := NewThrottledConn(&testConn{}, rateLimits) + throttledConn := NewThrottledConn(&testConn{}, true, rateLimits) now := time.Now() - _, err := throttledConn.Read(b) + _, err := io.ReadFull(throttledConn, b) elapsed := time.Since(now) if err != nil || elapsed < time.Duration(n)*time.Second { t.Errorf("unexpected interrupted read: %s, %v", elapsed, err) } now = time.Now() - go func() { + go func(conn net.Conn) { time.Sleep(500 * time.Millisecond) - throttledConn.Close() - }() + conn.Close() + }(throttledConn) _, err = throttledConn.Read(b) elapsed = time.Since(now) if elapsed > 1*time.Second { t.Errorf("unexpected uninterrupted read: %s, %v", elapsed, err) } - throttledConn = NewThrottledConn(&testConn{}, rateLimits) + throttledConn = NewThrottledConn(&testConn{}, true, rateLimits) now = time.Now() _, err = throttledConn.Write(b) @@ -234,10 +235,10 @@ func TestThrottledConnClose(t *testing.T) { } now = time.Now() - go func() { + go func(conn net.Conn) { time.Sleep(500 * time.Millisecond) - throttledConn.Close() - }() + conn.Close() + }(throttledConn) _, err = throttledConn.Write(b) elapsed = time.Since(now) if elapsed > 1*time.Second { @@ -245,6 +246,23 @@ func TestThrottledConnClose(t *testing.T) { } } +func TestNonStreamThrottledConn(t *testing.T) { + + MTU := int64(1500) + + rateLimits := RateLimits{ + ReadBytesPerSecond: MTU - 1, + WriteBytesPerSecond: MTU - 1, + } + + throttledConn := NewThrottledConn(&testConn{}, false, rateLimits) + + _, err := throttledConn.Write(make([]byte, MTU)) + if err == nil { + t.Errorf("unexpected split write") + } +} + type testConn struct { } diff --git a/psiphon/server/meek.go b/psiphon/server/meek.go index 5a2d4ba5a..859236fdd 100644 --- a/psiphon/server/meek.go +++ b/psiphon/server/meek.go @@ -50,8 +50,8 @@ import ( "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/transforms" "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/values" lrucache "github.com/cognusion/go-cache-lru" - "github.com/juju/ratelimit" "golang.org/x/crypto/nacl/box" + "golang.org/x/time/rate" ) // MeekServer is based on meek-server.go from Tor and Psiphon: @@ -1026,22 +1026,25 @@ func (server *MeekServer) rateLimit( // (as well as synchronizing access to rateLimitCount). server.rateLimitLock.Lock() - var rateLimiter *ratelimit.Bucket + var rateLimiter *rate.Limiter entry, ok := server.rateLimitHistory.Get(rateLimitIP) if ok { - rateLimiter = entry.(*ratelimit.Bucket) + rateLimiter = entry.(*rate.Limiter) } else { - rateLimiter = ratelimit.NewBucketWithQuantum( - time.Duration(thresholdSeconds)*time.Second, - int64(historySize), - int64(historySize)) + + // Set bursts to 1, which is appropriate for new meek tunnels and + // tactics requests. + + limit := float64(historySize) / float64(thresholdSeconds) + bursts := 1 + rateLimiter = rate.NewLimiter(rate.Limit(limit), bursts) server.rateLimitHistory.Set( rateLimitIP, rateLimiter, time.Duration(thresholdSeconds)*time.Second) } - limit := rateLimiter.TakeAvailable(1) < 1 + limit := !rateLimiter.Allow() triggerGC := false if limit { diff --git a/psiphon/server/tunnelServer.go b/psiphon/server/tunnelServer.go index 03032fe94..cf640589b 100644 --- a/psiphon/server/tunnelServer.go +++ b/psiphon/server/tunnelServer.go @@ -1937,9 +1937,11 @@ func (sshClient *sshClient) run( // Allow garbage collection. p.Close() - // Further wrap the connection in a rate limiting ThrottledConn. + // Further wrap the connection in a rate limiting ThrottledConn. The + // underlying dialConn is always a stream, even when the network conn + // uses UDP. - throttledConn := common.NewThrottledConn(conn, sshClient.rateLimits()) + throttledConn := common.NewThrottledConn(conn, true, sshClient.rateLimits()) conn = throttledConn // Replay of server-side parameters is set or extended after a new tunnel diff --git a/psiphon/tunnel.go b/psiphon/tunnel.go index b78d1d60c..bec851e72 100644 --- a/psiphon/tunnel.go +++ b/psiphon/tunnel.go @@ -991,9 +991,11 @@ func dialTunnel( burstUpstreamTargetBytes, burstUpstreamDeadline, burstDownstreamTargetBytes, burstDownstreamDeadline) - // Apply throttling (if configured) + // Apply throttling (if configured). The underlying dialConn is always a + // stream, even when the network conn uses UDP. throttledConn := common.NewThrottledConn( monitoredConn, + true, rateLimits) // Add obfuscated SSH layer diff --git a/vendor/github.com/juju/ratelimit/LICENSE b/vendor/github.com/juju/ratelimit/LICENSE deleted file mode 100644 index ade9307b3..000000000 --- a/vendor/github.com/juju/ratelimit/LICENSE +++ /dev/null @@ -1,191 +0,0 @@ -All files in this repository are licensed as follows. If you contribute -to this repository, it is assumed that you license your contribution -under the same license unless you state otherwise. - -All files Copyright (C) 2015 Canonical Ltd. unless otherwise specified in the file. - -This software is licensed under the LGPLv3, included below. - -As a special exception to the GNU Lesser General Public License version 3 -("LGPL3"), the copyright holders of this Library give you permission to -convey to a third party a Combined Work that links statically or dynamically -to this Library without providing any Minimal Corresponding Source or -Minimal Application Code as set out in 4d or providing the installation -information set out in section 4e, provided that you comply with the other -provisions of LGPL3 and provided that you meet, for the Application the -terms and conditions of the license(s) which apply to the Application. - -Except as stated in this special exception, the provisions of LGPL3 will -continue to comply in full to this Library. If you modify this Library, you -may apply this exception to your version of this Library, but you are not -obliged to do so. If you do not wish to do so, delete this exception -statement from your version. This exception does not (and cannot) modify any -license terms which apply to the Application, with which you must still -comply. - - - GNU LESSER GENERAL PUBLIC LICENSE - Version 3, 29 June 2007 - - Copyright (C) 2007 Free Software Foundation, Inc. - Everyone is permitted to copy and distribute verbatim copies - of this license document, but changing it is not allowed. - - - This version of the GNU Lesser General Public License incorporates -the terms and conditions of version 3 of the GNU General Public -License, supplemented by the additional permissions listed below. - - 0. Additional Definitions. - - As used herein, "this License" refers to version 3 of the GNU Lesser -General Public License, and the "GNU GPL" refers to version 3 of the GNU -General Public License. - - "The Library" refers to a covered work governed by this License, -other than an Application or a Combined Work as defined below. - - An "Application" is any work that makes use of an interface provided -by the Library, but which is not otherwise based on the Library. -Defining a subclass of a class defined by the Library is deemed a mode -of using an interface provided by the Library. - - A "Combined Work" is a work produced by combining or linking an -Application with the Library. The particular version of the Library -with which the Combined Work was made is also called the "Linked -Version". - - The "Minimal Corresponding Source" for a Combined Work means the -Corresponding Source for the Combined Work, excluding any source code -for portions of the Combined Work that, considered in isolation, are -based on the Application, and not on the Linked Version. - - The "Corresponding Application Code" for a Combined Work means the -object code and/or source code for the Application, including any data -and utility programs needed for reproducing the Combined Work from the -Application, but excluding the System Libraries of the Combined Work. - - 1. Exception to Section 3 of the GNU GPL. - - You may convey a covered work under sections 3 and 4 of this License -without being bound by section 3 of the GNU GPL. - - 2. Conveying Modified Versions. - - If you modify a copy of the Library, and, in your modifications, a -facility refers to a function or data to be supplied by an Application -that uses the facility (other than as an argument passed when the -facility is invoked), then you may convey a copy of the modified -version: - - a) under this License, provided that you make a good faith effort to - ensure that, in the event an Application does not supply the - function or data, the facility still operates, and performs - whatever part of its purpose remains meaningful, or - - b) under the GNU GPL, with none of the additional permissions of - this License applicable to that copy. - - 3. Object Code Incorporating Material from Library Header Files. - - The object code form of an Application may incorporate material from -a header file that is part of the Library. You may convey such object -code under terms of your choice, provided that, if the incorporated -material is not limited to numerical parameters, data structure -layouts and accessors, or small macros, inline functions and templates -(ten or fewer lines in length), you do both of the following: - - a) Give prominent notice with each copy of the object code that the - Library is used in it and that the Library and its use are - covered by this License. - - b) Accompany the object code with a copy of the GNU GPL and this license - document. - - 4. Combined Works. - - You may convey a Combined Work under terms of your choice that, -taken together, effectively do not restrict modification of the -portions of the Library contained in the Combined Work and reverse -engineering for debugging such modifications, if you also do each of -the following: - - a) Give prominent notice with each copy of the Combined Work that - the Library is used in it and that the Library and its use are - covered by this License. - - b) Accompany the Combined Work with a copy of the GNU GPL and this license - document. - - c) For a Combined Work that displays copyright notices during - execution, include the copyright notice for the Library among - these notices, as well as a reference directing the user to the - copies of the GNU GPL and this license document. - - d) Do one of the following: - - 0) Convey the Minimal Corresponding Source under the terms of this - License, and the Corresponding Application Code in a form - suitable for, and under terms that permit, the user to - recombine or relink the Application with a modified version of - the Linked Version to produce a modified Combined Work, in the - manner specified by section 6 of the GNU GPL for conveying - Corresponding Source. - - 1) Use a suitable shared library mechanism for linking with the - Library. A suitable mechanism is one that (a) uses at run time - a copy of the Library already present on the user's computer - system, and (b) will operate properly with a modified version - of the Library that is interface-compatible with the Linked - Version. - - e) Provide Installation Information, but only if you would otherwise - be required to provide such information under section 6 of the - GNU GPL, and only to the extent that such information is - necessary to install and execute a modified version of the - Combined Work produced by recombining or relinking the - Application with a modified version of the Linked Version. (If - you use option 4d0, the Installation Information must accompany - the Minimal Corresponding Source and Corresponding Application - Code. If you use option 4d1, you must provide the Installation - Information in the manner specified by section 6 of the GNU GPL - for conveying Corresponding Source.) - - 5. Combined Libraries. - - You may place library facilities that are a work based on the -Library side by side in a single library together with other library -facilities that are not Applications and are not covered by this -License, and convey such a combined library under terms of your -choice, if you do both of the following: - - a) Accompany the combined library with a copy of the same work based - on the Library, uncombined with any other library facilities, - conveyed under the terms of this License. - - b) Give prominent notice with the combined library that part of it - is a work based on the Library, and explaining where to find the - accompanying uncombined form of the same work. - - 6. Revised Versions of the GNU Lesser General Public License. - - The Free Software Foundation may publish revised and/or new versions -of the GNU Lesser General Public License from time to time. Such new -versions will be similar in spirit to the present version, but may -differ in detail to address new problems or concerns. - - Each version is given a distinguishing version number. If the -Library as you received it specifies that a certain numbered version -of the GNU Lesser General Public License "or any later version" -applies to it, you have the option of following the terms and -conditions either of that published version or of any later version -published by the Free Software Foundation. If the Library as you -received it does not specify a version number of the GNU Lesser -General Public License, you may choose any version of the GNU Lesser -General Public License ever published by the Free Software Foundation. - - If the Library as you received it specifies that a proxy can decide -whether future versions of the GNU Lesser General Public License shall -apply, that proxy's public statement of acceptance of any version is -permanent authorization for you to choose that version for the -Library. diff --git a/vendor/github.com/juju/ratelimit/README.md b/vendor/github.com/juju/ratelimit/README.md deleted file mode 100644 index 47ff69d03..000000000 --- a/vendor/github.com/juju/ratelimit/README.md +++ /dev/null @@ -1,129 +0,0 @@ -# ratelimit --- - import "github.com/juju/ratelimit" - -The ratelimit package provides an efficient token bucket implementation. See -http://en.wikipedia.org/wiki/Token_bucket. - -## Usage - -#### func Reader - -```go -func Reader(r io.Reader, bucket *Bucket) io.Reader -``` -Reader returns a reader that is rate limited by the given token bucket. Each -token in the bucket represents one byte. - -#### func Writer - -```go -func Writer(w io.Writer, bucket *Bucket) io.Writer -``` -Writer returns a writer that is rate limited by the given token bucket. Each -token in the bucket represents one byte. - -#### type Bucket - -```go -type Bucket struct { -} -``` - -Bucket represents a token bucket that fills at a predetermined rate. Methods on -Bucket may be called concurrently. - -#### func NewBucket - -```go -func NewBucket(fillInterval time.Duration, capacity int64) *Bucket -``` -NewBucket returns a new token bucket that fills at the rate of one token every -fillInterval, up to the given maximum capacity. Both arguments must be positive. -The bucket is initially full. - -#### func NewBucketWithQuantum - -```go -func NewBucketWithQuantum(fillInterval time.Duration, capacity, quantum int64) *Bucket -``` -NewBucketWithQuantum is similar to NewBucket, but allows the specification of -the quantum size - quantum tokens are added every fillInterval. - -#### func NewBucketWithRate - -```go -func NewBucketWithRate(rate float64, capacity int64) *Bucket -``` -NewBucketWithRate returns a token bucket that fills the bucket at the rate of -rate tokens per second up to the given maximum capacity. Because of limited -clock resolution, at high rates, the actual rate may be up to 1% different from -the specified rate. - -#### func (*Bucket) Available - -```go -func (tb *Bucket) Available() int64 -``` -Available returns the number of available tokens. It will be negative -when there are consumers waiting for tokens. Note that if this -returns greater than zero, it does not guarantee that calls that take -tokens from the buffer will succeed, as the number of available -tokens could have changed in the meantime. This method is intended -primarily for metrics reporting and debugging. - -#### func (*Bucket) Rate - -```go -func (tb *Bucket) Rate() float64 -``` -Rate returns the fill rate of the bucket, in tokens per second. - -#### func (*Bucket) Take - -```go -func (tb *Bucket) Take(count int64) time.Duration -``` -Take takes count tokens from the bucket without blocking. It returns the time -that the caller should wait until the tokens are actually available. - -Note that if the request is irrevocable - there is no way to return tokens to -the bucket once this method commits us to taking them. - -#### func (*Bucket) TakeAvailable - -```go -func (tb *Bucket) TakeAvailable(count int64) int64 -``` -TakeAvailable takes up to count immediately available tokens from the bucket. It -returns the number of tokens removed, or zero if there are no available tokens. -It does not block. - -#### func (*Bucket) TakeMaxDuration - -```go -func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool) -``` -TakeMaxDuration is like Take, except that it will only take tokens from the -bucket if the wait time for the tokens is no greater than maxWait. - -If it would take longer than maxWait for the tokens to become available, it does -nothing and reports false, otherwise it returns the time that the caller should -wait until the tokens are actually available, and reports true. - -#### func (*Bucket) Wait - -```go -func (tb *Bucket) Wait(count int64) -``` -Wait takes count tokens from the bucket, waiting until they are available. - -#### func (*Bucket) WaitMaxDuration - -```go -func (tb *Bucket) WaitMaxDuration(count int64, maxWait time.Duration) bool -``` -WaitMaxDuration is like Wait except that it will only take tokens from the -bucket if it needs to wait for no greater than maxWait. It reports whether any -tokens have been removed from the bucket If no tokens have been removed, it -returns immediately. diff --git a/vendor/github.com/juju/ratelimit/ratelimit.go b/vendor/github.com/juju/ratelimit/ratelimit.go deleted file mode 100644 index 51a6559a1..000000000 --- a/vendor/github.com/juju/ratelimit/ratelimit.go +++ /dev/null @@ -1,345 +0,0 @@ -// Copyright 2014 Canonical Ltd. -// Licensed under the LGPLv3 with static-linking exception. -// See LICENCE file for details. - -// Package ratelimit provides an efficient token bucket implementation -// that can be used to limit the rate of arbitrary things. -// See http://en.wikipedia.org/wiki/Token_bucket. -package ratelimit - -import ( - "math" - "strconv" - "sync" - "time" -) - -// The algorithm that this implementation uses does computational work -// only when tokens are removed from the bucket, and that work completes -// in short, bounded-constant time (Bucket.Wait benchmarks at 175ns on -// my laptop). -// -// Time is measured in equal measured ticks, a given interval -// (fillInterval) apart. On each tick a number of tokens (quantum) are -// added to the bucket. -// -// When any of the methods are called the bucket updates the number of -// tokens that are in the bucket, and it records the current tick -// number too. Note that it doesn't record the current time - by -// keeping things in units of whole ticks, it's easy to dish out tokens -// at exactly the right intervals as measured from the start time. -// -// This allows us to calculate the number of tokens that will be -// available at some time in the future with a few simple arithmetic -// operations. -// -// The main reason for being able to transfer multiple tokens on each tick -// is so that we can represent rates greater than 1e9 (the resolution of the Go -// time package) tokens per second, but it's also useful because -// it means we can easily represent situations like "a person gets -// five tokens an hour, replenished on the hour". - -// Bucket represents a token bucket that fills at a predetermined rate. -// Methods on Bucket may be called concurrently. -type Bucket struct { - clock Clock - - // startTime holds the moment when the bucket was - // first created and ticks began. - startTime time.Time - - // capacity holds the overall capacity of the bucket. - capacity int64 - - // quantum holds how many tokens are added on - // each tick. - quantum int64 - - // fillInterval holds the interval between each tick. - fillInterval time.Duration - - // mu guards the fields below it. - mu sync.Mutex - - // availableTokens holds the number of available - // tokens as of the associated latestTick. - // It will be negative when there are consumers - // waiting for tokens. - availableTokens int64 - - // latestTick holds the latest tick for which - // we know the number of tokens in the bucket. - latestTick int64 -} - -// NewBucket returns a new token bucket that fills at the -// rate of one token every fillInterval, up to the given -// maximum capacity. Both arguments must be -// positive. The bucket is initially full. -func NewBucket(fillInterval time.Duration, capacity int64) *Bucket { - return NewBucketWithClock(fillInterval, capacity, nil) -} - -// NewBucketWithClock is identical to NewBucket but injects a testable clock -// interface. -func NewBucketWithClock(fillInterval time.Duration, capacity int64, clock Clock) *Bucket { - return NewBucketWithQuantumAndClock(fillInterval, capacity, 1, clock) -} - -// rateMargin specifes the allowed variance of actual -// rate from specified rate. 1% seems reasonable. -const rateMargin = 0.01 - -// NewBucketWithRate returns a token bucket that fills the bucket -// at the rate of rate tokens per second up to the given -// maximum capacity. Because of limited clock resolution, -// at high rates, the actual rate may be up to 1% different from the -// specified rate. -func NewBucketWithRate(rate float64, capacity int64) *Bucket { - return NewBucketWithRateAndClock(rate, capacity, nil) -} - -// NewBucketWithRateAndClock is identical to NewBucketWithRate but injects a -// testable clock interface. -func NewBucketWithRateAndClock(rate float64, capacity int64, clock Clock) *Bucket { - // Use the same bucket each time through the loop - // to save allocations. - tb := NewBucketWithQuantumAndClock(1, capacity, 1, clock) - for quantum := int64(1); quantum < 1<<50; quantum = nextQuantum(quantum) { - fillInterval := time.Duration(1e9 * float64(quantum) / rate) - if fillInterval <= 0 { - continue - } - tb.fillInterval = fillInterval - tb.quantum = quantum - if diff := math.Abs(tb.Rate() - rate); diff/rate <= rateMargin { - return tb - } - } - panic("cannot find suitable quantum for " + strconv.FormatFloat(rate, 'g', -1, 64)) -} - -// nextQuantum returns the next quantum to try after q. -// We grow the quantum exponentially, but slowly, so we -// get a good fit in the lower numbers. -func nextQuantum(q int64) int64 { - q1 := q * 11 / 10 - if q1 == q { - q1++ - } - return q1 -} - -// NewBucketWithQuantum is similar to NewBucket, but allows -// the specification of the quantum size - quantum tokens -// are added every fillInterval. -func NewBucketWithQuantum(fillInterval time.Duration, capacity, quantum int64) *Bucket { - return NewBucketWithQuantumAndClock(fillInterval, capacity, quantum, nil) -} - -// NewBucketWithQuantumAndClock is like NewBucketWithQuantum, but -// also has a clock argument that allows clients to fake the passing -// of time. If clock is nil, the system clock will be used. -func NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket { - if clock == nil { - clock = realClock{} - } - if fillInterval <= 0 { - panic("token bucket fill interval is not > 0") - } - if capacity <= 0 { - panic("token bucket capacity is not > 0") - } - if quantum <= 0 { - panic("token bucket quantum is not > 0") - } - return &Bucket{ - clock: clock, - startTime: clock.Now(), - latestTick: 0, - fillInterval: fillInterval, - capacity: capacity, - quantum: quantum, - availableTokens: capacity, - } -} - -// Wait takes count tokens from the bucket, waiting until they are -// available. -func (tb *Bucket) Wait(count int64) { - if d := tb.Take(count); d > 0 { - tb.clock.Sleep(d) - } -} - -// WaitMaxDuration is like Wait except that it will -// only take tokens from the bucket if it needs to wait -// for no greater than maxWait. It reports whether -// any tokens have been removed from the bucket -// If no tokens have been removed, it returns immediately. -func (tb *Bucket) WaitMaxDuration(count int64, maxWait time.Duration) bool { - d, ok := tb.TakeMaxDuration(count, maxWait) - if d > 0 { - tb.clock.Sleep(d) - } - return ok -} - -const infinityDuration time.Duration = 0x7fffffffffffffff - -// Take takes count tokens from the bucket without blocking. It returns -// the time that the caller should wait until the tokens are actually -// available. -// -// Note that if the request is irrevocable - there is no way to return -// tokens to the bucket once this method commits us to taking them. -func (tb *Bucket) Take(count int64) time.Duration { - tb.mu.Lock() - defer tb.mu.Unlock() - d, _ := tb.take(tb.clock.Now(), count, infinityDuration) - return d -} - -// TakeMaxDuration is like Take, except that -// it will only take tokens from the bucket if the wait -// time for the tokens is no greater than maxWait. -// -// If it would take longer than maxWait for the tokens -// to become available, it does nothing and reports false, -// otherwise it returns the time that the caller should -// wait until the tokens are actually available, and reports -// true. -func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool) { - tb.mu.Lock() - defer tb.mu.Unlock() - return tb.take(tb.clock.Now(), count, maxWait) -} - -// TakeAvailable takes up to count immediately available tokens from the -// bucket. It returns the number of tokens removed, or zero if there are -// no available tokens. It does not block. -func (tb *Bucket) TakeAvailable(count int64) int64 { - tb.mu.Lock() - defer tb.mu.Unlock() - return tb.takeAvailable(tb.clock.Now(), count) -} - -// takeAvailable is the internal version of TakeAvailable - it takes the -// current time as an argument to enable easy testing. -func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 { - if count <= 0 { - return 0 - } - tb.adjustavailableTokens(tb.currentTick(now)) - if tb.availableTokens <= 0 { - return 0 - } - if count > tb.availableTokens { - count = tb.availableTokens - } - tb.availableTokens -= count - return count -} - -// Available returns the number of available tokens. It will be negative -// when there are consumers waiting for tokens. Note that if this -// returns greater than zero, it does not guarantee that calls that take -// tokens from the buffer will succeed, as the number of available -// tokens could have changed in the meantime. This method is intended -// primarily for metrics reporting and debugging. -func (tb *Bucket) Available() int64 { - return tb.available(tb.clock.Now()) -} - -// available is the internal version of available - it takes the current time as -// an argument to enable easy testing. -func (tb *Bucket) available(now time.Time) int64 { - tb.mu.Lock() - defer tb.mu.Unlock() - tb.adjustavailableTokens(tb.currentTick(now)) - return tb.availableTokens -} - -// Capacity returns the capacity that the bucket was created with. -func (tb *Bucket) Capacity() int64 { - return tb.capacity -} - -// Rate returns the fill rate of the bucket, in tokens per second. -func (tb *Bucket) Rate() float64 { - return 1e9 * float64(tb.quantum) / float64(tb.fillInterval) -} - -// take is the internal version of Take - it takes the current time as -// an argument to enable easy testing. -func (tb *Bucket) take(now time.Time, count int64, maxWait time.Duration) (time.Duration, bool) { - if count <= 0 { - return 0, true - } - - tick := tb.currentTick(now) - tb.adjustavailableTokens(tick) - avail := tb.availableTokens - count - if avail >= 0 { - tb.availableTokens = avail - return 0, true - } - // Round up the missing tokens to the nearest multiple - // of quantum - the tokens won't be available until - // that tick. - - // endTick holds the tick when all the requested tokens will - // become available. - endTick := tick + (-avail+tb.quantum-1)/tb.quantum - endTime := tb.startTime.Add(time.Duration(endTick) * tb.fillInterval) - waitTime := endTime.Sub(now) - if waitTime > maxWait { - return 0, false - } - tb.availableTokens = avail - return waitTime, true -} - -// currentTick returns the current time tick, measured -// from tb.startTime. -func (tb *Bucket) currentTick(now time.Time) int64 { - return int64(now.Sub(tb.startTime) / tb.fillInterval) -} - -// adjustavailableTokens adjusts the current number of tokens -// available in the bucket at the given time, which must -// be in the future (positive) with respect to tb.latestTick. -func (tb *Bucket) adjustavailableTokens(tick int64) { - lastTick := tb.latestTick - tb.latestTick = tick - if tb.availableTokens >= tb.capacity { - return - } - tb.availableTokens += (tick - lastTick) * tb.quantum - if tb.availableTokens > tb.capacity { - tb.availableTokens = tb.capacity - } - return -} - -// Clock represents the passage of time in a way that -// can be faked out for tests. -type Clock interface { - // Now returns the current time. - Now() time.Time - // Sleep sleeps for at least the given duration. - Sleep(d time.Duration) -} - -// realClock implements Clock in terms of standard time functions. -type realClock struct{} - -// Now implements Clock.Now by calling time.Now. -func (realClock) Now() time.Time { - return time.Now() -} - -// Now implements Clock.Sleep by calling time.Sleep. -func (realClock) Sleep(d time.Duration) { - time.Sleep(d) -} diff --git a/vendor/github.com/juju/ratelimit/reader.go b/vendor/github.com/juju/ratelimit/reader.go deleted file mode 100644 index 6403bf78d..000000000 --- a/vendor/github.com/juju/ratelimit/reader.go +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright 2014 Canonical Ltd. -// Licensed under the LGPLv3 with static-linking exception. -// See LICENCE file for details. - -package ratelimit - -import "io" - -type reader struct { - r io.Reader - bucket *Bucket -} - -// Reader returns a reader that is rate limited by -// the given token bucket. Each token in the bucket -// represents one byte. -func Reader(r io.Reader, bucket *Bucket) io.Reader { - return &reader{ - r: r, - bucket: bucket, - } -} - -func (r *reader) Read(buf []byte) (int, error) { - n, err := r.r.Read(buf) - if n <= 0 { - return n, err - } - r.bucket.Wait(int64(n)) - return n, err -} - -type writer struct { - w io.Writer - bucket *Bucket -} - -// Writer returns a reader that is rate limited by -// the given token bucket. Each token in the bucket -// represents one byte. -func Writer(w io.Writer, bucket *Bucket) io.Writer { - return &writer{ - w: w, - bucket: bucket, - } -} - -func (w *writer) Write(buf []byte) (int, error) { - w.bucket.Wait(int64(len(buf))) - return w.w.Write(buf) -} diff --git a/vendor/golang.org/x/time/LICENSE b/vendor/golang.org/x/time/LICENSE new file mode 100644 index 000000000..6a66aea5e --- /dev/null +++ b/vendor/golang.org/x/time/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2009 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/golang.org/x/time/PATENTS b/vendor/golang.org/x/time/PATENTS new file mode 100644 index 000000000..733099041 --- /dev/null +++ b/vendor/golang.org/x/time/PATENTS @@ -0,0 +1,22 @@ +Additional IP Rights Grant (Patents) + +"This implementation" means the copyrightable works distributed by +Google as part of the Go project. + +Google hereby grants to You a perpetual, worldwide, non-exclusive, +no-charge, royalty-free, irrevocable (except as stated in this section) +patent license to make, have made, use, offer to sell, sell, import, +transfer and otherwise run, modify and propagate the contents of this +implementation of Go, where such license applies only to those patent +claims, both currently owned or controlled by Google and acquired in +the future, licensable by Google that are necessarily infringed by this +implementation of Go. This grant does not include claims that would be +infringed only as a consequence of further modification of this +implementation. If you or your agent or exclusive licensee institute or +order or agree to the institution of patent litigation against any +entity (including a cross-claim or counterclaim in a lawsuit) alleging +that this implementation of Go or any code incorporated within this +implementation of Go constitutes direct or contributory patent +infringement, or inducement of patent infringement, then any patent +rights granted to you under this License for this implementation of Go +shall terminate as of the date such litigation is filed. diff --git a/vendor/golang.org/x/time/rate/rate.go b/vendor/golang.org/x/time/rate/rate.go new file mode 100644 index 000000000..8f6c7f493 --- /dev/null +++ b/vendor/golang.org/x/time/rate/rate.go @@ -0,0 +1,430 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package rate provides a rate limiter. +package rate + +import ( + "context" + "fmt" + "math" + "sync" + "time" +) + +// Limit defines the maximum frequency of some events. +// Limit is represented as number of events per second. +// A zero Limit allows no events. +type Limit float64 + +// Inf is the infinite rate limit; it allows all events (even if burst is zero). +const Inf = Limit(math.MaxFloat64) + +// Every converts a minimum time interval between events to a Limit. +func Every(interval time.Duration) Limit { + if interval <= 0 { + return Inf + } + return 1 / Limit(interval.Seconds()) +} + +// A Limiter controls how frequently events are allowed to happen. +// It implements a "token bucket" of size b, initially full and refilled +// at rate r tokens per second. +// Informally, in any large enough time interval, the Limiter limits the +// rate to r tokens per second, with a maximum burst size of b events. +// As a special case, if r == Inf (the infinite rate), b is ignored. +// See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets. +// +// The zero value is a valid Limiter, but it will reject all events. +// Use NewLimiter to create non-zero Limiters. +// +// Limiter has three main methods, Allow, Reserve, and Wait. +// Most callers should use Wait. +// +// Each of the three methods consumes a single token. +// They differ in their behavior when no token is available. +// If no token is available, Allow returns false. +// If no token is available, Reserve returns a reservation for a future token +// and the amount of time the caller must wait before using it. +// If no token is available, Wait blocks until one can be obtained +// or its associated context.Context is canceled. +// +// The methods AllowN, ReserveN, and WaitN consume n tokens. +// +// Limiter is safe for simultaneous use by multiple goroutines. +type Limiter struct { + mu sync.Mutex + limit Limit + burst int + tokens float64 + // last is the last time the limiter's tokens field was updated + last time.Time + // lastEvent is the latest time of a rate-limited event (past or future) + lastEvent time.Time +} + +// Limit returns the maximum overall event rate. +func (lim *Limiter) Limit() Limit { + lim.mu.Lock() + defer lim.mu.Unlock() + return lim.limit +} + +// Burst returns the maximum burst size. Burst is the maximum number of tokens +// that can be consumed in a single call to Allow, Reserve, or Wait, so higher +// Burst values allow more events to happen at once. +// A zero Burst allows no events, unless limit == Inf. +func (lim *Limiter) Burst() int { + lim.mu.Lock() + defer lim.mu.Unlock() + return lim.burst +} + +// TokensAt returns the number of tokens available at time t. +func (lim *Limiter) TokensAt(t time.Time) float64 { + lim.mu.Lock() + _, tokens := lim.advance(t) // does not mutate lim + lim.mu.Unlock() + return tokens +} + +// Tokens returns the number of tokens available now. +func (lim *Limiter) Tokens() float64 { + return lim.TokensAt(time.Now()) +} + +// NewLimiter returns a new Limiter that allows events up to rate r and permits +// bursts of at most b tokens. +func NewLimiter(r Limit, b int) *Limiter { + return &Limiter{ + limit: r, + burst: b, + } +} + +// Allow reports whether an event may happen now. +func (lim *Limiter) Allow() bool { + return lim.AllowN(time.Now(), 1) +} + +// AllowN reports whether n events may happen at time t. +// Use this method if you intend to drop / skip events that exceed the rate limit. +// Otherwise use Reserve or Wait. +func (lim *Limiter) AllowN(t time.Time, n int) bool { + return lim.reserveN(t, n, 0).ok +} + +// A Reservation holds information about events that are permitted by a Limiter to happen after a delay. +// A Reservation may be canceled, which may enable the Limiter to permit additional events. +type Reservation struct { + ok bool + lim *Limiter + tokens int + timeToAct time.Time + // This is the Limit at reservation time, it can change later. + limit Limit +} + +// OK returns whether the limiter can provide the requested number of tokens +// within the maximum wait time. If OK is false, Delay returns InfDuration, and +// Cancel does nothing. +func (r *Reservation) OK() bool { + return r.ok +} + +// Delay is shorthand for DelayFrom(time.Now()). +func (r *Reservation) Delay() time.Duration { + return r.DelayFrom(time.Now()) +} + +// InfDuration is the duration returned by Delay when a Reservation is not OK. +const InfDuration = time.Duration(math.MaxInt64) + +// DelayFrom returns the duration for which the reservation holder must wait +// before taking the reserved action. Zero duration means act immediately. +// InfDuration means the limiter cannot grant the tokens requested in this +// Reservation within the maximum wait time. +func (r *Reservation) DelayFrom(t time.Time) time.Duration { + if !r.ok { + return InfDuration + } + delay := r.timeToAct.Sub(t) + if delay < 0 { + return 0 + } + return delay +} + +// Cancel is shorthand for CancelAt(time.Now()). +func (r *Reservation) Cancel() { + r.CancelAt(time.Now()) +} + +// CancelAt indicates that the reservation holder will not perform the reserved action +// and reverses the effects of this Reservation on the rate limit as much as possible, +// considering that other reservations may have already been made. +func (r *Reservation) CancelAt(t time.Time) { + if !r.ok { + return + } + + r.lim.mu.Lock() + defer r.lim.mu.Unlock() + + if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(t) { + return + } + + // calculate tokens to restore + // The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved + // after r was obtained. These tokens should not be restored. + restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct)) + if restoreTokens <= 0 { + return + } + // advance time to now + t, tokens := r.lim.advance(t) + // calculate new number of tokens + tokens += restoreTokens + if burst := float64(r.lim.burst); tokens > burst { + tokens = burst + } + // update state + r.lim.last = t + r.lim.tokens = tokens + if r.timeToAct == r.lim.lastEvent { + prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens))) + if !prevEvent.Before(t) { + r.lim.lastEvent = prevEvent + } + } +} + +// Reserve is shorthand for ReserveN(time.Now(), 1). +func (lim *Limiter) Reserve() *Reservation { + return lim.ReserveN(time.Now(), 1) +} + +// ReserveN returns a Reservation that indicates how long the caller must wait before n events happen. +// The Limiter takes this Reservation into account when allowing future events. +// The returned Reservation’s OK() method returns false if n exceeds the Limiter's burst size. +// Usage example: +// +// r := lim.ReserveN(time.Now(), 1) +// if !r.OK() { +// // Not allowed to act! Did you remember to set lim.burst to be > 0 ? +// return +// } +// time.Sleep(r.Delay()) +// Act() +// +// Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events. +// If you need to respect a deadline or cancel the delay, use Wait instead. +// To drop or skip events exceeding rate limit, use Allow instead. +func (lim *Limiter) ReserveN(t time.Time, n int) *Reservation { + r := lim.reserveN(t, n, InfDuration) + return &r +} + +// Wait is shorthand for WaitN(ctx, 1). +func (lim *Limiter) Wait(ctx context.Context) (err error) { + return lim.WaitN(ctx, 1) +} + +// WaitN blocks until lim permits n events to happen. +// It returns an error if n exceeds the Limiter's burst size, the Context is +// canceled, or the expected wait time exceeds the Context's Deadline. +// The burst limit is ignored if the rate limit is Inf. +func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) { + // The test code calls lim.wait with a fake timer generator. + // This is the real timer generator. + newTimer := func(d time.Duration) (<-chan time.Time, func() bool, func()) { + timer := time.NewTimer(d) + return timer.C, timer.Stop, func() {} + } + + return lim.wait(ctx, n, time.Now(), newTimer) +} + +// wait is the internal implementation of WaitN. +func (lim *Limiter) wait(ctx context.Context, n int, t time.Time, newTimer func(d time.Duration) (<-chan time.Time, func() bool, func())) error { + lim.mu.Lock() + burst := lim.burst + limit := lim.limit + lim.mu.Unlock() + + if n > burst && limit != Inf { + return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst) + } + // Check if ctx is already cancelled + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + // Determine wait limit + waitLimit := InfDuration + if deadline, ok := ctx.Deadline(); ok { + waitLimit = deadline.Sub(t) + } + // Reserve + r := lim.reserveN(t, n, waitLimit) + if !r.ok { + return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n) + } + // Wait if necessary + delay := r.DelayFrom(t) + if delay == 0 { + return nil + } + ch, stop, advance := newTimer(delay) + defer stop() + advance() // only has an effect when testing + select { + case <-ch: + // We can proceed. + return nil + case <-ctx.Done(): + // Context was canceled before we could proceed. Cancel the + // reservation, which may permit other events to proceed sooner. + r.Cancel() + return ctx.Err() + } +} + +// SetLimit is shorthand for SetLimitAt(time.Now(), newLimit). +func (lim *Limiter) SetLimit(newLimit Limit) { + lim.SetLimitAt(time.Now(), newLimit) +} + +// SetLimitAt sets a new Limit for the limiter. The new Limit, and Burst, may be violated +// or underutilized by those which reserved (using Reserve or Wait) but did not yet act +// before SetLimitAt was called. +func (lim *Limiter) SetLimitAt(t time.Time, newLimit Limit) { + lim.mu.Lock() + defer lim.mu.Unlock() + + t, tokens := lim.advance(t) + + lim.last = t + lim.tokens = tokens + lim.limit = newLimit +} + +// SetBurst is shorthand for SetBurstAt(time.Now(), newBurst). +func (lim *Limiter) SetBurst(newBurst int) { + lim.SetBurstAt(time.Now(), newBurst) +} + +// SetBurstAt sets a new burst size for the limiter. +func (lim *Limiter) SetBurstAt(t time.Time, newBurst int) { + lim.mu.Lock() + defer lim.mu.Unlock() + + t, tokens := lim.advance(t) + + lim.last = t + lim.tokens = tokens + lim.burst = newBurst +} + +// reserveN is a helper method for AllowN, ReserveN, and WaitN. +// maxFutureReserve specifies the maximum reservation wait duration allowed. +// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN. +func (lim *Limiter) reserveN(t time.Time, n int, maxFutureReserve time.Duration) Reservation { + lim.mu.Lock() + defer lim.mu.Unlock() + + if lim.limit == Inf { + return Reservation{ + ok: true, + lim: lim, + tokens: n, + timeToAct: t, + } + } else if lim.limit == 0 { + var ok bool + if lim.burst >= n { + ok = true + lim.burst -= n + } + return Reservation{ + ok: ok, + lim: lim, + tokens: lim.burst, + timeToAct: t, + } + } + + t, tokens := lim.advance(t) + + // Calculate the remaining number of tokens resulting from the request. + tokens -= float64(n) + + // Calculate the wait duration + var waitDuration time.Duration + if tokens < 0 { + waitDuration = lim.limit.durationFromTokens(-tokens) + } + + // Decide result + ok := n <= lim.burst && waitDuration <= maxFutureReserve + + // Prepare reservation + r := Reservation{ + ok: ok, + lim: lim, + limit: lim.limit, + } + if ok { + r.tokens = n + r.timeToAct = t.Add(waitDuration) + + // Update state + lim.last = t + lim.tokens = tokens + lim.lastEvent = r.timeToAct + } + + return r +} + +// advance calculates and returns an updated state for lim resulting from the passage of time. +// lim is not changed. +// advance requires that lim.mu is held. +func (lim *Limiter) advance(t time.Time) (newT time.Time, newTokens float64) { + last := lim.last + if t.Before(last) { + last = t + } + + // Calculate the new number of tokens, due to time that passed. + elapsed := t.Sub(last) + delta := lim.limit.tokensFromDuration(elapsed) + tokens := lim.tokens + delta + if burst := float64(lim.burst); tokens > burst { + tokens = burst + } + return t, tokens +} + +// durationFromTokens is a unit conversion function from the number of tokens to the duration +// of time it takes to accumulate them at a rate of limit tokens per second. +func (limit Limit) durationFromTokens(tokens float64) time.Duration { + if limit <= 0 { + return InfDuration + } + seconds := tokens / float64(limit) + return time.Duration(float64(time.Second) * seconds) +} + +// tokensFromDuration is a unit conversion function from a time duration to the number of tokens +// which could be accumulated during that duration at a rate of limit tokens per second. +func (limit Limit) tokensFromDuration(d time.Duration) float64 { + if limit <= 0 { + return 0 + } + return d.Seconds() * float64(limit) +} diff --git a/vendor/golang.org/x/time/rate/sometimes.go b/vendor/golang.org/x/time/rate/sometimes.go new file mode 100644 index 000000000..6ba99ddb6 --- /dev/null +++ b/vendor/golang.org/x/time/rate/sometimes.go @@ -0,0 +1,67 @@ +// Copyright 2022 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package rate + +import ( + "sync" + "time" +) + +// Sometimes will perform an action occasionally. The First, Every, and +// Interval fields govern the behavior of Do, which performs the action. +// A zero Sometimes value will perform an action exactly once. +// +// # Example: logging with rate limiting +// +// var sometimes = rate.Sometimes{First: 3, Interval: 10*time.Second} +// func Spammy() { +// sometimes.Do(func() { log.Info("here I am!") }) +// } +type Sometimes struct { + First int // if non-zero, the first N calls to Do will run f. + Every int // if non-zero, every Nth call to Do will run f. + Interval time.Duration // if non-zero and Interval has elapsed since f's last run, Do will run f. + + mu sync.Mutex + count int // number of Do calls + last time.Time // last time f was run +} + +// Do runs the function f as allowed by First, Every, and Interval. +// +// The model is a union (not intersection) of filters. The first call to Do +// always runs f. Subsequent calls to Do run f if allowed by First or Every or +// Interval. +// +// A non-zero First:N causes the first N Do(f) calls to run f. +// +// A non-zero Every:M causes every Mth Do(f) call, starting with the first, to +// run f. +// +// A non-zero Interval causes Do(f) to run f if Interval has elapsed since +// Do last ran f. +// +// Specifying multiple filters produces the union of these execution streams. +// For example, specifying both First:N and Every:M causes the first N Do(f) +// calls and every Mth Do(f) call, starting with the first, to run f. See +// Examples for more. +// +// If Do is called multiple times simultaneously, the calls will block and run +// serially. Therefore, Do is intended for lightweight operations. +// +// Because a call to Do may block until f returns, if f causes Do to be called, +// it will deadlock. +func (s *Sometimes) Do(f func()) { + s.mu.Lock() + defer s.mu.Unlock() + if s.count == 0 || + (s.First > 0 && s.count < s.First) || + (s.Every > 0 && s.count%s.Every == 0) || + (s.Interval > 0 && time.Since(s.last) >= s.Interval) { + f() + s.last = time.Now() + } + s.count++ +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 01828ceb2..0850af6c4 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -138,9 +138,6 @@ github.com/grafov/m3u8 # github.com/josharian/native v1.0.0 ## explicit; go 1.13 github.com/josharian/native -# github.com/juju/ratelimit v1.0.2 -## explicit -github.com/juju/ratelimit # github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 ## explicit github.com/kardianos/osext @@ -381,6 +378,9 @@ golang.org/x/text/secure/bidirule golang.org/x/text/transform golang.org/x/text/unicode/bidi golang.org/x/text/unicode/norm +# golang.org/x/time v0.5.0 +## explicit; go 1.18 +golang.org/x/time/rate # golang.org/x/tools v0.9.1 ## explicit; go 1.18 golang.org/x/tools/go/analysis