Skip to content

Commit

Permalink
Merge pull request #45 from shogo82148/implement-eks-plugin
Browse files Browse the repository at this point in the history
implement EKS plugin
  • Loading branch information
shogo82148 authored Apr 1, 2020
2 parents 26e0e38 + 9f71c07 commit 9a5f4ed
Show file tree
Hide file tree
Showing 6 changed files with 276 additions and 36 deletions.
31 changes: 30 additions & 1 deletion plugins/ecs/ecs.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
package ecs

import (
"bufio"
"os"
"runtime"
"strings"

"github.com/shogo82148/aws-xray-yasdk-go/xray"
"github.com/shogo82148/aws-xray-yasdk-go/xray/schema"
)

const cgroupPath = "/proc/self/cgroup"

type plugin struct {
ECS *schema.ECS
}

// Init activates ECS Plugin at runtime.
func Init() {
if runtime.GOOS != "linux" {
return
}
uri := os.Getenv("ECS_CONTAINER_METADATA_URI")
if !strings.HasPrefix(uri, "http://") {
return
Expand All @@ -24,7 +31,8 @@ func Init() {
}
xray.AddPlugin(&plugin{
ECS: &schema.ECS{
Container: hostname,
Container: hostname,
ContainerID: containerID(cgroupPath),
},
})
}
Expand All @@ -39,3 +47,24 @@ func (p *plugin) HandleSegment(seg *xray.Segment, doc *schema.Segment) {

// Origin implements Plugin.
func (*plugin) Origin() string { return schema.OriginECSContainer }

// Reads the docker-generated cgroup file that lists the full (untruncated) docker container ID at the end of each line.
// This method takes advantage of that fact by just reading the 64-character ID from the end of the first line.
func containerID(cgroup string) string {
const idLength = 64
f, err := os.Open(cgroup)
if err != nil {
return ""
}
defer f.Close()

scanner := bufio.NewScanner(f)
if !scanner.Scan() {
return ""
}
line := scanner.Text()
if len(line) < idLength {
return ""
}
return line[len(line)-idLength:]
}
26 changes: 26 additions & 0 deletions plugins/ecs/ecs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package ecs

import (
"io/ioutil"
"os"
"path/filepath"
"testing"
)

func TestContainerID(t *testing.T) {
tmp, err := ioutil.TempDir("", "")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmp)

want := "42e85902377f5b9e758dfa6537377e2da86338b4b40c20d875251082e8a1da84"
dummyCGroup := filepath.Join(tmp, "tmpfile")
if err := ioutil.WriteFile(dummyCGroup, []byte("14:name=systemd:/docker/"+want+"\n"), 0644); err != nil {
t.Fatal(err)
}
got := containerID(dummyCGroup)
if got != want {
t.Errorf("want %s, got %s", want, got)
}
}
171 changes: 171 additions & 0 deletions plugins/eks/eks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package eks

import (
"bufio"
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"io/ioutil"
"net"
"net/http"
"os"
"runtime"
"time"

"github.com/shogo82148/aws-xray-yasdk-go/xray"
"github.com/shogo82148/aws-xray-yasdk-go/xray/schema"
"github.com/shogo82148/aws-xray-yasdk-go/xray/xraylog"
)

const (
caCertificateFile = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
tokenFile = "/var/run/secrets/kubernetes.io/serviceaccount/token"
cgroupPath = "/proc/self/cgroup"
)

type plugin struct {
EKS *schema.EKS
}

// Init activates EKS Plugin at runtime.
func Init() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

if runtime.GOOS != "linux" {
return
}
caCert, err := ioutil.ReadFile(caCertificateFile)
if err != nil {
// it seems not to be in kubernetes environment.
// just ignore error.
xraylog.Debugf(ctx, "failed to read ca.crt: %v", err)
return
}
token, err := ioutil.ReadFile(tokenFile)
if err != nil {
xraylog.Debugf(ctx, "failed to read token: %v", err)
return
}

certPool := x509.NewCertPool()
certPool.AppendCertsFromPEM(caCert)
client := &http.Client{
Transport: &http.Transport{
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
TLSClientConfig: &tls.Config{
RootCAs: certPool,
},
},
}
// we don't reuse the client, so release its resources.
defer closeIdleConnections(client)

hostname, err := os.Hostname()
if err != nil {
xraylog.Debugf(ctx, "failed to get hostname: %v", err)
return
}
xray.AddPlugin(&plugin{
EKS: &schema.EKS{
ClusterName: clusterName(ctx, client, string(bytes.TrimSpace(token))),
ContainerID: containerID(ctx, cgroupPath),
Pod: hostname,
},
})
}

// HandleSegment implements Plugin.
func (p *plugin) HandleSegment(seg *xray.Segment, doc *schema.Segment) {
if doc.AWS == nil {
doc.AWS = schema.AWS{}
}
doc.AWS.SetEKS(p.EKS)
}

// Origin implements Plugin.
func (*plugin) Origin() string { return schema.OriginEKSContainer }

// Reads the docker-generated cgroup file that lists the full (untruncated) docker container ID at the end of each line.
// This method takes advantage of that fact by just reading the 64-character ID from the end of the first line.
func containerID(ctx context.Context, cgroup string) string {
const idLength = 64
f, err := os.Open(cgroup)
if err != nil {
xraylog.Debugf(ctx, "failed to read cgroup: %v", err)
return ""
}
defer f.Close()

scanner := bufio.NewScanner(f)
if !scanner.Scan() {
if err := scanner.Err(); err != nil {
xraylog.Debugf(ctx, "failed to read cgroup: %v", err)
}
return ""
}
line := scanner.Text()
if len(line) < idLength {
return ""
}
line = line[len(line)-idLength:]
xraylog.Debugf(ctx, "container id is %s", line)
return line
}

func clusterName(ctx context.Context, client *http.Client, token string) string {
const apiEndpoint = "https://kubernetes.default.svc"
const configMapPath = "/api/v1/namespaces/amazon-cloudwatch/configmaps/cluster-info"
req, err := http.NewRequest(http.MethodGet, apiEndpoint+configMapPath, nil)
if err != nil {
xraylog.Debugf(ctx, "failed to create a new request: %v", err)
return ""
}
if token != "" {
req.Header.Set("Authorization", "Bearer "+token)
}
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
req = req.WithContext(ctx)

resp, err := client.Do(req)
if err != nil {
xraylog.Debugf(ctx, "failed to get the cluster name: %v", err)
return ""
}
defer resp.Body.Close()

var data struct {
Data struct {
ClusterName string `json:"cluster.name"`
} `json:"data"`
}
dec := json.NewDecoder(resp.Body)
if err := dec.Decode(&data); err != nil {
xraylog.Debugf(ctx, "failed to decode: %v", err)
return ""
}
xraylog.Debugf(ctx, "cluster name is %s", data.Data.ClusterName)
return data.Data.ClusterName
}

// call CloseIdleConnections() if the client have the method.
// for Go 1.11
func closeIdleConnections(client interface{}) {
type IdleConnectionsCloser interface {
CloseIdleConnections()
}
if c, ok := client.(IdleConnectionsCloser); ok {
c.CloseIdleConnections()
}
}
27 changes: 27 additions & 0 deletions plugins/eks/eks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package eks

import (
"context"
"io/ioutil"
"os"
"path/filepath"
"testing"
)

func TestContainerID(t *testing.T) {
tmp, err := ioutil.TempDir("", "")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmp)

want := "42e85902377f5b9e758dfa6537377e2da86338b4b40c20d875251082e8a1da84"
dummyCGroup := filepath.Join(tmp, "tmpfile")
if err := ioutil.WriteFile(dummyCGroup, []byte("14:name=systemd:/docker/"+want+"\n"), 0644); err != nil {
t.Fatal(err)
}
got := containerID(context.Background(), dummyCGroup)
if got != want {
t.Errorf("want %s, got %s", want, got)
}
}
7 changes: 7 additions & 0 deletions plugins/eks/init/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package init

import "github.com/shogo82148/aws-xray-yasdk-go/plugins/eks"

func init() {
eks.Init()
}
50 changes: 15 additions & 35 deletions xray/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
// ref. https://docs.aws.amazon.com/xray/latest/devguide/xray-api-segmentdocuments.html
package schema

import "strings"
import (
"strings"
)

// The value of Segment.Origin.
const (
Expand Down Expand Up @@ -167,57 +169,26 @@ func (aws AWS) SetAccountID(accountID string) {
aws["account_id"] = accountID
}

// ECS returns the information about an Amazon ECS container.
func (aws AWS) ECS() *ECS {
if aws == nil {
return nil
}
v, _ := aws["ecs"].(*ECS)
return v
}

// SetECS sets ECS.
func (aws AWS) SetECS(ecs *ECS) {
aws["ecs"] = ecs
}

// EC2 returns the information about an Amazon EC2 instance.
func (aws AWS) EC2() *EC2 {
if aws == nil {
return nil
}
v, _ := aws["ec2"].(*EC2)
return v
// SetEKS sets EKS.
func (aws AWS) SetEKS(eks *EKS) {
aws["eks"] = eks
}

// SetEC2 sets EC2.
func (aws AWS) SetEC2(ec2 *EC2) {
aws["ec2"] = ec2
}

// ElasticBeanstalk returns the information about an Elastic Beanstalk environment.
func (aws AWS) ElasticBeanstalk() *ElasticBeanstalk {
if aws == nil {
return nil
}
v, _ := aws["elastic_beanstalk"].(*ElasticBeanstalk)
return v
}

// SetElasticBeanstalk sets ElasticBeanstalk.
func (aws AWS) SetElasticBeanstalk(bean *ElasticBeanstalk) {
aws["elastic_beanstalk"] = bean
}

// XRay returns the information about the X-Ray SDK.
func (aws AWS) XRay() *XRay {
if aws == nil {
return nil
}
v, _ := aws["xray"].(*XRay)
return v
}

// SetXRay sets XRay.
func (aws AWS) SetXRay(xray *XRay) {
aws["xray"] = xray
Expand All @@ -227,6 +198,15 @@ func (aws AWS) SetXRay(xray *XRay) {
type ECS struct {
// The container ID of the container running your application.
Container string `json:"container,omitempty"`

ContainerID string `json:"containerId,omitempty"`
}

// EKS is information about an Amazon EKS container.
type EKS struct {
ClusterName string `json:"cluster_name,omitempty"`
Pod string `json:"pod,omitempty"`
ContainerID string `json:"containerId,omitempty"`
}

// EC2 is information about an EC2 instance.
Expand Down

0 comments on commit 9a5f4ed

Please sign in to comment.