Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update trisolaris dynamic config #8450

Merged
merged 1 commit into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions server/controller/trisolaris/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,6 @@ const (
)

const (
CONFIG_KEY_ENABLED = "global.common.enabled"
CONFIG_KEY_HOSTNAME = "global.self_monitoring.hostname"
CONFIG_KEY_AGENT_TYPE = "global.common.agent_type"
CONFIG_KEY_MAX_MEMORY = "global.limits.max_memory"
CONFIG_KEY_MAX_ESCAPE_DURATION = "global.communication.max_escape_duration"
CONFIG_KEY_INGESTER_IP = "global.communication.ingester_ip"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/deepflowio/deepflow/server/controller/trisolaris"
. "github.com/deepflowio/deepflow/server/controller/trisolaris/common"
"github.com/deepflowio/deepflow/server/controller/trisolaris/pushmanager"
"github.com/deepflowio/deepflow/server/controller/trisolaris/utils"
"github.com/deepflowio/deepflow/server/controller/trisolaris/vtap"
"github.com/deepflowio/deepflow/server/libs/logger"
)
Expand All @@ -58,16 +59,13 @@ func NewAgentEvent() *AgentEvent {
return &AgentEvent{}
}

func (e *AgentEvent) generateUserConfig(c *vtap.VTapCache, clusterID string, gAgentInfo *vtap.VTapInfo, orgID int) string {
func (e *AgentEvent) generateUserConfig(c *vtap.VTapCache, clusterID string, gAgentInfo *vtap.VTapInfo, orgID int) *viper.Viper {
userConfig := c.GetUserConfig()
viperConfig := viper.New()
viperConfig.SetConfigType("yaml")
if err := viperConfig.ReadConfig(bytes.NewBufferString(userConfig)); err != nil {
log.Errorf("viper read agent(%d) config yaml error: %v", c.GetVTapID(), err)
}
if clusterID != "" { // if agent report cluster_id, force set tridentType = VTAP_TYPE_POD_VM
viperConfig.Set(CONFIG_KEY_AGENT_TYPE, VTAP_TYPE_POD_VM)
}

configTSDBIP := gAgentInfo.GetConfigTSDBIP()
if configTSDBIP != "" {
Expand All @@ -91,24 +89,24 @@ func (e *AgentEvent) generateUserConfig(c *vtap.VTapCache, clusterID string, gAg
log.Errorf("agent(%s) has no proxy_controller_ip, "+
"Please check whether the agent allocs controller IP or If nat-ip is enabled, whether the controller is configured with nat-ip", c.GetCtrlIP())
}
if viperConfig.GetString(CONFIG_KEY_INGESTER_IP) == "" {
viperConfig.Set(CONFIG_KEY_ENABLED, false)
log.Errorf("agent(%s) has no tsdb_ip, "+
"Please check whether the agent allocs tsdb IP or If nat-ip is enabled, whether the tsdb is configured with nat-ip", c.GetCtrlIP())
}

if c.GetVTapEnabled() == 0 {
viperConfig.Set(CONFIG_KEY_HYPERVISOR_RESOURCE_ENABLED, false)
viperConfig.Set(CONFIG_KEY_ENABLED, false)
}

return e.formateViperConfigToString(viperConfig)
return viperConfig
}

func (e *AgentEvent) generateDynamicConfig(c *vtap.VTapCache) *api.DynamicConfig {

func (e *AgentEvent) generateDynamicConfig(clusterID string, c *vtap.VTapCache) *api.DynamicConfig {
agentType := c.GetVTapType()
if clusterID != "" { // if agent report cluster_id, force set tridentType = VTAP_TYPE_POD_VM
agentType = VTAP_TYPE_POD_VM
}
return &api.DynamicConfig{
AgentType: utils.Int2AgentTypePtr(agentType),
Enabled: proto.Bool(c.GetVTapEnabled() != 0),
KubernetesApiEnabled: proto.Bool(false),
Hostname: proto.String(c.GetVTapHost()),
RegionId: proto.Uint32(uint32(c.GetRegionID())),
PodClusterId: proto.Uint32(uint32(c.GetPodClusterID())),
VpcId: proto.Uint32(uint32(c.GetVPCID())),
Expand Down Expand Up @@ -180,6 +178,7 @@ func (e *AgentEvent) Sync(ctx context.Context, in *api.SyncRequest) (*api.SyncRe
ctrlIP := in.GetCtrlIp()
ctrlMac := in.GetCtrlMac()
teamIDStr := in.GetTeamId()
clusterID := in.GetKubernetesClusterId()
orgID, teamIDInt := trisolaris.GetOrgInfoByTeamID(teamIDStr)
gAgentInfo := trisolaris.GetORGVTapInfo(orgID)
if gAgentInfo == nil {
Expand All @@ -200,7 +199,7 @@ func (e *AgentEvent) Sync(ctx context.Context, in *api.SyncRequest) (*api.SyncRe
}
log.Warningf("vtap (ctrl_ip: %s, ctrl_mac: %s, team_id: (str=%s,int=%d), host_ips: %s, kubernetes_cluster_id: %s, kubernetes_force_watch: %t, group_id: %s) not found in cache. "+
"NAME:%s REVISION:%s BOOT_TIME:%d",
ctrlIP, ctrlMac, teamIDStr, teamIDInt, in.GetHostIps(), in.GetKubernetesClusterId(), in.GetKubernetesForceWatch(),
ctrlIP, ctrlMac, teamIDStr, teamIDInt, in.GetHostIps(), clusterID, in.GetKubernetesForceWatch(),
in.GetAgentGroupIdRequest(), in.GetProcessName(), in.GetRevision(), in.GetBootTime(), logger.NewORGPrefix(orgID))
// If the kubernetes_force_watch field is true, the ctrl_ip and ctrl_mac of the vtap will not change,
// resulting in unsuccessful registration and a large number of error logs.
Expand Down Expand Up @@ -303,19 +302,24 @@ func (e *AgentEvent) Sync(ctx context.Context, in *api.SyncRequest) (*api.SyncRe
tapTypes = gAgentInfo.GetCaptureNetworkTypes()
}

dynamicConfig := e.generateDynamicConfig(vtapCache)
dynamicConfig := e.generateDynamicConfig(clusterID, vtapCache)
// 携带信息有cluster_id时选择一个采集器开启云平台同步开关
if in.GetKubernetesClusterId() != "" && isOpenK8sSyn(vtapCache.GetVTapType()) == true {
value := gAgentInfo.GetKubernetesClusterID(in.GetKubernetesClusterId(), vtapCacheKey, in.GetKubernetesForceWatch(), int(in.GetKubernetesWatchPolicy()))
if clusterID != "" && isOpenK8sSyn(vtapCache.GetVTapType()) == true {
value := gAgentInfo.GetKubernetesClusterID(clusterID, vtapCacheKey, in.GetKubernetesForceWatch(), int(in.GetKubernetesWatchPolicy()))
if value == vtapCacheKey {
log.Infof(
"open cluster(%s) kubernetes_api_enabled Agent(ctrl_ip: %s, ctrl_mac: %s, team_id: (str=%s,int=%d), kubernetes_force_watch: %t)",
in.GetKubernetesClusterId(), ctrlIP, ctrlMac,
clusterID, ctrlIP, ctrlMac,
teamIDStr, teamIDInt, in.GetKubernetesForceWatch(), logger.NewORGPrefix(orgID))
dynamicConfig.KubernetesApiEnabled = proto.Bool(true)
}
}
userConfig := e.generateUserConfig(vtapCache, in.GetKubernetesClusterId(), gAgentInfo, orgID)
userConfig := e.generateUserConfig(vtapCache, clusterID, gAgentInfo, orgID)
if userConfig.GetString(CONFIG_KEY_INGESTER_IP) == "" {
dynamicConfig.Enabled = proto.Bool(false)
log.Errorf("agent(%s) has no ingester_ip, "+
"Please check whether the agent allocs tsdb IP or If nat-ip is enabled, whether the tsdb is configured with nat-ip", vtapCache.GetCtrlIP())
}
localSegments := vtapCache.GetAgentLocalSegments()
remoteSegments := vtapCache.GetAgentRemoteSegments()
upgradeRevision := vtapCache.GetExpectedRevision()
Expand All @@ -325,7 +329,7 @@ func (e *AgentEvent) Sync(ctx context.Context, in *api.SyncRequest) (*api.SyncRe
Status: &STATUS_SUCCESS,
LocalSegments: localSegments,
RemoteSegments: remoteSegments,
UserConfig: proto.String(userConfig),
UserConfig: proto.String(e.formateViperConfigToString(userConfig)),
DynamicConfig: dynamicConfig,
PlatformData: platformData,
Groups: groups,
Expand All @@ -343,6 +347,7 @@ func (e *AgentEvent) Sync(ctx context.Context, in *api.SyncRequest) (*api.SyncRe

func (e *AgentEvent) generateNoAgentCacheDynamicConfig() *api.DynamicConfig {
return &api.DynamicConfig{
Enabled: proto.Bool(false),
KubernetesApiEnabled: proto.Bool(false),
}
}
Expand Down Expand Up @@ -374,22 +379,21 @@ func (e *AgentEvent) noAgentResponse(in *api.SyncRequest, orgID int) *api.SyncRe
ctrlMac := in.GetCtrlMac()
vtapCacheKey := ctrlIP + "-" + ctrlMac

clusterID := in.GetKubernetesClusterId()
dynamicConfigInfo := e.generateNoAgentCacheDynamicConfig()
viperConfig := e.generateNoAgentCacheUserViperConfig(in.GetAgentGroupIdRequest(), orgID)
gAgentInfo := trisolaris.GetORGVTapInfo(orgID)
if in.GetKubernetesClusterId() != "" {
value := gAgentInfo.GetKubernetesClusterID(in.GetKubernetesClusterId(), vtapCacheKey, in.GetKubernetesForceWatch(), int(in.GetKubernetesWatchPolicy()))
if clusterID != "" {
dynamicConfigInfo.AgentType = utils.Int2AgentTypePtr(VTAP_TYPE_POD_VM)
value := gAgentInfo.GetKubernetesClusterID(clusterID, vtapCacheKey, in.GetKubernetesForceWatch(), int(in.GetKubernetesWatchPolicy()))
if value == vtapCacheKey {
dynamicConfigInfo.KubernetesApiEnabled = proto.Bool(true)
log.Infof(
"open cluster(%s) kubernetes_api_enabled Agent(ctrl_ip: %s, ctrl_mac: %s, kubernetes_force_watch: %t)",
in.GetKubernetesClusterId(), ctrlIP, ctrlMac, in.GetKubernetesForceWatch(), logger.NewORGPrefix(orgID))
clusterID, ctrlIP, ctrlMac, in.GetKubernetesForceWatch(), logger.NewORGPrefix(orgID))
}

viperConfig.Set(CONFIG_KEY_MAX_ESCAPE_DURATION, gAgentInfo.GetDefaultMaxEscapeSecondsStr())
viperConfig.Set(CONFIG_KEY_MAX_MEMORY, gAgentInfo.GetDefaultMaxMemory())
viperConfig.Set(CONFIG_KEY_AGENT_TYPE, VTAP_TYPE_POD_VM)
viperConfig.Set(CONFIG_KEY_ENABLED, false)

return &api.SyncResponse{
Status: &STATUS_SUCCESS,
Expand All @@ -400,10 +404,9 @@ func (e *AgentEvent) noAgentResponse(in *api.SyncRequest, orgID int) *api.SyncRe

agentTypeForUnknowAgent := gAgentInfo.GetTridentTypeForUnknowVTap()
if agentTypeForUnknowAgent != 0 {
dynamicConfigInfo.AgentType = utils.Int2AgentTypePtr(agentTypeForUnknowAgent)
viperConfig.Set(CONFIG_KEY_MAX_ESCAPE_DURATION, gAgentInfo.GetDefaultMaxEscapeSecondsStr())
viperConfig.Set(CONFIG_KEY_MAX_MEMORY, gAgentInfo.GetDefaultMaxMemory())
viperConfig.Set(CONFIG_KEY_AGENT_TYPE, int(agentTypeForUnknowAgent))
viperConfig.Set(CONFIG_KEY_ENABLED, false)
viperConfig.Set(CONFIG_KEY_HYPERVISOR_RESOURCE_ENABLED, true)

return &api.SyncResponse{
Expand All @@ -412,8 +415,6 @@ func (e *AgentEvent) noAgentResponse(in *api.SyncRequest, orgID int) *api.SyncRe
UserConfig: proto.String(e.formateViperConfigToString(viperConfig)),
}
}
// if vtap not exist & not k8s/agent sync, set vtap disable
viperConfig.Set(CONFIG_KEY_ENABLED, false)

return &api.SyncResponse{
Status: &STATUS_SUCCESS,
Expand Down Expand Up @@ -448,6 +449,7 @@ func (e *AgentEvent) pushResponse(in *api.SyncRequest, all bool) (*api.SyncRespo
ctrlIP := in.GetCtrlIp()
ctrlMac := in.GetCtrlMac()
teamIDStr := in.GetTeamId()
clusterID := in.GetKubernetesClusterId()
orgID, teamIDInt := trisolaris.GetOrgInfoByTeamID(teamIDStr)
vtapCacheKey := ctrlIP + "-" + ctrlMac
gAgentInfo := trisolaris.GetORGVTapInfo(orgID)
Expand Down Expand Up @@ -525,19 +527,24 @@ func (e *AgentEvent) pushResponse(in *api.SyncRequest, all bool) (*api.SyncRespo
tapTypes = gAgentInfo.GetCaptureNetworkTypes()
}

dynamicConfig := e.generateDynamicConfig(vtapCache)
dynamicConfig := e.generateDynamicConfig(clusterID, vtapCache)
// 携带信息有cluster_id时选择一个采集器开启云平台同步开关
if in.GetKubernetesClusterId() != "" && isOpenK8sSyn(vtapCache.GetVTapType()) == true {
value := gAgentInfo.GetKubernetesClusterID(in.GetKubernetesClusterId(), vtapCacheKey, in.GetKubernetesForceWatch(), int(in.GetKubernetesWatchPolicy()))
if clusterID != "" && isOpenK8sSyn(vtapCache.GetVTapType()) == true {
value := gAgentInfo.GetKubernetesClusterID(clusterID, vtapCacheKey, in.GetKubernetesForceWatch(), int(in.GetKubernetesWatchPolicy()))
if value == vtapCacheKey {
log.Infof(
"open cluster(%s) kubernetes_api_enabled Agent(ctrl_ip: %s, ctrl_mac: %s, team_id: (str=%s,int=%d), kubernetes_force_watch: %t)",
in.GetKubernetesClusterId(), ctrlIP, ctrlMac, teamIDStr, teamIDInt, in.GetKubernetesForceWatch(), logger.NewORGPrefix(orgID))
clusterID, ctrlIP, ctrlMac, teamIDStr, teamIDInt, in.GetKubernetesForceWatch(), logger.NewORGPrefix(orgID))
dynamicConfig.KubernetesApiEnabled = proto.Bool(true)
}
}

userConfig := e.generateUserConfig(vtapCache, in.GetKubernetesClusterId(), gAgentInfo, orgID)
userConfig := e.generateUserConfig(vtapCache, clusterID, gAgentInfo, orgID)
if userConfig.GetString(CONFIG_KEY_INGESTER_IP) == "" {
dynamicConfig.Enabled = proto.Bool(false)
log.Errorf("agent(%s) has no ingester_ip, "+
"Please check whether the agent allocs tsdb IP or If nat-ip is enabled, whether the tsdb is configured with nat-ip", vtapCache.GetCtrlIP())
}
localSegments := vtapCache.GetAgentLocalSegments()
remoteSegments := vtapCache.GetAgentRemoteSegments()
skipInterface := gAgentInfo.GetAgentSkipInterface(vtapCache)
Expand All @@ -547,7 +554,7 @@ func (e *AgentEvent) pushResponse(in *api.SyncRequest, all bool) (*api.SyncRespo
LocalSegments: localSegments,
RemoteSegments: remoteSegments,
DynamicConfig: dynamicConfig,
UserConfig: proto.String(userConfig),
UserConfig: proto.String(e.formateViperConfigToString(userConfig)),
PlatformData: platformData,
SkipInterface: skipInterface,
VersionPlatformData: proto.Uint64(versionPlatformData),
Expand Down
6 changes: 6 additions & 0 deletions server/controller/trisolaris/utils/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sync"
"time"

"github.com/deepflowio/deepflow/message/agent"
"github.com/deepflowio/deepflow/server/controller/common"
"github.com/deepflowio/deepflow/server/libs/logger"
"github.com/vishvananda/netlink"
Expand Down Expand Up @@ -190,3 +191,8 @@ func Int2Bool(i int) bool {

return true
}

func Int2AgentTypePtr[T int | uint16](i T) *agent.AgentType {
value := agent.AgentType(int32(i))
return &value
}
3 changes: 0 additions & 3 deletions server/controller/trisolaris/vtap/vtap_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,6 @@ func (f *VTapConfig) modifyUserConfig(c *VTapCache) {
log.Error("vtap configure is nil")
return
}
f.UserConfig.Set(CONFIG_KEY_AGENT_TYPE, c.GetVTapType())
f.UserConfig.Set(CONFIG_KEY_ENABLED, Int2Bool(c.GetVTapEnabled()))
f.UserConfig.Set(CONFIG_KEY_HOSTNAME, c.GetVTapHost())
if !f.UserConfig.IsSet(CONFIG_KEY_PROXY_CONTROLLER_IP) {
f.UserConfig.Set(CONFIG_KEY_PROXY_CONTROLLER_IP, c.GetControllerIP())
}
Expand Down
Loading