Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Features/tracing: grafana dashboard & metrics yaml & event recorder #111

Merged
merged 7 commits into from
Jun 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func main() {
}

// initialize gRPC
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
grpcConn, err := grpc.DialContext(ctx, defaultSocketPath, grpc.WithInsecure(), grpc.WithContextDialer(
func(ctx context.Context, s string) (net.Conn, error) {
unixAddr, err := net.ResolveUnixAddr("unix", defaultSocketPath)
Expand Down
31 changes: 29 additions & 2 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,34 @@ func (networkService *networkService) GetIPInfo(ctx context.Context, r *rpc.GetI
}
}

func (networkService *networkService) RecordEvent(_ context.Context, r *rpc.EventRequest) (*rpc.EventReply, error) {
eventType := eventTypeNormal
if r.EventType == rpc.EventType_EventTypeWarning {
eventType = eventTypeWarning
}

reply := &rpc.EventReply{
Succeed: true,
Error: "",
}

if r.EventTarget == rpc.EventTarget_EventTargetNode { // Node
networkService.k8s.RecordNodeEvent(eventType, r.Reason, r.Message)
return reply, nil
}

// Pod
err := networkService.k8s.RecordPodEvent(r.K8SPodName, r.K8SPodNamespace, eventType, r.Reason, r.Message)
if err != nil {
reply.Succeed = false
reply.Error = err.Error()

return reply, err
}

return reply, nil
}

func (networkService *networkService) verifyPodNetworkType(podNetworkMode string) bool {
return (networkService.daemonMode == daemonModeVPC && //vpc
(podNetworkMode == podNetworkTypeVPCENI || podNetworkMode == podNetworkTypeVPCIP)) ||
Expand Down Expand Up @@ -618,7 +646,6 @@ func (networkService *networkService) Execute(cmd string, _ []string, message ch
}

func (networkService *networkService) GetResourceMapping() ([]tracing.PodResourceMapping, error) {
log.Println("get network_service resource mapping")
var resourceMapping []tracing.ResourceMapping

var err error
Expand Down Expand Up @@ -676,7 +703,6 @@ func (networkService *networkService) GetResourceMapping() ([]tracing.PodResourc
mapping[i].Resource = res
}

log.Printf("get network_service resource mapping done: %v\n", mapping)
return mapping, nil
}

Expand Down Expand Up @@ -831,6 +857,7 @@ func newNetworkService(configFilePath, kubeconfig, master, daemonMode string) (r
// register for tracing
_ = tracing.Register(tracing.ResourceTypeNetworkService, "default", netSrv)
tracing.RegisterResourceMapping(netSrv)
tracing.RegisterEventRecorder(netSrv.k8s.RecordNodeEvent, netSrv.k8s.RecordPodEvent)

return netSrv, nil
}
Expand Down
70 changes: 68 additions & 2 deletions daemon/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ import (
"time"
"unicode"

"github.com/AliyunContainerService/terway/pkg/tracing"

"k8s.io/client-go/kubernetes/scheme"

"k8s.io/client-go/tools/clientcmd"

"github.com/AliyunContainerService/terway/deviceplugin"
Expand All @@ -24,6 +28,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
typedv1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
)

const (
Expand All @@ -35,6 +41,9 @@ const (

apiServerTimeout = 70 * time.Second
apiServerReconnectThrottle = 2 * time.Minute

eventTypeNormal = corev1.EventTypeNormal
eventTypeWarning = corev1.EventTypeWarning
)

type podInfo struct {
Expand All @@ -56,14 +65,19 @@ type Kubernetes interface {
GetServiceCidr() *net.IPNet
GetNodeCidr() *net.IPNet
SetNodeAllocatablePod(count int) error
RecordNodeEvent(eventType, reason, message string)
RecordPodEvent(podName, podNamespace, eventType, reason, message string) error
}

type k8s struct {
client kubernetes.Interface
storage storage.Storage
broadcaster record.EventBroadcaster
recorder record.EventRecorder
mode string
nodeName string
nodeCidr *net.IPNet
node *corev1.Node
svcCidr *net.IPNet
apiConn *connTracker
apiConnTime time.Time
Expand Down Expand Up @@ -94,6 +108,13 @@ func newK8S(master, kubeconfig string, svcCidr *net.IPNet, daemonMode string) (K
return nil, errors.Wrap(err, "failed getting node name")
}

node, err := client.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{
ResourceVersion: "0",
})
if err != nil {
return nil, errors.Wrap(err, "failed getting node")
}

if svcCidr == nil {
svcCidr, err = serviceCidrFromAPIServer(client)
if err != nil {
Expand All @@ -114,18 +135,29 @@ func newK8S(master, kubeconfig string, svcCidr *net.IPNet, daemonMode string) (K
return nil, errors.Wrapf(err, "failed init db storage with path %s and bucket %s", dbPath, dbName)
}

broadcaster := record.NewBroadcaster()
source := corev1.EventSource{Component: "terway-daemon"}
recorder := broadcaster.NewRecorder(scheme.Scheme, source)

sink := &typedv1.EventSinkImpl{
Interface: typedv1.New(client.CoreV1().RESTClient()).Events(""),
}
broadcaster.StartRecordingToSink(sink)

k8sObj := &k8s{
client: client,
mode: daemonMode,
node: node,
nodeName: nodeName,
nodeCidr: nodeCidr,
svcCidr: svcCidr,
storage: storage,
apiConn: t,
broadcaster: broadcaster,
recorder: recorder,
apiConnTime: time.Now(),
Locker: &sync.RWMutex{},
}

go func() {
for range time.Tick(storageCleanPeriod) {
err := k8sObj.clean()
Expand Down Expand Up @@ -271,7 +303,9 @@ func convertPod(daemonMode string, pod *corev1.Pod) *podInfo {
if ingress, err := parseBandwidth(ingressBandwidth); err == nil {
pi.TcIngress = ingress
}
//TODO write event on pod if parse bandwidth fail

_ = tracing.RecordPodEvent(pod.Name, pod.Namespace, eventTypeWarning,
"ParseFailed", fmt.Sprintf("Parse bandwidth %s failed.", ingressBandwidth))
}
if egressBandwidth, ok := podAnnotation[podEgressBandwidth]; ok {
if egress, err := parseBandwidth(egressBandwidth); err == nil {
Expand Down Expand Up @@ -478,6 +512,38 @@ func (k *k8s) reconnectOnTimeoutError(err error) {
}
}

func (k *k8s) RecordNodeEvent(eventType, reason, message string) {
ref := &corev1.ObjectReference{
Kind: "Node",
Name: k.node.Name,
UID: k.node.UID,
Namespace: "",
}

k.recorder.Event(ref, eventType, reason, message)
}

func (k *k8s) RecordPodEvent(podName, podNamespace, eventType, reason, message string) error {
pod, err := k.client.CoreV1().Pods(podNamespace).Get(podName, metav1.GetOptions{
ResourceVersion: "0",
})

if err != nil {
k.reconnectOnTimeoutError(err)
return err
}

ref := &corev1.ObjectReference{
Kind: "Pod",
Name: pod.Name,
UID: pod.UID,
Namespace: pod.Namespace,
}

k.recorder.Event(ref, eventType, reason, message)
return nil
}

// connTracker is a dialer that tracks all open connections it creates.
type connTracker struct {
dialer *net.Dialer
Expand Down
70 changes: 70 additions & 0 deletions monitoring/terway-metric-proxy.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# use this config to expose terway metrics api on node
# and register it to prometheus
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: terway-metric-svcmonitor
# use your namespace where prometheus in
# arms-prom for Aliyun ARMS Prometheus
namespace: arms-prom
spec:
jobLabel: terway-metric
selector:
mathLabels:
app: terway-metric-proxy
namespaceSelector:
matchNames:
- kube-system
endpoints:
- port: metric
path: /metrics
# pull interval
interval: 15s

---

apiVersion: extensions/v1beta1
kind: DaemonSet
metadata:
name: terway-metric-proxy
namespace: kube-system
spec:
template:
metadata:
labels:
app: terway-metric-proxy
annotations:
scheduler.alpha.kubernetes.io/critical-pod: ''
spec:
nodeSelector:
beta.kubernetes.io/arch: amd64
tolerations:
- operator: "Exists"
hostNetwork: true
containers:
- name: proxy
image: alpine/socat
command: [ "socat", "-d", "-d", "TCP4-LISTEN:15432,fork", "UNIX-CONNECT:/var/run/eni/eni_debug.socket" ]
volumeMounts:
- name: terway-metric
mountPath: /var/run/eni/
volumes:
- name: terway-metric
hostPath:
path: /var/run/eni/
---

apiVersion: v1
kind: Service
metadata:
name: terway-metric
namespace: kube-system
spec:
selector:
app: terway-metric-proxy
clusterIP: None
ports:
- name: metric
protocol: TCP
port: 15432
targetPort: 15432
Loading