Skip to content

Commit

Permalink
feature: copies the auto-ingestered domain api for agent file
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhengYa-0110 committed Oct 28, 2024
1 parent 0386bb7 commit 6e46cec
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 0 deletions.
1 change: 1 addition & 0 deletions server/controller/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/deepflowio/deepflow/server/controller/election"
"github.com/deepflowio/deepflow/server/controller/genesis"
"github.com/deepflowio/deepflow/server/controller/grpc"
_ "github.com/deepflowio/deepflow/server/controller/grpc/agent"
_ "github.com/deepflowio/deepflow/server/controller/grpc/controller"
_ "github.com/deepflowio/deepflow/server/controller/grpc/synchronizer"
"github.com/deepflowio/deepflow/server/controller/http"
Expand Down
64 changes: 64 additions & 0 deletions server/controller/grpc/agent/k8s_cluster_id.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2024 Yunshan Networks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package synchronize

import (
"fmt"

api "github.com/deepflowio/deepflow/message/agent"
context "golang.org/x/net/context"

"github.com/deepflowio/deepflow/server/controller/common"
grpccommon "github.com/deepflowio/deepflow/server/controller/grpc/common"
"github.com/deepflowio/deepflow/server/controller/trisolaris"
)

type KubernetesClusterIDEvent struct {
}

func NewKubernetesClusterIDEvent() *KubernetesClusterIDEvent {
return &KubernetesClusterIDEvent{}
}

func (k *KubernetesClusterIDEvent) GetKubernetesClusterID(ctx context.Context, in *api.KubernetesClusterIDRequest) (*api.KubernetesClusterIDResponse, error) {
remote := grpccommon.GetRemote(ctx)
log.Infof("call me from ip: %s to get kubernetes cluster_id", remote)
log.Debugf("ca_md5: %#v", in.GetCaMd5())

clusterID, err := common.GenerateKuberneteClusterIDByMD5(in.GetCaMd5())
if err != nil {
errorMsg := err.Error()
log.Error(errorMsg)
return &api.KubernetesClusterIDResponse{ErrorMsg: &errorMsg}, nil
}
if !trisolaris.GetConfig().DomainAutoRegister {
return &api.KubernetesClusterIDResponse{ClusterId: &clusterID}, nil
}

// cache clusterID & create kubernetes domain
kubernetesInfo := trisolaris.GetGKubernetesInfo(in.GetTeamId())
if kubernetesInfo == nil {
errorMsg := fmt.Sprintf("failed to get kubernetes info for team_id: %d", in.GetTeamId())
log.Error(errorMsg)
return &api.KubernetesClusterIDResponse{ErrorMsg: &errorMsg}, nil
}
kubernetesInfo.CacheClusterID(in.GetTeamId(), clusterID, in.GetKubernetesClusterName())

log.Infof("response kubernetes cluster_id: %s to ip: %s", clusterID, remote)
log.Debugf("ca_md5: %#v", in.GetCaMd5())
return &api.KubernetesClusterIDResponse{ClusterId: &clusterID}, nil
}
95 changes: 95 additions & 0 deletions server/controller/grpc/agent/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright (c) 2024 Yunshan Networks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package synchronize

import (
"time"

"github.com/op/go-logging"
"golang.org/x/net/context"
"google.golang.org/grpc"

api "github.com/deepflowio/deepflow/message/agent"

grpcserver "github.com/deepflowio/deepflow/server/controller/grpc"
"github.com/deepflowio/deepflow/server/controller/grpc/statsd"
)

var log = logging.MustGetLogger("grpc.agent")

type service struct {
kubernetesClusterIDEvent *KubernetesClusterIDEvent
}

func init() {
grpcserver.Add(newService())
}

func newService() *service {
return &service{
kubernetesClusterIDEvent: NewKubernetesClusterIDEvent(),
}
}

func (s *service) Register(gs *grpc.Server) error {
api.RegisterSynchronizerServer(gs, s)
return nil
}

func (s *service) Sync(ctx context.Context, in *api.SyncRequest) (*api.SyncResponse, error) {
return &api.SyncResponse{}, nil
}

func (s *service) Push(r *api.SyncRequest, in api.Synchronizer_PushServer) error {
return nil
}

func (s *service) Upgrade(r *api.UpgradeRequest, in api.Synchronizer_UpgradeServer) error {
return nil
}

func (s *service) Query(ctx context.Context, in *api.NtpRequest) (*api.NtpResponse, error) {
return &api.NtpResponse{}, nil
}

func (s *service) GenesisSync(ctx context.Context, in *api.GenesisSyncRequest) (*api.GenesisSyncResponse, error) {
return &api.GenesisSyncResponse{}, nil
}

func (s *service) KubernetesAPISync(ctx context.Context, in *api.KubernetesAPISyncRequest) (*api.KubernetesAPISyncResponse, error) {
return &api.KubernetesAPISyncResponse{}, nil
}

func (s *service) GetKubernetesClusterID(ctx context.Context, in *api.KubernetesClusterIDRequest) (*api.KubernetesClusterIDResponse, error) {
startTime := time.Now()
defer func() {
statsd.AddGrpcCostStatsd(statsd.GetKubernetesClusterID, int(time.Now().Sub(startTime).Milliseconds()))
}()
return s.kubernetesClusterIDEvent.GetKubernetesClusterID(ctx, in)
}

func (s *service) GPIDSync(ctx context.Context, in *api.GPIDSyncRequest) (*api.GPIDSyncResponse, error) {
return &api.GPIDSyncResponse{}, nil
}

func (s *service) Plugin(r *api.PluginRequest, in api.Synchronizer_PluginServer) error {
return nil
}

func (s *service) RemoteExecute(in api.Synchronizer_RemoteExecuteServer) error {
return nil
}
29 changes: 29 additions & 0 deletions server/controller/grpc/common/peer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2024 Yunshan Networks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package common

import (
"golang.org/x/net/context"
"google.golang.org/grpc/peer"
)

func GetRemote(ctx context.Context) string {
remote := ""
peerIP, _ := peer.FromContext(ctx)
remote = peerIP.Addr.String()
return remote
}

0 comments on commit 6e46cec

Please sign in to comment.