-
Notifications
You must be signed in to change notification settings - Fork 82
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow multiple ES outputs as long as they are the same ES #1684
Changes from all commits
129eefe
0d99f4b
03a9134
7fcb743
7192db9
96976f8
26f08df
b9f7464
7d372c6
37d116d
6c80253
91518f3
282fb10
a322ddc
5fa764e
ce387fa
d14e71e
211e9b9
d7928b4
88dd2b6
6059b03
cddd9d7
152c1d9
77f0fd6
a487054
1bbad27
d61c8ee
1404587
eb58160
d2ec527
b463a77
91c78f2
008f273
e311321
68e6bf2
58a7eab
495ea7d
52b8aa0
35c05f0
a7e3217
be82596
7b5e4aa
faa3ead
40490c6
d492cfd
35d11ec
311e6b0
fc460ac
87a411a
e266c20
c865859
eba4f71
135ddcb
89c920b
c4cc70e
21b3984
935706a
1fa74ec
17787e1
f2e70c5
056d3f0
7847924
5f2ff23
ebc2131
130b979
c39e365
cd02061
9a156b5
77b9541
2fc804b
959431c
7c2e881
f522d01
9d8ea7d
1eb7bd7
4250ad6
8fd7b62
e5f4b74
0fb8c56
7a79d4c
8ea0ede
31d0537
6e5c500
3124b02
b87920f
6f0c0db
9bbd267
cdac944
d0af9ea
2966e63
7090f88
ed3f8d4
2a7ffaf
aa4ad32
a64a820
a3c36d6
bd4b7af
1269b68
96b3b87
b42d5f8
d78e50a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,8 @@ import ( | |
"strings" | ||
"time" | ||
|
||
"github.com/pkg/errors" | ||
|
||
"github.com/elastic/fleet-server/v7/internal/pkg/bulk" | ||
"github.com/elastic/fleet-server/v7/internal/pkg/cache" | ||
"github.com/elastic/fleet-server/v7/internal/pkg/config" | ||
|
@@ -24,7 +26,6 @@ import ( | |
"github.com/elastic/fleet-server/v7/internal/pkg/logger" | ||
"github.com/elastic/fleet-server/v7/internal/pkg/model" | ||
"github.com/elastic/fleet-server/v7/internal/pkg/policy" | ||
"github.com/pkg/errors" | ||
|
||
"github.com/julienschmidt/httprouter" | ||
"github.com/rs/zerolog" | ||
|
@@ -337,8 +338,9 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag | |
Int64("rev.coordinatorIdx", rev.CoordinatorIdx). | ||
Msg("ack policy revision") | ||
|
||
if ok && rev.PolicyID == agent.PolicyID && (rev.RevisionIdx > currRev || | ||
(rev.RevisionIdx == currRev && rev.CoordinatorIdx > currCoord)) { | ||
if ok && rev.PolicyID == agent.PolicyID && | ||
(rev.RevisionIdx > currRev || | ||
(rev.RevisionIdx == currRev && rev.CoordinatorIdx > currCoord)) { | ||
found = true | ||
currRev = rev.RevisionIdx | ||
currCoord = rev.CoordinatorIdx | ||
|
@@ -349,17 +351,7 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag | |
return nil | ||
} | ||
|
||
sz := len(agent.DefaultAPIKeyHistory) | ||
if sz > 0 { | ||
ids := make([]string, sz) | ||
for i := 0; i < sz; i++ { | ||
ids[i] = agent.DefaultAPIKeyHistory[i].ID | ||
} | ||
log.Info().Strs("ids", ids).Msg("Invalidate old API keys") | ||
if err := ack.bulk.APIKeyInvalidate(ctx, ids...); err != nil { | ||
log.Info().Err(err).Strs("ids", ids).Msg("Failed to invalidate API keys") | ||
} | ||
} | ||
ack.invalidateAPIKeys(ctx, agent) | ||
|
||
body := makeUpdatePolicyBody( | ||
agent.PolicyID, | ||
|
@@ -385,8 +377,24 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag | |
return errors.Wrap(err, "handlePolicyChange update") | ||
} | ||
|
||
func (ack *AckT) invalidateAPIKeys(ctx context.Context, agent *model.Agent) { | ||
var ids []string | ||
for _, out := range agent.Outputs { | ||
for _, k := range out.ToRetireAPIKeyIds { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Q: when this is cleared? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. below, within the |
||
ids = append(ids, k.ID) | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. doesn't look like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the problem is that |
||
|
||
if len(ids) > 0 { | ||
log.Info().Strs("fleet.policy.apiKeyIDsToRetire", ids).Msg("Invalidate old API keys") | ||
if err := ack.bulk.APIKeyInvalidate(ctx, ids...); err != nil { | ||
log.Info().Err(err).Strs("ids", ids).Msg("Failed to invalidate API keys") | ||
} | ||
} | ||
} | ||
|
||
func (ack *AckT) handleUnenroll(ctx context.Context, zlog zerolog.Logger, agent *model.Agent) error { | ||
apiKeys := _getAPIKeyIDs(agent) | ||
apiKeys := agent.APIKeyIDs() | ||
if len(apiKeys) > 0 { | ||
zlog = zlog.With().Strs(LogAPIKeyID, apiKeys).Logger() | ||
|
||
|
@@ -440,17 +448,6 @@ func (ack *AckT) handleUpgrade(ctx context.Context, zlog zerolog.Logger, agent * | |
return nil | ||
} | ||
|
||
func _getAPIKeyIDs(agent *model.Agent) []string { | ||
keys := make([]string, 0, 1) | ||
if agent.AccessAPIKeyID != "" { | ||
keys = append(keys, agent.AccessAPIKeyID) | ||
} | ||
if agent.DefaultAPIKeyID != "" { | ||
keys = append(keys, agent.DefaultAPIKeyID) | ||
} | ||
return keys | ||
} | ||
|
||
// Generate an update script that validates that the policy_id | ||
// has not changed underneath us by an upstream process (Kibana or otherwise). | ||
// We have a race condition where a user could have assigned a new policy to | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,13 +15,14 @@ import ( | |
"net/http" | ||
"testing" | ||
|
||
"github.com/google/go-cmp/cmp" | ||
|
||
"github.com/elastic/fleet-server/v7/internal/pkg/cache" | ||
"github.com/elastic/fleet-server/v7/internal/pkg/config" | ||
"github.com/elastic/fleet-server/v7/internal/pkg/es" | ||
"github.com/elastic/fleet-server/v7/internal/pkg/model" | ||
ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" | ||
testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" | ||
"github.com/google/go-cmp/cmp" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/mock" | ||
|
@@ -439,3 +440,39 @@ func TestHandleAckEvents(t *testing.T) { | |
}) | ||
} | ||
} | ||
|
||
func TestInvalidateAPIKeys(t *testing.T) { | ||
toRetire1 := []model.ToRetireAPIKeyIdsItems{{ | ||
ID: "toRetire1", | ||
}} | ||
toRetire2 := []model.ToRetireAPIKeyIdsItems{{ | ||
ID: "toRetire2_0", | ||
}, { | ||
ID: "toRetire2_1", | ||
}} | ||
var toRetire3 []model.ToRetireAPIKeyIdsItems | ||
|
||
want := []string{"toRetire1", "toRetire2_0", "toRetire2_1"} | ||
|
||
agent := model.Agent{ | ||
Outputs: map[string]*model.PolicyOutput{ | ||
"1": {ToRetireAPIKeyIds: toRetire1}, | ||
"2": {ToRetireAPIKeyIds: toRetire2}, | ||
"3": {ToRetireAPIKeyIds: toRetire3}, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. question: any handling for the same ids in the different outputs? probably not possible in real life scenario. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
no, it should not happen. If it happens, it's either another bug, or the same I'm trying to fix. Anyway, here we're invalidating them, so it'd be fine, we'd try to invalidate the same key twice There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One thing, that error would be bubbled up in the call would that be a problem for other execution, how idempotent the call is? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The idempotent part on on ES side. Because of this bug, there were calls to the invalidate API with the same API key listed twice. As far as I could see, there was no problem. |
||
}, | ||
} | ||
|
||
bulker := ftesting.NewMockBulk() | ||
bulker.On("APIKeyInvalidate", | ||
context.Background(), mock.MatchedBy(func(ids []string) bool { | ||
// if A contains B and B contains A => A = B | ||
return assert.Subset(t, ids, want) && | ||
assert.Subset(t, want, ids) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TIL, never use subset before. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Neither had I :) |
||
})). | ||
Return(nil) | ||
|
||
ack := &AckT{bulk: bulker} | ||
ack.invalidateAPIKeys(context.Background(), &agent) | ||
|
||
bulker.AssertExpectations(t) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,12 +6,18 @@ | |
package apikey | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"encoding/base64" | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"net/http" | ||
"strings" | ||
"unicode/utf8" | ||
|
||
"github.com/elastic/go-elasticsearch/v7" | ||
"github.com/elastic/go-elasticsearch/v7/esapi" | ||
) | ||
|
||
const ( | ||
|
@@ -28,6 +34,61 @@ var ( | |
|
||
var AuthKey = http.CanonicalHeaderKey("Authorization") | ||
|
||
// APIKeyMetadata tracks Metadata associated with an APIKey. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you give an example of what would be tracker and why we are tracking it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just moves it from another file. But it's the metadata on the ES document for an API key |
||
type APIKeyMetadata struct { | ||
ID string | ||
Metadata Metadata | ||
} | ||
|
||
// Read gathers APIKeyMetadata from Elasticsearch using the given client. | ||
func Read(ctx context.Context, client *elasticsearch.Client, id string) (*APIKeyMetadata, error) { | ||
opts := []func(*esapi.SecurityGetAPIKeyRequest){ | ||
client.Security.GetAPIKey.WithContext(ctx), | ||
client.Security.GetAPIKey.WithID(id), | ||
} | ||
|
||
res, err := client.Security.GetAPIKey( | ||
opts..., | ||
) | ||
if err != nil { | ||
return nil, fmt.Errorf("request to elasticsearch failed: %w", err) | ||
} | ||
defer res.Body.Close() | ||
|
||
if res.IsError() { | ||
return nil, fmt.Errorf("%s: %w", res.String(), ErrAPIKeyNotFound) | ||
} | ||
|
||
type APIKeyResponse struct { | ||
ID string `json:"id"` | ||
Metadata Metadata `json:"metadata"` | ||
} | ||
type GetAPIKeyResponse struct { | ||
APIKeys []APIKeyResponse `json:"api_keys"` | ||
} | ||
|
||
var buff bytes.Buffer | ||
if _, err := buff.ReadFrom(res.Body); err != nil { | ||
return nil, fmt.Errorf("could not read from response body: %w", err) | ||
} | ||
|
||
var resp GetAPIKeyResponse | ||
if err = json.Unmarshal(buff.Bytes(), &resp); err != nil { | ||
aleksmaus marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return nil, fmt.Errorf( | ||
"could not Unmarshal elasticsearch GetAPIKeyResponse: %w", err) | ||
} | ||
|
||
if len(resp.APIKeys) == 0 { | ||
return nil, ErrAPIKeyNotFound | ||
} | ||
|
||
first := resp.APIKeys[0] | ||
return &APIKeyMetadata{ | ||
ID: first.ID, | ||
Metadata: first.Metadata, | ||
}, nil | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is only used for testing AFAIK? Right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no, it's used to invalidate API keys as well |
||
// APIKey is used to represent an Elasticsearch API Key. | ||
type APIKey struct { | ||
ID string | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
more of a question: is Outputs new outputs or old outputs?
what happens in case you change output (replace output A with output B so output A is no longer part of the policy) in policy to different ES? will this contain old output so we can invalidate apikeys?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these are the outputs in the agent document. the current ones, there isn't really new or old output here.
it all breaks as currently fleet-server cannot support any other ES besides it own ES. Also the UI does not let the user input any credentials to access ES