diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index ee5bc6b054f3..d04bf505c8c7 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -17,6 +17,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fixed a crash under Windows when fetching processes information. {pull}12833[12833] - Update to Golang 1.12.7. {pull}12931[12931] - Remove `in_cluster` configuration parameter for Kuberentes, now in-cluster configuration is used only if no other kubeconfig is specified {pull}13051[13051] +- Disable Alibaba Cloud and Tencent Cloud metadata providers by default. {pull}13812[12812] *Auditbeat* @@ -257,6 +258,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add autodetection mode for add_docker_metadata and enable it by default in included configuration files{pull}13374[13374] - Added `monitoring.cluster_uuid` setting to associate Beat data with specified ES cluster in Stack Monitoring UI. {pull}13182[13182] - Add autodetection mode for add_kubernetes_metadata and enable it by default in included configuration files. {pull}13473[13473] +- Add `providers` setting to `add_cloud_metadata` processor. {pull}13812[13812] *Auditbeat* diff --git a/libbeat/docs/processors-using.asciidoc b/libbeat/docs/processors-using.asciidoc index 17c312e6f056..c01aad4e895c 100644 --- a/libbeat/docs/processors-using.asciidoc +++ b/libbeat/docs/processors-using.asciidoc @@ -505,8 +505,8 @@ not: === Add cloud metadata The `add_cloud_metadata` processor enriches each event with instance metadata -from the machine's hosting provider. At startup it will detect the hosting -provider and cache the instance metadata. +from the machine's hosting provider. At startup it will query a list of hosting +providers and cache the instance metadata. The following cloud providers are supported: @@ -518,6 +518,10 @@ The following cloud providers are supported: - Azure Virtual Machine - Openstack Nova +The Alibaba Cloud and Tencent cloud providers are disabled by default, because +they require to access a remote host. The `providers` setting allows users to +select a list of default providers to query. + The simple configuration below enables the processor. [source,yaml] @@ -526,7 +530,7 @@ processors: - add_cloud_metadata: ~ ------------------------------------------------------------------------------- -The `add_cloud_metadata` processor has two optional configuration settings. +The `add_cloud_metadata` processor has three optional configuration settings. The first one is `timeout` which specifies the maximum amount of time to wait for a successful response when detecting the hosting provider. The default timeout value is `3s`. @@ -535,7 +539,20 @@ If a timeout occurs then no instance metadata will be added to the events. This makes it possible to enable this processor for all your deployments (in the cloud or on-premise). -The second optional configuration setting is `overwrite`. When `overwrite` is +The second optional setting is `providers`. The `providers` settings accepts a +list of cloud provider names to be used. If `providers` is not configured, then +all providers that do not access a remote endpoint are enabled by default. + +List of names the `providers` setting supports: +- "alibaba", or "ecs" for the Alibaba Cloud provider (disabled by default). +- "azure" for Azure Virtual Machine (enabled by default). +- "digitalocean" for Digital Ocean (enabled by default). +- "aws", or "ec2" for Amazon Web Services (enabled by default). +- "gcp" for Google Copmute Enging (enabled by default). +- "openstack", or "nova" for Openstack Nova (enabled by default). +- "tencent", or "qcloud" for Tencent Cloud (disabled by default). + +The third optional configuration setting is `overwrite`. When `overwrite` is `true`, `add_cloud_metadata` overwrites existing `cloud.*` fields (`false` by default). diff --git a/libbeat/processors/add_cloud_metadata/add_cloud_metadata.go b/libbeat/processors/add_cloud_metadata/add_cloud_metadata.go index 451763801d5f..d4a8ea22baee 100644 --- a/libbeat/processors/add_cloud_metadata/add_cloud_metadata.go +++ b/libbeat/processors/add_cloud_metadata/add_cloud_metadata.go @@ -18,13 +18,7 @@ package add_cloud_metadata import ( - "bytes" - "context" - "encoding/json" "fmt" - "io/ioutil" - "net" - "net/http" "sync" "time" @@ -41,12 +35,6 @@ const ( // metadataHost is the IP that each of the cloud providers supported here // use for their metadata service. metadataHost = "169.254.169.254" - - // Default config - defaultTimeOut = 3 * time.Second - - // Default overwrite - defaultOverwrite = false ) var debugf = logp.MakeDebug("filters") @@ -57,305 +45,58 @@ func init() { jsprocessor.RegisterPlugin("AddCloudMetadata", New) } -type schemaConv func(m map[string]interface{}) common.MapStr - -// responseHandler is the callback function that used to write something -// to the result according the HTTP response. -type responseHandler func(all []byte, res *result) error - -type metadataFetcher struct { - provider string - headers map[string]string - responseHandlers map[string]responseHandler - conv schemaConv -} - -// fetchRaw queries raw metadata from a hosting provider's metadata service. -func (f *metadataFetcher) fetchRaw( - ctx context.Context, - client http.Client, - url string, - responseHandler responseHandler, - result *result, -) { - req, err := http.NewRequest("GET", url, nil) - if err != nil { - result.err = errors.Wrapf(err, "failed to create http request for %v", f.provider) - return - } - for k, v := range f.headers { - req.Header.Add(k, v) - } - req = req.WithContext(ctx) - - rsp, err := client.Do(req) - if err != nil { - result.err = errors.Wrapf(err, "failed requesting %v metadata", f.provider) - return - } - defer rsp.Body.Close() - - if rsp.StatusCode != http.StatusOK { - result.err = errors.Errorf("failed with http status code %v", rsp.StatusCode) - return - } - - all, err := ioutil.ReadAll(rsp.Body) - if err != nil { - result.err = errors.Wrapf(err, "failed requesting %v metadata", f.provider) - return - } - - // Decode JSON. - err = responseHandler(all, result) - if err != nil { - result.err = err - return - } - - return -} - -// fetchMetadata queries metadata from a hosting provider's metadata service. -// Some providers require multiple HTTP requests to gather the whole metadata, -// len(f.responseHandlers) > 1 indicates that multiple requests are needed. -func (f *metadataFetcher) fetchMetadata(ctx context.Context, client http.Client) result { - res := result{provider: f.provider, metadata: common.MapStr{}} - for url, responseHandler := range f.responseHandlers { - f.fetchRaw(ctx, client, url, responseHandler, &res) - if res.err != nil { - return res - } - } - - // Apply schema. - res.metadata = f.conv(res.metadata) - res.metadata["provider"] = f.provider - - return res -} - -// result is the result of a query for a specific hosting provider's metadata. -type result struct { - provider string // Hosting provider type. - err error // Error that occurred while fetching (if any). - metadata common.MapStr // A specific subset of the metadata received from the hosting provider. -} - -func (r result) String() string { - return fmt.Sprintf("result=[provider:%v, error=%v, metadata=%v]", - r.provider, r.err, r.metadata) -} - -// writeResult blocks until it can write the result r to the channel c or until -// the context times out. -func writeResult(ctx context.Context, c chan result, r result) error { - select { - case <-ctx.Done(): - return ctx.Err() - case c <- r: - return nil - } -} - -// fetchMetadata attempts to fetch metadata in parallel from each of the -// hosting providers supported by this processor. It wait for the results to -// be returned or for a timeout to occur then returns the results that -// completed in time. -func fetchMetadata(metadataFetchers []*metadataFetcher, timeout time.Duration) *result { - debugf("add_cloud_metadata: starting to fetch metadata, timeout=%v", timeout) - start := time.Now() - defer func() { - debugf("add_cloud_metadata: fetchMetadata ran for %v", time.Since(start)) - }() - - // Create HTTP client with our timeouts and keep-alive disabled. - client := http.Client{ - Timeout: timeout, - Transport: &http.Transport{ - DisableKeepAlives: true, - DialContext: (&net.Dialer{ - Timeout: timeout, - KeepAlive: 0, - }).DialContext, - }, - } - - // Create context to enable explicit cancellation of the http requests. - ctx, cancel := context.WithTimeout(context.TODO(), timeout) - defer cancel() - - c := make(chan result) - for _, fetcher := range metadataFetchers { - go func(fetcher *metadataFetcher) { - writeResult(ctx, c, fetcher.fetchMetadata(ctx, client)) - }(fetcher) - } - - for i := 0; i < len(metadataFetchers); i++ { - select { - case result := <-c: - debugf("add_cloud_metadata: received disposition for %v after %v. %v", - result.provider, time.Since(start), result) - // Bail out on first success. - if result.err == nil && result.metadata != nil { - return &result - } - case <-ctx.Done(): - debugf("add_cloud_metadata: timed-out waiting for all responses") - return nil - } - } - - return nil -} - -// getMetadataURLs loads config and generates the metadata URLs. -func getMetadataURLs(c *common.Config, defaultHost string, metadataURIs []string) ([]string, error) { - var urls []string - config := struct { - MetadataHostAndPort string `config:"host"` // Specifies the host and port of the metadata service (for testing purposes only). - }{ - MetadataHostAndPort: defaultHost, - } - err := c.Unpack(&config) - if err != nil { - return urls, errors.Wrap(err, "failed to unpack add_cloud_metadata config") - } - for _, uri := range metadataURIs { - urls = append(urls, "http://"+config.MetadataHostAndPort+uri) - } - return urls, nil -} - -// makeJSONPicker returns a responseHandler function that unmarshals JSON -// from a hosting provider's HTTP response and writes it to the result. -func makeJSONPicker(provider string) responseHandler { - return func(all []byte, res *result) error { - dec := json.NewDecoder(bytes.NewReader(all)) - dec.UseNumber() - err := dec.Decode(&res.metadata) - if err != nil { - err = errors.Wrapf(err, "failed to unmarshal %v JSON of '%v'", provider, string(all)) - return err - } - return nil - } -} - -// newMetadataFetcher return metadataFetcher with one pass JSON responseHandler. -func newMetadataFetcher( - c *common.Config, - provider string, - headers map[string]string, - host string, - conv schemaConv, - uri string, -) (*metadataFetcher, error) { - urls, err := getMetadataURLs(c, host, []string{uri}) - if err != nil { - return nil, err - } - responseHandlers := map[string]responseHandler{urls[0]: makeJSONPicker(provider)} - fetcher := &metadataFetcher{provider, headers, responseHandlers, conv} - return fetcher, nil +type addCloudMetadata struct { + initOnce sync.Once + initData *initData + metadata common.MapStr } -func setupFetchers(c *common.Config) ([]*metadataFetcher, error) { - var fetchers []*metadataFetcher - doFetcher, err := newDoMetadataFetcher(c) - if err != nil { - return fetchers, err - } - ec2Fetcher, err := newEc2MetadataFetcher(c) - if err != nil { - return fetchers, err - } - gceFetcher, err := newGceMetadataFetcher(c) - if err != nil { - return fetchers, err - } - qcloudFetcher, err := newQcloudMetadataFetcher(c) - if err != nil { - return fetchers, err - } - ecsFetcher, err := newAlibabaCloudMetadataFetcher(c) - if err != nil { - return fetchers, err - } - azFetcher, err := newAzureVmMetadataFetcher(c) - if err != nil { - return fetchers, err - } - osFetcher, err := newOpenstackNovaMetadataFetcher(c) - if err != nil { - return fetchers, err - } - - fetchers = []*metadataFetcher{ - doFetcher, - ec2Fetcher, - gceFetcher, - qcloudFetcher, - ecsFetcher, - azFetcher, - osFetcher, - } - return fetchers, nil +type initData struct { + fetchers []metadataFetcher + timeout time.Duration + overwrite bool } // New constructs a new add_cloud_metadata processor. func New(c *common.Config) (processors.Processor, error) { - config := struct { - Timeout time.Duration `config:"timeout"` // Amount of time to wait for responses from the metadata services. - Overwrite bool `config:"overwrite"` // Overwrite if cloud.* fields already exist. - }{ - Timeout: defaultTimeOut, - Overwrite: defaultOverwrite, - } - err := c.Unpack(&config) - if err != nil { + config := defaultConfig() + if err := c.Unpack(&config); err != nil { return nil, errors.Wrap(err, "failed to unpack add_cloud_metadata config") } - fetchers, err := setupFetchers(c) + initProviders := selectProviders(config.Providers, cloudMetaProviders) + fetchers, err := setupFetchers(initProviders, c) if err != nil { return nil, err } - p := &addCloudMetadata{ initData: &initData{fetchers, config.Timeout, config.Overwrite}, } - go p.initOnce.Do(p.init) + go p.init() return p, nil } -type initData struct { - fetchers []*metadataFetcher - timeout time.Duration - overwrite bool -} - -type addCloudMetadata struct { - initOnce sync.Once - initData *initData - metadata common.MapStr +func (r result) String() string { + return fmt.Sprintf("result=[provider:%v, error=%v, metadata=%v]", + r.provider, r.err, r.metadata) } func (p *addCloudMetadata) init() { - result := fetchMetadata(p.initData.fetchers, p.initData.timeout) - if result == nil { - logp.Info("add_cloud_metadata: hosting provider type not detected.") - return - } - p.metadata = result.metadata - logp.Info("add_cloud_metadata: hosting provider type detected as %v, metadata=%v", - result.provider, result.metadata.String()) + p.initOnce.Do(func() { + result := fetchMetadata(p.initData.fetchers, p.initData.timeout) + if result == nil { + logp.Info("add_cloud_metadata: hosting provider type not detected.") + return + } + p.metadata = result.metadata + logp.Info("add_cloud_metadata: hosting provider type detected as %v, metadata=%v", + result.provider, result.metadata.String()) + }) } func (p *addCloudMetadata) getMeta() common.MapStr { - p.initOnce.Do(p.init) + p.init() return p.metadata } @@ -376,7 +117,6 @@ func (p *addCloudMetadata) Run(event *beat.Event) (*beat.Event, error) { } _, err := event.PutValue("cloud", meta) - return event, err } diff --git a/libbeat/processors/add_cloud_metadata/config.go b/libbeat/processors/add_cloud_metadata/config.go new file mode 100644 index 000000000000..08f4a241483a --- /dev/null +++ b/libbeat/processors/add_cloud_metadata/config.go @@ -0,0 +1,76 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package add_cloud_metadata + +import ( + "fmt" + "time" +) + +type config struct { + Timeout time.Duration `config:"timeout"` // Amount of time to wait for responses from the metadata services. + Overwrite bool `config:"overwrite"` // Overwrite if cloud.* fields already exist. + Providers providerList `config:"providers"` // List of providers to probe +} + +type providerList []string + +const ( + // Default config + defaultTimeout = 3 * time.Second + + // Default overwrite + defaultOverwrite = false +) + +func defaultConfig() config { + return config{ + Timeout: defaultTimeout, + Overwrite: defaultOverwrite, + Providers: nil, // enable all local-only providers by default + } +} + +func (c *config) Validate() error { + // XXX: remove this check. A bug in go-ucfg prevents the correct validation + // on providerList + return c.Providers.Validate() +} + +func (l providerList) Has(name string) bool { + for _, elem := range l { + if string(elem) == name { + return true + } + } + return false +} + +func (l *providerList) Validate() error { + if l == nil { + return nil + } + + for _, name := range *l { + if _, ok := cloudMetaProviders[name]; !ok { + return fmt.Errorf("unknown provider '%v'", name) + } + } + return nil + +} diff --git a/libbeat/processors/add_cloud_metadata/http_fetcher.go b/libbeat/processors/add_cloud_metadata/http_fetcher.go new file mode 100644 index 000000000000..30e312443337 --- /dev/null +++ b/libbeat/processors/add_cloud_metadata/http_fetcher.go @@ -0,0 +1,159 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package add_cloud_metadata + +import ( + "bytes" + "context" + "encoding/json" + "io/ioutil" + "net/http" + + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/common" +) + +type httpMetadataFetcher struct { + provider string + headers map[string]string + responseHandlers map[string]responseHandler + conv schemaConv +} + +// responseHandler is the callback function that used to write something +// to the result according the HTTP response. +type responseHandler func(all []byte, res *result) error + +type schemaConv func(m map[string]interface{}) common.MapStr + +// newMetadataFetcher return metadataFetcher with one pass JSON responseHandler. +func newMetadataFetcher( + c *common.Config, + provider string, + headers map[string]string, + host string, + conv schemaConv, + uri string, +) (*httpMetadataFetcher, error) { + urls, err := getMetadataURLs(c, host, []string{uri}) + if err != nil { + return nil, err + } + responseHandlers := map[string]responseHandler{urls[0]: makeJSONPicker(provider)} + fetcher := &httpMetadataFetcher{provider, headers, responseHandlers, conv} + return fetcher, nil +} + +// fetchMetadata queries metadata from a hosting provider's metadata service. +// Some providers require multiple HTTP requests to gather the whole metadata, +// len(f.responseHandlers) > 1 indicates that multiple requests are needed. +func (f *httpMetadataFetcher) fetchMetadata(ctx context.Context, client http.Client) result { + res := result{provider: f.provider, metadata: common.MapStr{}} + for url, responseHandler := range f.responseHandlers { + f.fetchRaw(ctx, client, url, responseHandler, &res) + if res.err != nil { + return res + } + } + + // Apply schema. + res.metadata = f.conv(res.metadata) + res.metadata["provider"] = f.provider + + return res +} + +// fetchRaw queries raw metadata from a hosting provider's metadata service. +func (f *httpMetadataFetcher) fetchRaw( + ctx context.Context, + client http.Client, + url string, + responseHandler responseHandler, + result *result, +) { + req, err := http.NewRequest("GET", url, nil) + if err != nil { + result.err = errors.Wrapf(err, "failed to create http request for %v", f.provider) + return + } + for k, v := range f.headers { + req.Header.Add(k, v) + } + req = req.WithContext(ctx) + + rsp, err := client.Do(req) + if err != nil { + result.err = errors.Wrapf(err, "failed requesting %v metadata", f.provider) + return + } + defer rsp.Body.Close() + + if rsp.StatusCode != http.StatusOK { + result.err = errors.Errorf("failed with http status code %v", rsp.StatusCode) + return + } + + all, err := ioutil.ReadAll(rsp.Body) + if err != nil { + result.err = errors.Wrapf(err, "failed requesting %v metadata", f.provider) + return + } + + // Decode JSON. + err = responseHandler(all, result) + if err != nil { + result.err = err + return + } + + return +} + +// getMetadataURLs loads config and generates the metadata URLs. +func getMetadataURLs(c *common.Config, defaultHost string, metadataURIs []string) ([]string, error) { + var urls []string + config := struct { + MetadataHostAndPort string `config:"host"` // Specifies the host and port of the metadata service (for testing purposes only). + }{ + MetadataHostAndPort: defaultHost, + } + err := c.Unpack(&config) + if err != nil { + return urls, errors.Wrap(err, "failed to unpack add_cloud_metadata config") + } + for _, uri := range metadataURIs { + urls = append(urls, "http://"+config.MetadataHostAndPort+uri) + } + return urls, nil +} + +// makeJSONPicker returns a responseHandler function that unmarshals JSON +// from a hosting provider's HTTP response and writes it to the result. +func makeJSONPicker(provider string) responseHandler { + return func(all []byte, res *result) error { + dec := json.NewDecoder(bytes.NewReader(all)) + dec.UseNumber() + err := dec.Decode(&res.metadata) + if err != nil { + err = errors.Wrapf(err, "failed to unmarshal %v JSON of '%v'", provider, string(all)) + return err + } + return nil + } +} diff --git a/libbeat/processors/add_cloud_metadata/provider_alibaba_cloud.go b/libbeat/processors/add_cloud_metadata/provider_alibaba_cloud.go index 02fc158d4053..afa502990a20 100644 --- a/libbeat/processors/add_cloud_metadata/provider_alibaba_cloud.go +++ b/libbeat/processors/add_cloud_metadata/provider_alibaba_cloud.go @@ -21,38 +21,44 @@ import "github.com/elastic/beats/libbeat/common" // Alibaba Cloud Metadata Service // Document https://help.aliyun.com/knowledge_detail/49122.html -func newAlibabaCloudMetadataFetcher(c *common.Config) (*metadataFetcher, error) { - ecsMetadataHost := "100.100.100.200" - ecsMetadataInstanceIDURI := "/latest/meta-data/instance-id" - ecsMetadataRegionURI := "/latest/meta-data/region-id" - ecsMetadataZoneURI := "/latest/meta-data/zone-id" +var alibabaCloudMetadataFetcher = provider{ + Name: "alibaba-ecs", - ecsSchema := func(m map[string]interface{}) common.MapStr { - return common.MapStr(m) - } + Local: false, - urls, err := getMetadataURLs(c, ecsMetadataHost, []string{ - ecsMetadataInstanceIDURI, - ecsMetadataRegionURI, - ecsMetadataZoneURI, - }) - if err != nil { - return nil, err - } - responseHandlers := map[string]responseHandler{ - urls[0]: func(all []byte, result *result) error { - result.metadata.Put("instance.id", string(all)) - return nil - }, - urls[1]: func(all []byte, result *result) error { - result.metadata["region"] = string(all) - return nil - }, - urls[2]: func(all []byte, result *result) error { - result.metadata["availability_zone"] = string(all) - return nil - }, - } - fetcher := &metadataFetcher{"ecs", nil, responseHandlers, ecsSchema} - return fetcher, nil + Create: func(_ string, c *common.Config) (metadataFetcher, error) { + ecsMetadataHost := "100.100.100.200" + ecsMetadataInstanceIDURI := "/latest/meta-data/instance-id" + ecsMetadataRegionURI := "/latest/meta-data/region-id" + ecsMetadataZoneURI := "/latest/meta-data/zone-id" + + ecsSchema := func(m map[string]interface{}) common.MapStr { + return common.MapStr(m) + } + + urls, err := getMetadataURLs(c, ecsMetadataHost, []string{ + ecsMetadataInstanceIDURI, + ecsMetadataRegionURI, + ecsMetadataZoneURI, + }) + if err != nil { + return nil, err + } + responseHandlers := map[string]responseHandler{ + urls[0]: func(all []byte, result *result) error { + result.metadata.Put("instance.id", string(all)) + return nil + }, + urls[1]: func(all []byte, result *result) error { + result.metadata["region"] = string(all) + return nil + }, + urls[2]: func(all []byte, result *result) error { + result.metadata["availability_zone"] = string(all) + return nil + }, + } + fetcher := &httpMetadataFetcher{"ecs", nil, responseHandlers, ecsSchema} + return fetcher, nil + }, } diff --git a/libbeat/processors/add_cloud_metadata/provider_alibaba_cloud_test.go b/libbeat/processors/add_cloud_metadata/provider_alibaba_cloud_test.go index a35fb5c3de34..7a6b7265d693 100644 --- a/libbeat/processors/add_cloud_metadata/provider_alibaba_cloud_test.go +++ b/libbeat/processors/add_cloud_metadata/provider_alibaba_cloud_test.go @@ -55,7 +55,8 @@ func TestRetrieveAlibabaCloudMetadata(t *testing.T) { defer server.Close() config, err := common.NewConfigFrom(map[string]interface{}{ - "host": server.Listener.Addr().String(), + "providers": []string{"alibaba"}, + "host": server.Listener.Addr().String(), }) if err != nil { diff --git a/libbeat/processors/add_cloud_metadata/provider_aws_ec2.go b/libbeat/processors/add_cloud_metadata/provider_aws_ec2.go index 77b7869334f9..ed28f3b38aa5 100644 --- a/libbeat/processors/add_cloud_metadata/provider_aws_ec2.go +++ b/libbeat/processors/add_cloud_metadata/provider_aws_ec2.go @@ -26,19 +26,25 @@ import ( const ec2InstanceIdentityURI = "/2014-02-25/dynamic/instance-identity/document" // AWS EC2 Metadata Service -func newEc2MetadataFetcher(config *common.Config) (*metadataFetcher, error) { - ec2Schema := func(m map[string]interface{}) common.MapStr { - out, _ := s.Schema{ - "instance": s.Object{"id": c.Str("instanceId")}, - "machine": s.Object{"type": c.Str("instanceType")}, - "region": c.Str("region"), - "availability_zone": c.Str("availabilityZone"), - "account": s.Object{"id": c.Str("accountId")}, - "image": s.Object{"id": c.Str("imageId")}, - }.Apply(m) - return out - } +var ec2MetadataFetcher = provider{ + Name: "aws-ec2", - fetcher, err := newMetadataFetcher(config, "aws", nil, metadataHost, ec2Schema, ec2InstanceIdentityURI) - return fetcher, err + Local: true, + + Create: func(_ string, config *common.Config) (metadataFetcher, error) { + ec2Schema := func(m map[string]interface{}) common.MapStr { + out, _ := s.Schema{ + "instance": s.Object{"id": c.Str("instanceId")}, + "machine": s.Object{"type": c.Str("instanceType")}, + "region": c.Str("region"), + "availability_zone": c.Str("availabilityZone"), + "account": s.Object{"id": c.Str("accountId")}, + "image": s.Object{"id": c.Str("imageId")}, + }.Apply(m) + return out + } + + fetcher, err := newMetadataFetcher(config, "aws", nil, metadataHost, ec2Schema, ec2InstanceIdentityURI) + return fetcher, err + }, } diff --git a/libbeat/processors/add_cloud_metadata/provider_azure_vm.go b/libbeat/processors/add_cloud_metadata/provider_azure_vm.go index bbf5e19ff044..8ee84f417e3b 100644 --- a/libbeat/processors/add_cloud_metadata/provider_azure_vm.go +++ b/libbeat/processors/add_cloud_metadata/provider_azure_vm.go @@ -24,23 +24,29 @@ import ( ) // Azure VM Metadata Service -func newAzureVmMetadataFetcher(config *common.Config) (*metadataFetcher, error) { - azMetadataURI := "/metadata/instance/compute?api-version=2017-04-02" - azHeaders := map[string]string{"Metadata": "true"} - azSchema := func(m map[string]interface{}) common.MapStr { - out, _ := s.Schema{ - "instance": s.Object{ - "id": c.Str("vmId"), - "name": c.Str("name"), - }, - "machine": s.Object{ - "type": c.Str("vmSize"), - }, - "region": c.Str("location"), - }.Apply(m) - return out - } +var azureVMMetadataFetcher = provider{ + Name: "azure-compute", - fetcher, err := newMetadataFetcher(config, "az", azHeaders, metadataHost, azSchema, azMetadataURI) - return fetcher, err + Local: true, + + Create: func(_ string, config *common.Config) (metadataFetcher, error) { + azMetadataURI := "/metadata/instance/compute?api-version=2017-04-02" + azHeaders := map[string]string{"Metadata": "true"} + azSchema := func(m map[string]interface{}) common.MapStr { + out, _ := s.Schema{ + "instance": s.Object{ + "id": c.Str("vmId"), + "name": c.Str("name"), + }, + "machine": s.Object{ + "type": c.Str("vmSize"), + }, + "region": c.Str("location"), + }.Apply(m) + return out + } + + fetcher, err := newMetadataFetcher(config, "az", azHeaders, metadataHost, azSchema, azMetadataURI) + return fetcher, err + }, } diff --git a/libbeat/processors/add_cloud_metadata/provider_digital_ocean.go b/libbeat/processors/add_cloud_metadata/provider_digital_ocean.go index d3500dd49bc3..fe016ffb95e9 100644 --- a/libbeat/processors/add_cloud_metadata/provider_digital_ocean.go +++ b/libbeat/processors/add_cloud_metadata/provider_digital_ocean.go @@ -24,18 +24,24 @@ import ( ) // DigitalOcean Metadata Service -func newDoMetadataFetcher(config *common.Config) (*metadataFetcher, error) { - doSchema := func(m map[string]interface{}) common.MapStr { - out, _ := s.Schema{ - "instance": s.Object{ - "id": c.StrFromNum("droplet_id"), - }, - "region": c.Str("region"), - }.Apply(m) - return out - } - doMetadataURI := "/metadata/v1.json" +var doMetadataFetcher = provider{ + Name: "digitalocean", - fetcher, err := newMetadataFetcher(config, "digitalocean", nil, metadataHost, doSchema, doMetadataURI) - return fetcher, err + Local: true, + + Create: func(provider string, config *common.Config) (metadataFetcher, error) { + doSchema := func(m map[string]interface{}) common.MapStr { + out, _ := s.Schema{ + "instance": s.Object{ + "id": c.StrFromNum("droplet_id"), + }, + "region": c.Str("region"), + }.Apply(m) + return out + } + doMetadataURI := "/metadata/v1.json" + + fetcher, err := newMetadataFetcher(config, provider, nil, metadataHost, doSchema, doMetadataURI) + return fetcher, err + }, } diff --git a/libbeat/processors/add_cloud_metadata/provider_google_gce.go b/libbeat/processors/add_cloud_metadata/provider_google_gce.go index c69fb389531c..27c574f9f1dc 100644 --- a/libbeat/processors/add_cloud_metadata/provider_google_gce.go +++ b/libbeat/processors/add_cloud_metadata/provider_google_gce.go @@ -26,50 +26,56 @@ import ( ) // Google GCE Metadata Service -func newGceMetadataFetcher(config *common.Config) (*metadataFetcher, error) { - gceMetadataURI := "/computeMetadata/v1/?recursive=true&alt=json" - gceHeaders := map[string]string{"Metadata-Flavor": "Google"} - gceSchema := func(m map[string]interface{}) common.MapStr { - out := common.MapStr{} +var gceMetadataFetcher = provider{ + Name: "google-gce", - trimLeadingPath := func(key string) { - v, err := out.GetValue(key) - if err != nil { - return + Local: true, + + Create: func(provider string, config *common.Config) (metadataFetcher, error) { + gceMetadataURI := "/computeMetadata/v1/?recursive=true&alt=json" + gceHeaders := map[string]string{"Metadata-Flavor": "Google"} + gceSchema := func(m map[string]interface{}) common.MapStr { + out := common.MapStr{} + + trimLeadingPath := func(key string) { + v, err := out.GetValue(key) + if err != nil { + return + } + p, ok := v.(string) + if !ok { + return + } + out.Put(key, path.Base(p)) } - p, ok := v.(string) - if !ok { - return + + if instance, ok := m["instance"].(map[string]interface{}); ok { + s.Schema{ + "instance": s.Object{ + "id": c.StrFromNum("id"), + "name": c.Str("name"), + }, + "machine": s.Object{ + "type": c.Str("machineType"), + }, + "availability_zone": c.Str("zone"), + }.ApplyTo(out, instance) + trimLeadingPath("machine.type") + trimLeadingPath("availability_zone") } - out.Put(key, path.Base(p)) - } - if instance, ok := m["instance"].(map[string]interface{}); ok { - s.Schema{ - "instance": s.Object{ - "id": c.StrFromNum("id"), - "name": c.Str("name"), - }, - "machine": s.Object{ - "type": c.Str("machineType"), - }, - "availability_zone": c.Str("zone"), - }.ApplyTo(out, instance) - trimLeadingPath("machine.type") - trimLeadingPath("availability_zone") - } + if project, ok := m["project"].(map[string]interface{}); ok { + s.Schema{ + "project": s.Object{ + "id": c.Str("projectId"), + }, + }.ApplyTo(out, project) + } - if project, ok := m["project"].(map[string]interface{}); ok { - s.Schema{ - "project": s.Object{ - "id": c.Str("projectId"), - }, - }.ApplyTo(out, project) + return out } - return out - } - - fetcher, err := newMetadataFetcher(config, "gcp", gceHeaders, metadataHost, gceSchema, gceMetadataURI) - return fetcher, err + fetcher, err := newMetadataFetcher(config, provider, gceHeaders, metadataHost, gceSchema, gceMetadataURI) + return fetcher, err + }, } diff --git a/libbeat/processors/add_cloud_metadata/provider_openstack_nova.go b/libbeat/processors/add_cloud_metadata/provider_openstack_nova.go index d5b505601695..cbb516059b9f 100644 --- a/libbeat/processors/add_cloud_metadata/provider_openstack_nova.go +++ b/libbeat/processors/add_cloud_metadata/provider_openstack_nova.go @@ -31,40 +31,45 @@ const ( // newOpenstackNovaMetadataFetcher returns a metadataFetcher for the // OpenStack Nova Metadata Service // Document https://docs.openstack.org/nova/latest/user/metadata-service.html -func newOpenstackNovaMetadataFetcher(c *common.Config) (*metadataFetcher, error) { +var openstackNovaMetadataFetcher = provider{ + Name: "openstack-nova", - osSchema := func(m map[string]interface{}) common.MapStr { - return common.MapStr(m) - } + Local: true, - urls, err := getMetadataURLs(c, metadataHost, []string{ - osMetadataInstanceIDURI, - osMetadataInstanceTypeURI, - osMetadataHostnameURI, - osMetadataZoneURI, - }) - if err != nil { - return nil, err - } + Create: func(provider string, c *common.Config) (metadataFetcher, error) { + osSchema := func(m map[string]interface{}) common.MapStr { + return common.MapStr(m) + } - responseHandlers := map[string]responseHandler{ - urls[0]: func(all []byte, result *result) error { - result.metadata.Put("instance.id", string(all)) - return nil - }, - urls[1]: func(all []byte, result *result) error { - result.metadata.Put("machine.type", string(all)) - return nil - }, - urls[2]: func(all []byte, result *result) error { - result.metadata.Put("instance.name", string(all)) - return nil - }, - urls[3]: func(all []byte, result *result) error { - result.metadata["availability_zone"] = string(all) - return nil - }, - } - fetcher := &metadataFetcher{"openstack", nil, responseHandlers, osSchema} - return fetcher, nil + urls, err := getMetadataURLs(c, metadataHost, []string{ + osMetadataInstanceIDURI, + osMetadataInstanceTypeURI, + osMetadataHostnameURI, + osMetadataZoneURI, + }) + if err != nil { + return nil, err + } + + responseHandlers := map[string]responseHandler{ + urls[0]: func(all []byte, result *result) error { + result.metadata.Put("instance.id", string(all)) + return nil + }, + urls[1]: func(all []byte, result *result) error { + result.metadata.Put("machine.type", string(all)) + return nil + }, + urls[2]: func(all []byte, result *result) error { + result.metadata.Put("instance.name", string(all)) + return nil + }, + urls[3]: func(all []byte, result *result) error { + result.metadata["availability_zone"] = string(all) + return nil + }, + } + fetcher := &httpMetadataFetcher{"openstack", nil, responseHandlers, osSchema} + return fetcher, nil + }, } diff --git a/libbeat/processors/add_cloud_metadata/provider_tencent_cloud.go b/libbeat/processors/add_cloud_metadata/provider_tencent_cloud.go index 6a380377de1e..d24778c6850c 100644 --- a/libbeat/processors/add_cloud_metadata/provider_tencent_cloud.go +++ b/libbeat/processors/add_cloud_metadata/provider_tencent_cloud.go @@ -21,38 +21,44 @@ import "github.com/elastic/beats/libbeat/common" // Tencent Cloud Metadata Service // Document https://www.qcloud.com/document/product/213/4934 -func newQcloudMetadataFetcher(c *common.Config) (*metadataFetcher, error) { - qcloudMetadataHost := "metadata.tencentyun.com" - qcloudMetadataInstanceIDURI := "/meta-data/instance-id" - qcloudMetadataRegionURI := "/meta-data/placement/region" - qcloudMetadataZoneURI := "/meta-data/placement/zone" +var qcloudMetadataFetcher = provider{ + Name: "tencent-qcloud", - qcloudSchema := func(m map[string]interface{}) common.MapStr { - return common.MapStr(m) - } + Local: false, - urls, err := getMetadataURLs(c, qcloudMetadataHost, []string{ - qcloudMetadataInstanceIDURI, - qcloudMetadataRegionURI, - qcloudMetadataZoneURI, - }) - if err != nil { - return nil, err - } - responseHandlers := map[string]responseHandler{ - urls[0]: func(all []byte, result *result) error { - result.metadata.Put("instance.id", string(all)) - return nil - }, - urls[1]: func(all []byte, result *result) error { - result.metadata["region"] = string(all) - return nil - }, - urls[2]: func(all []byte, result *result) error { - result.metadata["availability_zone"] = string(all) - return nil - }, - } - fetcher := &metadataFetcher{"qcloud", nil, responseHandlers, qcloudSchema} - return fetcher, nil + Create: func(_ string, c *common.Config) (metadataFetcher, error) { + qcloudMetadataHost := "metadata.tencentyun.com" + qcloudMetadataInstanceIDURI := "/meta-data/instance-id" + qcloudMetadataRegionURI := "/meta-data/placement/region" + qcloudMetadataZoneURI := "/meta-data/placement/zone" + + qcloudSchema := func(m map[string]interface{}) common.MapStr { + return common.MapStr(m) + } + + urls, err := getMetadataURLs(c, qcloudMetadataHost, []string{ + qcloudMetadataInstanceIDURI, + qcloudMetadataRegionURI, + qcloudMetadataZoneURI, + }) + if err != nil { + return nil, err + } + responseHandlers := map[string]responseHandler{ + urls[0]: func(all []byte, result *result) error { + result.metadata.Put("instance.id", string(all)) + return nil + }, + urls[1]: func(all []byte, result *result) error { + result.metadata["region"] = string(all) + return nil + }, + urls[2]: func(all []byte, result *result) error { + result.metadata["availability_zone"] = string(all) + return nil + }, + } + fetcher := &httpMetadataFetcher{"qcloud", nil, responseHandlers, qcloudSchema} + return fetcher, nil + }, } diff --git a/libbeat/processors/add_cloud_metadata/provider_tencent_cloud_test.go b/libbeat/processors/add_cloud_metadata/provider_tencent_cloud_test.go index c7868148ad19..72be1934c2d6 100644 --- a/libbeat/processors/add_cloud_metadata/provider_tencent_cloud_test.go +++ b/libbeat/processors/add_cloud_metadata/provider_tencent_cloud_test.go @@ -55,7 +55,8 @@ func TestRetrieveQCloudMetadata(t *testing.T) { defer server.Close() config, err := common.NewConfigFrom(map[string]interface{}{ - "host": server.Listener.Addr().String(), + "providers": []string{"tencent"}, + "host": server.Listener.Addr().String(), }) if err != nil { diff --git a/libbeat/processors/add_cloud_metadata/providers.go b/libbeat/processors/add_cloud_metadata/providers.go new file mode 100644 index 000000000000..301e7d4731f6 --- /dev/null +++ b/libbeat/processors/add_cloud_metadata/providers.go @@ -0,0 +1,175 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package add_cloud_metadata + +import ( + "context" + "net" + "net/http" + "time" + + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/common" +) + +type provider struct { + // Name contains a long name of provider and service metadata is fetched from. + Name string + + // Local Set to true if local IP is accessed only + Local bool + + // Create returns an actual metadataFetcher + Create func(string, *common.Config) (metadataFetcher, error) +} + +type metadataFetcher interface { + fetchMetadata(context.Context, http.Client) result +} + +// result is the result of a query for a specific hosting provider's metadata. +type result struct { + provider string // Hosting provider type. + err error // Error that occurred while fetching (if any). + metadata common.MapStr // A specific subset of the metadata received from the hosting provider. +} + +var cloudMetaProviders = map[string]provider{ + "alibaba": alibabaCloudMetadataFetcher, + "ecs": alibabaCloudMetadataFetcher, + "azure": azureVMMetadataFetcher, + "digitalocean": doMetadataFetcher, + "aws": ec2MetadataFetcher, + "ec2": ec2MetadataFetcher, + "gcp": gceMetadataFetcher, + "openstack": openstackNovaMetadataFetcher, + "nova": openstackNovaMetadataFetcher, + "qcloud": qcloudMetadataFetcher, + "tencent": qcloudMetadataFetcher, +} + +func selectProviders(configList providerList, providers map[string]provider) map[string]provider { + return filterMetaProviders(providersFilter(configList, providers), providers) +} + +func providersFilter(configList providerList, allProviders map[string]provider) func(string) bool { + if len(configList) == 0 { + return func(name string) bool { + ff, ok := allProviders[name] + return ok && ff.Local + } + } + return func(name string) (ok bool) { + if ok = configList.Has(name); ok { + _, ok = allProviders[name] + } + return ok + } +} + +func filterMetaProviders(filter func(string) bool, fetchers map[string]provider) map[string]provider { + out := map[string]provider{} + for name, ff := range fetchers { + if filter(name) { + out[name] = ff + } + } + return out +} + +func setupFetchers(providers map[string]provider, c *common.Config) ([]metadataFetcher, error) { + mf := make([]metadataFetcher, 0, len(providers)) + visited := map[string]bool{} + + // Iterate over all providers and create an unique meta-data fetcher per provider type. + // Some providers might appear twice in the set of providers to support aliases on provider names. + // For example aws and ec2 both use the same provider. + // The loop tracks already seen providers in the `visited` set, to ensure that we do not create + // duplicate providers for aliases. + for name, ff := range providers { + if visited[ff.Name] { + continue + } + visited[ff.Name] = true + + fetcher, err := ff.Create(name, c) + if err != nil { + return nil, errors.Wrapf(err, "failed to initialize the %v fetcher", name) + } + + mf = append(mf, fetcher) + } + return mf, nil +} + +// fetchMetadata attempts to fetch metadata in parallel from each of the +// hosting providers supported by this processor. It wait for the results to +// be returned or for a timeout to occur then returns the first result that +// completed in time. +func fetchMetadata(metadataFetchers []metadataFetcher, timeout time.Duration) *result { + debugf("add_cloud_metadata: starting to fetch metadata, timeout=%v", timeout) + start := time.Now() + defer func() { + debugf("add_cloud_metadata: fetchMetadata ran for %v", time.Since(start)) + }() + + // Create HTTP client with our timeouts and keep-alive disabled. + client := http.Client{ + Timeout: timeout, + Transport: &http.Transport{ + DisableKeepAlives: true, + DialContext: (&net.Dialer{ + Timeout: timeout, + KeepAlive: 0, + }).DialContext, + }, + } + + // Create context to enable explicit cancellation of the http requests. + ctx, cancel := context.WithTimeout(context.TODO(), timeout) + defer cancel() + + results := make(chan result) + for _, fetcher := range metadataFetchers { + fetcher := fetcher + go func() { + select { + case <-ctx.Done(): + case results <- fetcher.fetchMetadata(ctx, client): + } + }() + } + + for i := 0; i < len(metadataFetchers); i++ { + select { + case result := <-results: + debugf("add_cloud_metadata: received disposition for %v after %v. %v", + result.provider, time.Since(start), result) + // Bail out on first success. + if result.err == nil && result.metadata != nil { + return &result + } + case <-ctx.Done(): + debugf("add_cloud_metadata: timed-out waiting for all responses") + return nil + } + } + + return nil +} diff --git a/libbeat/processors/add_cloud_metadata/providers_test.go b/libbeat/processors/add_cloud_metadata/providers_test.go new file mode 100644 index 000000000000..e9c8732c54a2 --- /dev/null +++ b/libbeat/processors/add_cloud_metadata/providers_test.go @@ -0,0 +1,100 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package add_cloud_metadata + +import ( + "sort" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/common" +) + +func TestProvidersFilter(t *testing.T) { + var all []string + var allLocal []string + for name, ff := range cloudMetaProviders { + all = append(all, name) + if ff.Local { + allLocal = append(allLocal, name) + } + } + + cases := map[string]struct { + config map[string]interface{} + fail bool + expected []string + }{ + "all with local access only if not configured": { + config: map[string]interface{}{}, + expected: allLocal, + }, + "fail to load if unknown name is used": { + config: map[string]interface{}{ + "providers": []string{"unknown"}, + }, + fail: true, + }, + "only selected": { + config: map[string]interface{}{ + "providers": []string{"aws", "gcp", "digitalocean"}, + }, + }, + } + + copyStrings := func(in []string) (out []string) { + for _, str := range in { + out = append(out, str) + } + return out + } + + for name, test := range cases { + t.Run(name, func(t *testing.T) { + rawConfig := common.MustNewConfigFrom(test.config) + + config := defaultConfig() + err := rawConfig.Unpack(&config) + if err == nil && test.fail { + t.Fatal("Did expect to fail on unpack") + } else if err != nil && !test.fail { + t.Fatal("Unpack failed", err) + } else if test.fail && err != nil { + return + } + + // compute list of providers that should have matched + var expected []string + if len(test.expected) == 0 && len(config.Providers) > 0 { + expected = copyStrings(config.Providers) + } else { + expected = copyStrings(test.expected) + } + sort.Strings(expected) + + var actual []string + for name := range selectProviders(config.Providers, cloudMetaProviders) { + actual = append(actual, name) + } + + sort.Strings(actual) + assert.Equal(t, expected, actual) + }) + } +}