-
Notifications
You must be signed in to change notification settings - Fork 1
/
receiver.go
132 lines (112 loc) · 3.8 KB
/
receiver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package leaderreceivercreator
import (
"fmt"
"context"
"os"
"path/filepath"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
var _ receiver.Metrics = (*leaderReceiverCreator)(nil)
// leaderReceiverCreator implements consumer.Metrics.
type leaderReceiverCreator struct {
params receiver.CreateSettings
cfg *Config
nextLogsConsumer consumer.Logs
nextMetricsConsumer consumer.Metrics
nextTracesConsumer consumer.Traces
host component.Host
subReceiverRunner *receiverRunner
cancel context.CancelFunc
}
func newLeaderReceiverCreator(params receiver.CreateSettings, cfg *Config) component.Component {
return &leaderReceiverCreator{
params: params,
cfg: cfg,
}
}
// Start receiver_creator.
func (ler *leaderReceiverCreator) Start(ctx context.Context, host component.Host) error {
ler.host = host
ctx = context.Background()
ctx, ler.cancel = context.WithCancel(ctx)
ler.params.TelemetrySettings.Logger.Info("Starting leader election receiver...")
client, err := ler.newClient()
if err != nil {
return fmt.Errorf("failed to create Kubernetes client: %w", err)
}
ler.params.TelemetrySettings.Logger.Info("Creating leader elector...")
leaderElector, err := newLeaderElector(
client,
func(ctx context.Context) {
ler.params.TelemetrySettings.Logger.Info("Elected as leader")
if err := ler.startSubReceiver(); err != nil {
ler.params.TelemetrySettings.Logger.Error("Failed to start subreceiver", zap.Error(err))
}
},
func() {
ler.params.TelemetrySettings.Logger.Info("Lost leadership")
if err := ler.stopSubReceiver(); err != nil {
ler.params.TelemetrySettings.Logger.Error("Failed to stop subreceiver", zap.Error(err))
}
},
)
if err != nil {
return fmt.Errorf("failed to create leader elector: %w", err)
}
leaderElector.Run(ctx)
return nil
}
func (ler *leaderReceiverCreator) newClient() (kubernetes.Interface, error) {
kubeConfigPath := filepath.Join(os.Getenv("HOME"), ".kube/config")
config, err := rest.InClusterConfig()
if err != nil {
ler.params.TelemetrySettings.Logger.Warn("Cannot find in cluster config", zap.Error(err))
config, err = clientcmd.BuildConfigFromFlags("", kubeConfigPath)
if err != nil {
ler.params.TelemetrySettings.Logger.Error("Cannot build ClientConfig", zap.Error(err))
return nil, err
}
}
client, err := kubernetes.NewForConfig(config)
if err != nil {
ler.params.TelemetrySettings.Logger.Error("Cannot create Kubernetes client", zap.Error(err))
return nil, err
}
return client, nil
}
func (ler *leaderReceiverCreator) startSubReceiver() error {
ler.params.TelemetrySettings.Logger.Info("Starting subreceiver",
zap.String("name", ler.cfg.subreceiverConfig.id.String()))
ler.subReceiverRunner = newReceiverRunner(ler.params, ler.host)
if err := ler.subReceiverRunner.start(
receiverConfig{
id: ler.cfg.subreceiverConfig.id,
config: ler.cfg.subreceiverConfig.config,
},
ler.nextLogsConsumer,
ler.nextMetricsConsumer,
ler.nextTracesConsumer,
); err != nil {
return fmt.Errorf("failed to start subreceiver %s: %w", ler.cfg.subreceiverConfig.id.String(), err)
}
return nil
}
func (ler *leaderReceiverCreator) stopSubReceiver() error {
ler.params.TelemetrySettings.Logger.Info("Stopping subreceiver",
zap.String("name", ler.cfg.subreceiverConfig.id.String()))
ler.subReceiverRunner.shutdown(context.Background())
return nil
}
// Shutdown stops the receiver_creator and all its receivers started at runtime.
func (ler *leaderReceiverCreator) Shutdown(context.Context) error {
ler.cancel()
return nil
}