Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Begin exec rework #5088

Merged
merged 3 commits into from
Mar 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
338 changes: 337 additions & 1 deletion libpod/boltdb_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type BoltState struct {
// containing the path to the container's network namespace, a dependencies
// bucket containing the container's dependencies, and an optional pod key
// containing the ID of the pod the container is joined to.
// After updates to include exec sessions, may also include an exec bucket
// with the IDs of exec sessions currently in use by the container.
// - allCtrsBkt: Map of ID to name containing only containers. Used for
// container lookup operations.
// - podBkt: Contains a sub-bucket for each pod in the state.
Expand All @@ -49,6 +51,10 @@ type BoltState struct {
// containers in the pod.
// - allPodsBkt: Map of ID to name containing only pods. Used for pod lookup
// operations.
// - execBkt: Map of exec session ID to exec session - contains a sub-bucket for
// each exec session in the DB.
// - execRegistryBkt: Map of exec session ID to nothing. Contains one entry for
// each exec session. Used for iterating through all exec sessions.
// - runtimeConfigBkt: Contains configuration of the libpod instance that
// initially created the database. This must match for any further instances
// that access the database, to ensure that state mismatches with
Expand Down Expand Up @@ -86,6 +92,7 @@ func NewBoltState(path string, runtime *Runtime) (State, error) {
allPodsBkt,
volBkt,
allVolsBkt,
execBkt,
runtimeConfigBkt,
}

Expand Down Expand Up @@ -171,6 +178,11 @@ func (s *BoltState) Refresh() error {
return err
}

execBucket, err := getExecBucket(tx)
if err != nil {
return err
}

// Iterate through all IDs. Check if they are containers.
// If they are, unmarshal their state, and then clear
// PID, mountpoint, and state for all of them
Expand Down Expand Up @@ -245,6 +257,26 @@ func (s *BoltState) Refresh() error {
return errors.Wrapf(err, "error updating state for container %s in DB", string(id))
}

// Delete all exec sessions, if there are any
ctrExecBkt := ctrBkt.Bucket(execBkt)
if ctrExecBkt != nil {
// Can't delete in a ForEach, so build a list of
// what to remove then remove.
toRemove := []string{}
err = ctrExecBkt.ForEach(func(id, unused []byte) error {
toRemove = append(toRemove, string(id))
return nil
})
if err != nil {
return err
}
for _, execId := range toRemove {
if err := ctrExecBkt.Delete([]byte(execId)); err != nil {
return errors.Wrapf(err, "error removing exec session %s from container %s", execId, string(id))
}
}
}

return nil
})
if err != nil {
Expand Down Expand Up @@ -285,7 +317,30 @@ func (s *BoltState) Refresh() error {

return nil
})
return err
if err != nil {
return err
}

// Now refresh exec sessions
// We want to remove them all, but for-each can't modify buckets
// So we have to make a list of what to operate on, then do the
// work.
toRemoveExec := []string{}
err = execBucket.ForEach(func(id, unused []byte) error {
toRemoveExec = append(toRemoveExec, string(id))
return nil
})
if err != nil {
return err
}

for _, execSession := range toRemoveExec {
if err := execBucket.Delete([]byte(execSession)); err != nil {
return errors.Wrapf(err, "error deleting exec session %s registry from database", execSession)
}
}

return nil
})
return err
}
Expand Down Expand Up @@ -895,6 +950,287 @@ func (s *BoltState) GetContainerConfig(id string) (*ContainerConfig, error) {
return config, nil
}

// AddExecSession adds an exec session to the state.
func (s *BoltState) AddExecSession(ctr *Container, session *ExecSession) error {
if !s.valid {
return define.ErrDBClosed
}

if !ctr.valid {
return define.ErrCtrRemoved
}

db, err := s.getDBCon()
if err != nil {
return err
}
defer s.deferredCloseDBCon(db)

ctrID := []byte(ctr.ID())
sessionID := []byte(session.ID())

err = db.Update(func(tx *bolt.Tx) error {
execBucket, err := getExecBucket(tx)
if err != nil {
return err
}
ctrBucket, err := getCtrBucket(tx)
if err != nil {
return err
}

dbCtr := ctrBucket.Bucket(ctrID)
if dbCtr == nil {
ctr.valid = false
return errors.Wrapf(define.ErrNoSuchCtr, "container %s is not present in the database", ctr.ID())
}

ctrExecSessionBucket, err := dbCtr.CreateBucketIfNotExists(execBkt)
if err != nil {
return errors.Wrapf(err, "error creating exec sessions bucket for container %s", ctr.ID())
}

execExists := execBucket.Get(sessionID)
if execExists != nil {
return errors.Wrapf(define.ErrExecSessionExists, "an exec session with ID %s already exists", session.ID())
}

if err := execBucket.Put(sessionID, ctrID); err != nil {
return errors.Wrapf(err, "error adding exec session %s to DB", session.ID())
}

if err := ctrExecSessionBucket.Put(sessionID, ctrID); err != nil {
return errors.Wrapf(err, "error adding exec session %s to container %s in DB", session.ID(), ctr.ID())
}

return nil
})
return err
}

// GetExecSession returns the ID of the container an exec session is associated
// with.
func (s *BoltState) GetExecSession(id string) (string, error) {
if !s.valid {
return "", define.ErrDBClosed
}

if id == "" {
return "", define.ErrEmptyID
}

db, err := s.getDBCon()
if err != nil {
return "", err
}
defer s.deferredCloseDBCon(db)

ctrID := ""
err = db.View(func(tx *bolt.Tx) error {
execBucket, err := getExecBucket(tx)
if err != nil {
return err
}

ctr := execBucket.Get([]byte(id))
if ctr == nil {
return errors.Wrapf(define.ErrNoSuchExecSession, "no exec session with ID %s found", id)
}
ctrID = string(ctr)
return nil
})
return ctrID, err
}

// RemoveExecSession removes references to the given exec session in the
// database.
func (s *BoltState) RemoveExecSession(session *ExecSession) error {
if !s.valid {
return define.ErrDBClosed
}

db, err := s.getDBCon()
if err != nil {
return err
}
defer s.deferredCloseDBCon(db)

sessionID := []byte(session.ID())
containerID := []byte(session.ContainerID())
err = db.Update(func(tx *bolt.Tx) error {
execBucket, err := getExecBucket(tx)
if err != nil {
return err
}
ctrBucket, err := getCtrBucket(tx)
if err != nil {
return err
}

sessionExists := execBucket.Get(sessionID)
if sessionExists == nil {
return define.ErrNoSuchExecSession
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrap with id?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

up to you i guess, if the caller wraps it, then ok too

}
// Check that container ID matches
if string(sessionExists) != session.ContainerID() {
return errors.Wrapf(define.ErrInternal, "database inconsistency: exec session %s points to container %s in state but %s in database", session.ID(), session.ContainerID(), string(sessionExists))
}

if err := execBucket.Delete(sessionID); err != nil {
return errors.Wrapf(err, "error removing exec session %s from database", session.ID())
}

dbCtr := ctrBucket.Bucket(containerID)
if dbCtr == nil {
// State is inconsistent. We refer to a container that
// is no longer in the state.
// Return without error, to attempt to recover.
return nil
}

ctrExecBucket := dbCtr.Bucket(execBkt)
if ctrExecBucket == nil {
// Again, state is inconsistent. We should have an exec
// bucket, and it should have this session.
// Again, nothing we can do, so proceed and try to
// recover.
return nil
}

ctrSessionExists := ctrExecBucket.Get(sessionID)
if ctrSessionExists != nil {
if err := ctrExecBucket.Delete(sessionID); err != nil {
return errors.Wrapf(err, "error removing exec session %s from container %s in database", session.ID(), session.ContainerID())
}
}

return nil
})
return err
}

// GetContainerExecSessions retrieves the IDs of all exec sessions running in a
// container that the database is aware of (IE, were added via AddExecSession).
func (s *BoltState) GetContainerExecSessions(ctr *Container) ([]string, error) {
if !s.valid {
return nil, define.ErrDBClosed
}

if !ctr.valid {
return nil, define.ErrCtrRemoved
}

db, err := s.getDBCon()
if err != nil {
return nil, err
}
defer s.deferredCloseDBCon(db)

ctrID := []byte(ctr.ID())
sessions := []string{}
err = db.View(func(tx *bolt.Tx) error {
ctrBucket, err := getCtrBucket(tx)
if err != nil {
return err
}

dbCtr := ctrBucket.Bucket(ctrID)
if dbCtr == nil {
ctr.valid = false
return define.ErrNoSuchCtr
}

ctrExecSessions := dbCtr.Bucket(execBkt)
if ctrExecSessions == nil {
return nil
}

return ctrExecSessions.ForEach(func(id, unused []byte) error {
sessions = append(sessions, string(id))
return nil
})
})
if err != nil {
return nil, err
}

return sessions, nil
}

// RemoveContainerExecSessions removes all exec sessions attached to a given
// container.
func (s *BoltState) RemoveContainerExecSessions(ctr *Container) error {
if !s.valid {
return define.ErrDBClosed
}

if !ctr.valid {
return define.ErrCtrRemoved
}

db, err := s.getDBCon()
if err != nil {
return err
}
defer s.deferredCloseDBCon(db)

ctrID := []byte(ctr.ID())
sessions := []string{}

err = db.Update(func(tx *bolt.Tx) error {
execBucket, err := getExecBucket(tx)
if err != nil {
return err
}
ctrBucket, err := getCtrBucket(tx)
if err != nil {
return err
}

dbCtr := ctrBucket.Bucket(ctrID)
if dbCtr == nil {
ctr.valid = false
return define.ErrNoSuchCtr
}

ctrExecSessions := dbCtr.Bucket(execBkt)
if ctrExecSessions == nil {
return nil
}

err = ctrExecSessions.ForEach(func(id, unused []byte) error {
sessions = append(sessions, string(id))
return nil
})
if err != nil {
return err
}

for _, session := range sessions {
if err := ctrExecSessions.Delete([]byte(session)); err != nil {
return errors.Wrapf(err, "error removing container %s exec session %s from database", ctr.ID(), session)
}
// Check if the session exists in the global table
// before removing. It should, but in cases where the DB
// has become inconsistent, we should try and proceed
// so we can recover.
sessionExists := execBucket.Get([]byte(session))
if sessionExists == nil {
continue
}
if string(sessionExists) != ctr.ID() {
return errors.Wrapf(define.ErrInternal, "database mismatch: exec session %s is associated with containers %s and %s", session, ctr.ID(), string(sessionExists))
}
if err := execBucket.Delete([]byte(session)); err != nil {
return errors.Wrapf(err, "error removing container %s exec session %s from exec sessions", ctr.ID(), session)
}
}

return nil
})
return err
}

// RewriteContainerConfig rewrites a container's configuration.
// WARNING: This function is DANGEROUS. Do not use without reading the full
// comment on this function in state.go.
Expand Down
Loading