Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[4.6] libct/cg/sd: reconnect and retry on dbus connection error #7

Closed
wants to merge 9 commits into from
120 changes: 73 additions & 47 deletions libcontainer/cgroups/systemd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,8 @@ import (
)

var (
connOnce sync.Once
connDbus *systemdDbus.Conn
connErr error

versionOnce sync.Once
version int
versionErr error

isRunningSystemdOnce sync.Once
isRunningSystemd bool
Expand Down Expand Up @@ -284,19 +279,6 @@ func generateDeviceProperties(rules []*configs.DeviceRule) ([]systemdDbus.Proper
return properties, nil
}

// getDbusConnection lazy initializes systemd dbus connection
// and returns it
func getDbusConnection(rootless bool) (*systemdDbus.Conn, error) {
connOnce.Do(func() {
if rootless {
connDbus, connErr = NewUserSystemdDbus()
} else {
connDbus, connErr = systemdDbus.New()
}
})
return connDbus, connErr
}

func newProp(name string, units interface{}) systemdDbus.Property {
return systemdDbus.Property{
Name: name,
Expand All @@ -312,29 +294,43 @@ func getUnitName(c *configs.Cgroup) string {
return c.Name
}

// isUnitExists returns true if the error is that a systemd unit already exists.
func isUnitExists(err error) bool {
// isDbusError returns true if the error is a specific dbus error.
func isDbusError(err error, name string) bool {
if err != nil {
if dbusError, ok := err.(dbus.Error); ok {
return strings.Contains(dbusError.Name, "org.freedesktop.systemd1.UnitExists")
var derr *dbus.Error
if errors.As(err, &derr) {
return strings.Contains(derr.Name, name)
}
}
return false
}

func startUnit(dbusConnection *systemdDbus.Conn, unitName string, properties []systemdDbus.Property) error {
// isUnitExists returns true if the error is that a systemd unit already exists.
func isUnitExists(err error) bool {
return isDbusError(err, "org.freedesktop.systemd1.UnitExists")
}

func startUnit(cm *dbusConnManager, unitName string, properties []systemdDbus.Property) error {
statusChan := make(chan string, 1)
if _, err := dbusConnection.StartTransientUnit(unitName, "replace", properties, statusChan); err == nil {
err := cm.retryOnDisconnect(func(c *systemdDbus.Conn) error {
_, err := c.StartTransientUnit(unitName, "replace", properties, statusChan)
return err
})
if err == nil {
timeout := time.NewTimer(30 * time.Second)
defer timeout.Stop()

select {
case s := <-statusChan:
close(statusChan)
// Please refer to https://godoc.org/github.com/coreos/go-systemd/dbus#Conn.StartUnit
if s != "done" {
dbusConnection.ResetFailedUnit(unitName)
resetFailedUnit(cm, unitName)
return errors.Errorf("error creating systemd unit `%s`: got `%s`", unitName, s)
}
case <-time.After(time.Second):
logrus.Warnf("Timed out while waiting for StartTransientUnit(%s) completion signal from dbus. Continuing...", unitName)
case <-timeout.C:
resetFailedUnit(cm, unitName)
return errors.New("Timeout waiting for systemd to create " + unitName)
}
} else if !isUnitExists(err) {
return err
Expand All @@ -343,9 +339,13 @@ func startUnit(dbusConnection *systemdDbus.Conn, unitName string, properties []s
return nil
}

func stopUnit(dbusConnection *systemdDbus.Conn, unitName string) error {
func stopUnit(cm *dbusConnManager, unitName string) error {
statusChan := make(chan string, 1)
if _, err := dbusConnection.StopUnit(unitName, "replace", statusChan); err == nil {
err := cm.retryOnDisconnect(func(c *systemdDbus.Conn) error {
_, err := c.StopUnit(unitName, "replace", statusChan)
return err
})
if err == nil {
select {
case s := <-statusChan:
close(statusChan)
Expand All @@ -360,29 +360,57 @@ func stopUnit(dbusConnection *systemdDbus.Conn, unitName string) error {
return nil
}

func systemdVersion(conn *systemdDbus.Conn) (int, error) {
func resetFailedUnit(cm *dbusConnManager, name string) {
err := cm.retryOnDisconnect(func(c *systemdDbus.Conn) error {
return c.ResetFailedUnit(name)
})
if err != nil {
logrus.Warnf("unable to reset failed unit: %v", err)
}
}

func setUnitProperties(cm *dbusConnManager, name string, properties ...systemdDbus.Property) error {
return cm.retryOnDisconnect(func(c *systemdDbus.Conn) error {
return c.SetUnitProperties(name, true, properties...)
})
}

func getManagerProperty(cm *dbusConnManager, name string) (string, error) {
str := ""
err := cm.retryOnDisconnect(func(c *systemdDbus.Conn) error {
var err error
str, err = c.GetManagerProperty(name)
return err
})
if err != nil {
return "", err
}
return strconv.Unquote(str)
}

func systemdVersion(cm *dbusConnManager) int {
versionOnce.Do(func() {
version = -1
verStr, err := conn.GetManagerProperty("Version")
if err != nil {
versionErr = err
return
verStr, err := getManagerProperty(cm, "Version")
if err == nil {
version, err = systemdVersionAtoi(verStr)
}

version, versionErr = systemdVersionAtoi(verStr)
return
if err != nil {
logrus.WithError(err).Error("unable to get systemd version")
}
})

return version, versionErr
return version
}

func systemdVersionAtoi(verStr string) (int, error) {
// verStr should be of the form:
// "v245.4-1.fc32", "245", "v245-1.fc32", "245-1.fc32"
// all the input strings include quotes, and the output int should be 245
// thus, we unconditionally remove the `"v`
// and then match on the first integer we can grab
re := regexp.MustCompile(`"?v?([0-9]+)`)
// "v245.4-1.fc32", "245", "v245-1.fc32", "245-1.fc32" (without quotes).
// The result for all of the above should be 245.
// Thus, we unconditionally remove the "v" prefix
// and then match on the first integer we can grab.
re := regexp.MustCompile(`v?([0-9]+)`)
matches := re.FindStringSubmatch(verStr)
if len(matches) < 2 {
return 0, errors.Errorf("can't parse version %s: incorrect number of matches %v", verStr, matches)
Expand All @@ -391,13 +419,11 @@ func systemdVersionAtoi(verStr string) (int, error) {
return ver, errors.Wrapf(err, "can't parse version %s", verStr)
}

func addCpuQuota(conn *systemdDbus.Conn, properties *[]systemdDbus.Property, quota int64, period uint64) {
func addCpuQuota(cm *dbusConnManager, properties *[]systemdDbus.Property, quota int64, period uint64) {
if period != 0 {
// systemd only supports CPUQuotaPeriodUSec since v242
sdVer, err := systemdVersion(conn)
if err != nil {
logrus.Warnf("systemdVersion: %s", err)
} else if sdVer >= 242 {
sdVer := systemdVersion(cm)
if sdVer >= 242 {
*properties = append(*properties,
newProp("CPUQuotaPeriodUSec", period))
}
Expand Down
95 changes: 95 additions & 0 deletions libcontainer/cgroups/systemd/dbus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// +build linux

package systemd

import (
"sync"

systemdDbus "github.com/coreos/go-systemd/v22/dbus"
dbus "github.com/godbus/dbus/v5"
)

var (
dbusC *systemdDbus.Conn
dbusMu sync.RWMutex
dbusInited bool
dbusRootless bool
)

type dbusConnManager struct {
}

// newDbusConnManager initializes systemd dbus connection manager.
func newDbusConnManager(rootless bool) *dbusConnManager {
if dbusInited && rootless != dbusRootless {
panic("can't have both root and rootless dbus")
}
dbusRootless = rootless
return &dbusConnManager{}
}

// getConnection lazily initializes and returns systemd dbus connection.
func (d *dbusConnManager) getConnection() (*systemdDbus.Conn, error) {
// In the case where dbusC != nil
// Use the read lock the first time to ensure
// that Conn can be acquired at the same time.
dbusMu.RLock()
if conn := dbusC; conn != nil {
dbusMu.RUnlock()
return conn, nil
}
dbusMu.RUnlock()

// In the case where dbusC == nil
// Use write lock to ensure that only one
// will be created
dbusMu.Lock()
defer dbusMu.Unlock()
if conn := dbusC; conn != nil {
return conn, nil
}

conn, err := d.newConnection()
if err != nil {
return nil, err
}
dbusC = conn
return conn, nil
}

func (d *dbusConnManager) newConnection() (*systemdDbus.Conn, error) {
if dbusRootless {
return newUserSystemdDbus()
}
return systemdDbus.New()
}

// resetConnection resets the connection to its initial state
// (so it can be reconnected if necessary).
func (d *dbusConnManager) resetConnection(conn *systemdDbus.Conn) {
dbusMu.Lock()
defer dbusMu.Unlock()
if dbusC != nil && dbusC == conn {
dbusC.Close()
dbusC = nil
}
}

var errDbusConnClosed = dbus.ErrClosed.Error()

// retryOnDisconnect calls op, and if the error it returns is about closed dbus
// connection, the connection is re-established and the op is retried. This helps
// with the situation when dbus is restarted and we have a stale connection.
func (d *dbusConnManager) retryOnDisconnect(op func(*systemdDbus.Conn) error) error {
for {
conn, err := d.getConnection()
if err != nil {
return err
}
err = op(conn)
if !isDbusError(err, errDbusConnClosed) {
return err
}
d.resetConnection(conn)
}
}
4 changes: 2 additions & 2 deletions libcontainer/cgroups/systemd/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
"github.com/pkg/errors"
)

// NewUserSystemdDbus creates a connection for systemd user-instance.
func NewUserSystemdDbus() (*systemdDbus.Conn, error) {
// newUserSystemdDbus creates a connection for systemd user-instance.
func newUserSystemdDbus() (*systemdDbus.Conn, error) {
addr, err := DetectUserDbusSessionBusAddress()
if err != nil {
return nil, err
Expand Down
30 changes: 9 additions & 21 deletions libcontainer/cgroups/systemd/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ type legacyManager struct {
mu sync.Mutex
cgroups *configs.Cgroup
paths map[string]string
dbus *dbusConnManager
}

func NewLegacyManager(cg *configs.Cgroup, paths map[string]string) cgroups.Manager {
return &legacyManager{
cgroups: cg,
paths: paths,
dbus: newDbusConnManager(false),
}
}

Expand Down Expand Up @@ -68,7 +70,7 @@ var legacySubsystems = subsystemSet{
&fs.NameGroup{GroupName: "name=systemd"},
}

func genV1ResourcesProperties(c *configs.Cgroup, conn *systemdDbus.Conn) ([]systemdDbus.Property, error) {
func genV1ResourcesProperties(c *configs.Cgroup, cm *dbusConnManager) ([]systemdDbus.Property, error) {
var properties []systemdDbus.Property
r := c.Resources

Expand All @@ -88,7 +90,7 @@ func genV1ResourcesProperties(c *configs.Cgroup, conn *systemdDbus.Conn) ([]syst
newProp("CPUShares", r.CpuShares))
}

addCpuQuota(conn, &properties, r.CpuQuota, r.CpuPeriod)
addCpuQuota(cm, &properties, r.CpuQuota, r.CpuPeriod)

if r.BlkioWeight != 0 {
properties = append(properties,
Expand Down Expand Up @@ -167,11 +169,7 @@ func (m *legacyManager) Apply(pid int) error {
properties = append(properties,
newProp("DefaultDependencies", false))

dbusConnection, err := getDbusConnection(false)
if err != nil {
return err
}
resourcesProperties, err := genV1ResourcesProperties(c, dbusConnection)
resourcesProperties, err := genV1ResourcesProperties(c, m.dbus)
if err != nil {
return err
}
Expand All @@ -186,7 +184,7 @@ func (m *legacyManager) Apply(pid int) error {
}
}

if err := startUnit(dbusConnection, unitName, properties); err != nil {
if err := startUnit(m.dbus, unitName, properties); err != nil {
return err
}

Expand Down Expand Up @@ -217,13 +215,7 @@ func (m *legacyManager) Destroy() error {
m.mu.Lock()
defer m.mu.Unlock()

dbusConnection, err := getDbusConnection(false)
if err != nil {
return err
}
unitName := getUnitName(m.cgroups)

err = stopUnit(dbusConnection, unitName)
err := stopUnit(m.dbus, getUnitName(m.cgroups))
// Both on success and on error, cleanup all the cgroups we are aware of.
// Some of them were created directly by Apply() and are not managed by systemd.
if err := cgroups.RemovePaths(m.paths); err != nil {
Expand Down Expand Up @@ -374,11 +366,7 @@ func (m *legacyManager) Set(container *configs.Config) error {
if m.cgroups.Paths != nil {
return nil
}
dbusConnection, err := getDbusConnection(false)
if err != nil {
return err
}
properties, err := genV1ResourcesProperties(container.Cgroups, dbusConnection)
properties, err := genV1ResourcesProperties(container.Cgroups, m.dbus)
if err != nil {
return err
}
Expand Down Expand Up @@ -406,7 +394,7 @@ func (m *legacyManager) Set(container *configs.Config) error {
}
}

if err := dbusConnection.SetUnitProperties(getUnitName(container.Cgroups), true, properties...); err != nil {
if err := setUnitProperties(m.dbus, getUnitName(container.Cgroups), properties...); err != nil {
_ = m.Freeze(targetFreezerState)
return err
}
Expand Down
Loading