Skip to content

Commit

Permalink
record pod metadata and allocationTime in IP allocation state file (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
M00nF1sh authored and sushrk committed Jun 17, 2022
1 parent 61edde1 commit cf97e60
Show file tree
Hide file tree
Showing 6 changed files with 230 additions and 70 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/coreos/go-iptables v0.4.5
github.com/golang/mock v1.4.1
github.com/golang/protobuf v1.4.3
github.com/google/go-cmp v0.5.2
github.com/google/go-jsonnet v0.16.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.7.1
Expand Down Expand Up @@ -41,7 +42,6 @@ require (
github.com/go-logr/logr v0.3.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/google/go-cmp v0.5.2 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/google/uuid v1.1.2 // indirect
github.com/googleapis/gnostic v0.5.1 // indirect
Expand All @@ -66,6 +66,7 @@ require (
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e // indirect
golang.org/x/tools v0.1.5 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
gomodules.xyz/jsonpatch/v2 v2.1.0 // indirect
google.golang.org/appengine v1.6.6 // indirect
google.golang.org/genproto v0.0.0-20201110150050-8816d57aaa9a // indirect
Expand Down
39 changes: 29 additions & 10 deletions pkg/cri/cri.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,29 @@ const (
dockerSocketPath = "unix:///var/run/dockershim.sock"
)

// PodSandboxMetadata contains metadata about pod sandboxes.
type PodSandboxMetadata struct {
// Pod's namespace
Namespace string
// Pod's name
Name string
}

// SandboxInfo provides container information
type SandboxInfo struct {
// Pod sandbox's ID.
ID string
IP string
// Pod sandbox's IP addresses
IPs []string
// Timestamp(in nanoSec) when sandbox was created
CreationTimestamp int64
// Pod sandbox's metadata if any.
Metadata *PodSandboxMetadata
}

// APIs is the CRI interface
type APIs interface {
GetRunningPodSandboxes(log logger.Logger) ([]*SandboxInfo, error)
GetRunningPodSandboxes(log logger.Logger) ([]SandboxInfo, error)
}

// Client is an empty struct
Expand All @@ -48,7 +62,7 @@ func New() *Client {
}

//GetRunningPodSandboxes get running sandboxIDs
func (c *Client) GetRunningPodSandboxes(log logger.Logger) ([]*SandboxInfo, error) {
func (c *Client) GetRunningPodSandboxes(log logger.Logger) ([]SandboxInfo, error) {
ctx := context.TODO()

socketPath := dockerSocketPath
Expand Down Expand Up @@ -76,7 +90,7 @@ func (c *Client) GetRunningPodSandboxes(log logger.Logger) ([]*SandboxInfo, erro
return nil, err
}

sandboxInfos := make([]*SandboxInfo, 0, len(sandboxes.GetItems()))
sandboxInfos := make([]SandboxInfo, 0, len(sandboxes.GetItems()))
for _, sandbox := range sandboxes.GetItems() {
status, err := client.PodSandboxStatus(ctx, &runtimeapi.PodSandboxStatusRequest{
PodSandboxId: sandbox.GetId(),
Expand All @@ -99,14 +113,19 @@ func (c *Client) GetRunningPodSandboxes(log logger.Logger) ([]*SandboxInfo, erro
for _, ip := range status.GetStatus().GetNetwork().GetAdditionalIps() {
ips = append(ips, ip.GetIp())
}

for _, ip := range ips {
info := SandboxInfo{
ID: sandbox.GetId(),
IP: ip,
sandBoxInfo := SandboxInfo{
ID: sandbox.GetId(),
IPs: ips,
CreationTimestamp: status.GetStatus().GetCreatedAt(),
}
if metadata := status.GetStatus().GetMetadata(); metadata != nil {
sandBoxInfo.Metadata = &PodSandboxMetadata{
Namespace: metadata.GetNamespace(),
Name: metadata.GetName(),
}
sandboxInfos = append(sandboxInfos, &info)
}

sandboxInfos = append(sandboxInfos, sandBoxInfo)
}
return sandboxInfos, nil
}
101 changes: 67 additions & 34 deletions pkg/ipamd/datastore/data_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ func (k IPAMKey) String() string {
return fmt.Sprintf("%s/%s/%s", k.NetworkName, k.ContainerID, k.IfName)
}

// IPAMMetadata is the metadata associated with IP allocations.
type IPAMMetadata struct {
K8SPodNamespace string `json:"k8sPodNamespace,omitempty"`
K8SPodName string `json:"k8sPodName,omitempty"`
}

// ENI represents a single ENI. Exported fields will be marshaled for introspection.
type ENI struct {
// AWS ENI ID
Expand All @@ -170,8 +176,11 @@ type ENI struct {

// AddressInfo contains information about an IP, Exported fields will be marshaled for introspection.
type AddressInfo struct {
Address string

IPAMKey IPAMKey
Address string
IPAMMetadata IPAMMetadata
AssignedTime time.Time
UnassignedTime time.Time
}

Expand Down Expand Up @@ -360,8 +369,10 @@ type CheckpointData struct {
// in checkpoints.
type CheckpointEntry struct {
IPAMKey
IPv4 string `json:"ipv4,omitempty"`
IPv6 string `json:"ipv6,omitempty"`
IPv4 string `json:"ipv4,omitempty"`
IPv6 string `json:"ipv6,omitempty"`
AllocationTimestamp int64 `json:"allocationTimestamp"`
Metadata IPAMMetadata `json:"metadata"`
}

// ReadBackingStore initialises the IP allocation state from the
Expand All @@ -383,21 +394,35 @@ func (ds *DataStore) ReadBackingStore(isv6Enabled bool) error {
entries := make([]CheckpointEntry, 0, len(sandboxes))
for _, s := range sandboxes {
ds.log.Debugf("Adding container ID: %v", s.ID)
if isv6Enabled {
ipv6Addr = s.IP
} else {
ipv4Addr = s.IP

metadata := IPAMMetadata{}
// both containerd and dockershim populates the metadata, just be cautious to have this null check.
if s.Metadata != nil {
metadata.K8SPodNamespace = s.Metadata.Namespace
metadata.K8SPodName = s.Metadata.Name
}

// note: ideally each sandbox should only contain one IP only,
// looping through them here is just to keep legacy code's behavior.
for _, ip := range s.IPs {
if isv6Enabled {
ipv6Addr = ip
} else {
ipv4Addr = ip
}
entries = append(entries, CheckpointEntry{
// NB: These Backfill values are also assumed in UnassignPodIPAddress
IPAMKey: IPAMKey{
NetworkName: backfillNetworkName,
ContainerID: s.ID,
IfName: backfillNetworkIface,
},
IPv4: ipv4Addr,
IPv6: ipv6Addr,
AllocationTimestamp: s.CreationTimestamp,
Metadata: metadata,
})
}
entries = append(entries, CheckpointEntry{
// NB: These Backfill values are also assumed in UnassignPodIPAddress
IPAMKey: IPAMKey{
NetworkName: backfillNetworkName,
ContainerID: s.ID,
IfName: backfillNetworkIface,
},
IPv4: ipv4Addr,
IPv6: ipv6Addr,
})
}
data = CheckpointData{
Version: CheckpointFormatVersion,
Expand Down Expand Up @@ -455,7 +480,7 @@ func (ds *DataStore) ReadBackingStore(isv6Enabled bool) error {
}
addr := &AddressInfo{Address: ipAddr.String()}
cidr.IPAddresses[ipAddr.String()] = addr
ds.assignPodIPAddressUnsafe(allocation.IPAMKey, eni, addr)
ds.assignPodIPAddressUnsafe(addr, allocation.IPAMKey, allocation.Metadata, time.Unix(0, allocation.AllocationTimestamp))
ds.log.Debugf("Recovered %s => %s/%s", allocation.IPAMKey, eni.ID, addr.Address)
//Update prometheus for ips per cidr
//Secondary IP mode will have /32:1 and Prefix mode will have /28:<number of /32s>
Expand Down Expand Up @@ -493,8 +518,10 @@ func (ds *DataStore) writeBackingStoreUnsafe() error {
for _, addr := range assignedAddr.IPAddresses {
if addr.Assigned() {
entry := CheckpointEntry{
IPAMKey: addr.IPAMKey,
IPv4: addr.Address,
IPAMKey: addr.IPAMKey,
IPv4: addr.Address,
AllocationTimestamp: addr.AssignedTime.UnixNano(),
Metadata: addr.IPAMMetadata,
}
allocations = append(allocations, entry)
}
Expand All @@ -505,8 +532,10 @@ func (ds *DataStore) writeBackingStoreUnsafe() error {
for _, addr := range assignedAddr.IPAddresses {
if addr.Assigned() {
entry := CheckpointEntry{
IPAMKey: addr.IPAMKey,
IPv6: addr.Address,
IPAMKey: addr.IPAMKey,
IPv6: addr.Address,
AllocationTimestamp: addr.AssignedTime.UnixNano(),
Metadata: addr.IPAMMetadata,
}
allocations = append(allocations, entry)
}
Expand Down Expand Up @@ -677,19 +706,19 @@ func (ds *DataStore) AddIPv6CidrToStore(eniID string, ipv6Cidr net.IPNet, isPref
return nil
}

func (ds *DataStore) AssignPodIPAddress(ipamKey IPAMKey, isIPv4Enabled bool, isIPv6Enabled bool) (ipv4Address string,
func (ds *DataStore) AssignPodIPAddress(ipamKey IPAMKey, ipamMetadata IPAMMetadata, isIPv4Enabled bool, isIPv6Enabled bool) (ipv4Address string,
ipv6Address string, deviceNumber int, err error) {
//Currently it's either v4 or v6. Dual Stack mode isn't supported.
if isIPv4Enabled {
ipv4Address, deviceNumber, err = ds.AssignPodIPv4Address(ipamKey)
ipv4Address, deviceNumber, err = ds.AssignPodIPv4Address(ipamKey, ipamMetadata)
} else if isIPv6Enabled {
ipv6Address, deviceNumber, err = ds.AssignPodIPv6Address(ipamKey)
ipv6Address, deviceNumber, err = ds.AssignPodIPv6Address(ipamKey, ipamMetadata)
}
return ipv4Address, ipv6Address, deviceNumber, err
}

// AssignPodIPv6Address assigns an IPv6 address to pod. Returns the assigned IPv6 address along with device number
func (ds *DataStore) AssignPodIPv6Address(ipamKey IPAMKey) (ipv6Address string, deviceNumber int, err error) {
func (ds *DataStore) AssignPodIPv6Address(ipamKey IPAMKey, ipamMetadata IPAMMetadata) (ipv6Address string, deviceNumber int, err error) {
ds.lock.Lock()
defer ds.lock.Unlock()

Expand Down Expand Up @@ -723,7 +752,7 @@ func (ds *DataStore) AssignPodIPv6Address(ipamKey IPAMKey) (ipv6Address string,
addr := &AddressInfo{Address: ipv6Address}
V6Cidr.IPAddresses[ipv6Address] = addr

ds.assignPodIPAddressUnsafe(ipamKey, eni, addr)
ds.assignPodIPAddressUnsafe(addr, ipamKey, ipamMetadata, time.Now())
if err := ds.writeBackingStoreUnsafe(); err != nil {
ds.log.Warnf("Failed to update backing store: %v", err)
// Important! Unwind assignment
Expand All @@ -740,7 +769,7 @@ func (ds *DataStore) AssignPodIPv6Address(ipamKey IPAMKey) (ipv6Address string,

// AssignPodIPv4Address assigns an IPv4 address to pod
// It returns the assigned IPv4 address, device number, error
func (ds *DataStore) AssignPodIPv4Address(ipamKey IPAMKey) (ipv4address string, deviceNumber int, err error) {
func (ds *DataStore) AssignPodIPv4Address(ipamKey IPAMKey, ipamMetadata IPAMMetadata) (ipv4address string, deviceNumber int, err error) {
ds.lock.Lock()
defer ds.lock.Unlock()

Expand Down Expand Up @@ -785,7 +814,7 @@ func (ds *DataStore) AssignPodIPv4Address(ipamKey IPAMKey) (ipv4address string,
}

availableCidr.IPAddresses[strPrivateIPv4] = addr
ds.assignPodIPAddressUnsafe(ipamKey, eni, addr)
ds.assignPodIPAddressUnsafe(addr, ipamKey, ipamMetadata, time.Now())

if err := ds.writeBackingStoreUnsafe(); err != nil {
ds.log.Warnf("Failed to update backing store: %v", err)
Expand All @@ -806,23 +835,24 @@ func (ds *DataStore) AssignPodIPv4Address(ipamKey IPAMKey) (ipv4address string,
return "", -1, errors.New("assignPodIPv4AddressUnsafe: no available IP/Prefix addresses")
}

// It returns the assigned IPv4 address, device number
func (ds *DataStore) assignPodIPAddressUnsafe(ipamKey IPAMKey, eni *ENI, addr *AddressInfo) (string, int) {
// assignPodIPAddressUnsafe mark Address as assigned.
func (ds *DataStore) assignPodIPAddressUnsafe(addr *AddressInfo, ipamKey IPAMKey, ipamMetadata IPAMMetadata, assignedTime time.Time) {
ds.log.Infof("AssignPodIPv4Address: Assign IP %v to sandbox %s",
addr.Address, ipamKey)

if addr.Assigned() {
panic("addr already assigned")
}
addr.IPAMKey = ipamKey // This marks the addr as assigned
addr.IPAMMetadata = ipamMetadata
addr.AssignedTime = assignedTime

ds.assigned++
// Prometheus gauge
assignedIPs.Set(float64(ds.assigned))

return addr.Address, eni.DeviceNumber
}

// unassignPodIPAddressUnsafe mark Address as unassigned.
func (ds *DataStore) unassignPodIPAddressUnsafe(addr *AddressInfo) {
if !addr.Assigned() {
// Already unassigned
Expand All @@ -831,6 +861,7 @@ func (ds *DataStore) unassignPodIPAddressUnsafe(addr *AddressInfo) {
ds.log.Infof("UnAssignPodIPAddress: Unassign IP %v from sandbox %s",
addr.Address, addr.IPAMKey)
addr.IPAMKey = IPAMKey{} // unassign the addr
addr.IPAMMetadata = IPAMMetadata{}
ds.assigned--
// Prometheus gauge
assignedIPs.Set(float64(ds.assigned))
Expand Down Expand Up @@ -1179,10 +1210,12 @@ func (ds *DataStore) UnassignPodIPAddress(ipamKey IPAMKey) (e *ENI, ip string, d
return nil, "", 0, ErrUnknownPod
}

originalIPAMMetadata := addr.IPAMMetadata
originalAssignedTime := addr.AssignedTime
ds.unassignPodIPAddressUnsafe(addr)
if err := ds.writeBackingStoreUnsafe(); err != nil {
// Unwind un-assignment
ds.assignPodIPAddressUnsafe(ipamKey, eni, addr)
ds.assignPodIPAddressUnsafe(addr, ipamKey, originalIPAMMetadata, originalAssignedTime)
return nil, "", 0, err
}
addr.UnassignedTime = time.Now()
Expand Down
Loading

0 comments on commit cf97e60

Please sign in to comment.