Skip to content

Commit

Permalink
add more logs
Browse files Browse the repository at this point in the history
  • Loading branch information
lghinet committed Dec 6, 2023
1 parent 898585a commit 7ec2214
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 29 deletions.
54 changes: 29 additions & 25 deletions pkg/operator/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
jsoniter "github.com/json-iterator/go"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"k8s.io/klog/v2"
"rusi/internal/kube"
"rusi/pkg/custom-resource/components"
Expand All @@ -13,33 +13,34 @@ import (
"sync"
)

var conn *grpc.ClientConn
func newClient(ctx context.Context, address string) (operatorv1.RusiOperatorClient, error) {
//var retryPolicy = `{
// "methodConfig": [{
// "name": [{"service": "rusi.proto.operator.v1.RusiOperator"}],
// "waitForReady": true,
// "retryPolicy": {
// "MaxAttempts": 4,
// "InitialBackoff": ".01s",
// "MaxBackoff": ".01s",
// "BackoffMultiplier": 1.0,
// "RetryableStatusCodes": [ "UNAVAILABLE" ]
// }
// }]}`

func newClient(ctx context.Context, address string) (cl operatorv1.RusiOperatorClient, err error) {
var retryPolicy = `{
"methodConfig": [{
"name": [{"service": "rusi.proto.operator.v1.RusiOperator"}],
"waitForReady": true,
"retryPolicy": {
"MaxAttempts": 4,
"InitialBackoff": ".01s",
"MaxBackoff": ".01s",
"BackoffMultiplier": 1.0,
"RetryableStatusCodes": [ "UNAVAILABLE" ]
}
}]}`
//conn, err = grpc.DialContext(ctx, address, grpc.WithInsecure(), grpc.WithDefaultServiceConfig(retryPolicy))

if conn == nil {
conn, err = grpc.DialContext(ctx, address, grpc.WithInsecure(), grpc.WithDefaultServiceConfig(retryPolicy))
if err != nil {
return nil, err
}
conn, conErr := grpc.DialContext(ctx, address, grpc.WithTransportCredentials(insecure.NewCredentials()))
if conErr != nil {
return nil, conErr
}
return operatorv1.NewRusiOperatorClient(conn), nil
}

func GetComponentsWatcher(ctx context.Context, address string, wg *sync.WaitGroup) func(context.Context) (<-chan components.Spec, error) {
client, _ := newClient(ctx, address)
client, err := newClient(ctx, address)
if err != nil {
klog.ErrorS(err, "error creating grpc operator client")
}
return func(ctx context.Context) (<-chan components.Spec, error) {
c := make(chan components.Spec)
namespace := kube.GetCurrentNamespace()
Expand Down Expand Up @@ -84,7 +85,10 @@ func GetComponentsWatcher(ctx context.Context, address string, wg *sync.WaitGrou
}

func GetConfigurationWatcher(ctx context.Context, address, configName string, wg *sync.WaitGroup) func(context.Context) (<-chan configuration.Spec, error) {
client, _ := newClient(ctx, address)
client, err := newClient(ctx, address)
if err != nil {
klog.ErrorS(err, "error creating grpc operator client")
}
return func(ctx context.Context) (<-chan configuration.Spec, error) {
c := make(chan configuration.Spec)
namespace := kube.GetCurrentNamespace()
Expand Down Expand Up @@ -125,6 +129,6 @@ func GetConfigurationWatcher(ctx context.Context, address, configName string, wg
}
}

func IsOperatorClientAlive() bool {
return conn != nil && conn.GetState() == connectivity.Ready
}
//func IsOperatorClientAlive() bool {
// return conn != nil && conn.GetState() == connectivity.Ready
//}
4 changes: 0 additions & 4 deletions pkg/runtime/components_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,7 @@ func NewComponentsManager(ctx context.Context, appId string,
for _, opt := range opts {
opt(&runtimeOpts)
}
klog.V(4).InfoS("Components channel creation started")

compChan, err := componentsLoader(ctx)
klog.V(4).InfoS("Components channel created")

if err != nil {
klog.ErrorS(err, "error loading components")
Expand All @@ -61,7 +58,6 @@ func NewComponentsManager(ctx context.Context, appId string,

manager.pubSubRegistry.Register(runtimeOpts.pubsubs...)
manager.middlewareRegistry.Register(runtimeOpts.pubsubMiddleware...)
klog.V(4).InfoS("Components added to registry")

go manager.watchComponentsUpdates()

Expand Down

0 comments on commit 7ec2214

Please sign in to comment.