diff --git a/libcontainer/cgroups/systemd/common.go b/libcontainer/cgroups/systemd/common.go index b567f3e1fcb..0a876fb9751 100644 --- a/libcontainer/cgroups/systemd/common.go +++ b/libcontainer/cgroups/systemd/common.go @@ -20,13 +20,8 @@ import ( ) var ( - connOnce sync.Once - connDbus *systemdDbus.Conn - connErr error - versionOnce sync.Once version int - versionErr error isRunningSystemdOnce sync.Once isRunningSystemd bool @@ -284,19 +279,6 @@ func generateDeviceProperties(rules []*configs.DeviceRule) ([]systemdDbus.Proper return properties, nil } -// getDbusConnection lazy initializes systemd dbus connection -// and returns it -func getDbusConnection(rootless bool) (*systemdDbus.Conn, error) { - connOnce.Do(func() { - if rootless { - connDbus, connErr = NewUserSystemdDbus() - } else { - connDbus, connErr = systemdDbus.New() - } - }) - return connDbus, connErr -} - func newProp(name string, units interface{}) systemdDbus.Property { return systemdDbus.Property{ Name: name, @@ -312,29 +294,43 @@ func getUnitName(c *configs.Cgroup) string { return c.Name } -// isUnitExists returns true if the error is that a systemd unit already exists. -func isUnitExists(err error) bool { +// isDbusError returns true if the error is a specific dbus error. +func isDbusError(err error, name string) bool { if err != nil { - if dbusError, ok := err.(dbus.Error); ok { - return strings.Contains(dbusError.Name, "org.freedesktop.systemd1.UnitExists") + var derr *dbus.Error + if errors.As(err, &derr) { + return strings.Contains(derr.Name, name) } } return false } -func startUnit(dbusConnection *systemdDbus.Conn, unitName string, properties []systemdDbus.Property) error { +// isUnitExists returns true if the error is that a systemd unit already exists. +func isUnitExists(err error) bool { + return isDbusError(err, "org.freedesktop.systemd1.UnitExists") +} + +func startUnit(cm *dbusConnManager, unitName string, properties []systemdDbus.Property) error { statusChan := make(chan string, 1) - if _, err := dbusConnection.StartTransientUnit(unitName, "replace", properties, statusChan); err == nil { + err := cm.retryOnDisconnect(func(c *systemdDbus.Conn) error { + _, err := c.StartTransientUnit(unitName, "replace", properties, statusChan) + return err + }) + if err == nil { + timeout := time.NewTimer(30 * time.Second) + defer timeout.Stop() + select { case s := <-statusChan: close(statusChan) // Please refer to https://godoc.org/github.com/coreos/go-systemd/dbus#Conn.StartUnit if s != "done" { - dbusConnection.ResetFailedUnit(unitName) + resetFailedUnit(cm, unitName) return errors.Errorf("error creating systemd unit `%s`: got `%s`", unitName, s) } - case <-time.After(time.Second): - logrus.Warnf("Timed out while waiting for StartTransientUnit(%s) completion signal from dbus. Continuing...", unitName) + case <-timeout.C: + resetFailedUnit(cm, unitName) + return errors.New("Timeout waiting for systemd to create " + unitName) } } else if !isUnitExists(err) { return err @@ -343,9 +339,13 @@ func startUnit(dbusConnection *systemdDbus.Conn, unitName string, properties []s return nil } -func stopUnit(dbusConnection *systemdDbus.Conn, unitName string) error { +func stopUnit(cm *dbusConnManager, unitName string) error { statusChan := make(chan string, 1) - if _, err := dbusConnection.StopUnit(unitName, "replace", statusChan); err == nil { + err := cm.retryOnDisconnect(func(c *systemdDbus.Conn) error { + _, err := c.StopUnit(unitName, "replace", statusChan) + return err + }) + if err == nil { select { case s := <-statusChan: close(statusChan) @@ -360,29 +360,57 @@ func stopUnit(dbusConnection *systemdDbus.Conn, unitName string) error { return nil } -func systemdVersion(conn *systemdDbus.Conn) (int, error) { +func resetFailedUnit(cm *dbusConnManager, name string) { + err := cm.retryOnDisconnect(func(c *systemdDbus.Conn) error { + return c.ResetFailedUnit(name) + }) + if err != nil { + logrus.Warnf("unable to reset failed unit: %v", err) + } +} + +func setUnitProperties(cm *dbusConnManager, name string, properties ...systemdDbus.Property) error { + return cm.retryOnDisconnect(func(c *systemdDbus.Conn) error { + return c.SetUnitProperties(name, true, properties...) + }) +} + +func getManagerProperty(cm *dbusConnManager, name string) (string, error) { + str := "" + err := cm.retryOnDisconnect(func(c *systemdDbus.Conn) error { + var err error + str, err = c.GetManagerProperty(name) + return err + }) + if err != nil { + return "", err + } + return strconv.Unquote(str) +} + +func systemdVersion(cm *dbusConnManager) int { versionOnce.Do(func() { version = -1 - verStr, err := conn.GetManagerProperty("Version") - if err != nil { - versionErr = err - return + verStr, err := getManagerProperty(cm, "Version") + if err == nil { + version, err = systemdVersionAtoi(verStr) } - version, versionErr = systemdVersionAtoi(verStr) - return + if err != nil { + logrus.WithError(err).Error("unable to get systemd version") + } }) - return version, versionErr + return version } func systemdVersionAtoi(verStr string) (int, error) { // verStr should be of the form: - // "v245.4-1.fc32", "245", "v245-1.fc32", "245-1.fc32" - // all the input strings include quotes, and the output int should be 245 - // thus, we unconditionally remove the `"v` - // and then match on the first integer we can grab - re := regexp.MustCompile(`"?v?([0-9]+)`) + // "v245.4-1.fc32", "245", "v245-1.fc32", "245-1.fc32" (without quotes). + // The result for all of the above should be 245. + // Thus, we unconditionally remove the "v" prefix + // and then match on the first integer we can grab. + re := regexp.MustCompile(`v?([0-9]+)`) matches := re.FindStringSubmatch(verStr) if len(matches) < 2 { return 0, errors.Errorf("can't parse version %s: incorrect number of matches %v", verStr, matches) @@ -391,13 +419,11 @@ func systemdVersionAtoi(verStr string) (int, error) { return ver, errors.Wrapf(err, "can't parse version %s", verStr) } -func addCpuQuota(conn *systemdDbus.Conn, properties *[]systemdDbus.Property, quota int64, period uint64) { +func addCpuQuota(cm *dbusConnManager, properties *[]systemdDbus.Property, quota int64, period uint64) { if period != 0 { // systemd only supports CPUQuotaPeriodUSec since v242 - sdVer, err := systemdVersion(conn) - if err != nil { - logrus.Warnf("systemdVersion: %s", err) - } else if sdVer >= 242 { + sdVer := systemdVersion(cm) + if sdVer >= 242 { *properties = append(*properties, newProp("CPUQuotaPeriodUSec", period)) } diff --git a/libcontainer/cgroups/systemd/dbus.go b/libcontainer/cgroups/systemd/dbus.go new file mode 100644 index 00000000000..973970c2c6c --- /dev/null +++ b/libcontainer/cgroups/systemd/dbus.go @@ -0,0 +1,95 @@ +// +build linux + +package systemd + +import ( + "sync" + + systemdDbus "github.com/coreos/go-systemd/v22/dbus" + dbus "github.com/godbus/dbus/v5" +) + +var ( + dbusC *systemdDbus.Conn + dbusMu sync.RWMutex + dbusInited bool + dbusRootless bool +) + +type dbusConnManager struct { +} + +// newDbusConnManager initializes systemd dbus connection manager. +func newDbusConnManager(rootless bool) *dbusConnManager { + if dbusInited && rootless != dbusRootless { + panic("can't have both root and rootless dbus") + } + dbusRootless = rootless + return &dbusConnManager{} +} + +// getConnection lazily initializes and returns systemd dbus connection. +func (d *dbusConnManager) getConnection() (*systemdDbus.Conn, error) { + // In the case where dbusC != nil + // Use the read lock the first time to ensure + // that Conn can be acquired at the same time. + dbusMu.RLock() + if conn := dbusC; conn != nil { + dbusMu.RUnlock() + return conn, nil + } + dbusMu.RUnlock() + + // In the case where dbusC == nil + // Use write lock to ensure that only one + // will be created + dbusMu.Lock() + defer dbusMu.Unlock() + if conn := dbusC; conn != nil { + return conn, nil + } + + conn, err := d.newConnection() + if err != nil { + return nil, err + } + dbusC = conn + return conn, nil +} + +func (d *dbusConnManager) newConnection() (*systemdDbus.Conn, error) { + if dbusRootless { + return newUserSystemdDbus() + } + return systemdDbus.New() +} + +// resetConnection resets the connection to its initial state +// (so it can be reconnected if necessary). +func (d *dbusConnManager) resetConnection(conn *systemdDbus.Conn) { + dbusMu.Lock() + defer dbusMu.Unlock() + if dbusC != nil && dbusC == conn { + dbusC.Close() + dbusC = nil + } +} + +var errDbusConnClosed = dbus.ErrClosed.Error() + +// retryOnDisconnect calls op, and if the error it returns is about closed dbus +// connection, the connection is re-established and the op is retried. This helps +// with the situation when dbus is restarted and we have a stale connection. +func (d *dbusConnManager) retryOnDisconnect(op func(*systemdDbus.Conn) error) error { + for { + conn, err := d.getConnection() + if err != nil { + return err + } + err = op(conn) + if !isDbusError(err, errDbusConnClosed) { + return err + } + d.resetConnection(conn) + } +} diff --git a/libcontainer/cgroups/systemd/user.go b/libcontainer/cgroups/systemd/user.go index cb070cec8b7..6a0bc1fa1c2 100644 --- a/libcontainer/cgroups/systemd/user.go +++ b/libcontainer/cgroups/systemd/user.go @@ -17,8 +17,8 @@ import ( "github.com/pkg/errors" ) -// NewUserSystemdDbus creates a connection for systemd user-instance. -func NewUserSystemdDbus() (*systemdDbus.Conn, error) { +// newUserSystemdDbus creates a connection for systemd user-instance. +func newUserSystemdDbus() (*systemdDbus.Conn, error) { addr, err := DetectUserDbusSessionBusAddress() if err != nil { return nil, err diff --git a/libcontainer/cgroups/systemd/v1.go b/libcontainer/cgroups/systemd/v1.go index c3cc9c8b3bb..e88c763225a 100644 --- a/libcontainer/cgroups/systemd/v1.go +++ b/libcontainer/cgroups/systemd/v1.go @@ -21,12 +21,14 @@ type legacyManager struct { mu sync.Mutex cgroups *configs.Cgroup paths map[string]string + dbus *dbusConnManager } func NewLegacyManager(cg *configs.Cgroup, paths map[string]string) cgroups.Manager { return &legacyManager{ cgroups: cg, paths: paths, + dbus: newDbusConnManager(false), } } @@ -68,7 +70,7 @@ var legacySubsystems = subsystemSet{ &fs.NameGroup{GroupName: "name=systemd"}, } -func genV1ResourcesProperties(c *configs.Cgroup, conn *systemdDbus.Conn) ([]systemdDbus.Property, error) { +func genV1ResourcesProperties(c *configs.Cgroup, cm *dbusConnManager) ([]systemdDbus.Property, error) { var properties []systemdDbus.Property r := c.Resources @@ -88,7 +90,7 @@ func genV1ResourcesProperties(c *configs.Cgroup, conn *systemdDbus.Conn) ([]syst newProp("CPUShares", r.CpuShares)) } - addCpuQuota(conn, &properties, r.CpuQuota, r.CpuPeriod) + addCpuQuota(cm, &properties, r.CpuQuota, r.CpuPeriod) if r.BlkioWeight != 0 { properties = append(properties, @@ -167,11 +169,7 @@ func (m *legacyManager) Apply(pid int) error { properties = append(properties, newProp("DefaultDependencies", false)) - dbusConnection, err := getDbusConnection(false) - if err != nil { - return err - } - resourcesProperties, err := genV1ResourcesProperties(c, dbusConnection) + resourcesProperties, err := genV1ResourcesProperties(c, m.dbus) if err != nil { return err } @@ -186,7 +184,7 @@ func (m *legacyManager) Apply(pid int) error { } } - if err := startUnit(dbusConnection, unitName, properties); err != nil { + if err := startUnit(m.dbus, unitName, properties); err != nil { return err } @@ -217,13 +215,7 @@ func (m *legacyManager) Destroy() error { m.mu.Lock() defer m.mu.Unlock() - dbusConnection, err := getDbusConnection(false) - if err != nil { - return err - } - unitName := getUnitName(m.cgroups) - - err = stopUnit(dbusConnection, unitName) + err := stopUnit(m.dbus, getUnitName(m.cgroups)) // Both on success and on error, cleanup all the cgroups we are aware of. // Some of them were created directly by Apply() and are not managed by systemd. if err := cgroups.RemovePaths(m.paths); err != nil { @@ -374,11 +366,7 @@ func (m *legacyManager) Set(container *configs.Config) error { if m.cgroups.Paths != nil { return nil } - dbusConnection, err := getDbusConnection(false) - if err != nil { - return err - } - properties, err := genV1ResourcesProperties(container.Cgroups, dbusConnection) + properties, err := genV1ResourcesProperties(container.Cgroups, m.dbus) if err != nil { return err } @@ -406,7 +394,7 @@ func (m *legacyManager) Set(container *configs.Config) error { } } - if err := dbusConnection.SetUnitProperties(getUnitName(container.Cgroups), true, properties...); err != nil { + if err := setUnitProperties(m.dbus, getUnitName(container.Cgroups), properties...); err != nil { _ = m.Freeze(targetFreezerState) return err } diff --git a/libcontainer/cgroups/systemd/v2.go b/libcontainer/cgroups/systemd/v2.go index e1a6622a0d3..1cc77390265 100644 --- a/libcontainer/cgroups/systemd/v2.go +++ b/libcontainer/cgroups/systemd/v2.go @@ -24,6 +24,7 @@ type unifiedManager struct { // path is like "/sys/fs/cgroup/user.slice/user-1001.slice/session-1.scope" path string rootless bool + dbus *dbusConnManager } func NewUnifiedManager(config *configs.Cgroup, path string, rootless bool) cgroups.Manager { @@ -31,10 +32,11 @@ func NewUnifiedManager(config *configs.Cgroup, path string, rootless bool) cgrou cgroups: config, path: path, rootless: rootless, + dbus: newDbusConnManager(rootless), } } -func genV2ResourcesProperties(c *configs.Cgroup, conn *systemdDbus.Conn) ([]systemdDbus.Property, error) { +func genV2ResourcesProperties(c *configs.Cgroup, cm *dbusConnManager) ([]systemdDbus.Property, error) { var properties []systemdDbus.Property r := c.Resources @@ -72,7 +74,7 @@ func genV2ResourcesProperties(c *configs.Cgroup, conn *systemdDbus.Conn) ([]syst newProp("CPUWeight", r.CpuWeight)) } - addCpuQuota(conn, &properties, r.CpuQuota, r.CpuPeriod) + addCpuQuota(cm, &properties, r.CpuQuota, r.CpuPeriod) if r.PidsLimit > 0 || r.PidsLimit == -1 { properties = append(properties, @@ -136,22 +138,18 @@ func (m *unifiedManager) Apply(pid int) error { properties = append(properties, newProp("DefaultDependencies", false)) - dbusConnection, err := getDbusConnection(m.rootless) - if err != nil { - return err - } - resourcesProperties, err := genV2ResourcesProperties(c, dbusConnection) + resourcesProperties, err := genV2ResourcesProperties(c, m.dbus) if err != nil { return err } properties = append(properties, resourcesProperties...) properties = append(properties, c.SystemdProps...) - if err := startUnit(dbusConnection, unitName, properties); err != nil { + if err := startUnit(m.dbus, unitName, properties); err != nil { return errors.Wrapf(err, "error while starting unit %q with properties %+v", unitName, properties) } - if err = m.initPath(); err != nil { + if err := m.initPath(); err != nil { return err } if err := fs2.CreateCgroupPath(m.path, m.cgroups); err != nil { @@ -167,17 +165,13 @@ func (m *unifiedManager) Destroy() error { m.mu.Lock() defer m.mu.Unlock() - dbusConnection, err := getDbusConnection(m.rootless) - if err != nil { - return err - } unitName := getUnitName(m.cgroups) - if err := stopUnit(dbusConnection, unitName); err != nil { + if err := stopUnit(m.dbus, unitName); err != nil { return err } // XXX this is probably not needed, systemd should handle it - err = os.Remove(m.path) + err := os.Remove(m.path) if err != nil && !os.IsNotExist(err) { return err } @@ -206,16 +200,8 @@ func (m *unifiedManager) getSliceFull() (string, error) { } if m.rootless { - dbusConnection, err := getDbusConnection(m.rootless) - if err != nil { - return "", err - } - // managerCGQuoted is typically "/user.slice/user-${uid}.slice/user@${uid}.service" including the quote symbols - managerCGQuoted, err := dbusConnection.GetManagerProperty("ControlGroup") - if err != nil { - return "", err - } - managerCG, err := strconv.Unquote(managerCGQuoted) + // managerCG is typically "/user.slice/user-${uid}.slice/user@${uid}.service". + managerCG, err := getManagerProperty(m.dbus, "ControlGroup") if err != nil { return "", err } @@ -289,11 +275,7 @@ func (m *unifiedManager) GetStats() (*cgroups.Stats, error) { } func (m *unifiedManager) Set(container *configs.Config) error { - dbusConnection, err := getDbusConnection(m.rootless) - if err != nil { - return err - } - properties, err := genV2ResourcesProperties(m.cgroups, dbusConnection) + properties, err := genV2ResourcesProperties(m.cgroups, m.dbus) if err != nil { return err } @@ -321,7 +303,7 @@ func (m *unifiedManager) Set(container *configs.Config) error { } } - if err := dbusConnection.SetUnitProperties(getUnitName(m.cgroups), true, properties...); err != nil { + if err := setUnitProperties(m.dbus, getUnitName(m.cgroups), properties...); err != nil { _ = m.Freeze(targetFreezerState) return errors.Wrap(err, "error while setting unit properties") }