Skip to content

Commit

Permalink
Merge pull request #832 from jlandowner/event-watch3
Browse files Browse the repository at this point in the history
  • Loading branch information
oruharo authored Jun 4, 2024
2 parents ec4bcdd + 1da43c2 commit 9f8359c
Show file tree
Hide file tree
Showing 55 changed files with 4,167 additions and 1,407 deletions.
35 changes: 8 additions & 27 deletions charts/cosmo/templates/dashboard/roles.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,26 +37,6 @@ rules:
resources:
- workspaces
- users
verbs:
- create
- delete
- patch
- update
- get
- list
- watch
- apiGroups:
- cosmo-workspace.github.io
resources:
- workspaces/status
- users/status
verbs:
- get
- list
- watch
- apiGroups:
- cosmo-workspace.github.io
resources:
- instances
- templates
- clusterinstances
Expand All @@ -70,27 +50,28 @@ rules:
- list
- watch
- apiGroups:
- cosmo-workspace.github.io
- events.k8s.io
resources:
- instances/status
- events
verbs:
- get
- list
- watch
- update
- apiGroups:
- events.k8s.io
- authentication.k8s.io
resources:
- events
- tokenreviews
verbs:
- create
- get
- list
- watch
- apiGroups:
- authentication.k8s.io
- '*'
resources:
- tokenreviews
- '*'
verbs:
- create
- get
- list
- watch
Expand Down
21 changes: 9 additions & 12 deletions internal/cmd/user/get_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"time"

"github.com/spf13/cobra"
"google.golang.org/protobuf/types/known/timestamppb"
"k8s.io/utils/ptr"

"github.com/cosmo-workspace/cosmo/api/v1alpha1"
"github.com/cosmo-workspace/cosmo/pkg/apiconv"
Expand Down Expand Up @@ -88,35 +86,34 @@ func (o *GetEventsOption) RunE(cmd *cobra.Command, args []string) error {
}

func (o *GetEventsOption) GetEventsWithDashClient(ctx context.Context) ([]*dashv1alpha1.Event, error) {
req := &dashv1alpha1.GetUserRequest{
req := &dashv1alpha1.GetEventsRequest{
UserName: o.UserName,
WithRaw: ptr.To(true),
}
c := o.CosmoDashClient
res, err := c.UserServiceClient.GetUser(ctx, cli.NewRequestWithToken(req, o.CliConfig))
res, err := c.UserServiceClient.GetEvents(ctx, cli.NewRequestWithToken(req, o.CliConfig))
if err != nil {
return nil, fmt.Errorf("failed to get user: %w", err)
return nil, fmt.Errorf("failed to get events: %w", err)
}
o.Logr.DebugAll().Info("UserServiceClient.GetUser", "res", res)
return res.Msg.User.Events, nil
return res.Msg.Items, nil
}

func (o *GetEventsOption) OutputTable(w io.Writer, events []*dashv1alpha1.Event) {
data := [][]string{}

for _, v := range events {
data = append(data, []string{lastSeen(v.EventTime, v.Series), v.Type, v.Reason, regarding(v.Regarding), v.ReportingController, v.Note})
data = append(data, []string{lastSeen(v.Series), v.Type, v.Reason, regarding(v.Regarding), v.ReportingController, v.Note})
}
cli.OutputTable(w,
[]string{"LAST SEEN", "TYPE", "REASON", "OBJECT", "REPORTER", "MESSAGE"},
data)
}

func lastSeen(t *timestamppb.Timestamp, series *dashv1alpha1.EventSeries) string {
if series != nil {
return fmt.Sprintf("%s (%vx)", time.Since(t.AsTime()).Round(time.Second), series.Count)
func lastSeen(series *dashv1alpha1.EventSeries) string {
if series.Count > 0 {
return fmt.Sprintf("%s (%vx)", time.Since(series.LastObservedTime.AsTime()).Round(time.Second), series.Count)
}
return time.Since(t.AsTime()).Round(time.Second).String()
return time.Since(series.LastObservedTime.AsTime()).Round(time.Second).String()
}

func regarding(v *dashv1alpha1.ObjectReference) string {
Expand Down
2 changes: 1 addition & 1 deletion internal/dashboard/auth_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

func (s *Server) AuthServiceHandler(mux *http.ServeMux) {
path, handler := dashboardv1alpha1connect.NewAuthServiceHandler(s)
mux.Handle(path, s.contextMiddleware(handler))
mux.Handle(path, s.timeoutHandler(s.contextMiddleware(handler)))
}

func (s *Server) CreateSession(w http.ResponseWriter, r *http.Request, sesInfo session.Info) error {
Expand Down
54 changes: 38 additions & 16 deletions internal/dashboard/auth_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"net/http"
"time"

"github.com/bufbuild/connect-go"
connect_go "github.com/bufbuild/connect-go"
apierrs "k8s.io/apimachinery/pkg/api/errors"

cosmov1alpha1 "github.com/cosmo-workspace/cosmo/api/v1alpha1"
Expand Down Expand Up @@ -52,25 +52,47 @@ func (s *Server) contextMiddleware(next http.Handler) http.Handler {
})
}

func (s *Server) authorizationInterceptor() connect.UnaryInterceptorFunc {
interceptor := func(next connect.UnaryFunc) connect.UnaryFunc {
return connect.UnaryFunc(func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
type authorizationInterceptorFunc func(ctx context.Context) (loginUser *cosmov1alpha1.User, deadline time.Time, err error)

log := clog.FromContext(ctx).WithName("authorization")
// WrapUnary implements [Interceptor] with an authorization.
func (f authorizationInterceptorFunc) WrapUnary(next connect_go.UnaryFunc) connect_go.UnaryFunc {
return connect_go.UnaryFunc(func(ctx context.Context, req connect_go.AnyRequest) (connect_go.AnyResponse, error) {
log := clog.FromContext(ctx).WithName("unary authorization")

loginUser, deadline, err := s.verifyAndGetLoginUser(ctx)
if err != nil {
return nil, ErrResponse(log, err)
}
loginUser, deadline, err := f(ctx)
if err != nil {
return nil, ErrResponse(log, err)
}

ctx = newContextWithCaller(ctx, loginUser)
ctx, cancel := context.WithDeadline(ctx, deadline)
defer cancel()
ctx = newContextWithCaller(ctx, loginUser)
ctx, cancel := context.WithDeadline(ctx, deadline)
defer cancel()

return next(ctx, req)
})
}
return connect.UnaryInterceptorFunc(interceptor)
return next(ctx, req)
})
}

// WrapStreamingClient implements [Interceptor] with a no-op.
func (f authorizationInterceptorFunc) WrapStreamingClient(next connect_go.StreamingClientFunc) connect_go.StreamingClientFunc {
return next
}

// WrapStreamingHandler implements [Interceptor] with an authorization.
func (f authorizationInterceptorFunc) WrapStreamingHandler(next connect_go.StreamingHandlerFunc) connect_go.StreamingHandlerFunc {
return connect_go.StreamingHandlerFunc(func(ctx context.Context, conn connect_go.StreamingHandlerConn) error {
log := clog.FromContext(ctx).WithName("streaming handler authorization")

loginUser, deadline, err := f(ctx)
if err != nil {
return ErrResponse(log, err)
}

ctx = newContextWithCaller(ctx, loginUser)
ctx, cancel := context.WithDeadline(ctx, deadline)
defer cancel()

return next(ctx, conn)
})
}

func (s *Server) verifyAndGetLoginUser(ctx context.Context) (loginUser *cosmov1alpha1.User, deadline time.Time, err error) {
Expand Down
169 changes: 169 additions & 0 deletions internal/dashboard/event_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package dashboard

import (
"context"
"crypto/sha256"
"fmt"
"net/http"
"sync"
"time"

eventv1 "k8s.io/api/events/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

connect_go "github.com/bufbuild/connect-go"

cosmov1alpha1 "github.com/cosmo-workspace/cosmo/api/v1alpha1"
"github.com/cosmo-workspace/cosmo/pkg/apiconv"
"github.com/cosmo-workspace/cosmo/pkg/clog"
"github.com/cosmo-workspace/cosmo/pkg/kosmo"
dashv1alpha1 "github.com/cosmo-workspace/cosmo/proto/gen/dashboard/v1alpha1"
"github.com/cosmo-workspace/cosmo/proto/gen/dashboard/v1alpha1/dashboardv1alpha1connect"
)

func (s *Server) StreamServiceHandler(mux *http.ServeMux) {
path, handler := dashboardv1alpha1connect.NewStreamServiceHandler(s,
connect_go.WithInterceptors(authorizationInterceptorFunc(s.verifyAndGetLoginUser)),
)
mux.Handle(path, s.contextMiddleware(handler))
}

// StreamingEvents implements dashboardv1alpha1connect.UserServiceHandler.
func (s *Server) StreamingEvents(ctx context.Context, req *connect_go.Request[dashv1alpha1.GetEventsRequest], stream *connect_go.ServerStream[dashv1alpha1.GetEventsResponse]) error {
log := clog.FromContext(ctx).WithCaller()
log.Info("request", "req", req)

if err := userAuthentication(ctx, req.Msg.UserName); err != nil {
return err
}
key := sha256.Sum256([]byte(stream.Conn().RequestHeader().Get("Cookie")))

ctx, cancel := context.WithTimeout(ctx, time.Second*300)
defer cancel()

events, err := s.Klient.ListEvents(ctx, cosmov1alpha1.UserNamespace(req.Msg.UserName))
if err != nil {
return ErrResponse(log, err)
}

for _, v := range events {
if req.Msg.From != nil {
events := make([]eventv1.Event, 0)
if _, last := apiconv.EventObservedTime(v); last.After(req.Msg.From.AsTime()) {
events = append(events, v)
}
if len(events) > 0 {
res := &dashv1alpha1.GetEventsResponse{
Items: apiconv.K2D_Events([]eventv1.Event{v}),
}
if err := stream.Send(res); err != nil {
log.Error(err, "send error")
return err
}
}
}
}

eventCh := s.watcher.subscribe(ctx, fmt.Sprintf("%x", key))
if eventCh == nil {
return fmt.Errorf("channel already exists")
}

for {
select {
case <-ctx.Done():
log.Debug().Info("ctx done")
return nil
case event, ok := <-eventCh:
if ok {
log.Debug().Info("delegating event", "user", req.Msg.UserName)
if event.Namespace != cosmov1alpha1.UserNamespace(req.Msg.UserName) {
continue
}
res := &dashv1alpha1.GetEventsResponse{
Items: apiconv.K2D_Events([]eventv1.Event{event}),
}
log.Info("sending event", "event", event)
if err := stream.Send(res); err != nil {
log.Error(err, "send error")
return err
}
} else {
log.Debug().Info("event channel closed")
return nil
}
}
}
}

type watcher struct {
Klient kosmo.Client
Log *clog.Logger
subscribers sync.Map
cancelSubscribe sync.Map
sendingLock sync.Mutex
}

func (r *watcher) subscribe(ctx context.Context, key string) <-chan eventv1.Event {
log := r.Log.WithValues("key", key)
log.Debug().Info("create new channel")

ctx, cancel := context.WithCancel(ctx)
preCancel, ok := r.cancelSubscribe.Load(key)
if ok {
preCancel.(context.CancelFunc)()
}

ch := make(chan eventv1.Event)
r.subscribers.Store(key, ch)
r.cancelSubscribe.Store(key, cancel)

go func(ch chan eventv1.Event) {
log.Debug().Info("wait channel closed...")
<-ctx.Done()
log.Debug().Info("close channel")
r.sendingLock.Lock()
defer r.sendingLock.Unlock()
r.subscribers.Delete(key)
r.cancelSubscribe.Delete(key)
close(ch)
}(ch)

return ch
}

func (r *watcher) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
log := r.Log.WithValues("req", req)
log.Debug().Info("start reconcile")

var event eventv1.Event
if err := r.Klient.Get(ctx, req.NamespacedName, &event); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
if err := r.Klient.UpdateEventAnnotations(ctx, &event); err != nil {
log.Error(err, "failed to set regaining instance on event annotation")
}

r.sendingLock.Lock()
defer r.sendingLock.Unlock()
r.subscribers.Range(func(key, value any) bool {
log.Debug().Info("send event to channel", "key", key, "event", event)
ch := value.(chan eventv1.Event)
ch <- event
return true
})
log.Debug().Info("finish reconcile")
return reconcile.Result{}, nil
}

func (r *watcher) SetupWithManager(mgr ctrl.Manager) error {
_, err := ctrl.NewControllerManagedBy(mgr).
For(&eventv1.Event{}).
Build(r)
if err != nil {
return err
}
return nil
}
10 changes: 10 additions & 0 deletions internal/dashboard/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,15 @@ func (o *options) RunE(cmd *cobra.Command, args []string) error {
return fmt.Errorf("failed to create webauthn instance: %w", err)
}

eventWatcher := &watcher{
Log: clog.NewLogger(ctrl.Log.WithName("eventwatcher")),
Klient: kosmo.NewClient(mgr.GetClient()),
}
if err := eventWatcher.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to event watcher")
os.Exit(1)
}

serv := &Server{
Log: clog.NewLogger(ctrl.Log.WithName("dashboard")),
Klient: klient,
Expand All @@ -272,6 +281,7 @@ func (o *options) RunE(cmd *cobra.Command, args []string) error {
http: &http.Server{Addr: fmt.Sprintf(":%d", o.ServerPort)},
sessionStore: nil,
webauthn: wa,
watcher: eventWatcher,
}

if err := mgr.Add(serv); err != nil {
Expand Down
Loading

0 comments on commit 9f8359c

Please sign in to comment.