Skip to content

Commit

Permalink
Merge pull request #102 from BSWANG/master
Browse files Browse the repository at this point in the history
fix golangci-lint issues & popResult starve
  • Loading branch information
BSWANG authored Jun 2, 2020
2 parents ba44188 + 4217554 commit f322e8c
Show file tree
Hide file tree
Showing 16 changed files with 130 additions and 70 deletions.
12 changes: 12 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,22 @@ jobs:
command: |
find . -name \*.md | grep -v '^./vendor' | grep -v ./README-zh_C | xargs mdspell --ignore-numbers --ignore-acronyms --en-us -r -x
golangci-lint:
docker:
- image: golangci/golangci-lint:v1.27.0
working_directory: /go/src/github.com/AliyunContainerService/terway
steps:
- checkout
- run:
name: golangci-lint
command: |
golangci-lint run -v
workflows:
version: 2
ci:
jobs:
- golangci-lint
- build
- code-check
- markdown-spellcheck
9 changes: 6 additions & 3 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,10 @@ func (networkService *networkService) AllocIP(grpcContext context.Context, r *rp
networkContext.Log().Warnf("error cleanup allocated network resource %s, %s: %v", res.ID, res.Type, err)
continue
}
mgr.Release(networkContext, res.ID)
err = mgr.Release(networkContext, res.ID)
if err != nil {
networkContext.Log().Infof("rollback res error: %+v", err)
}
}
} else {
networkContext.Log().Infof("alloc result: %+v", allocIPReply)
Expand Down Expand Up @@ -507,8 +510,8 @@ func (networkService *networkService) startGarbageCollectionLoop() {
}
for _, res := range resRelate.Resources {
if _, ok := inUseSet[res.Type]; !ok {
inUseSet[res.Type] = make(map[string]interface{}, 0)
expireSet[res.Type] = make(map[string]interface{}, 0)
inUseSet[res.Type] = make(map[string]interface{})
expireSet[res.Type] = make(map[string]interface{})
}
// already in use by others
if _, ok := inUseSet[res.Type][res.ID]; ok {
Expand Down
14 changes: 14 additions & 0 deletions daemon/eni-multi-ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ func (f *eniIPFactory) initialENI(eni *ENI) {
}

eni.lock.Lock()
extraAlloc := eni.pending - len(ips)
for _, ip := range ips {
eniip := &types.ENIIP{
Eni: eni.ENI,
Expand All @@ -421,6 +422,19 @@ func (f *eniIPFactory) initialENI(eni *ENI) {
err: nil,
}
}
for i := 0; i < extraAlloc; i++ {
select {
case eni.ipBacklog <- struct{}{}:
default:
f.ipResultChan <- &ENIIP{
ENIIP: &types.ENIIP{
Eni: nil,
},
err: errors.Errorf("need retry to allocate eni ip: %v", err),
}
}

}

eni.lock.Unlock()
go eni.allocateWorker(f.ipResultChan)
Expand Down
1 change: 0 additions & 1 deletion daemon/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@ func convertPod(daemonMode string, pod *corev1.Pod) *podInfo {
switch strings.ToLower(pod.OwnerReferences[0].Kind) {
case "statefulset":
pi.IPStickTime = defaultStickTimeForSts
break
}
}

Expand Down
6 changes: 4 additions & 2 deletions daemon/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ func Run(pidFilePath, socketFilePath, debugSocketListen, configFilePath, kubecon
}

if _, err := os.Stat(path.Dir(pidFilePath)); err != nil && os.IsNotExist(err) {
os.MkdirAll(path.Dir(pidFilePath), 0666)
if err = os.MkdirAll(path.Dir(pidFilePath), 0666); err != nil {
return fmt.Errorf("error create pid file: %+v", err)
}
}
if err := ioutil.WriteFile(pidFilePath, []byte(fmt.Sprintf("%d", os.Getpid())), 0644); err != nil {
return fmt.Errorf("error writing pidfile %q: %v", pidFilePath, err)
Expand Down Expand Up @@ -126,7 +128,7 @@ func runDebugServer(debugSocketListen string) error {
err error
)
if strings.HasPrefix(debugSocketListen, "unix://") {
debugSocketListen = strings.TrimLeft(debugSocketListen, "unix://")
debugSocketListen = strings.TrimPrefix(debugSocketListen, "unix://")
if err := os.MkdirAll(filepath.Dir(debugSocketListen), 0700); err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion daemon/veth.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ type vethResourceManager struct {
}

func (*vethResourceManager) Allocate(context *networkContext, prefer string) (types.NetworkResource, error) {
vethName, _ := link.VethNameForPod(context.pod.Name, context.pod.Namespace, defaultPrefix)
return &types.Veth{
HostVeth: link.VethNameForPod(context.pod.Name, context.pod.Namespace, defaultPrefix),
HostVeth: vethName,
}, nil
}

Expand Down
49 changes: 33 additions & 16 deletions deviceplugin/eni.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,23 @@ func NewEniDevicePlugin(count int) *EniDevicePlugin {
}

// dial establishes the gRPC communication with the registered device plugin.
func dial(unixSocketPath string, timeout time.Duration) (*grpc.ClientConn, error) {
c, err := grpc.Dial(unixSocketPath, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithTimeout(timeout),
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
func dial(unixSocketPath string, timeout time.Duration) (*grpc.ClientConn, func(), error) {
timeoutCtx, cancel := context.WithTimeout(context.Background(), timeout)
c, err := grpc.DialContext(timeoutCtx, unixSocketPath, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
}),
)

if err != nil {
return nil, err
cancel()
return nil, nil, err
}

return c, nil
return c, func() {
err = c.Close()
cancel()
}, nil
}

// Start starts the gRPC server of the device plugin
Expand All @@ -82,15 +86,19 @@ func (m *EniDevicePlugin) Start() error {
pluginapi.RegisterDevicePluginServer(m.server, m)

m.stop = make(chan struct{}, 1)
go m.server.Serve(sock)
go func() {
err := m.server.Serve(sock)
if err != nil {
log.Errorf("error start device plugin server, %+v", err)
}
}()

// Wait for server to start by launching a blocking connection
conn, err := dial(m.socket, 5*time.Second)
_, closeConn, err := dial(m.socket, 5*time.Second)
if err != nil {
return err
}
conn.Close()

closeConn()
return nil
}

Expand Down Expand Up @@ -119,11 +127,11 @@ func (m *EniDevicePlugin) Stop() error {

// Register registers the device plugin for the given resourceName with Kubelet.
func (m *EniDevicePlugin) Register(request pluginapi.RegisterRequest) error {
conn, err := dial(pluginapi.KubeletSocket, 5*time.Second)
conn, closeConn, err := dial(pluginapi.KubeletSocket, 5*time.Second)
if err != nil {
return err
}
defer conn.Close()
defer closeConn()

client := pluginapi.NewRegistrationClient(conn)

Expand All @@ -140,7 +148,10 @@ func (m *EniDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlu
for i := 0; i < m.count; i++ {
devs = append(devs, &pluginapi.Device{ID: fmt.Sprintf("eni-%d", i), Health: pluginapi.Healthy})
}
s.Send(&pluginapi.ListAndWatchResponse{Devices: devs})
err := s.Send(&pluginapi.ListAndWatchResponse{Devices: devs})
if err != nil {
return err
}
ticker := time.NewTicker(time.Second * 5)
for {
select {
Expand Down Expand Up @@ -223,8 +234,11 @@ func (m *EniDevicePlugin) watchKubeletRestart() {
}
if os.IsNotExist(err) {
log.Infof("device plugin socket %s removed, restarting.", m.socket)
m.Stop()
err := m.Start()
err := m.Stop()
if err != nil {
log.Errorf("stop current device plugin server with error: %v", err)
}
err = m.Start()
if err != nil {
log.Fatalf("error restart device plugin after kubelet restart %+v", err)
}
Expand Down Expand Up @@ -263,7 +277,10 @@ func (m *EniDevicePlugin) Serve(resourceName string) error {
)
if err != nil {
log.Errorf("Could not register device plugin: %v", err)
m.Stop()
err := m.Stop()
if err != nil {
log.Errorf("stop current device plugin server with error: %v", err)
}
return err
}
log.Infof("Registered device plugin with Kubelet")
Expand Down
4 changes: 3 additions & 1 deletion pkg/aliyun/ecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ func (e *ecsImpl) AllocateENI(vSwitch string, securityGroup string, instanceID s
eniDestroy := &types.ENI{
ID: createNetworkInterfaceResponse.NetworkInterfaceId,
}
e.destroyInterface(eniDestroy.ID, instanceID, true)
if err = e.destroyInterface(eniDestroy.ID, instanceID, true); err != nil {
logrus.Errorf("error rollback interface, may cause eni leak: %+v", err)
}
}
}()

Expand Down
9 changes: 6 additions & 3 deletions pkg/link/veth.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ import (

// VethNameForPod return host-side veth name for pod
// max veth length is 15
func VethNameForPod(name, namespace, prefix string) string {
func VethNameForPod(name, namespace, prefix string) (string, error) {
// A SHA1 is always 20 bytes long, and so is sufficient for generating the
// veth name and mac addr.
h := sha1.New()
h.Write([]byte(namespace + "." + name))
return fmt.Sprintf("%s%s", prefix, hex.EncodeToString(h.Sum(nil))[:11])
_, err := h.Write([]byte(namespace + "." + name))
if err != nil {
return "", err
}
return fmt.Sprintf("%s%s", prefix, hex.EncodeToString(h.Sum(nil))[:11]), nil
}
2 changes: 1 addition & 1 deletion pkg/link/veth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package link
import "testing"

func TestVethNameForPod(t *testing.T) {
if veth := VethNameForPod("client-b6989bf87-2bgtc", "default", "cali"); veth != "calic95a4947e07" {
if veth, _ := VethNameForPod("client-b6989bf87-2bgtc", "default", "cali"); veth != "calic95a4947e07" {
t.Fatalf("veth name failed: expect: %s, actual: %s", "calic95a4947e07", veth)
}
}
27 changes: 8 additions & 19 deletions pkg/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,14 @@ type ObjectFactory interface {
}

type simpleObjectPool struct {
inuse map[string]poolItem
idle *priorityQeueu // Todo: Fix this typo
lock sync.Mutex
factory ObjectFactory
maxIdle int
minIdle int
capacity int
maxBackoff time.Duration
notifyCh chan interface{}
inuse map[string]poolItem
idle *priorityQeueu // Todo: Fix this typo
lock sync.Mutex
factory ObjectFactory
maxIdle int
minIdle int
capacity int
notifyCh chan interface{}
// concurrency to create resource. tokenCh = capacity - (idle + inuse + dispose)
tokenCh chan struct{}
backoffTime time.Duration
Expand Down Expand Up @@ -161,16 +160,6 @@ func queueKeys(q *priorityQeueu) string {
return strings.Join(keys, ", ")
}

func (p *simpleObjectPool) dispose(res types.NetworkResource) {
log.Infof("try dispose res %+v", res)
if err := p.factory.Dispose(res); err != nil {
//put it back on dispose fail
log.Warnf("failed dispose %s: %v, put it back to idle", res.GetResourceID(), err)
} else {
p.tokenCh <- struct{}{}
}
}

func (p *simpleObjectPool) tooManyIdleLocked() bool {
return p.idle.Size() > p.maxIdle || (p.idle.Size() > 0 && p.sizeLocked() > p.capacity)
}
Expand Down
18 changes: 12 additions & 6 deletions pkg/pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,16 +227,22 @@ func TestRelease(t *testing.T) {
n5, _ := pool.Acquire(context.Background(), "", "")
n6, _ := pool.Acquire(context.Background(), "", "")
assert.Equal(t, 3, factory.getTotalCreated())
pool.Release(n1.GetResourceID())
pool.Release(n2.GetResourceID())
pool.Release(n3.GetResourceID())
err := pool.Release(n1.GetResourceID())
assert.Equal(t, err, nil)
err = pool.Release(n2.GetResourceID())
assert.Equal(t, err, nil)
err = pool.Release(n3.GetResourceID())
assert.Equal(t, err, nil)
time.Sleep(1 * time.Second)
assert.Equal(t, 0, factory.getTotalDisposed())
pool.Release(n4.GetResourceID())
pool.Release(n5.GetResourceID())
err = pool.Release(n4.GetResourceID())
assert.Equal(t, err, nil)
err = pool.Release(n5.GetResourceID())
assert.Equal(t, err, nil)
time.Sleep(1 * time.Second)
assert.Equal(t, 0, factory.getTotalDisposed())
pool.Release(n6.GetResourceID())
err = pool.Release(n6.GetResourceID())
assert.Equal(t, err, nil)
time.Sleep(1 * time.Second)
assert.Equal(t, 1, factory.getTotalDisposed())
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,10 @@ func (d *DiskStorage) load() error {
if err != nil {
return err
}
d.memory.Put(string(k), obj)
err = d.memory.Put(string(k), obj)
if err != nil {
return err
}
}
return nil
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/tc/tc.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const latencyInMillis = 25

// SetRule set the traffic rule on interface
func SetRule(dev netlink.Link, rule *TrafficShapingRule) error {
if rule.Rate < 0 {
if rule.Rate <= 0 {
return fmt.Errorf("invalid rate %d", rule.Rate)
}

Expand Down
12 changes: 8 additions & 4 deletions plugin/driver/raw_nic.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ func (r *rawNicDriver) Setup(hostVeth string,
return errors.Wrapf(err, "NicDriver, cannot found spec nic link")
}
hostCurrentNs, err := ns.GetCurrentNS()
defer hostCurrentNs.Close()
defer func() {
err = hostCurrentNs.Close()
}()
if err != nil {
return errors.Wrapf(err, "NicDriver, cannot get host netns")
}
Expand All @@ -44,7 +46,7 @@ func (r *rawNicDriver) Setup(hostVeth string,

defer func() {
if err != nil {
netNS.Do(func(netNS ns.NetNS) error {
err = netNS.Do(func(netNS ns.NetNS) error {
nicLink, err = netlink.LinkByName(containerVeth)
if err == nil {
nicName, err1 := r.randomNicName()
Expand All @@ -62,7 +64,7 @@ func (r *rawNicDriver) Setup(hostVeth string,
nicLink, err = netlink.LinkByName(nicLink.Attrs().Name)
}
if err == nil {
netlink.LinkSetDown(nicLink)
err = netlink.LinkSetDown(nicLink)
return netlink.LinkSetNsFd(nicLink, int(hostCurrentNs.Fd()))
}
return err
Expand Down Expand Up @@ -159,7 +161,9 @@ func (r *rawNicDriver) Teardown(hostVeth string,
containerIP net.IP) error {
// 1. move link out
hostCurrentNs, err := ns.GetCurrentNS()
defer hostCurrentNs.Close()
defer func() {
err = hostCurrentNs.Close()
}()
if err != nil {
return errors.Wrapf(err, "NicDriver, cannot get host netns")
}
Expand Down
Loading

0 comments on commit f322e8c

Please sign in to comment.