Skip to content

Commit

Permalink
Fix telemetry timeout (#1500)
Browse files Browse the repository at this point in the history
  • Loading branch information
AstroProfundis authored Aug 4, 2021
1 parent 35abe88 commit ef79639
Show file tree
Hide file tree
Showing 12 changed files with 63 additions and 33 deletions.
5 changes: 3 additions & 2 deletions cmd/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package cmd

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -106,7 +107,7 @@ func newMirrorSignCmd() *cobra.Command {

if strings.HasPrefix(args[0], "http") {
client := utils.NewHTTPClient(time.Duration(timeout)*time.Second, nil)
data, err := client.Get(args[0])
data, err := client.Get(context.TODO(), args[0])
if err != nil {
return err
}
Expand All @@ -115,7 +116,7 @@ func newMirrorSignCmd() *cobra.Command {
return err
}

if _, err = client.Post(args[0], bytes.NewBuffer(data)); err != nil {
if _, err = client.Post(context.TODO(), args[0], bytes.NewBuffer(data)); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,8 @@ func Execute() {

tiupReport.ExitCode = int32(code)
tiupReport.TakeMilliseconds = uint64(time.Since(start).Milliseconds())
tele := telemetry.NewTelemetry()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
tele := telemetry.NewTelemetry()
err := tele.Report(ctx, teleReport)
if environment.DebugMode {
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion components/cluster/command/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,8 @@ func Execute() {
}
clusterReport.TakeMilliseconds = uint64(time.Since(start).Milliseconds())
clusterReport.Command = strings.Join(teleCommand, " ")
tele := telemetry.NewTelemetry()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
tele := telemetry.NewTelemetry()
err := tele.Report(ctx, teleReport)
if environment.DebugMode {
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion components/playground/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,8 +632,8 @@ func main() {
}
}
playgroundReport.TakeMilliseconds = uint64(time.Since(start).Milliseconds())
tele := telemetry.NewTelemetry()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
tele := telemetry.NewTelemetry()
err := tele.Report(ctx, teleReport)
if environment.DebugMode {
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions pkg/cluster/api/dmapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package api

import (
"bytes"
"context"
"crypto/tls"
"fmt"
"strings"
Expand Down Expand Up @@ -78,7 +79,7 @@ func (dm *DMMasterClient) getEndpoints(cmd string) (endpoints []string) {
func (dm *DMMasterClient) getMember(endpoints []string) (*dmpb.ListMemberResponse, error) {
resp := &dmpb.ListMemberResponse{}
_, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) {
body, err := dm.httpClient.Get(endpoint)
body, err := dm.httpClient.Get(context.TODO(), endpoint)
if err != nil {
return body, err
}
Expand All @@ -101,7 +102,7 @@ func (dm *DMMasterClient) getMember(endpoints []string) (*dmpb.ListMemberRespons
func (dm *DMMasterClient) deleteMember(endpoints []string) (*dmpb.OfflineMemberResponse, error) {
resp := &dmpb.OfflineMemberResponse{}
_, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) {
body, statusCode, err := dm.httpClient.Delete(endpoint, nil)
body, statusCode, err := dm.httpClient.Delete(context.TODO(), endpoint, nil)

if statusCode == 404 || bytes.Contains(body, []byte("not exists")) {
zap.L().Debug("member to offline does not exist, ignore.")
Expand Down
29 changes: 15 additions & 14 deletions pkg/cluster/api/pdapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package api

import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"errors"
Expand Down Expand Up @@ -62,7 +63,7 @@ func (pc *PDClient) tryIdentifyVersion() {
endpoints := pc.getEndpoints(pdVersionURI)
response := map[string]string{}
_, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) {
body, err := pc.httpClient.Get(endpoint)
body, err := pc.httpClient.Get(context.TODO(), endpoint)
if err != nil {
return body, err
}
Expand Down Expand Up @@ -150,7 +151,7 @@ func (pc *PDClient) CheckHealth() error {
endpoints := pc.getEndpoints(pdPingURI)

_, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) {
body, err := pc.httpClient.Get(endpoint)
body, err := pc.httpClient.Get(context.TODO(), endpoint)
if err != nil {
return body, err
}
Expand All @@ -174,7 +175,7 @@ func (pc *PDClient) GetStores() (*StoresInfo, error) {
storesInfo := StoresInfo{}

_, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) {
body, err := pc.httpClient.Get(endpoint)
body, err := pc.httpClient.Get(context.TODO(), endpoint)
if err != nil {
return body, err
}
Expand Down Expand Up @@ -267,7 +268,7 @@ func (pc *PDClient) GetLeader() (*pdpb.Member, error) {
leader := pdpb.Member{}

_, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) {
body, err := pc.httpClient.Get(endpoint)
body, err := pc.httpClient.Get(context.TODO(), endpoint)
if err != nil {
return body, err
}
Expand All @@ -288,7 +289,7 @@ func (pc *PDClient) GetMembers() (*pdpb.GetMembersResponse, error) {
members := pdpb.GetMembersResponse{}

_, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) {
body, err := pc.httpClient.Get(endpoint)
body, err := pc.httpClient.Get(context.TODO(), endpoint)
if err != nil {
return body, err
}
Expand All @@ -312,7 +313,7 @@ func (pc *PDClient) GetConfig() (map[string]interface{}, error) {
pdConfig := map[string]interface{}{}

_, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) {
body, err := pc.httpClient.Get(endpoint)
body, err := pc.httpClient.Get(context.TODO(), endpoint)
if err != nil {
return body, err
}
Expand Down Expand Up @@ -358,7 +359,7 @@ func (pc *PDClient) EvictPDLeader(retryOpt *utils.RetryOption) error {
endpoints := pc.getEndpoints(cmd)

_, err = tryURLs(endpoints, func(endpoint string) ([]byte, error) {
body, err := pc.httpClient.Post(endpoint, nil)
body, err := pc.httpClient.Post(context.TODO(), endpoint, nil)
if err != nil {
return body, err
}
Expand Down Expand Up @@ -438,7 +439,7 @@ func (pc *PDClient) EvictStoreLeader(host string, retryOpt *utils.RetryOption, c
endpoints := pc.getEndpoints(pdSchedulersURI)

_, err = tryURLs(endpoints, func(endpoint string) ([]byte, error) {
return pc.httpClient.Post(endpoint, bytes.NewBuffer(scheduler))
return pc.httpClient.Post(context.TODO(), endpoint, bytes.NewBuffer(scheduler))
})
if err != nil {
return err
Expand Down Expand Up @@ -498,7 +499,7 @@ func (pc *PDClient) RemoveStoreEvict(host string) error {
endpoints := pc.getEndpoints(cmd)

_, err = tryURLs(endpoints, func(endpoint string) ([]byte, error) {
body, statusCode, err := pc.httpClient.Delete(endpoint, nil)
body, statusCode, err := pc.httpClient.Delete(context.TODO(), endpoint, nil)
if err != nil {
if statusCode == http.StatusNotFound || bytes.Contains(body, []byte("scheduler not found")) {
log.Debugf("Store leader evicting scheduler does not exist, ignore.")
Expand Down Expand Up @@ -533,7 +534,7 @@ func (pc *PDClient) DelPD(name string, retryOpt *utils.RetryOption) error {
endpoints := pc.getEndpoints(cmd)

_, err = tryURLs(endpoints, func(endpoint string) ([]byte, error) {
body, statusCode, err := pc.httpClient.Delete(endpoint, nil)
body, statusCode, err := pc.httpClient.Delete(context.TODO(), endpoint, nil)
if err != nil {
if statusCode == http.StatusNotFound || bytes.Contains(body, []byte("not found, pd")) {
log.Debugf("PD node does not exist, ignore: %s", body)
Expand Down Expand Up @@ -620,7 +621,7 @@ func (pc *PDClient) DelStore(host string, retryOpt *utils.RetryOption) error {
endpoints := pc.getEndpoints(cmd)

_, err = tryURLs(endpoints, func(endpoint string) ([]byte, error) {
body, statusCode, err := pc.httpClient.Delete(endpoint, nil)
body, statusCode, err := pc.httpClient.Delete(context.TODO(), endpoint, nil)
if err != nil {
if statusCode == http.StatusNotFound || bytes.Contains(body, []byte("not found")) {
log.Debugf("store %d %s does not exist, ignore: %s", storeID, host, body)
Expand Down Expand Up @@ -673,7 +674,7 @@ func (pc *PDClient) DelStore(host string, retryOpt *utils.RetryOption) error {
func (pc *PDClient) updateConfig(url string, body io.Reader) error {
endpoints := pc.getEndpoints(url)
_, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) {
return pc.httpClient.Post(endpoint, body)
return pc.httpClient.Post(context.TODO(), endpoint, body)
})
return err
}
Expand All @@ -687,7 +688,7 @@ func (pc *PDClient) UpdateReplicateConfig(body io.Reader) error {
func (pc *PDClient) GetReplicateConfig() ([]byte, error) {
endpoints := pc.getEndpoints(pdConfigReplicate)
return tryURLs(endpoints, func(endpoint string) ([]byte, error) {
return pc.httpClient.Get(endpoint)
return pc.httpClient.Get(context.TODO(), endpoint)
})
}

Expand Down Expand Up @@ -744,7 +745,7 @@ func (pc *PDClient) CheckRegion(state string) (*RegionsInfo, error) {
regionsInfo := RegionsInfo{}

_, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) {
body, err := pc.httpClient.Get(endpoint)
body, err := pc.httpClient.Get(context.TODO(), endpoint)
if err != nil {
return body, err
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/cluster/spec/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package spec

import (
"bytes"
"context"
"crypto/tls"
"fmt"
"path/filepath"
Expand Down Expand Up @@ -136,7 +137,7 @@ func statusByHost(host string, port int, path string, tlsCfg *tls.Config) string
url := fmt.Sprintf("%s://%s:%d%s", scheme, host, port, path)

// body doesn't have any status section needed
body, err := client.Get(url)
body, err := client.Get(context.TODO(), url)
if err != nil || body == nil {
return "Down"
}
Expand All @@ -153,7 +154,7 @@ func UptimeByHost(host string, port int, tlsCfg *tls.Config) time.Duration {

client := utils.NewHTTPClient(statusQueryTimeout, tlsCfg)

body, err := client.Get(url)
body, err := client.Get(context.TODO(), url)
if err != nil || body == nil {
return 0
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/cluster/template/install/local_install.sh.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ fi
chmod 755 "$bin_dir/tiup"
# telemetry is not needed for offline installations
"$bin_dir/tiup" telemetry disable
# set mirror to the local path
"$bin_dir/tiup" mirror set ${script_dir}
bold=$(tput bold 2>/dev/null)
Expand Down
3 changes: 2 additions & 1 deletion pkg/repository/store/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package store

import (
"bytes"
"context"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -161,7 +162,7 @@ func (t *localTxn) ReadManifest(filename string, role v1manifest.ValidManifest)
case os.IsNotExist(err) && t.store.upstream != "":
url := fmt.Sprintf("%s/%s", t.store.upstream, filename)
client := utils.NewHTTPClient(time.Minute, nil)
body, err := client.Get(url)
body, err := client.Get(context.TODO(), url)
if err != nil {
return nil, errors.Annotatef(err, "fetch %s", url)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (t *Telemetry) Report(ctx context.Context, msg *Report) error {
return errors.AddStack(err)
}

if _, err = t.cli.Post(t.url, bytes.NewReader(dst)); err != nil {
if _, err = t.cli.Post(ctx, t.url, bytes.NewReader(dst)); err != nil {
return errors.AddStack(err)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/telemetry/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ func (s *TelemetrySuite) TestReport(c *check.C) {

msg := new(Report)

err := tele.Report(context.Background(), msg)
err := tele.Report(context.TODO(), msg)
c.Assert(err, check.NotNil)

msg.EventUUID = "dfdfdf"
err = tele.Report(context.Background(), msg)
err = tele.Report(context.TODO(), msg)
c.Assert(err, check.IsNil)
}
33 changes: 27 additions & 6 deletions pkg/utils/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package utils

import (
"context"
"crypto/tls"
"fmt"
"io"
Expand All @@ -36,7 +37,7 @@ func NewHTTPClient(timeout time.Duration, tlsConfig *tls.Config) *HTTPClient {
}
tr := &http.Transport{
TLSClientConfig: tlsConfig,
Dial: (&net.Dialer{Timeout: 5 * time.Second}).Dial,
Dial: (&net.Dialer{Timeout: 3 * time.Second}).Dial,
}
// prefer to use the inner http proxy
httpProxy := os.Getenv("TIUP_INNER_HTTP_PROXY")
Expand All @@ -57,8 +58,16 @@ func NewHTTPClient(timeout time.Duration, tlsConfig *tls.Config) *HTTPClient {
}

// Get fetch an URL with GET method and returns the response
func (c *HTTPClient) Get(url string) ([]byte, error) {
res, err := c.client.Get(url)
func (c *HTTPClient) Get(ctx context.Context, url string) ([]byte, error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}

if ctx != nil {
req = req.WithContext(ctx)
}
res, err := c.client.Do(req)
if err != nil {
return nil, err
}
Expand All @@ -68,8 +77,17 @@ func (c *HTTPClient) Get(url string) ([]byte, error) {
}

// Post send a POST request to the url and returns the response
func (c *HTTPClient) Post(url string, body io.Reader) ([]byte, error) {
res, err := c.client.Post(url, "application/json", body)
func (c *HTTPClient) Post(ctx context.Context, url string, body io.Reader) ([]byte, error) {
req, err := http.NewRequest("POST", url, body)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")

if ctx != nil {
req = req.WithContext(ctx)
}
res, err := c.client.Do(req)
if err != nil {
return nil, err
}
Expand All @@ -79,13 +97,16 @@ func (c *HTTPClient) Post(url string, body io.Reader) ([]byte, error) {
}

// Delete send a DELETE request to the url and returns the response and status code.
func (c *HTTPClient) Delete(url string, body io.Reader) ([]byte, int, error) {
func (c *HTTPClient) Delete(ctx context.Context, url string, body io.Reader) ([]byte, int, error) {
var statusCode int
req, err := http.NewRequest("DELETE", url, body)
if err != nil {
return nil, statusCode, err
}

if ctx != nil {
req = req.WithContext(ctx)
}
res, err := c.client.Do(req)
if err != nil {
return nil, statusCode, err
Expand Down

0 comments on commit ef79639

Please sign in to comment.