Skip to content

Commit

Permalink
Add structure for new exec session tracking to DB
Browse files Browse the repository at this point in the history
As part of the rework of exec sessions, we need to address them
independently of containers. In the new API, we need to be able
to fetch them by their ID, regardless of what container they are
associated with. Unfortunately, our existing exec sessions are
tied to individual containers; there's no way to tell what
container a session belongs to and retrieve it without getting
every exec session for every container.

This adds a pointer to the container an exec session is
associated with to the database. The sessions themselves are
still stored in the container.

Exec-related APIs have been restructured to work with the new
database representation. The originally monolithic API has been
split into a number of smaller calls to allow more fine-grained
control of lifecycle. Support for legacy exec sessions has been
retained, but in a deprecated fashion; we should remove this in
a few releases.

Signed-off-by: Matthew Heon <[email protected]>
  • Loading branch information
mheon committed Mar 18, 2020
1 parent f138405 commit 118e78c
Show file tree
Hide file tree
Showing 19 changed files with 1,430 additions and 489 deletions.
338 changes: 337 additions & 1 deletion libpod/boltdb_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type BoltState struct {
// containing the path to the container's network namespace, a dependencies
// bucket containing the container's dependencies, and an optional pod key
// containing the ID of the pod the container is joined to.
// After updates to include exec sessions, may also include an exec bucket
// with the IDs of exec sessions currently in use by the container.
// - allCtrsBkt: Map of ID to name containing only containers. Used for
// container lookup operations.
// - podBkt: Contains a sub-bucket for each pod in the state.
Expand All @@ -49,6 +51,10 @@ type BoltState struct {
// containers in the pod.
// - allPodsBkt: Map of ID to name containing only pods. Used for pod lookup
// operations.
// - execBkt: Map of exec session ID to exec session - contains a sub-bucket for
// each exec session in the DB.
// - execRegistryBkt: Map of exec session ID to nothing. Contains one entry for
// each exec session. Used for iterating through all exec sessions.
// - runtimeConfigBkt: Contains configuration of the libpod instance that
// initially created the database. This must match for any further instances
// that access the database, to ensure that state mismatches with
Expand Down Expand Up @@ -86,6 +92,7 @@ func NewBoltState(path string, runtime *Runtime) (State, error) {
allPodsBkt,
volBkt,
allVolsBkt,
execBkt,
runtimeConfigBkt,
}

Expand Down Expand Up @@ -171,6 +178,11 @@ func (s *BoltState) Refresh() error {
return err
}

execBucket, err := getExecBucket(tx)
if err != nil {
return err
}

// Iterate through all IDs. Check if they are containers.
// If they are, unmarshal their state, and then clear
// PID, mountpoint, and state for all of them
Expand Down Expand Up @@ -245,6 +257,26 @@ func (s *BoltState) Refresh() error {
return errors.Wrapf(err, "error updating state for container %s in DB", string(id))
}

// Delete all exec sessions, if there are any
ctrExecBkt := ctrBkt.Bucket(execBkt)
if ctrExecBkt != nil {
// Can't delete in a ForEach, so build a list of
// what to remove then remove.
toRemove := []string{}
err = ctrExecBkt.ForEach(func(id, unused []byte) error {
toRemove = append(toRemove, string(id))
return nil
})
if err != nil {
return err
}
for _, execId := range toRemove {
if err := ctrExecBkt.Delete([]byte(execId)); err != nil {
return errors.Wrapf(err, "error removing exec session %s from container %s", execId, string(id))
}
}
}

return nil
})
if err != nil {
Expand Down Expand Up @@ -285,7 +317,30 @@ func (s *BoltState) Refresh() error {

return nil
})
return err
if err != nil {
return err
}

// Now refresh exec sessions
// We want to remove them all, but for-each can't modify buckets
// So we have to make a list of what to operate on, then do the
// work.
toRemoveExec := []string{}
err = execBucket.ForEach(func(id, unused []byte) error {
toRemoveExec = append(toRemoveExec, string(id))
return nil
})
if err != nil {
return err
}

for _, execSession := range toRemoveExec {
if err := execBucket.Delete([]byte(execSession)); err != nil {
return errors.Wrapf(err, "error deleting exec session %s registry from database", execSession)
}
}

return nil
})
return err
}
Expand Down Expand Up @@ -895,6 +950,287 @@ func (s *BoltState) GetContainerConfig(id string) (*ContainerConfig, error) {
return config, nil
}

// AddExecSession adds an exec session to the state.
func (s *BoltState) AddExecSession(ctr *Container, session *ExecSession) error {
if !s.valid {
return define.ErrDBClosed
}

if !ctr.valid {
return define.ErrCtrRemoved
}

db, err := s.getDBCon()
if err != nil {
return err
}
defer s.deferredCloseDBCon(db)

ctrID := []byte(ctr.ID())
sessionID := []byte(session.ID())

err = db.Update(func(tx *bolt.Tx) error {
execBucket, err := getExecBucket(tx)
if err != nil {
return err
}
ctrBucket, err := getCtrBucket(tx)
if err != nil {
return err
}

dbCtr := ctrBucket.Bucket(ctrID)
if dbCtr == nil {
ctr.valid = false
return errors.Wrapf(define.ErrNoSuchCtr, "container %s is not present in the database", ctr.ID())
}

ctrExecSessionBucket, err := dbCtr.CreateBucketIfNotExists(execBkt)
if err != nil {
return errors.Wrapf(err, "error creating exec sessions bucket for container %s", ctr.ID())
}

execExists := execBucket.Get(sessionID)
if execExists != nil {
return errors.Wrapf(define.ErrExecSessionExists, "an exec session with ID %s already exists", session.ID())
}

if err := execBucket.Put(sessionID, ctrID); err != nil {
return errors.Wrapf(err, "error adding exec session %s to DB", session.ID())
}

if err := ctrExecSessionBucket.Put(sessionID, ctrID); err != nil {
return errors.Wrapf(err, "error adding exec session %s to container %s in DB", session.ID(), ctr.ID())
}

return nil
})
return err
}

// GetExecSession returns the ID of the container an exec session is associated
// with.
func (s *BoltState) GetExecSession(id string) (string, error) {
if !s.valid {
return "", define.ErrDBClosed
}

if id == "" {
return "", define.ErrEmptyID
}

db, err := s.getDBCon()
if err != nil {
return "", err
}
defer s.deferredCloseDBCon(db)

ctrID := ""
err = db.View(func(tx *bolt.Tx) error {
execBucket, err := getExecBucket(tx)
if err != nil {
return err
}

ctr := execBucket.Get([]byte(id))
if ctr == nil {
return errors.Wrapf(define.ErrNoSuchExecSession, "no exec session with ID %s found", id)
}
ctrID = string(ctr)
return nil
})
return ctrID, err
}

// RemoveExecSession removes references to the given exec session in the
// database.
func (s *BoltState) RemoveExecSession(session *ExecSession) error {
if !s.valid {
return define.ErrDBClosed
}

db, err := s.getDBCon()
if err != nil {
return err
}
defer s.deferredCloseDBCon(db)

sessionID := []byte(session.ID())
containerID := []byte(session.ContainerID())
err = db.Update(func(tx *bolt.Tx) error {
execBucket, err := getExecBucket(tx)
if err != nil {
return err
}
ctrBucket, err := getCtrBucket(tx)
if err != nil {
return err
}

sessionExists := execBucket.Get(sessionID)
if sessionExists == nil {
return define.ErrNoSuchExecSession
}
// Check that container ID matches
if string(sessionExists) != session.ContainerID() {
return errors.Wrapf(define.ErrInternal, "database inconsistency: exec session %s points to container %s in state but %s in database", session.ID(), session.ContainerID(), string(sessionExists))
}

if err := execBucket.Delete(sessionID); err != nil {
return errors.Wrapf(err, "error removing exec session %s from database", session.ID())
}

dbCtr := ctrBucket.Bucket(containerID)
if dbCtr == nil {
// State is inconsistent. We refer to a container that
// is no longer in the state.
// Return without error, to attempt to recover.
return nil
}

ctrExecBucket := dbCtr.Bucket(execBkt)
if ctrExecBucket == nil {
// Again, state is inconsistent. We should have an exec
// bucket, and it should have this session.
// Again, nothing we can do, so proceed and try to
// recover.
return nil
}

ctrSessionExists := ctrExecBucket.Get(sessionID)
if ctrSessionExists != nil {
if err := ctrExecBucket.Delete(sessionID); err != nil {
return errors.Wrapf(err, "error removing exec session %s from container %s in database", session.ID(), session.ContainerID())
}
}

return nil
})
return err
}

// GetContainerExecSessions retrieves the IDs of all exec sessions running in a
// container that the database is aware of (IE, were added via AddExecSession).
func (s *BoltState) GetContainerExecSessions(ctr *Container) ([]string, error) {
if !s.valid {
return nil, define.ErrDBClosed
}

if !ctr.valid {
return nil, define.ErrCtrRemoved
}

db, err := s.getDBCon()
if err != nil {
return nil, err
}
defer s.deferredCloseDBCon(db)

ctrID := []byte(ctr.ID())
sessions := []string{}
err = db.View(func(tx *bolt.Tx) error {
ctrBucket, err := getCtrBucket(tx)
if err != nil {
return err
}

dbCtr := ctrBucket.Bucket(ctrID)
if dbCtr == nil {
ctr.valid = false
return define.ErrNoSuchCtr
}

ctrExecSessions := dbCtr.Bucket(execBkt)
if ctrExecSessions == nil {
return nil
}

return ctrExecSessions.ForEach(func(id, unused []byte) error {
sessions = append(sessions, string(id))
return nil
})
})
if err != nil {
return nil, err
}

return sessions, nil
}

// RemoveContainerExecSessions removes all exec sessions attached to a given
// container.
func (s *BoltState) RemoveContainerExecSessions(ctr *Container) error {
if !s.valid {
return define.ErrDBClosed
}

if !ctr.valid {
return define.ErrCtrRemoved
}

db, err := s.getDBCon()
if err != nil {
return err
}
defer s.deferredCloseDBCon(db)

ctrID := []byte(ctr.ID())
sessions := []string{}

err = db.Update(func(tx *bolt.Tx) error {
execBucket, err := getExecBucket(tx)
if err != nil {
return err
}
ctrBucket, err := getCtrBucket(tx)
if err != nil {
return err
}

dbCtr := ctrBucket.Bucket(ctrID)
if dbCtr == nil {
ctr.valid = false
return define.ErrNoSuchCtr
}

ctrExecSessions := dbCtr.Bucket(execBkt)
if ctrExecSessions == nil {
return nil
}

err = ctrExecSessions.ForEach(func(id, unused []byte) error {
sessions = append(sessions, string(id))
return nil
})
if err != nil {
return err
}

for _, session := range sessions {
if err := ctrExecSessions.Delete([]byte(session)); err != nil {
return errors.Wrapf(err, "error removing container %s exec session %s from database", ctr.ID(), session)
}
// Check if the session exists in the global table
// before removing. It should, but in cases where the DB
// has become inconsistent, we should try and proceed
// so we can recover.
sessionExists := execBucket.Get([]byte(session))
if sessionExists == nil {
continue
}
if string(sessionExists) != ctr.ID() {
return errors.Wrapf(define.ErrInternal, "database mismatch: exec session %s is associated with containers %s and %s", session, ctr.ID(), string(sessionExists))
}
if err := execBucket.Delete([]byte(session)); err != nil {
return errors.Wrapf(err, "error removing container %s exec session %s from exec sessions", ctr.ID(), session)
}
}

return nil
})
return err
}

// RewriteContainerConfig rewrites a container's configuration.
// WARNING: This function is DANGEROUS. Do not use without reading the full
// comment on this function in state.go.
Expand Down
Loading

0 comments on commit 118e78c

Please sign in to comment.