Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

additional logging #35

Merged
merged 1 commit into from
Aug 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ require (
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd // indirect
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect
golang.org/x/sys v0.0.0-20220307203707-22a9840ba4d7 // indirect
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -759,8 +759,9 @@ golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220111092808-5a964db01320/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220307203707-22a9840ba4d7 h1:8IVLkfbr2cLhv0a/vKq4UFUcJym8RmDoDboxCFWEjYE=
golang.org/x/sys v0.0.0-20220307203707-22a9840ba4d7/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY=
Expand Down
48 changes: 25 additions & 23 deletions pkg/api/runtime/grpc/grpc_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,19 @@ import (
"context"
"errors"
"fmt"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
"k8s.io/klog/v2"
"net"
"rusi/pkg/api/runtime"
"rusi/pkg/messaging"
"rusi/pkg/messaging/serdes"
v1 "rusi/pkg/proto/runtime/v1"
"sync"

"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
"k8s.io/klog/v2"
)

func NewGrpcAPI(port int, serverOptions ...grpc.ServerOption) runtime.Api {
Expand Down Expand Up @@ -133,7 +134,7 @@ func (srv *rusiServerImpl) Subscribe(stream v1.Rusi_SubscribeServer) error {
exit := false
subId := uuid.NewString()
defer srv.removeRefreshChan(subId)
handler := srv.buildSubscribeHandler(stream)
handler := srv.buildSubscribeHandler(stream, request)
for {
hCtx, hCancel := context.WithCancel(context.Background())
unsub, err := srv.subscribeHandler(hCtx, messaging.SubscribeRequest{
Expand Down Expand Up @@ -177,23 +178,23 @@ type subAck struct {
errCh chan error
}

func (srv *rusiServerImpl) buildSubscribeHandler(stream v1.Rusi_SubscribeServer) func(context.Context) messaging.Handler {
func (srv *rusiServerImpl) buildSubscribeHandler(stream v1.Rusi_SubscribeServer, sr *v1.SubscriptionRequest) func(context.Context) messaging.Handler {

subAckMap := map[string]*subAck{}
mu := &sync.RWMutex{}

//monitor incoming ack stream for the current subscription
go startAckReceiverForStream(subAckMap, mu, stream)
go startAckReceiverForStream(subAckMap, mu, stream, sr)

return func(buildCtx context.Context) messaging.Handler {
return func(ctx context.Context, env *messaging.MessageEnvelope) error {
if env.Id == "" {
return errors.New("message id is missing")
return fmt.Errorf("message id is missing for topic %s", env.Subject)
}

errChan := make(chan error)
ackChan := make(chan error)
mu.Lock()
subAckMap[env.Id] = &subAck{nil, errChan}
subAckMap[env.Id] = &subAck{nil, ackChan}
mu.Unlock()
//cleanup
defer func() {
Expand All @@ -220,21 +221,21 @@ func (srv *rusiServerImpl) buildSubscribeHandler(stream v1.Rusi_SubscribeServer)
select {
//handler builder closed context
case <-buildCtx.Done():
klog.V(4).InfoS("Context done before ack", "message", buildCtx.Err())
klog.V(4).InfoS("Context done before ack", "message", buildCtx.Err(), "topic", env.Subject)
return buildCtx.Err()
//subscriber context is done
case <-ctx.Done():
klog.V(4).InfoS("Context done before ack", "message", ctx.Err())
klog.V(4).InfoS("Context done before ack", "message", ctx.Err(), "topic", env.Subject)
return ctx.Err()
case err = <-errChan:
case err = <-ackChan:
klog.V(4).InfoS("Ack sent to pubsub", "topic", env.Subject, "Id", env.Id, "error", err)
return err
}
}
}
}

func startAckReceiverForStream(subAckMap map[string]*subAck, mu *sync.RWMutex, stream v1.Rusi_SubscribeServer) {
func startAckReceiverForStream(subAckMap map[string]*subAck, mu *sync.RWMutex, stream v1.Rusi_SubscribeServer, sr *v1.SubscriptionRequest) {

//wait for ack from the client
for {
Expand All @@ -245,20 +246,21 @@ func startAckReceiverForStream(subAckMap map[string]*subAck, mu *sync.RWMutex, s
default:
r, err := stream.Recv() //blocks
if err != nil {
klog.V(4).ErrorS(err, "ack stream error")
klog.V(4).ErrorS(err, "ack stream error", "topic", sr.GetTopic())
break
}
if r.GetAckRequest() == nil {
klog.V(4).InfoS("invalid ack response")
ar := r.GetAckRequest()
if ar == nil {
klog.V(4).InfoS("invalid ack response", "topic", sr.GetTopic())
break
}
if r.GetAckRequest().GetError() != "" {
err = errors.New(r.GetAckRequest().GetError())
if ar.GetError() != "" {
err = errors.New(ar.GetError())
}

mu.RLock()
mid := r.GetAckRequest().GetMessageId()
klog.V(4).InfoS("Ack received for message", "Id", mid)
mid := ar.GetMessageId()
klog.V(4).InfoS("Ack received for message", "Id", mid, "topic", sr.GetTopic())
for id, ack := range subAckMap {
if id == mid {
if ack.ackHandler != nil {
Expand Down
7 changes: 6 additions & 1 deletion pkg/messaging/nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ import (
"context"
"errors"
"fmt"
"github.com/nats-io/nats.go"
"math/rand"
"rusi/pkg/healthcheck"
"rusi/pkg/messaging"
"rusi/pkg/messaging/serdes"
"strconv"
"time"

"github.com/nats-io/nats.go"

"k8s.io/klog/v2"

stan "github.com/nats-io/stan.go"
Expand Down Expand Up @@ -272,6 +273,10 @@ func (n *natsStreamingPubSub) Subscribe(topic string, handler messaging.Handler,
subs, err = n.natStreamingConn.QueueSubscribe(topic, n.options.natsQueueGroupName, natsMsgHandler, stanOptions...)
}

if err != nil {
klog.ErrorS(err, "nats-streaming: subscribe error", "topic", topic)
}

if err != nil || subs == nil {
return nil, fmt.Errorf("nats-streaming: subscribe error %s", err)
}
Expand Down