Skip to content

Commit

Permalink
feat: update trisolaris dynamic config
Browse files Browse the repository at this point in the history
  • Loading branch information
askyrie committed Nov 5, 2024
1 parent e0ca365 commit aedece7
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 42 deletions.
3 changes: 0 additions & 3 deletions server/controller/trisolaris/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,6 @@ const (
)

const (
CONFIG_KEY_ENABLED = "global.common.enabled"
CONFIG_KEY_HOSTNAME = "global.self_monitoring.hostname"
CONFIG_KEY_AGENT_TYPE = "global.common.agent_type"
CONFIG_KEY_MAX_MEMORY = "global.limits.max_memory"
CONFIG_KEY_MAX_ESCAPE_DURATION = "global.communication.max_escape_duration"
CONFIG_KEY_INGESTER_IP = "global.communication.ingester_ip"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,13 @@ func NewAgentEvent() *AgentEvent {
return &AgentEvent{}
}

func (e *AgentEvent) generateUserConfig(c *vtap.VTapCache, clusterID string, gAgentInfo *vtap.VTapInfo, orgID int) string {
func (e *AgentEvent) generateUserConfig(c *vtap.VTapCache, clusterID string, gAgentInfo *vtap.VTapInfo, orgID int) *viper.Viper {
userConfig := c.GetUserConfig()
viperConfig := viper.New()
viperConfig.SetConfigType("yaml")
if err := viperConfig.ReadConfig(bytes.NewBufferString(userConfig)); err != nil {
log.Errorf("viper read agent(%d) config yaml error: %v", c.GetVTapID(), err)
}
if clusterID != "" { // if agent report cluster_id, force set tridentType = VTAP_TYPE_POD_VM
viperConfig.Set(CONFIG_KEY_AGENT_TYPE, VTAP_TYPE_POD_VM)
}

configTSDBIP := gAgentInfo.GetConfigTSDBIP()
if configTSDBIP != "" {
Expand All @@ -91,24 +88,24 @@ func (e *AgentEvent) generateUserConfig(c *vtap.VTapCache, clusterID string, gAg
log.Errorf("agent(%s) has no proxy_controller_ip, "+
"Please check whether the agent allocs controller IP or If nat-ip is enabled, whether the controller is configured with nat-ip", c.GetCtrlIP())
}
if viperConfig.GetString(CONFIG_KEY_INGESTER_IP) == "" {
viperConfig.Set(CONFIG_KEY_ENABLED, false)
log.Errorf("agent(%s) has no tsdb_ip, "+
"Please check whether the agent allocs tsdb IP or If nat-ip is enabled, whether the tsdb is configured with nat-ip", c.GetCtrlIP())
}

if c.GetVTapEnabled() == 0 {
viperConfig.Set(CONFIG_KEY_HYPERVISOR_RESOURCE_ENABLED, false)
viperConfig.Set(CONFIG_KEY_ENABLED, false)
}

return e.formateViperConfigToString(viperConfig)
return viperConfig
}

func (e *AgentEvent) generateDynamicConfig(c *vtap.VTapCache) *api.DynamicConfig {

func (e *AgentEvent) generateDynamicConfig(clusterID string, c *vtap.VTapCache) *api.DynamicConfig {
agentType := c.GetVTapType()
if clusterID != "" { // if agent report cluster_id, force set tridentType = VTAP_TYPE_POD_VM
agentType = VTAP_TYPE_POD_VM
}
return &api.DynamicConfig{
AgentType: &api.AgentType{agentType},
Enabled: proto.Bool(c.GetVTapEnabled() != 0),
KubernetesApiEnabled: proto.Bool(false),
Hostname: proto.String(c.GetVTapHost()),
RegionId: proto.Uint32(uint32(c.GetRegionID())),
PodClusterId: proto.Uint32(uint32(c.GetPodClusterID())),
VpcId: proto.Uint32(uint32(c.GetVPCID())),
Expand Down Expand Up @@ -180,6 +177,7 @@ func (e *AgentEvent) Sync(ctx context.Context, in *api.SyncRequest) (*api.SyncRe
ctrlIP := in.GetCtrlIp()
ctrlMac := in.GetCtrlMac()
teamIDStr := in.GetTeamId()
clusterID := in.GetKubernetesClusterId()
orgID, teamIDInt := trisolaris.GetOrgInfoByTeamID(teamIDStr)
gAgentInfo := trisolaris.GetORGVTapInfo(orgID)
if gAgentInfo == nil {
Expand All @@ -200,7 +198,7 @@ func (e *AgentEvent) Sync(ctx context.Context, in *api.SyncRequest) (*api.SyncRe
}
log.Warningf("vtap (ctrl_ip: %s, ctrl_mac: %s, team_id: (str=%s,int=%d), host_ips: %s, kubernetes_cluster_id: %s, kubernetes_force_watch: %t, group_id: %s) not found in cache. "+
"NAME:%s REVISION:%s BOOT_TIME:%d",
ctrlIP, ctrlMac, teamIDStr, teamIDInt, in.GetHostIps(), in.GetKubernetesClusterId(), in.GetKubernetesForceWatch(),
ctrlIP, ctrlMac, teamIDStr, teamIDInt, in.GetHostIps(), clusterID, in.GetKubernetesForceWatch(),
in.GetAgentGroupIdRequest(), in.GetProcessName(), in.GetRevision(), in.GetBootTime(), logger.NewORGPrefix(orgID))
// If the kubernetes_force_watch field is true, the ctrl_ip and ctrl_mac of the vtap will not change,
// resulting in unsuccessful registration and a large number of error logs.
Expand Down Expand Up @@ -303,19 +301,24 @@ func (e *AgentEvent) Sync(ctx context.Context, in *api.SyncRequest) (*api.SyncRe
tapTypes = gAgentInfo.GetCaptureNetworkTypes()
}

dynamicConfig := e.generateDynamicConfig(vtapCache)
dynamicConfig := e.generateDynamicConfig(clusterID, vtapCache)
// 携带信息有cluster_id时选择一个采集器开启云平台同步开关
if in.GetKubernetesClusterId() != "" && isOpenK8sSyn(vtapCache.GetVTapType()) == true {
value := gAgentInfo.GetKubernetesClusterID(in.GetKubernetesClusterId(), vtapCacheKey, in.GetKubernetesForceWatch(), int(in.GetKubernetesWatchPolicy()))
if clusterID != "" && isOpenK8sSyn(vtapCache.GetVTapType()) == true {
value := gAgentInfo.GetKubernetesClusterID(clusterID, vtapCacheKey, in.GetKubernetesForceWatch(), int(in.GetKubernetesWatchPolicy()))
if value == vtapCacheKey {
log.Infof(
"open cluster(%s) kubernetes_api_enabled Agent(ctrl_ip: %s, ctrl_mac: %s, team_id: (str=%s,int=%d), kubernetes_force_watch: %t)",
in.GetKubernetesClusterId(), ctrlIP, ctrlMac,
clusterID, ctrlIP, ctrlMac,
teamIDStr, teamIDInt, in.GetKubernetesForceWatch(), logger.NewORGPrefix(orgID))
dynamicConfig.KubernetesApiEnabled = proto.Bool(true)
}
}
userConfig := e.generateUserConfig(vtapCache, in.GetKubernetesClusterId(), gAgentInfo, orgID)
userConfig := e.generateUserConfig(vtapCache, clusterID, gAgentInfo, orgID)
if userConfig.GetString(CONFIG_KEY_INGESTER_IP) == "" {
dynamicConfig.Enabled = proto.Bool(false)
log.Errorf("agent(%s) has no ingester_ip, "+
"Please check whether the agent allocs tsdb IP or If nat-ip is enabled, whether the tsdb is configured with nat-ip", vtapCache.GetCtrlIP())
}
localSegments := vtapCache.GetAgentLocalSegments()
remoteSegments := vtapCache.GetAgentRemoteSegments()
upgradeRevision := vtapCache.GetExpectedRevision()
Expand All @@ -325,7 +328,7 @@ func (e *AgentEvent) Sync(ctx context.Context, in *api.SyncRequest) (*api.SyncRe
Status: &STATUS_SUCCESS,
LocalSegments: localSegments,
RemoteSegments: remoteSegments,
UserConfig: proto.String(userConfig),
UserConfig: proto.String(e.formateViperConfigToString(userConfig)),
DynamicConfig: dynamicConfig,
PlatformData: platformData,
Groups: groups,
Expand All @@ -343,6 +346,7 @@ func (e *AgentEvent) Sync(ctx context.Context, in *api.SyncRequest) (*api.SyncRe

func (e *AgentEvent) generateNoAgentCacheDynamicConfig() *api.DynamicConfig {
return &api.DynamicConfig{
Enabled: proto.Bool(false),
KubernetesApiEnabled: proto.Bool(false),
}
}
Expand Down Expand Up @@ -374,22 +378,21 @@ func (e *AgentEvent) noAgentResponse(in *api.SyncRequest, orgID int) *api.SyncRe
ctrlMac := in.GetCtrlMac()
vtapCacheKey := ctrlIP + "-" + ctrlMac

clusterID := in.GetKubernetesClusterId()
dynamicConfigInfo := e.generateNoAgentCacheDynamicConfig()
viperConfig := e.generateNoAgentCacheUserViperConfig(in.GetAgentGroupIdRequest(), orgID)
gAgentInfo := trisolaris.GetORGVTapInfo(orgID)
if in.GetKubernetesClusterId() != "" {
value := gAgentInfo.GetKubernetesClusterID(in.GetKubernetesClusterId(), vtapCacheKey, in.GetKubernetesForceWatch(), int(in.GetKubernetesWatchPolicy()))
if clusterID != "" {
dynamicConfigInfo.AgentType = &api.AgentType{VTAP_TYPE_POD_VM}
value := gAgentInfo.GetKubernetesClusterID(clusterID, vtapCacheKey, in.GetKubernetesForceWatch(), int(in.GetKubernetesWatchPolicy()))
if value == vtapCacheKey {
dynamicConfigInfo.KubernetesApiEnabled = proto.Bool(true)
log.Infof(
"open cluster(%s) kubernetes_api_enabled Agent(ctrl_ip: %s, ctrl_mac: %s, kubernetes_force_watch: %t)",
in.GetKubernetesClusterId(), ctrlIP, ctrlMac, in.GetKubernetesForceWatch(), logger.NewORGPrefix(orgID))
clusterID, ctrlIP, ctrlMac, in.GetKubernetesForceWatch(), logger.NewORGPrefix(orgID))
}

viperConfig.Set(CONFIG_KEY_MAX_ESCAPE_DURATION, gAgentInfo.GetDefaultMaxEscapeSecondsStr())
viperConfig.Set(CONFIG_KEY_MAX_MEMORY, gAgentInfo.GetDefaultMaxMemory())
viperConfig.Set(CONFIG_KEY_AGENT_TYPE, VTAP_TYPE_POD_VM)
viperConfig.Set(CONFIG_KEY_ENABLED, false)

return &api.SyncResponse{
Status: &STATUS_SUCCESS,
Expand All @@ -400,10 +403,9 @@ func (e *AgentEvent) noAgentResponse(in *api.SyncRequest, orgID int) *api.SyncRe

agentTypeForUnknowAgent := gAgentInfo.GetTridentTypeForUnknowVTap()
if agentTypeForUnknowAgent != 0 {
dynamicConfigInfo.AgentType = &api.AgentType{agentTypeForUnknowAgent}
viperConfig.Set(CONFIG_KEY_MAX_ESCAPE_DURATION, gAgentInfo.GetDefaultMaxEscapeSecondsStr())
viperConfig.Set(CONFIG_KEY_MAX_MEMORY, gAgentInfo.GetDefaultMaxMemory())
viperConfig.Set(CONFIG_KEY_AGENT_TYPE, int(agentTypeForUnknowAgent))
viperConfig.Set(CONFIG_KEY_ENABLED, false)
viperConfig.Set(CONFIG_KEY_HYPERVISOR_RESOURCE_ENABLED, true)

return &api.SyncResponse{
Expand All @@ -412,8 +414,6 @@ func (e *AgentEvent) noAgentResponse(in *api.SyncRequest, orgID int) *api.SyncRe
UserConfig: proto.String(e.formateViperConfigToString(viperConfig)),
}
}
// if vtap not exist & not k8s/agent sync, set vtap disable
viperConfig.Set(CONFIG_KEY_ENABLED, false)

return &api.SyncResponse{
Status: &STATUS_SUCCESS,
Expand Down Expand Up @@ -448,6 +448,7 @@ func (e *AgentEvent) pushResponse(in *api.SyncRequest, all bool) (*api.SyncRespo
ctrlIP := in.GetCtrlIp()
ctrlMac := in.GetCtrlMac()
teamIDStr := in.GetTeamId()
clusterID := in.GetKubernetesClusterId()
orgID, teamIDInt := trisolaris.GetOrgInfoByTeamID(teamIDStr)
vtapCacheKey := ctrlIP + "-" + ctrlMac
gAgentInfo := trisolaris.GetORGVTapInfo(orgID)
Expand Down Expand Up @@ -525,19 +526,24 @@ func (e *AgentEvent) pushResponse(in *api.SyncRequest, all bool) (*api.SyncRespo
tapTypes = gAgentInfo.GetCaptureNetworkTypes()
}

dynamicConfig := e.generateDynamicConfig(vtapCache)
dynamicConfig := e.generateDynamicConfig(clusterID, vtapCache)
// 携带信息有cluster_id时选择一个采集器开启云平台同步开关
if in.GetKubernetesClusterId() != "" && isOpenK8sSyn(vtapCache.GetVTapType()) == true {
value := gAgentInfo.GetKubernetesClusterID(in.GetKubernetesClusterId(), vtapCacheKey, in.GetKubernetesForceWatch(), int(in.GetKubernetesWatchPolicy()))
if clusterID != "" && isOpenK8sSyn(vtapCache.GetVTapType()) == true {
value := gAgentInfo.GetKubernetesClusterID(clusterID, vtapCacheKey, in.GetKubernetesForceWatch(), int(in.GetKubernetesWatchPolicy()))
if value == vtapCacheKey {
log.Infof(
"open cluster(%s) kubernetes_api_enabled Agent(ctrl_ip: %s, ctrl_mac: %s, team_id: (str=%s,int=%d), kubernetes_force_watch: %t)",
in.GetKubernetesClusterId(), ctrlIP, ctrlMac, teamIDStr, teamIDInt, in.GetKubernetesForceWatch(), logger.NewORGPrefix(orgID))
clusterID, ctrlIP, ctrlMac, teamIDStr, teamIDInt, in.GetKubernetesForceWatch(), logger.NewORGPrefix(orgID))
dynamicConfig.KubernetesApiEnabled = proto.Bool(true)
}
}

userConfig := e.generateUserConfig(vtapCache, in.GetKubernetesClusterId(), gAgentInfo, orgID)
userConfig := e.generateUserConfig(vtapCache, clusterID, gAgentInfo, orgID)
if userConfig.GetString(CONFIG_KEY_INGESTER_IP) == "" {
dynamicConfig.Enabled = proto.Bool(false)
log.Errorf("agent(%s) has no ingester_ip, "+
"Please check whether the agent allocs tsdb IP or If nat-ip is enabled, whether the tsdb is configured with nat-ip", vtapCache.GetCtrlIP())
}
localSegments := vtapCache.GetAgentLocalSegments()
remoteSegments := vtapCache.GetAgentRemoteSegments()
skipInterface := gAgentInfo.GetAgentSkipInterface(vtapCache)
Expand All @@ -547,7 +553,7 @@ func (e *AgentEvent) pushResponse(in *api.SyncRequest, all bool) (*api.SyncRespo
LocalSegments: localSegments,
RemoteSegments: remoteSegments,
DynamicConfig: dynamicConfig,
UserConfig: proto.String(userConfig),
UserConfig: proto.String(e.formateViperConfigToString(userConfig)),
PlatformData: platformData,
SkipInterface: skipInterface,
VersionPlatformData: proto.Uint64(versionPlatformData),
Expand Down
3 changes: 0 additions & 3 deletions server/controller/trisolaris/vtap/vtap_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,6 @@ func (f *VTapConfig) modifyUserConfig(c *VTapCache) {
log.Error("vtap configure is nil")
return
}
f.UserConfig.Set(CONFIG_KEY_AGENT_TYPE, c.GetVTapType())
f.UserConfig.Set(CONFIG_KEY_ENABLED, Int2Bool(c.GetVTapEnabled()))
f.UserConfig.Set(CONFIG_KEY_HOSTNAME, c.GetVTapHost())
if !f.UserConfig.IsSet(CONFIG_KEY_PROXY_CONTROLLER_IP) {
f.UserConfig.Set(CONFIG_KEY_PROXY_CONTROLLER_IP, c.GetControllerIP())
}
Expand Down

0 comments on commit aedece7

Please sign in to comment.