diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 593ff80e217..7e27e1e4f35 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -248,6 +248,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix ARN parsing function to work for ELB ARNs. {pull}14316[14316] - Update azure configuration example. {issue}14224[14224] - Fix cloudwatch metricset with names and dimensions in config. {issue}14376[14376] {pull}14391[14391] +- Fix marshaling of ms-since-epoch values in `elasticsearch/cluster_stats` metricset. {pull}14378[14378] *Packetbeat* diff --git a/metricbeat/module/elasticsearch/cluster_stats/data_xpack.go b/metricbeat/module/elasticsearch/cluster_stats/data_xpack.go index 02545542ea6..0dacb45d56f 100644 --- a/metricbeat/module/elasticsearch/cluster_stats/data_xpack.go +++ b/metricbeat/module/elasticsearch/cluster_stats/data_xpack.go @@ -33,11 +33,6 @@ import ( "github.com/elastic/beats/metricbeat/module/elasticsearch" ) -type clusterStatsLicense struct { - *elasticsearch.License - ClusterNeedsTLS bool `json:"cluster_needs_tls"` -} - func clusterNeedsTLSEnabled(license *elasticsearch.License, stackStats common.MapStr) (bool, error) { // TLS does not need to be enabled if license type is something other than trial if !license.IsOneOf("trial") { @@ -205,7 +200,8 @@ func eventMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info, c return errors.Wrap(err, "failed to determine if cluster needs TLS enabled") } - l := clusterStatsLicense{license, clusterNeedsTLS} + l := license.ToMapStr() + l["cluster_needs_tls"] = clusterNeedsTLS isAPMFound, err := apmIndicesExist(clusterState) if err != nil { diff --git a/metricbeat/module/elasticsearch/elasticsearch.go b/metricbeat/module/elasticsearch/elasticsearch.go index b8c905c64f2..920ea4e14b3 100644 --- a/metricbeat/module/elasticsearch/elasticsearch.go +++ b/metricbeat/module/elasticsearch/elasticsearch.go @@ -480,6 +480,26 @@ func (l *License) IsOneOf(candidateLicenses ...string) bool { return false } +// ToMapStr converts the license to a common.MapStr. This is necessary +// for proper marshaling of the data before it's sent over the wire. In +// particular it ensures that ms-since-epoch values are marshaled as longs +// and not floats in scientific notation as Elasticsearch does not like that. +func (l *License) ToMapStr() common.MapStr { + return common.MapStr{ + "status": l.Status, + "id": l.ID, + "type": l.Type, + "issue_date": l.IssueDate, + "issue_date_in_millis": l.IssueDateInMillis, + "expiry_date": l.ExpiryDate, + "expiry_date_in_millis": l.ExpiryDateInMillis, + "max_nodes": l.MaxNodes, + "issued_to": l.IssuedTo, + "issuer": l.Issuer, + "start_date_in_millis": l.StartDateInMillis, + } +} + func getSettingGroup(allSettings common.MapStr, groupKey string) (common.MapStr, error) { hasSettingGroup, err := allSettings.HasKey(groupKey) if err != nil { diff --git a/metricbeat/module/elasticsearch/test_elasticsearch.py b/metricbeat/module/elasticsearch/test_elasticsearch.py index 0f89f6511b2..9b64ce4df89 100644 --- a/metricbeat/module/elasticsearch/test_elasticsearch.py +++ b/metricbeat/module/elasticsearch/test_elasticsearch.py @@ -109,6 +109,46 @@ def test_xpack(self): proc.check_kill_and_wait() self.assert_no_logged_warnings() + @unittest.skipUnless(metricbeat.INTEGRATION_TESTS, "integration test") + def test_xpack_cluster_stats(self): + """ + elasticsearch-xpack module test for type:cluster_stats + """ + self.render_config_template(modules=[{ + "name": "elasticsearch", + "metricsets": [ + "ccr", + "cluster_stats", + "enrich", + "index", + "index_recovery", + "index_summary", + "ml_job", + "node_stats", + "shard" + ], + "hosts": self.get_hosts(), + "period": "1s", + "extras": { + "xpack.enabled": "true" + } + }]) + proc = self.start_beat() + self.wait_log_contains('"type": "cluster_stats"') + + # self.wait_until(lambda: self.output_has_message('"type":"cluster_stats"')) + proc.check_kill_and_wait() + self.assert_no_logged_warnings() + + docs = self.read_output_json() + for doc in docs: + t = doc["type"] + if t != "cluster_stats": + continue + license = doc["license"] + issue_date = license["issue_date_in_millis"] + self.assertIsNot(type(issue_date), float) + def create_ml_job(self): # Check if an ml job already exists response = self.ml_es.get_jobs()