Skip to content

Commit

Permalink
Add go-elasticsearch dependency
Browse files Browse the repository at this point in the history
Enable requests with the official go elasticsearch library.
  • Loading branch information
simitt committed Nov 12, 2019
1 parent 09056ad commit 6d9e0a9
Show file tree
Hide file tree
Showing 353 changed files with 81,396 additions and 0 deletions.
18 changes: 18 additions & 0 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,24 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

--------------------------------------------------------------------
Dependency: github.com/elastic/go-elasticsearch
Revision: 8efb73c32e7fa9d1e2e905d9ec112d51382cabf4
License type (autodetected): Apache-2.0
./vendor/github.com/elastic/go-elasticsearch/LICENSE:
--------------------------------------------------------------------
Apache License 2.0


--------------------------------------------------------------------
Dependency: github.com/elastic/go-elasticsearch/v8
Revision: 8efb73c32e7fa9d1e2e905d9ec112d51382cabf4
License type (autodetected): Apache-2.0
./vendor/github.com/elastic/go-elasticsearch/v8/LICENSE:
--------------------------------------------------------------------
Apache License 2.0


--------------------------------------------------------------------
Dependency: github.com/elastic/go-lumber
Revision: 616041e345fc33c97bc0eb0fa6b388aa07bca3e1
Expand Down
146 changes: 146 additions & 0 deletions elasticsearch/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package elasticsearch

import (
"net/http"
"net/url"
"strings"
"time"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/libbeat/outputs/transport"

goelasticsearch "github.com/elastic/go-elasticsearch/v8"
)

const (
prefixHTTP = "http"
prefixHTTPSchema = prefixHTTP + "://"
esDefaultPort = 9200
)

var (
errInvalidHosts = errors.New("`Hosts` must at least contain one hostname")
errConfigMissing = errors.New("config missing")
)

// Config holds all configurable fields that are used to create a Client
type Config struct {
Hosts Hosts `config:"hosts" validate:"required"`
Protocol string `config:"protocol"`
Path string `config:"path"`
ProxyURL string `config:"proxy_url"`
ProxyDisable bool `config:"proxy_disable"`
Timeout time.Duration `config:"timeout"`
TLS *tlscommon.Config `config:"ssl"`
}

// Hosts is an array of host strings and needs to have at least one entry
type Hosts []string

// NewClient creates an elasticsearch client from given config
func NewClient(config *Config) (*goelasticsearch.Client, error) {
if config == nil {
return nil, errConfigMissing
}

//following logic is inspired by libbeat functionality

var err error
proxy, err := httpProxyURL(config)
if err != nil {
return nil, err
}

addresses, err := addresses(config)
if err != nil {
return nil, err
}

dialer, tlsDialer, err := dialer(config)
if err != nil {
return nil, err
}

return goelasticsearch.NewClient(goelasticsearch.Config{
Addresses: addresses,
Transport: &http.Transport{
Proxy: proxy,
Dial: dialer.Dial,
DialTLS: tlsDialer.Dial,
},
})
}

// Validate ensures Hosts instance has at least one entry
func (h Hosts) Validate() error {
if len(h) == 0 {
return errInvalidHosts
}
return nil
}

func httpProxyURL(cfg *Config) (func(*http.Request) (*url.URL, error), error) {
if cfg.ProxyDisable {
return nil, nil
}

if cfg.ProxyURL == "" {
return http.ProxyFromEnvironment, nil
}

proxyStr := cfg.ProxyURL
if !strings.HasPrefix(proxyStr, prefixHTTP) {
proxyStr = prefixHTTPSchema + proxyStr
}
u, err := url.Parse(proxyStr)
if err != nil {
return nil, err
}
return http.ProxyURL(u), nil
}

func addresses(cfg *Config) ([]string, error) {
var addresses []string
for _, host := range cfg.Hosts {
address, err := common.MakeURL(cfg.Protocol, cfg.Path, host, esDefaultPort)
if err != nil {
return nil, err
}
addresses = append(addresses, address)
}
return addresses, nil
}

func dialer(cfg *Config) (transport.Dialer, transport.Dialer, error) {
var tlsConfig *tlscommon.TLSConfig
var err error
if cfg.TLS.IsEnabled() {
if tlsConfig, err = tlscommon.LoadTLSConfig(cfg.TLS); err != nil {
return nil, nil, err
}
}

dialer := transport.NetDialer(cfg.Timeout)
tlsDialer, err := transport.TLSDialer(dialer, tlsConfig, cfg.Timeout)
return dialer, tlsDialer, err
}
106 changes: 106 additions & 0 deletions elasticsearch/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package elasticsearch

import (
"net/http"
"net/http/httptest"
"os"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestClient(t *testing.T) {
t.Run("no config", func(t *testing.T) {
goESClient, err := NewClient(nil)
assert.Error(t, err)
assert.Nil(t, goESClient)
})

t.Run("valid config", func(t *testing.T) {
cfg := Config{Hosts: Hosts{"localhost:9200", "localhost:9201"}}
goESClient, err := NewClient(&cfg)
require.NoError(t, err)
assert.NotNil(t, goESClient)
})
}

func TestClient_httpProxyUrl(t *testing.T) {
t.Run("proxy disabled", func(t *testing.T) {
proxy, err := httpProxyURL(&Config{ProxyDisable: true})
require.Nil(t, err)
assert.Nil(t, proxy)
})

t.Run("proxy from ENV", func(t *testing.T) {
// set env var for http proxy
os.Setenv("HTTP_PROXY", "proxy")

// create proxy function
proxy, err := httpProxyURL(&Config{})
require.Nil(t, err)
// ensure proxy function is called and check url
url, err := proxy(httptest.NewRequest(http.MethodGet, "http://example.com", nil))
require.Nil(t, err)
assert.Equal(t, "http://proxy", url.String())
})

t.Run("proxy from URL", func(t *testing.T) {
// set env var for http proxy
os.Setenv("HTTP_PROXY", "proxy")

// create proxy function from URL without `http` prefix
proxy, err := httpProxyURL(&Config{ProxyURL: "foo"})
require.Nil(t, err)
// ensure proxy function is called and check url
url, err := proxy(httptest.NewRequest(http.MethodGet, "http://example.com/", nil))
require.Nil(t, err)
assert.Equal(t, "http://foo", url.String())

// create proxy function from URL with `http` prefix
proxy, err = httpProxyURL(&Config{ProxyURL: "http://foo"})
require.Nil(t, err)
// ensure proxy function is called and check url
url, err = proxy(httptest.NewRequest(http.MethodGet, "http://example.com/", nil))
require.Nil(t, err)
assert.Equal(t, "http://foo", url.String())
})
}

func TestClient_addresses(t *testing.T) {
t.Run("no protocol and path", func(t *testing.T) {
addresses, err := addresses(&Config{Hosts: []string{
"http://localhost", "http://localhost:9300", "localhost", "192.0.0.1", "192.0.0.2:8080"}})
require.NoError(t, err)
expected := []string{"http://localhost:9200", "http://localhost:9300",
"http://localhost:9200", "http://192.0.0.1:9200", "http://192.0.0.2:8080"}
assert.ElementsMatch(t, expected, addresses)
})

t.Run("with protocol and path", func(t *testing.T) {
addresses, err := addresses(&Config{Protocol: "https", Path: "xyz",
Hosts: []string{"http://localhost", "http://localhost:9300/abc",
"localhost/abc", "192.0.0.2:8080"}})
require.NoError(t, err)
expected := []string{"http://localhost:9200/xyz", "http://localhost:9300/abc",
"https://localhost:9200/abc", "https://192.0.0.2:8080/xyz"}
assert.ElementsMatch(t, expected, addresses)
})
}
Loading

0 comments on commit 6d9e0a9

Please sign in to comment.