diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index 6122c1a7181..5ca627a7cfa 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -16,6 +16,10 @@ package config import ( "context" + "crypto/tls" + "crypto/x509" + "io/ioutil" + "net/http" "sync" "time" @@ -37,6 +41,7 @@ type Configuration struct { MaxSpanAge time.Duration `yaml:"max_span_age"` // configures the maximum lookback on span reads NumShards int64 `yaml:"shards"` NumReplicas int64 `yaml:"replicas"` + Timeout time.Duration `validate:"min=500"` BulkSize int BulkWorkers int BulkActions int @@ -45,6 +50,15 @@ type Configuration struct { TagsFilePath string AllTagsAsFields bool TagDotReplacement string + TLS TLSConfig +} + +// TLSConfig describes the configuration properties to connect tls enabled ElasticSearch cluster +type TLSConfig struct { + Enabled bool + CertPath string + KeyPath string + CaPath string } // ClientBuilder creates new es.Client @@ -64,7 +78,12 @@ func (c *Configuration) NewClient(logger *zap.Logger, metricsFactory metrics.Fac if len(c.Servers) < 1 { return nil, errors.New("No servers specified") } - rawClient, err := elastic.NewClient(c.GetConfigs()...) + options, err := c.getConfigOptions() + if err != nil { + return nil, err + } + + rawClient, err := elastic.NewClient(options...) if err != nil { return nil, err } @@ -187,11 +206,60 @@ func (c *Configuration) GetTagDotReplacement() string { return c.TagDotReplacement } -// GetConfigs wraps the configs to feed to the ElasticSearch client init -func (c *Configuration) GetConfigs() []elastic.ClientOptionFunc { - options := make([]elastic.ClientOptionFunc, 3) - options[0] = elastic.SetURL(c.Servers...) - options[1] = elastic.SetBasicAuth(c.Username, c.Password) - options[2] = elastic.SetSniff(c.Sniffer) - return options +// getConfigOptions wraps the configs to feed to the ElasticSearch client init +func (c *Configuration) getConfigOptions() ([]elastic.ClientOptionFunc, error) { + options := []elastic.ClientOptionFunc{elastic.SetURL(c.Servers...), elastic.SetSniff(c.Sniffer)} + httpClient := &http.Client{ + Timeout: c.Timeout, + } + options = append(options, elastic.SetHttpClient(httpClient)) + if c.TLS.Enabled { + ctlsConfig, err := c.TLS.createTLSConfig() + if err != nil { + return nil, err + } + httpClient.Transport = &http.Transport{ + TLSClientConfig: ctlsConfig, + } + } else { + options = append(options, elastic.SetBasicAuth(c.Username, c.Password)) + } + return options, nil +} + +// createTLSConfig creates TLS Configuration to connect with ES Cluster. +func (tlsConfig *TLSConfig) createTLSConfig() (*tls.Config, error) { + rootCerts, err := tlsConfig.loadCertificate() + if err != nil { + return nil, err + } + clientPrivateKey, err := tlsConfig.loadPrivateKey() + if err != nil { + return nil, err + } + return &tls.Config{ + RootCAs: rootCerts, + Certificates: []tls.Certificate{*clientPrivateKey}, + }, nil + +} + +// loadCertificate is used to load root certification +func (tlsConfig *TLSConfig) loadCertificate() (*x509.CertPool, error) { + caCert, err := ioutil.ReadFile(tlsConfig.CaPath) + if err != nil { + return nil, err + } + certificates := x509.NewCertPool() + certificates.AppendCertsFromPEM(caCert) + return certificates, nil +} + +// loadPrivateKey is used to load the private certificate and key for TLS +func (tlsConfig *TLSConfig) loadPrivateKey() (*tls.Certificate, error) { + privateKey, err := tls.LoadX509KeyPair(tlsConfig.CertPath, tlsConfig.KeyPath) + if err != nil { + return nil, err + } + return &privateKey, nil } diff --git a/plugin/storage/es/options.go b/plugin/storage/es/options.go index 0e650499e7a..81be05f6588 100644 --- a/plugin/storage/es/options.go +++ b/plugin/storage/es/options.go @@ -36,6 +36,11 @@ const ( suffixBulkWorkers = ".bulk.workers" suffixBulkActions = ".bulk.actions" suffixBulkFlushInterval = ".bulk.flush-interval" + suffixTimeout = ".timeout" + suffixTLS = ".tls" + suffixCert = ".tls.cert" + suffixKey = ".tls.key" + suffixCA = ".tls.ca" suffixIndexPrefix = ".index-prefix" suffixTagsAsFields = ".tags-as-fields" suffixTagsAsFieldsAll = suffixTagsAsFields + ".all" @@ -119,6 +124,10 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { nsConfig.namespace+suffixServerURLs, nsConfig.servers, "The comma-separated list of ElasticSearch servers, must be full url i.e. http://localhost:9200") + flagSet.Duration( + nsConfig.namespace+suffixTimeout, + nsConfig.Timeout, + "Timeout used for queries") flagSet.Duration( nsConfig.namespace+suffixMaxSpanAge, nsConfig.MaxSpanAge, @@ -147,6 +156,22 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { nsConfig.namespace+suffixBulkFlushInterval, nsConfig.BulkFlushInterval, "A time.Duration after which bulk requests are committed, regardless of other tresholds. Set to zero to disable. By default, this is disabled.") + flagSet.Bool( + nsConfig.namespace+suffixTLS, + nsConfig.TLS.Enabled, + "Enable TLS") + flagSet.String( + nsConfig.namespace+suffixCert, + nsConfig.TLS.CertPath, + "Path to TLS certificate file") + flagSet.String( + nsConfig.namespace+suffixKey, + nsConfig.TLS.KeyPath, + "Path to TLS key file") + flagSet.String( + nsConfig.namespace+suffixCA, + nsConfig.TLS.CaPath, + "Path to TLS CA file") flagSet.String( nsConfig.namespace+suffixIndexPrefix, nsConfig.IndexPrefix, @@ -185,6 +210,11 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) { cfg.BulkWorkers = v.GetInt(cfg.namespace + suffixBulkWorkers) cfg.BulkActions = v.GetInt(cfg.namespace + suffixBulkActions) cfg.BulkFlushInterval = v.GetDuration(cfg.namespace + suffixBulkFlushInterval) + cfg.Timeout = v.GetDuration(cfg.namespace + suffixTimeout) + cfg.TLS.Enabled = v.GetBool(cfg.namespace + suffixTLS) + cfg.TLS.CertPath = v.GetString(cfg.namespace + suffixCert) + cfg.TLS.KeyPath = v.GetString(cfg.namespace + suffixKey) + cfg.TLS.CaPath = v.GetString(cfg.namespace + suffixCA) cfg.IndexPrefix = v.GetString(cfg.namespace + suffixIndexPrefix) cfg.AllTagsAsFields = v.GetBool(cfg.namespace + suffixTagsAsFieldsAll) cfg.TagsFilePath = v.GetString(cfg.namespace + suffixTagsFile)