Skip to content

Commit

Permalink
Beats CM now handle the new response format from Kibana. (elastic#11377)
Browse files Browse the repository at this point in the history
Format is defined in elastic/kibana#27408
Main objective is to easier bubbling or errors and messages.

```
export interface ReturnType {
  error?: {
    message: string;
    code?: number;
  };
  success: boolean;
}

```
  • Loading branch information
ph authored Mar 28, 2019
1 parent 1342d5f commit a07dec3
Show file tree
Hide file tree
Showing 12 changed files with 107 additions and 42 deletions.
8 changes: 4 additions & 4 deletions x-pack/libbeat/management/api/auth_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ type EventRequest struct {

// EventAPIResponse is the top level response for the events endpoints.
type EventAPIResponse struct {
Response []EventResponse `json:"response"`
BaseResponse
Response []EventResponse `json:"results"`
}

// EventResponse is the indiviual response for each event request.
type EventResponse struct {
Success bool `json:"success"`
Reason string `json:"reason"`
BaseResponse
}

// AuthClienter is the interface exposed by the auth client and is useful for testing without calling
Expand Down Expand Up @@ -102,7 +102,7 @@ func (c *AuthClient) SendEvents(requests []EventRequest) error {
var errors multierror.Errors
for _, response := range resp.Response {
if !response.Success {
errors = append(errors, fmt.Errorf("error sending event, reason: %+v", response.Reason))
errors = append(errors, fmt.Errorf("error sending event, reason: %+v", response.Error.Message))
}
}

Expand Down
18 changes: 10 additions & 8 deletions x-pack/libbeat/management/api/auth_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestReportEvents(t *testing.T) {
}

apiResponse := EventAPIResponse{
Response: []EventResponse{EventResponse{Success: true}},
Response: []EventResponse{EventResponse{BaseResponse: BaseResponse{Success: true}}},
}

w.WriteHeader(http.StatusOK)
Expand All @@ -118,10 +118,12 @@ func TestReportEvents(t *testing.T) {
t.Run("bubble up any errors", func(t *testing.T) {
server, client := newServerClientPair(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadRequest)
response := struct {
Message string
}{
Message: "bad request",
response := BaseResponse{
Success: false,
Error: ErrorResponse{
Message: "bad request",
Code: http.StatusBadRequest,
},
}
json.NewEncoder(w).Encode(response)
}))
Expand All @@ -139,8 +141,8 @@ func TestReportEvents(t *testing.T) {
server, client := newServerClientPair(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
apiResponse := EventAPIResponse{
Response: []EventResponse{
EventResponse{Success: true},
EventResponse{Success: false},
EventResponse{BaseResponse: BaseResponse{Success: true}},
EventResponse{BaseResponse: BaseResponse{Success: false}},
},
}
w.WriteHeader(http.StatusOK)
Expand All @@ -163,7 +165,7 @@ func TestReportEvents(t *testing.T) {
server, client := newServerClientPair(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
apiResponse := EventAPIResponse{
Response: []EventResponse{
EventResponse{Success: true},
EventResponse{BaseResponse: BaseResponse{Success: true}},
},
}
w.WriteHeader(http.StatusOK)
Expand Down
12 changes: 5 additions & 7 deletions x-pack/libbeat/management/api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,10 @@ func (c *Client) request(method, extraPath string,
return statusCode, err
}

func extractError(result []byte) error {
var kibanaResult struct {
Message string
func extractError(b []byte) error {
var result BaseResponse
if err := json.Unmarshal(b, &result); err != nil {
return errors.Wrap(err, "error while parsing Kibana response")
}
if err := json.Unmarshal(result, &kibanaResult); err != nil {
return errors.Wrap(err, "parsing Kibana response")
}
return errors.New(kibanaResult.Message)
return errors.New(result.Error.Message)
}
3 changes: 2 additions & 1 deletion x-pack/libbeat/management/api/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ func (c *configResponse) UnmarshalJSON(b []byte) error {
// Configuration retrieves the list of configuration blocks from Kibana
func (c *AuthClient) Configuration() (ConfigBlocks, error) {
resp := struct {
ConfigBlocks []*configResponse `json:"configuration_blocks"`
BaseResponse
ConfigBlocks []*configResponse `json:"list"`
}{}
url := fmt.Sprintf("/api/beats/agent/%s/configuration", c.BeatUUID)
statusCode, err := c.Client.request("GET", url, nil, c.headers(), &resp)
Expand Down
3 changes: 2 additions & 1 deletion x-pack/libbeat/management/api/configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/stretchr/testify/assert"
)

// {"list":[{"id":"6c385a04-f315-489e-9208-c87f41911782","type":"filebeat.inputs","config":{"paths":["/tmp/hello.log"]},"tag":"89be4cfd-6249-4ac2-abe2-8f82520ba435"},{"id":"315ff7e9-ae24-4c99-a9d0-ed4314bc8e60","type":"output","config":{"_sub_type":"elasticsearch","username":"elastic","password":"changeme"},"tag":"89be4cfd-6249-4ac2-abe2-8f82520ba435"}],"success":true}
func TestConfiguration(t *testing.T) {
beatUUID, err := uuid.NewV4()
if err != nil {
Expand All @@ -26,7 +27,7 @@ func TestConfiguration(t *testing.T) {
// Check enrollment token is correct
assert.Equal(t, "thisismyenrollmenttoken", r.Header.Get("kbn-beats-access-token"))

fmt.Fprintf(w, `{"configuration_blocks":[{"type":"filebeat.modules","config":{"_sub_type":"apache2"}},{"type":"metricbeat.modules","config":{"_sub_type":"system","period":"10s"}}]}`)
fmt.Fprintf(w, `{"success": true, "list":[{"type":"filebeat.modules","config":{"_sub_type":"apache2"}},{"type":"metricbeat.modules","config":{"_sub_type":"system","period":"10s"}}]}`)
}))
defer server.Close()

Expand Down
7 changes: 4 additions & 3 deletions x-pack/libbeat/management/api/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ Translations:
Type: output
{
"configuration_blocks": [
"success": true,
"list": [
{
"config": {
Expand Down Expand Up @@ -42,8 +43,8 @@ YAML representation:
Type: *.modules
{
"configuration_blocks": [
"success": true,
"list": [
{
"config": {
"_sub_type": "system"
Expand Down
14 changes: 10 additions & 4 deletions x-pack/libbeat/management/api/enroll.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,24 @@ import (
)

type enrollResponse struct {
AccessToken string `json:"access_token"`
BaseResponse
AccessToken string `json:"item"`
}

func (e *enrollResponse) Validate() error {
if len(e.AccessToken) == 0 {
if !e.Success || len(e.AccessToken) == 0 {
return errors.New("empty access_token")
}
return nil
}

// Enroll a beat in central management, this call returns a valid access token to retrieve configurations
func (c *Client) Enroll(beatType, beatName, beatVersion, hostname string, beatUUID uuid.UUID, enrollmentToken string) (string, error) {
// Enroll a beat in central management, this call returns a valid access token to retrieve
// configurations
func (c *Client) Enroll(
beatType, beatName, beatVersion, hostname string,
beatUUID uuid.UUID,
enrollmentToken string,
) (string, error) {
params := common.MapStr{
"type": beatType,
"name": beatName,
Expand Down
7 changes: 4 additions & 3 deletions x-pack/libbeat/management/api/enroll_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestEnrollValid(t *testing.T) {
assert.Equal(t, "6.3.0", request.Version)
assert.Equal(t, "beatname", request.Name)

fmt.Fprintf(w, `{"access_token": "fooo"}`)
fmt.Fprintf(w, `{"success": true, "item": "fooo"}`)
}))
defer server.Close()

Expand All @@ -67,11 +67,12 @@ func TestEnrollError(t *testing.T) {
responseCode int
}{
"invalid enrollment token": {
body: `{"message": "Invalid enrollment token"}`,
body: `{"success": false, "error": { "message": "Invalid enrollment token", "code": 400 }}`,
responseCode: 400,
},
//NOTE(ph): I believe this is now fixed in the API.
"invalid token response": {
body: `{"access_token": ""}`,
body: `{"success": true, "item": ""}`,
responseCode: 200,
},
}
Expand Down
2 changes: 1 addition & 1 deletion x-pack/libbeat/management/api/event_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/elastic/beats/libbeat/logp"
)

var debugK = "event_reporter"
const debugK = "event_reporter"

// EventReporter is an object that will periodically send asyncronously events to the
// CM events endpoints.
Expand Down
53 changes: 53 additions & 0 deletions x-pack/libbeat/management/api/response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package api

import "fmt"

// Action are the actions executed on the API.
type Action int

// List of the valid Actions executed by the API.
//go:generate stringer -type=LicenseType -linecomment=true
const (
Created Action = iota + 1 // created
)

var mapStringToAction = map[string]Action{
"created": Created,
}

// UnmarshalJSON unmarshal an action string into a constant.
func (a *Action) UnmarshalJSON(b []byte) error {
k := string(b)
if len(b) <= 2 {
return fmt.Errorf(
"invalid string for action type, received: '%s'",
k,
)
}
v, found := mapStringToAction[k[1:len(k)-1]]
if !found {
return fmt.Errorf(
"unknown action '%s' returned from the API, valid actions are: 'created'",
k,
)
}
*a = v
return nil
}

// BaseResponse the common response from all the API calls.
type BaseResponse struct {
Action Action `json:"action,omitempty"`
Success bool `json:"success"`
Error ErrorResponse `json:"error,omitempty"`
}

// ErrorResponse contains human readable and machine readable information when an error happens.
type ErrorResponse struct {
Message string `json:"message"`
Code int `json:"code"`
}
18 changes: 9 additions & 9 deletions x-pack/libbeat/management/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ func TestConfigManager(t *testing.T) {
i := 0
responses := []string{
// Initial load
`{"configuration_blocks":[{"type":"test.block","config":{"module":"apache2"}}]}`,
`{"success": true, "list":[{"type":"test.block","config":{"module":"apache2"}}]}`,

// No change, no reload
`{"configuration_blocks":[{"type":"test.block","config":{"module":"apache2"}}]}`,
`{"success": true, "list":[{"type":"test.block","config":{"module":"apache2"}}]}`,

// Changed, reload
`{"configuration_blocks":[{"type":"test.block","config":{"module":"system"}}]}`,
`{"success": true, "list":[{"type":"test.block","config":{"module":"system"}}]}`,
}
mux.Handle(fmt.Sprintf("/api/beats/agent/%s/configuration", id), http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, responses[i])
Expand Down Expand Up @@ -128,10 +128,10 @@ func TestRemoveItems(t *testing.T) {
i := 0
responses := []string{
// Initial load
`{"configuration_blocks":[{"type":"test.blocks","config":{"module":"apache2"}}]}`,
`{"success": true, "list":[{"type":"test.blocks","config":{"module":"apache2"}}]}`,

// Return no blocks
`{"configuration_blocks":[]}`,
`{"success": true, "list":[]}`,
}
mux.Handle(fmt.Sprintf("/api/beats/agent/%s/configuration", id), http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, responses[i])
Expand Down Expand Up @@ -206,7 +206,7 @@ func TestUnEnroll(t *testing.T) {
mux := http.NewServeMux()
i := 0
responses := []http.HandlerFunc{ // Initial load
responseText(`{"configuration_blocks":[{"type":"test.blocks","config":{"module":"apache2"}}]}`),
responseText(`{"success": true, "list":[{"type":"test.blocks","config":{"module":"apache2"}}]}`),
http.NotFound,
}

Expand Down Expand Up @@ -277,9 +277,9 @@ func TestBadConfig(t *testing.T) {
mux := http.NewServeMux()
i := 0
responses := []http.HandlerFunc{ // Initial load
responseText(`{"configuration_blocks":[{"type":"output","config":{"_sub_type": "console", "path": "/tmp/bad"}}]}`),
responseText(`{"success": true, "list":[{"type":"output","config":{"_sub_type": "console", "path": "/tmp/bad"}}]}`),
// will not resend new events
responseText(`{"configuration_blocks":[{"type":"output","config":{"_sub_type": "console", "path": "/tmp/bad"}}]}`),
responseText(`{"success": true, "list":[{"type":"output","config":{"_sub_type": "console", "path": "/tmp/bad"}}]}`),
// recover on call
http.NotFound,
}
Expand Down Expand Up @@ -406,7 +406,7 @@ func addEventsReporterHandle(mux *http.ServeMux, uuid uuid.UUID) *collectEventRe
resp := api.EventAPIResponse{Response: make([]api.EventResponse, len(requests))}

for i := 0; i < len(requests); i++ {
resp.Response[i] = api.EventResponse{Success: true}
resp.Response[i] = api.EventResponse{BaseResponse: api.BaseResponse{Success: true}}
}

json.NewEncoder(w).Encode(resp)
Expand Down
4 changes: 3 additions & 1 deletion x-pack/libbeat/tests/system/test_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
from base import BaseTest


INTEGRATION_TESTS = os.environ.get('INTEGRATION_TESTS', False)
# Disable because waiting artifacts from https://github.com/elastic/kibana/pull/31660
# INTEGRATION_TESTS = os.environ.get('INTEGRATION_TESTS', False)
INTEGRATION_TESTS = False
TIMEOUT = 2 * 60


Expand Down

0 comments on commit a07dec3

Please sign in to comment.