Skip to content

Commit

Permalink
addressing comments by Renato in PR 123619
Browse files Browse the repository at this point in the history
  • Loading branch information
nameisbhaskar committed May 15, 2024
1 parent 8d3b7c9 commit 9587ca8
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 54 deletions.
2 changes: 1 addition & 1 deletion pkg/roachprod/promhelperclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"@com_github_cockroachdb_errors//:errors",
"@com_google_cloud_go_storage//:storage",
"@org_golang_google_api//idtoken",
"@org_golang_google_api//option",
"@org_golang_x_oauth2//:oauth2",
],
)
Expand All @@ -24,7 +25,6 @@ go_test(
embed = [":promhelperclient"],
deps = [
"//pkg/roachprod/logger",
"//pkg/util/httputil",
"@com_github_stretchr_testify//require",
"@org_golang_google_api//idtoken",
"@org_golang_x_oauth2//:oauth2",
Expand Down
30 changes: 14 additions & 16 deletions pkg/roachprod/promhelperclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ const (
type PromClient struct {
// httpPut is used for http PUT operation.
httpPut func(
ctx context.Context, url string, h *httputil.RequestHeaders, body io.Reader,
ctx context.Context, url string, h *http.Header, body io.Reader,
) (resp *http.Response, err error)
// httpDelete is used for http DELETE operation.
httpDelete func(ctx context.Context, url string, h *httputil.RequestHeaders) (
httpDelete func(ctx context.Context, url string, h *http.Header) (
resp *http.Response, err error)
// newTokenSource is the token generator source.
newTokenSource func(ctx context.Context, audience string, opts ...idtoken.ClientOption) (
Expand Down Expand Up @@ -76,7 +76,7 @@ func (c *PromClient) UpdatePrometheusTargets(
ctx context.Context,
promUrl, clusterName string,
forceFetchCreds bool,
nodes []string,
nodes map[int]string,
insecure bool,
l *logger.Logger,
) error {
Expand All @@ -90,10 +90,12 @@ func (c *PromClient) UpdatePrometheusTargets(
}
url := getUrl(promUrl, clusterName)
l.Printf("invoking PUT for URL: %s", url)
response, err := c.httpPut(ctx, url, &httputil.RequestHeaders{
ContentType: "application/json",
Authorization: token,
}, req)
h := &http.Header{}
h.Set("ContentType", "application/json")
if token != "" {
h.Set("Authorization", token)
}
response, err := c.httpPut(ctx, url, h, req)
if err != nil {
return err
}
Expand Down Expand Up @@ -123,9 +125,9 @@ func (c *PromClient) DeleteClusterConfig(
}
url := getUrl(promUrl, clusterName)
l.Printf("invoking DELETE for URL: %s", url)
response, err := c.httpDelete(ctx, url, &httputil.RequestHeaders{
Authorization: token,
})
h := &http.Header{}
h.Set("Authorization", token)
response, err := c.httpDelete(ctx, url, h)
if err != nil {
return err
}
Expand Down Expand Up @@ -162,19 +164,15 @@ const clusterConfFileTemplate = `- targets:
`

// createClusterConfigFile creates the cluster config file per node
func buildCreateRequest(nodes []string, insecure bool) (io.Reader, error) {
func buildCreateRequest(nodes map[int]string, insecure bool) (io.Reader, error) {
buffer := bytes.NewBufferString("---\n")
for i, n := range nodes {
if n == "" {
// this is an unsupported node
continue
}
params := &ccParams{
Targets: []string{n},
Labels: make([]string, 0),
}
params.Labels = append(params.Labels,
fmt.Sprintf("node: \"%s\"", strconv.Itoa(i+1)),
fmt.Sprintf("node: \"%s\"", strconv.Itoa(i)),
"tenant: system",
)
t := template.Must(template.New("start").Parse(clusterConfFileTemplate))
Expand Down
13 changes: 6 additions & 7 deletions pkg/roachprod/promhelperclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/util/httputil"
"github.com/stretchr/testify/require"
"golang.org/x/oauth2"
"google.golang.org/api/idtoken"
Expand All @@ -40,20 +39,20 @@ func TestUpdatePrometheusTargets(t *testing.T) {
promUrl := "http://prom_url.com"
c := NewPromClient()
t.Run("UpdatePrometheusTargets fails with 400", func(t *testing.T) {
c.httpPut = func(ctx context.Context, reqUrl string, h *httputil.RequestHeaders, body io.Reader) (
c.httpPut = func(ctx context.Context, reqUrl string, h *http.Header, body io.Reader) (
resp *http.Response, err error) {
require.Equal(t, getUrl(promUrl, "c1"), reqUrl)
return &http.Response{
StatusCode: 400,
Body: io.NopCloser(strings.NewReader("failed")),
}, nil
}
err := c.UpdatePrometheusTargets(ctx, promUrl, "c1", false, []string{"n1"}, true, l)
err := c.UpdatePrometheusTargets(ctx, promUrl, "c1", false, map[int]string{1: "n1"}, true, l)
require.NotNil(t, err)
require.Equal(t, "request failed with status 400 and error failed", err.Error())
})
t.Run("UpdatePrometheusTargets succeeds", func(t *testing.T) {
c.httpPut = func(ctx context.Context, url string, h *httputil.RequestHeaders, body io.Reader) (
c.httpPut = func(ctx context.Context, url string, h *http.Header, body io.Reader) (
resp *http.Response, err error) {
require.Equal(t, getUrl(promUrl, "c1"), url)
ir, err := getInstanceConfigRequest(io.NopCloser(body))
Expand All @@ -76,7 +75,7 @@ func TestUpdatePrometheusTargets(t *testing.T) {
StatusCode: 200,
}, nil
}
err := c.UpdatePrometheusTargets(ctx, promUrl, "c1", false, []string{"n1", "", "n3"}, true, l)
err := c.UpdatePrometheusTargets(ctx, promUrl, "c1", false, map[int]string{1: "n1", 3: "n3"}, true, l)
require.Nil(t, err)
})
}
Expand All @@ -93,7 +92,7 @@ func TestDeleteClusterConfig(t *testing.T) {
promUrl := "http://prom_url.com"
c := NewPromClient()
t.Run("DeleteClusterConfig fails with 400", func(t *testing.T) {
c.httpDelete = func(ctx context.Context, url string, h *httputil.RequestHeaders) (
c.httpDelete = func(ctx context.Context, url string, h *http.Header) (
resp *http.Response, err error) {
require.Equal(t, getUrl(promUrl, "c1"), url)
return &http.Response{
Expand All @@ -106,7 +105,7 @@ func TestDeleteClusterConfig(t *testing.T) {
require.Equal(t, "request failed with status 400 and error failed", err.Error())
})
t.Run("DeleteClusterConfig succeeds", func(t *testing.T) {
c.httpDelete = func(ctx context.Context, url string, h *httputil.RequestHeaders) (
c.httpDelete = func(ctx context.Context, url string, h *http.Header) (
resp *http.Response, err error) {
require.Equal(t, getUrl(promUrl, "c1"), url)
return &http.Response{
Expand Down
11 changes: 6 additions & 5 deletions pkg/roachprod/promhelperclient/promhelper_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"cloud.google.com/go/storage"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"google.golang.org/api/option"
)

var (
Expand All @@ -35,7 +36,7 @@ type FetchedFrom string
const (
Env FetchedFrom = "Env" // fetched from environment
File FetchedFrom = "File" // fetched from the promCredFile
Store FetchedFrom = "Store" // fetched from the secrets manager
Store FetchedFrom = "Store" // fetched from the store

// secretsDelimiter is used as a delimiter between service account audience and JSON when stored in
// promCredFile or cloud storage
Expand All @@ -62,7 +63,7 @@ func SetPromHelperCredsEnv(
) (FetchedFrom, error) {
creds := ""
fetchedFrom := Env
if !forceFetch { // bypass environment anf creds file if forceFetch is false
if !forceFetch { // bypass environment and creds file if forceFetch is false
// check if environment is set
audience := os.Getenv(ServiceAccountAudience)
saJson := os.Getenv(ServiceAccountJson)
Expand All @@ -79,9 +80,9 @@ func SetPromHelperCredsEnv(
}
}
if creds == "" {
// creds == "" means (env is not set and the file does not have the creds) or forFetch is true
l.Printf("creds need to be fetched from secret manager.")
client, err := storage.NewClient(ctx)
// creds == "" means (env is not set and the file does not have the creds) or forceFetch is true
l.Printf("creds need to be fetched from store.")
client, err := storage.NewClient(ctx, option.WithScopes(storage.ScopeReadOnly))
if err != nil {
return fetchedFrom, err
}
Expand Down
15 changes: 8 additions & 7 deletions pkg/roachprod/roachprod.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,23 +775,23 @@ func UpdateTargets(

// updatePrometheusTargets updates the prometheus instance cluster config. Any error is logged and ignored.
func updatePrometheusTargets(ctx context.Context, l *logger.Logger, c *install.SyncedCluster) {
nodeIPPorts := make([]string, len(c.Nodes))
nodeIPPorts := make(map[int]string)
var wg sync.WaitGroup
for i, node := range c.Nodes {
for _, node := range c.Nodes {
if c.VMs[node-1].Provider == gce.ProviderName {
wg.Add(1)
go func(index int, v vm.VM) {
defer wg.Done()
// only gce is supported for prometheus
desc, err := c.DiscoverService(ctx, install.Node(index+1), "", install.ServiceTypeUI, 0)
desc, err := c.DiscoverService(ctx, install.Node(index), "", install.ServiceTypeUI, 0)
if err != nil {
l.Errorf("error getting the port for node %d: %v", index+1, err)
l.Errorf("error getting the port for node %d: %v", index, err)
return
}
nodeInfo := fmt.Sprintf("%s:%d", v.PublicIP, desc.Port)
l.Printf("node information obtained for node index %d: %s", index, nodeInfo)
nodeIPPorts[index] = nodeInfo
}(i, c.VMs[node-1])
}(int(node), c.VMs[node-1])
}
}
wg.Wait()
Expand Down Expand Up @@ -849,7 +849,6 @@ func Stop(ctx context.Context, l *logger.Logger, clusterName string, opts StopOp
return err
}

_ = deleteClusterConfig(clusterName, l)
return c.Stop(ctx, l, opts.Sig, opts.Wait, opts.MaxWait, "")
}

Expand Down Expand Up @@ -1421,7 +1420,9 @@ func destroyCluster(cld *cloud.Cloud, l *logger.Logger, clusterName string) erro
l.Printf("Destroying cluster %s with %d nodes", clusterName, len(c.VMs))
}

_ = deleteClusterConfig(clusterName, l)
if err := deleteClusterConfig(clusterName, l); err != nil {
l.Printf("Failed to delete cluster config: %v", err)
}

return cloud.DestroyCluster(l, c)
}
Expand Down
25 changes: 7 additions & 18 deletions pkg/util/httputil/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,6 @@ var DefaultClient = NewClientWithTimeout(StandardHTTPTimeout)
// StandardHTTPTimeout is the default timeout to use for HTTP connections.
const StandardHTTPTimeout time.Duration = 3 * time.Second

// RequestHeaders are the headers to be part of the request
type RequestHeaders struct {
ContentType string
Authorization string
}

// NewClientWithTimeout defines a http.Client with the given timeout.
func NewClientWithTimeout(timeout time.Duration) *Client {
return NewClientWithTimeouts(timeout, timeout)
Expand Down Expand Up @@ -90,14 +84,14 @@ func Post(
// Put is like http.Put but uses the provided context and obeys its cancellation.
// It also uses the default client with a default 3 second timeout.
func Put(
ctx context.Context, url string, h *RequestHeaders, body io.Reader,
ctx context.Context, url string, h *http.Header, body io.Reader,
) (resp *http.Response, err error) {
return DefaultClient.Put(ctx, url, h, body)
}

// Delete is like http.Delete but uses the provided context and obeys its cancellation.
// It also uses the default client with a default 3 second timeout.
func Delete(ctx context.Context, url string, h *RequestHeaders) (resp *http.Response, err error) {
func Delete(ctx context.Context, url string, h *http.Header) (resp *http.Response, err error) {
return DefaultClient.Delete(ctx, url, h)
}

Expand All @@ -115,33 +109,28 @@ func (c *Client) Get(ctx context.Context, url string) (resp *http.Response, err
// 1. ContentType
// 2. Authorization
func (c *Client) Put(
ctx context.Context, url string, h *RequestHeaders, body io.Reader,
ctx context.Context, url string, h *http.Header, body io.Reader,
) (resp *http.Response, err error) {
req, err := http.NewRequestWithContext(ctx, "PUT", url, body)
if err != nil {
return nil, err
}
if h != nil {
if h.ContentType != "" {
req.Header.Set("Content-Type", h.ContentType)
}
if h.Authorization != "" {
req.Header.Set("Authorization", h.Authorization)
}
req.Header = *h
}
return c.Do(req)
}

// Delete is like http.Client.Delete but uses the provided context and obeys its cancellation.
func (c *Client) Delete(
ctx context.Context, url string, h *RequestHeaders,
ctx context.Context, url string, h *http.Header,
) (resp *http.Response, err error) {
req, err := http.NewRequestWithContext(ctx, "DELETE", url, nil)
if err != nil {
return nil, err
}
if h != nil && h.Authorization != "" {
req.Header.Set("Authorization", h.Authorization)
if h != nil {
req.Header = *h
}
return c.Do(req)
}
Expand Down

0 comments on commit 9587ca8

Please sign in to comment.