diff --git a/containerd-shim-v2/service.go b/containerd-shim-v2/service.go index 9103885080..1667b28cf4 100644 --- a/containerd-shim-v2/service.go +++ b/containerd-shim-v2/service.go @@ -16,7 +16,6 @@ import ( eventstypes "github.com/containerd/containerd/api/events" "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/events" "github.com/containerd/containerd/namespaces" cdruntime "github.com/containerd/containerd/runtime" cdshim "github.com/containerd/containerd/runtime/v2/shim" @@ -58,7 +57,7 @@ var ( var vci vc.VC = &vc.VCImpl{} // New returns a new shim service that can be used via GRPC -func New(ctx context.Context, id string, publisher events.Publisher) (cdshim.Shim, error) { +func New(ctx context.Context, id string, publisher cdshim.Publisher, shutdown func()) (cdshim.Shim, error) { logger := logrus.WithField("ID", id) // Discard the log before shim init its log output. Otherwise // it will output into stdio, from which containerd would like @@ -67,8 +66,6 @@ func New(ctx context.Context, id string, publisher events.Publisher) (cdshim.Shi vci.SetLogger(ctx, logger) katautils.SetLogger(ctx, logger, logger.Logger.Level) - ctx, cancel := context.WithCancel(ctx) - s := &service{ id: id, pid: uint32(os.Getpid()), @@ -76,13 +73,13 @@ func New(ctx context.Context, id string, publisher events.Publisher) (cdshim.Shi containers: make(map[string]*container), events: make(chan interface{}, chSize), ec: make(chan exit, bufferSize), - cancel: cancel, + cancel: shutdown, mount: false, } go s.processExits() - go s.forward(publisher) + go s.forward(ctx, publisher) return s, nil } @@ -216,15 +213,20 @@ func (s *service) StartShim(ctx context.Context, id, containerdBinary, container return address, nil } -func (s *service) forward(publisher events.Publisher) { +func (s *service) forward(ctx context.Context, publisher cdshim.Publisher) { + ns, _ := namespaces.Namespace(ctx) + ctx = namespaces.WithNamespace(context.Background(), ns) + for e := range s.events { - ctx, cancel := context.WithTimeout(s.ctx, timeOut) + ctx, cancel := context.WithTimeout(ctx, timeOut) err := publisher.Publish(ctx, getTopic(e), e) cancel() if err != nil { logrus.WithError(err).Error("post event") } } + + publisher.Close() } func (s *service) send(evt interface{}) { @@ -769,11 +771,8 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (_ * s.mu.Unlock() s.cancel() + close(s.events) - os.Exit(0) - - // This will never be called, but this is only there to make sure the - // program can compile. return empty, nil }