From 82063bcf17a38e506d0c7069489d27ed386ada4e Mon Sep 17 00:00:00 2001 From: Shaun Crampton Date: Tue, 27 Jun 2017 16:41:33 +0100 Subject: [PATCH] Acquire the iptables lock before running iptables-restore. - Copy and adapt kube-proxy's iptables lock implementation. - Implement a shared iptables lock that allows for writes to different tables (which are orthogonal). - Add a test executable that grabs the lock for a specified length of time along with FV tests that test correct blocking of iptables -w. These are the Felix changes required for #1470. --- Makefile | 19 ++- README.md | 19 ++- config/config_params.go | 14 +- config/config_params_test.go | 8 + config/param_types.go | 14 ++ felix.go | 5 + fv/.gitignore | 1 + fv/fv.go | 16 ++ fv/fv_suite_test.go | 35 +++++ fv/iptables-locker/iptables-locker.go | 55 +++++++ fv/iptables_lock_test.go | 130 ++++++++++++++++ health/health_test.go | 1 + intdataplane/int_dataplane.go | 138 ++++++++++------- iptables/lock.go | 209 ++++++++++++++++++++++++++ iptables/lock_test.go | 175 +++++++++++++++++++++ iptables/rules_test.go | 3 + iptables/table.go | 9 ++ iptables/table_test.go | 42 ++++++ k8sfv/calc_graph_test.go | 3 +- 19 files changed, 835 insertions(+), 61 deletions(-) create mode 100644 fv/.gitignore create mode 100644 fv/fv.go create mode 100644 fv/fv_suite_test.go create mode 100644 fv/iptables-locker/iptables-locker.go create mode 100644 fv/iptables_lock_test.go create mode 100644 iptables/lock.go create mode 100644 iptables/lock_test.go diff --git a/Makefile b/Makefile index 6657b29985..f61ab12fac 100644 --- a/Makefile +++ b/Makefile @@ -78,7 +78,7 @@ help: .SUFFIXES: all: deb rpm calico/felix -test: ut +test: ut fv GO_BUILD_CONTAINER?=calico/go-build:v0.6 @@ -292,6 +292,12 @@ bin/calico-felix: $(FELIX_GO_FILES) vendor/.up-to-date ( ldd bin/calico-felix 2>&1 | grep -q "Not a valid dynamic program" || \ ( echo "Error: bin/calico-felix was not statically linked"; false ) )' +bin/iptables-locker: $(FELIX_GO_FILES) vendor/.up-to-date + @echo Building iptables-locker... + mkdir -p bin + $(DOCKER_GO_BUILD) \ + sh -c 'go build -v -i -o $@ -v $(LDFLAGS) "github.com/projectcalico/felix/fv/iptables-locker"' + bin/k8sfv.test: $(K8SFV_GO_FILES) vendor/.up-to-date @echo Building $@... $(DOCKER_GO_BUILD) \ @@ -324,6 +330,16 @@ ut combined.coverprofile: vendor/.up-to-date $(FELIX_GO_FILES) @echo Running Go UTs. $(DOCKER_GO_BUILD) ./utils/run-coverage +fv/fv.test: vendor/.up-to-date $(FELIX_GO_FILES) + $(DOCKER_GO_BUILD) go test ./fv -c --tags fvtests -o fv/fv.test + +.PHONY: fv +fv: calico/felix bin/iptables-locker fv/fv.test + @echo Running Go FVs. + # For now, we pre-build the binary so that we can run it outside a container and allow it + # to interact with docker. + cd fv && ./fv.test + bin/check-licenses: $(FELIX_GO_FILES) $(DOCKER_GO_BUILD) go build -v -i -o $@ "github.com/projectcalico/felix/check-licenses" @@ -403,6 +419,7 @@ clean: docker-image/bin \ dist \ build \ + fv/fv.test \ $(GENERATED_GO_FILES) \ go/docs/calc.pdf \ .glide \ diff --git a/README.md b/README.md index 87b4aed8e1..fc64f77875 100644 --- a/README.md +++ b/README.md @@ -110,7 +110,24 @@ Ginkgo will re-run tests as files are modified and saved. After building the docker image (see above), you can run Felix and log to screen with, for example: -`docker run --privileged --net=host -e FELIX_LOGSEVERITYSCREEN=INFO calico/felix` + +``` +docker run --privileged \ + --net=host \ + -v /run:/run \ + -e FELIX_LOGSEVERITYSCREEN=INFO \ + calico/felix +``` + +Notes: + +- `--privileged` is required because Felix needs to execute iptables and other privileged commands. +- `--net=host` is required so that Felix can manipulate the routes and iptables tables in the host + namespace (outside its container). +- `-v /run:/run` is required so that Felix shares the global iptables file lock with other + processes; this allows Felix and other daemons that manipulate iptables to avoid clobbering each + other's updates. +- `-e FELIX_LOGSEVERITYSCREEN=INFO` tells Felix to log at info level to stderr. ### Debs and RPMs diff --git a/config/config_params.go b/config/config_params.go index 7752438276..90c1cef71f 100644 --- a/config/config_params.go +++ b/config/config_params.go @@ -109,8 +109,14 @@ type Config struct { Ipv6Support bool `config:"bool;true"` IgnoreLooseRPF bool `config:"bool;false"` - IptablesRefreshInterval time.Duration `config:"seconds;10"` - IptablesPostWriteCheckIntervalSecs time.Duration `config:"seconds;1"` + RouteRefreshInterval time.Duration `config:"seconds;90"` + IptablesRefreshInterval time.Duration `config:"seconds;90"` + IptablesPostWriteCheckIntervalSecs time.Duration `config:"seconds;30"` + IptablesLockFilePath string `config:"file;/run/xtables.lock"` + IptablesLockTimeoutSecs time.Duration `config:"seconds;30"` + IptablesLockProbeIntervalMillis time.Duration `config:"millis;50"` + IpsetsRefreshInterval time.Duration `config:"seconds;10"` + MaxIpsetSize int `config:"int;1048576;non-zero"` MetadataAddr string `config:"hostname;127.0.0.1;die-on-fail"` MetadataPort int `config:"int(0,65535);8775;die-on-fail"` @@ -138,8 +144,6 @@ type Config struct { EndpointReportingEnabled bool `config:"bool;false"` EndpointReportingDelaySecs time.Duration `config:"seconds;1"` - MaxIpsetSize int `config:"int;1048576;non-zero"` - IptablesMarkMask uint32 `config:"mark-bitmask;0xff000000;non-zero,die-on-fail"` DisableConntrackInvalidCheck bool `config:"bool;false"` @@ -459,6 +463,8 @@ func loadParams() { param = &FloatParam{} case "seconds": param = &SecondsParam{} + case "millis": + param = &MillisParam{} case "iface-list": param = &RegexpParam{Regexp: IfaceListRegexp, Msg: "invalid Linux interface name"} diff --git a/config/config_params_test.go b/config/config_params_test.go index 1219b915f9..af89f4a3d5 100644 --- a/config/config_params_test.go +++ b/config/config_params_test.go @@ -76,6 +76,14 @@ var _ = DescribeTable("Config parsing", Entry("IptablesPostWriteCheckIntervalSecs", "IptablesPostWriteCheckIntervalSecs", "1.5", 1500*time.Millisecond), + Entry("IptablesLockFilePath", "IptablesLockFilePath", + "/host/run/xtables.lock", "/host/run/xtables.lock"), + Entry("IptablesLockTimeoutSecs", "IptablesLockTimeoutSecs", + "123", 123*time.Second), + Entry("IptablesLockProbeIntervalMillis", "IptablesLockProbeIntervalMillis", + "123", 123*time.Millisecond), + Entry("IptablesLockProbeIntervalMillis garbage", "IptablesLockProbeIntervalMillis", + "garbage", 50*time.Millisecond), Entry("DefaultEndpointToHostAction", "DefaultEndpointToHostAction", "RETURN", "RETURN"), diff --git a/config/param_types.go b/config/param_types.go index 72ce10156c..4729836913 100644 --- a/config/param_types.go +++ b/config/param_types.go @@ -140,6 +140,20 @@ func (p *SecondsParam) Parse(raw string) (result interface{}, err error) { return } +type MillisParam struct { + Metadata +} + +func (p *MillisParam) Parse(raw string) (result interface{}, err error) { + millis, err := strconv.ParseFloat(raw, 64) + if err != nil { + err = p.parseFailed(raw, "invalid float") + return + } + result = time.Duration(millis * float64(time.Millisecond)) + return +} + type RegexpParam struct { Metadata Regexp *regexp.Regexp diff --git a/felix.go b/felix.go index 7371b3adee..7d01506fe0 100644 --- a/felix.go +++ b/felix.go @@ -283,8 +283,13 @@ configRetry: }, IPIPMTU: configParams.IpInIpMtu, IptablesRefreshInterval: configParams.IptablesRefreshInterval, + RouteRefreshInterval: configParams.RouteRefreshInterval, + IPSetsRefreshInterval: configParams.IpsetsRefreshInterval, IptablesPostWriteCheckInterval: configParams.IptablesPostWriteCheckIntervalSecs, IptablesInsertMode: configParams.ChainInsertMode, + IptablesLockFilePath: configParams.IptablesLockFilePath, + IptablesLockTimeout: configParams.IptablesLockTimeoutSecs, + IptablesLockProbeInterval: configParams.IptablesLockProbeIntervalMillis, MaxIPSetSize: configParams.MaxIpsetSize, IgnoreLooseRPF: configParams.IgnoreLooseRPF, IPv6Enabled: configParams.Ipv6Support, diff --git a/fv/.gitignore b/fv/.gitignore new file mode 100644 index 0000000000..2b55091800 --- /dev/null +++ b/fv/.gitignore @@ -0,0 +1 @@ +fv.test diff --git a/fv/fv.go b/fv/fv.go new file mode 100644 index 0000000000..b6a6d43dde --- /dev/null +++ b/fv/fv.go @@ -0,0 +1,16 @@ +// Copyright (c) 2017 Tigera, Inc. All rights reserved. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// The fv packge contains FV tests that execute Felix for-real. +package fv diff --git a/fv/fv_suite_test.go b/fv/fv_suite_test.go new file mode 100644 index 0000000000..f605c8719c --- /dev/null +++ b/fv/fv_suite_test.go @@ -0,0 +1,35 @@ +// Copyright (c) 2017 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fv_test + +import ( + "testing" + + . "github.com/onsi/ginkgo" + "github.com/onsi/ginkgo/reporters" + . "github.com/onsi/gomega" + + "github.com/projectcalico/libcalico-go/lib/testutils" +) + +func init() { + testutils.HookLogrusForGinkgo() +} + +func TestFv(t *testing.T) { + RegisterFailHandler(Fail) + junitReporter := reporters.NewJUnitReporter("junit.xml") + RunSpecsWithDefaultAndCustomReporters(t, "FV Suite", []Reporter{junitReporter}) +} diff --git a/fv/iptables-locker/iptables-locker.go b/fv/iptables-locker/iptables-locker.go new file mode 100644 index 0000000000..83879237ed --- /dev/null +++ b/fv/iptables-locker/iptables-locker.go @@ -0,0 +1,55 @@ +// Copyright (c) 2017 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "time" + + log "github.com/Sirupsen/logrus" + "github.com/docopt/docopt-go" + + "github.com/projectcalico/felix/iptables" +) + +const usage = `iptables-locker, test tool for grabbing the iptables lock. + +Usage: + iptables-locker + +` + +func main() { + arguments, err := docopt.Parse(usage, nil, true, "v0.1", false) + if err != nil { + println(usage) + log.WithError(err).Fatal("Failed to parse usage") + } + durationStr := arguments[""].(string) + duration, err := time.ParseDuration(durationStr) + if err != nil { + println(usage) + log.WithError(err).Fatal("Failed to parse usage") + } + + iptablesLock := iptables.NewSharedLock( + "/run/xtables.lock", + 1*time.Second, + 50*time.Millisecond, + ) + iptablesLock.Lock() + println("LOCKED") + time.Sleep(duration) + iptablesLock.Unlock() +} diff --git a/fv/iptables_lock_test.go b/fv/iptables_lock_test.go new file mode 100644 index 0000000000..c4b11f2655 --- /dev/null +++ b/fv/iptables_lock_test.go @@ -0,0 +1,130 @@ +// +build fvtests + +// Copyright (c) 2017 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fv_test + +import ( + "bufio" + "fmt" + "os" + "os/exec" + "strings" + + "time" + + log "github.com/Sirupsen/logrus" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("with running container", func() { + var containerIdx int + var containerName string + var felixCmd *exec.Cmd + + cmdInContainer := func(cmd ...string) *exec.Cmd { + arg := []string{"exec", containerName} + arg = append(arg, cmd...) + return exec.Command("docker", arg...) + } + + BeforeEach(func() { + containerName = fmt.Sprintf("felix-fv-%d-%d", os.Getpid(), containerIdx) + containerIdx++ + myDir, err := os.Getwd() + Expect(err).NotTo(HaveOccurred()) + log.WithFields(log.Fields{ + "name": containerName, + "myDir": myDir, + }).Info("Starting a Felix container") + // Run a felix container. The tests in this file don't actually rely on Felix + // but the calico/felix container has all the iptables dependencies we need to + // check the lock behaviour. Note: we don't map the host's iptables lock into the + // container so the scope of the lock is limited to the container. + felixCmd = exec.Command("docker", "run", + "--rm", + "--name", containerName, + "-v", fmt.Sprintf("%s/..:/codebase", myDir), + "--privileged", + "calico/felix") + err = felixCmd.Start() + Expect(err).NotTo(HaveOccurred()) + + log.Info("Waiting for container to be listed in docker ps") + start := time.Now() + for { + cmd := exec.Command("docker", "ps") + out, err := cmd.CombinedOutput() + Expect(err).NotTo(HaveOccurred()) + if strings.Contains(string(out), containerName) { + break + } + if time.Since(start) > 10*time.Second { + log.Panic("Timed out waiting for container to be listed.") + } + } + }) + AfterEach(func() { + // Send an interrupt to ensure that docker gracefully shuts down the container. + // If we kill the docker process then it detaches the container. + log.Info("Stopping Felix container") + felixCmd.Process.Signal(os.Interrupt) + }) + + Describe("with the lock being held for 2s", func() { + var lockCmd *exec.Cmd + BeforeEach(func() { + // Start the iptables-locker, which is a simple test app that locks + // the iptables lock and then releases it after a timeout. + log.Info("Starting iptables-locker") + lockCmd = cmdInContainer("/codebase/bin/iptables-locker", "2s") + stdErr, err := lockCmd.StderrPipe() + Expect(err).NotTo(HaveOccurred()) + lockCmd.Start() + + // Wait for the iptables-locker to tell us that it actually acquired the + // lock. + log.Info("Waiting for iptables-locker to acquire lock") + scanner := bufio.NewScanner(stdErr) + Expect(scanner.Scan()).To(BeTrue()) + Expect(scanner.Text()).To(Equal("LOCKED")) + Expect(scanner.Err()).NotTo(HaveOccurred()) + log.Info("iptables-locker acquired lock") + }) + + It("iptables should fail to get the lock in 1s", func() { + iptCmd := cmdInContainer("iptables", "-w", "1", "-A", "FORWARD") + out, err := iptCmd.CombinedOutput() + Expect(string(out)).To(ContainSubstring("Stopped waiting")) + Expect(err).To(HaveOccurred()) + }) + + It("iptables should succeed in getting the lock after 3s", func() { + iptCmd := cmdInContainer("iptables", "-w", "3", "-A", "FORWARD") + out, err := iptCmd.CombinedOutput() + Expect(string(out)).To(ContainSubstring("Another app is currently holding the xtables lock")) + Expect(err).NotTo(HaveOccurred()) + }) + + AfterEach(func() { + if lockCmd != nil { + log.Info("waiting for iptables-locker to finish") + err := lockCmd.Wait() + Expect(err).NotTo(HaveOccurred()) + } + }) + }) +}) diff --git a/health/health_test.go b/health/health_test.go index b871e4e92e..6da2fe92b1 100644 --- a/health/health_test.go +++ b/health/health_test.go @@ -19,6 +19,7 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/projectcalico/felix/health" ) diff --git a/intdataplane/int_dataplane.go b/intdataplane/int_dataplane.go index 4b2576f81d..5d133c0d3b 100644 --- a/intdataplane/int_dataplane.go +++ b/intdataplane/int_dataplane.go @@ -97,9 +97,14 @@ type Config struct { MaxIPSetSize int + IPSetsRefreshInterval time.Duration + RouteRefreshInterval time.Duration IptablesRefreshInterval time.Duration IptablesPostWriteCheckInterval time.Duration IptablesInsertMode string + IptablesLockFilePath string + IptablesLockTimeout time.Duration + IptablesLockProbeInterval time.Duration RulesConfig rules.Config @@ -163,10 +168,18 @@ type InternalDataplane struct { routeTables []*routetable.RouteTable - dataplaneNeedsSync bool - forceDataplaneRefresh bool - cleanupPending bool - doneFirstApply bool + // dataplaneNeedsSync is set if the dataplane is dirty in some way, i.e. we need to + // call apply(). + dataplaneNeedsSync bool + // forceIPSetsRefresh is set by the IP sets refresh timer to indicate that we should + // check the IP sets in the dataplane. + forceIPSetsRefresh bool + // forceRouteRefresh is set by the route refresh timer to indicate that we should + // check the routes in the dataplane. + forceRouteRefresh bool + // doneFirstApply is set after we finish the first update to the dataplane. It indicates + // that the dataplane should now be in sync. + doneFirstApply bool reschedTimer *time.Timer reschedC <-chan time.Time @@ -192,7 +205,6 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane { fromDataplane: make(chan interface{}, 100), ruleRenderer: ruleRenderer, interfacePrefixes: config.RulesConfig.WorkloadIfacePrefixes, - cleanupPending: true, ifaceMonitor: ifacemonitor.New(), ifaceUpdates: make(chan *ifaceUpdate, 100), ifaceAddrUpdates: make(chan *ifaceAddrsUpdate, 100), @@ -203,38 +215,46 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane { dp.ifaceMonitor.Callback = dp.onIfaceStateChange dp.ifaceMonitor.AddrCallback = dp.onIfaceAddrsChange + // Most iptables tables need the same options. + iptablesOptions := iptables.TableOptions{ + HistoricChainPrefixes: rules.AllHistoricChainNamePrefixes, + InsertMode: config.IptablesInsertMode, + RefreshInterval: config.IptablesRefreshInterval, + PostWriteInterval: config.IptablesPostWriteCheckInterval, + } + + // However, the NAT tables need an extra cleanup regex. + iptablesNATOptions := iptablesOptions + iptablesNATOptions.ExtraCleanupRegexPattern = rules.HistoricInsertedNATRuleRegex + + // Create the shared iptables lock. This allows us to block other processes from + // manipulating iptables while we make our updates. We use a shared lock because we + // actually do multiple updates in parallel (but to different tables), which is safe. + iptablesLock := iptables.NewSharedLock( + config.IptablesLockFilePath, + config.IptablesLockTimeout, + config.IptablesLockProbeInterval, + ) + natTableV4 := iptables.NewTable( "nat", 4, rules.RuleHashPrefix, - iptables.TableOptions{ - HistoricChainPrefixes: rules.AllHistoricChainNamePrefixes, - ExtraCleanupRegexPattern: rules.HistoricInsertedNATRuleRegex, - InsertMode: config.IptablesInsertMode, - RefreshInterval: config.IptablesRefreshInterval, - PostWriteInterval: config.IptablesPostWriteCheckInterval, - }, + iptablesLock, + iptablesNATOptions, ) rawTableV4 := iptables.NewTable( "raw", 4, rules.RuleHashPrefix, - iptables.TableOptions{ - HistoricChainPrefixes: rules.AllHistoricChainNamePrefixes, - InsertMode: config.IptablesInsertMode, - RefreshInterval: config.IptablesRefreshInterval, - PostWriteInterval: config.IptablesPostWriteCheckInterval, - }) + iptablesLock, + iptablesOptions) filterTableV4 := iptables.NewTable( "filter", 4, rules.RuleHashPrefix, - iptables.TableOptions{ - HistoricChainPrefixes: rules.AllHistoricChainNamePrefixes, - InsertMode: config.IptablesInsertMode, - RefreshInterval: config.IptablesRefreshInterval, - PostWriteInterval: config.IptablesPostWriteCheckInterval, - }) + iptablesLock, + iptablesOptions) ipSetsConfigV4 := config.RulesConfig.IPSetConfigV4 ipSetsV4 := ipsets.NewIPSets(ipSetsConfigV4) dp.iptablesNATTables = append(dp.iptablesNATTables, natTableV4) @@ -269,35 +289,22 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane { "nat", 6, rules.RuleHashPrefix, - iptables.TableOptions{ - HistoricChainPrefixes: rules.AllHistoricChainNamePrefixes, - ExtraCleanupRegexPattern: rules.HistoricInsertedNATRuleRegex, - InsertMode: config.IptablesInsertMode, - RefreshInterval: config.IptablesRefreshInterval, - PostWriteInterval: config.IptablesPostWriteCheckInterval, - }, + iptablesLock, + iptablesNATOptions, ) rawTableV6 := iptables.NewTable( "raw", 6, rules.RuleHashPrefix, - iptables.TableOptions{ - HistoricChainPrefixes: rules.AllHistoricChainNamePrefixes, - InsertMode: config.IptablesInsertMode, - RefreshInterval: config.IptablesRefreshInterval, - PostWriteInterval: config.IptablesPostWriteCheckInterval, - }, + iptablesLock, + iptablesOptions, ) filterTableV6 := iptables.NewTable( "filter", 6, rules.RuleHashPrefix, - iptables.TableOptions{ - HistoricChainPrefixes: rules.AllHistoricChainNamePrefixes, - InsertMode: config.IptablesInsertMode, - RefreshInterval: config.IptablesRefreshInterval, - PostWriteInterval: config.IptablesPostWriteCheckInterval, - }, + iptablesLock, + iptablesOptions, ) ipSetsConfigV6 := config.RulesConfig.IPSetConfigV6 @@ -483,13 +490,27 @@ func (d *InternalDataplane) loopUpdatingDataplane() { // Retry any failed operations every 10s. retryTicker := time.NewTicker(10 * time.Second) - var refreshC <-chan time.Time - if d.config.IptablesRefreshInterval > 0 { + + // If configured, start tickers to refresh the IP sets and routing table entries. + var ipSetsRefreshC <-chan time.Time + if d.config.IPSetsRefreshInterval > 0 { + log.WithField("interval", d.config.IptablesRefreshInterval).Info( + "Will refresh IP sets on timer") refreshTicker := jitter.NewTicker( - d.config.IptablesRefreshInterval, - d.config.IptablesRefreshInterval/10, + d.config.IPSetsRefreshInterval, + d.config.IPSetsRefreshInterval/10, ) - refreshC = refreshTicker.C + ipSetsRefreshC = refreshTicker.C + } + var routeRefreshC <-chan time.Time + if d.config.RouteRefreshInterval > 0 { + log.WithField("interval", d.config.RouteRefreshInterval).Info( + "Will refresh routes on timer") + refreshTicker := jitter.NewTicker( + d.config.RouteRefreshInterval, + d.config.RouteRefreshInterval/10, + ) + routeRefreshC = refreshTicker.C } // Fill the apply throttle leaky bucket. @@ -584,9 +605,13 @@ func (d *InternalDataplane) loopUpdatingDataplane() { } summaryAddrBatchSize.Observe(float64(batchSize)) d.dataplaneNeedsSync = true - case <-refreshC: - log.Debug("Refreshing dataplane state") - d.forceDataplaneRefresh = true + case <-ipSetsRefreshC: + log.Debug("Refreshing IP sets state") + d.forceIPSetsRefresh = true + d.dataplaneNeedsSync = true + case <-routeRefreshC: + log.Debug("Refreshing routes") + d.forceRouteRefresh = true d.dataplaneNeedsSync = true case <-d.reschedC: log.Debug("Reschedule kick received") @@ -727,17 +752,22 @@ func (d *InternalDataplane) apply() { } } - if d.forceDataplaneRefresh { - // Refresh timer popped, ask the dataplane to resync as part of its update. + if d.forceRouteRefresh { + // Refresh timer popped. for _, r := range d.routeTables { // Queue a resync on the next Apply(). r.QueueResync() } + d.forceRouteRefresh = false + } + + if d.forceIPSetsRefresh { + // Refresh timer popped. for _, r := range d.ipSets { // Queue a resync on the next Apply(). r.QueueResync() } - d.forceDataplaneRefresh = false + d.forceIPSetsRefresh = false } // Next, create/update IP sets. We defer deletions of IP sets until after we update diff --git a/iptables/lock.go b/iptables/lock.go new file mode 100644 index 0000000000..9d04b0bf65 --- /dev/null +++ b/iptables/lock.go @@ -0,0 +1,209 @@ +// Copyright (c) 2017 Tigera, Inc. All rights reserved. +// Copyright 2017 The Kubernetes Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This file is based on that extracted from Kubernetes at pkg/util/iptables/iptables_linux.go. + +package iptables + +import ( + "errors" + "fmt" + "net" + "os" + "time" + + "io" + "sync" + + log "github.com/Sirupsen/logrus" + "github.com/prometheus/client_golang/prometheus" + "golang.org/x/sys/unix" +) + +var ( + summaryLockAcquisitionTime = prometheus.NewSummary(prometheus.SummaryOpts{ + Name: "felix_iptables_lock_acquire_secs", + Help: "Time in seconds that it took to acquire the iptables lock(s).", + }) + countLockRetries = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "felix_iptables_lock_retries", + Help: "Number of times the iptables lock was held by someone else and we had to retry.", + }, []string{"version"}) + countLockRetriesV14 = countLockRetries.WithLabelValues("1.4") + countLockRetriesV16 = countLockRetries.WithLabelValues("1.6") +) + +func init() { + prometheus.MustRegister( + summaryLockAcquisitionTime, + countLockRetries, + ) +} + +func NewSharedLock(lockFilePath string, lockTimeout, lockProbeInterval time.Duration) *SharedLock { + return &SharedLock{ + lockFilePath: lockFilePath, + lockTimeout: lockTimeout, + lockProbeInterval: lockProbeInterval, + GrabIptablesLocks: GrabIptablesLocks, + } +} + +// SharedLock allows for multiple goroutines to share the iptables lock without blocking on each +// other. That is safe because each of our goroutines is accessing a different iptables table, so +// they do not conflict. +type SharedLock struct { + lock sync.Mutex + referenceCount int + + iptablesLockHandle io.Closer + + lockFilePath string + lockTimeout time.Duration + lockProbeInterval time.Duration + + GrabIptablesLocks func(lockFilePath, socketName string, timeout, probeInterval time.Duration) (io.Closer, error) +} + +func (l *SharedLock) Lock() { + l.lock.Lock() + defer l.lock.Unlock() + + if l.referenceCount == 0 { + // The lock isn't currently held. Acquire it. + lockHandle, err := l.GrabIptablesLocks( + l.lockFilePath, + "@xtables", + l.lockTimeout, + l.lockProbeInterval, + ) + if err != nil { + // We give the lock plenty of time so err on the side of assuming a + // programming bug. + log.WithError(err).Panic("Failed to acquire iptables lock") + } + l.iptablesLockHandle = lockHandle + } + l.referenceCount++ +} + +func (l *SharedLock) Unlock() { + l.lock.Lock() + defer l.lock.Unlock() + + l.referenceCount-- + if l.referenceCount < 0 { + log.Panic("Unmatched Unlock()") + } + if l.referenceCount == 0 { + log.Debug("Releasing iptables lock.") + err := l.iptablesLockHandle.Close() + if err != nil { + // We haven't done anything with the file or socket so we shouldn't be + // able to hit any "deferred flush" type errors from the close. Panic + // since we're not sure what's going on. + log.WithError(err).Panic("Error while closing iptables lock.") + } + l.iptablesLockHandle = nil + } +} + +type Locker struct { + Lock16 io.Closer + Lock14 io.Closer +} + +func (l *Locker) Close() error { + var err error + if l.Lock16 != nil { + err = l.Lock16.Close() + if err != nil { + log.WithError(err).Error("Error while closing lock file.") + } + } + if l.Lock14 != nil { + err14 := l.Lock14.Close() + if err14 != nil { + log.WithError(err14).Error("Error while closing lock socket.") + } + if err14 != nil && err == nil { + err = err14 + } + } + return err +} + +var ( + Err14LockTimeout = errors.New("Timed out waiting for iptables 1.4 lock") + Err16LockTimeout = errors.New("Timed out waiting for iptables 1.6 lock") +) + +func GrabIptablesLocks(lockFilePath, socketName string, timeout, probeInterval time.Duration) (io.Closer, error) { + var err error + var success bool + + l := &Locker{} + defer func(l *Locker) { + // Clean up immediately on failure + if !success { + l.Close() + } + }(l) + + // Grab both 1.6.x and 1.4.x-style locks; we don't know what the + // iptables-restore version is if it doesn't support --wait, so we + // can't assume which lock method it'll use. + + // Roughly duplicate iptables 1.6.x xtables_lock() function. + f, err := os.OpenFile(lockFilePath, os.O_CREATE, 0600) + l.Lock16 = f + if err != nil { + return nil, fmt.Errorf("failed to open iptables lock %s: %v", lockFilePath, err) + } + + startTime := time.Now() + for { + if err := grabIptablesFileLock(f); err == nil { + break + } + if time.Since(startTime) > timeout { + return nil, Err16LockTimeout + } + time.Sleep(probeInterval) + countLockRetriesV16.Inc() + } + + startTime14 := time.Now() + for { + l.Lock14, err = net.ListenUnix("unix", &net.UnixAddr{Name: socketName, Net: "unix"}) + if err == nil { + break + } + if time.Since(startTime14) > timeout { + return nil, Err14LockTimeout + } + time.Sleep(probeInterval) + countLockRetriesV14.Inc() + } + + summaryLockAcquisitionTime.Observe(time.Since(startTime).Seconds()) + + success = true + return l, nil +} + +func grabIptablesFileLock(f *os.File) error { + return unix.Flock(int(f.Fd()), unix.LOCK_EX|unix.LOCK_NB) +} diff --git a/iptables/lock_test.go b/iptables/lock_test.go new file mode 100644 index 0000000000..9f382c6a42 --- /dev/null +++ b/iptables/lock_test.go @@ -0,0 +1,175 @@ +// Copyright (c) 2017 Tigera, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package iptables_test + +import ( + "io" + "io/ioutil" + "os" + "time" + + . "github.com/projectcalico/felix/iptables" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("SharedLock", func() { + var lock *SharedLock + var mockIptablesLock *mockLock + var mockErr error + + mockGrabLock := func(lockFilePath, socketName string, timeout, probeInterval time.Duration) (io.Closer, error) { + return mockIptablesLock, mockErr + } + + BeforeEach(func() { + mockErr = nil + mockIptablesLock = &mockLock{} + lock = NewSharedLock("/foo/bar.lock", time.Second, time.Millisecond) + lock.GrabIptablesLocks = mockGrabLock + }) + + It("should close the lock after final unlock call", func() { + lock.Lock() + Expect(mockIptablesLock.Closed).To(BeFalse()) + lock.Unlock() + Expect(mockIptablesLock.Closed).To(BeTrue()) + }) + It("should allow multiple holders", func() { + lock.Lock() + lock.Lock() + Expect(mockIptablesLock.Closed).To(BeFalse()) + lock.Unlock() + Expect(mockIptablesLock.Closed).To(BeFalse()) + lock.Unlock() + Expect(mockIptablesLock.Closed).To(BeTrue()) + }) + It("should panic on misuse", func() { + Expect(lock.Unlock).To(Panic()) + }) + It("should panic on failure to acquire", func() { + mockErr = Err14LockTimeout + Expect(lock.Lock).To(Panic()) + }) + It("should panic on failure to close", func() { + lock.Lock() + mockIptablesLock.Err = Err14LockTimeout + Expect(lock.Unlock).To(Panic()) + }) +}) + +type mockLock struct { + Closed bool + Err error +} + +func (l *mockLock) Close() error { + Expect(l.Closed).To(BeFalse()) + l.Closed = true + return l.Err +} + +var _ = Describe("GrabIptablesLocks FV", func() { + var fileName string + + BeforeEach(func() { + f, err := ioutil.TempFile("", "iptlocktest") + Expect(err).NotTo(HaveOccurred()) + fileName = f.Name() + f.Close() + }) + AfterEach(func() { + os.Remove(fileName) + }) + + It("should block concurrent invocations", func() { + l, err := GrabIptablesLocks(fileName, "@dummytables", 1*time.Second, 50*time.Millisecond) + Expect(err).NotTo(HaveOccurred()) + defer l.Close() + + l2, err := GrabIptablesLocks(fileName, "@dummytables", 100*time.Millisecond, 10*time.Millisecond) + Expect(err).To(Equal(Err16LockTimeout)) + Expect(l2).To(BeNil()) + }) + It("should allow access after being released", func() { + l, err := GrabIptablesLocks(fileName, "@dummytables", 1*time.Second, 50*time.Millisecond) + Expect(err).NotTo(HaveOccurred()) + l.Close() + + l2, err := GrabIptablesLocks(fileName, "@dummytables", 1*time.Second, 50*time.Millisecond) + Expect(err).NotTo(HaveOccurred()) + l2.Close() + }) + It("should block concurrent invocations using only iptables 1.4 version of lock", func() { + l, err := GrabIptablesLocks(fileName, "@dummytables", 1*time.Second, 50*time.Millisecond) + // Sneakily remove the lockfile after it's been locked so that we fall through to + // the v1.4 lock. + os.Remove(fileName) + Expect(err).NotTo(HaveOccurred()) + defer l.Close() + + l2, err := GrabIptablesLocks(fileName, "@dummytables", 100*time.Millisecond, 10*time.Millisecond) + Expect(err).To(Equal(Err14LockTimeout)) + Expect(l2).To(BeNil()) + }) + It("should allow access after being released using only iptables 1.4 version of lock", func() { + l, err := GrabIptablesLocks(fileName, "@dummytables", 1*time.Second, 50*time.Millisecond) + // Sneakily remove the lockfile after it's been locked so that we fall through to + // the v1.4 lock. + os.Remove(fileName) + Expect(err).NotTo(HaveOccurred()) + l.Close() + + l2, err := GrabIptablesLocks(fileName, "@dummytables", 1*time.Second, 50*time.Millisecond) + Expect(err).NotTo(HaveOccurred()) + l2.Close() + }) +}) + +var _ = Describe("locker", func() { + var l *Locker + var lock14 mockCloser + var lock16 mockCloser + + BeforeEach(func() { + lock14 = mockCloser{} + lock16 = mockCloser{} + l = &Locker{ + Lock14: &lock14, + Lock16: &lock16, + } + }) + + It("should return nil with no err", func() { + Expect(l.Close()).NotTo(HaveOccurred()) + }) + It("should return lock16 err", func() { + lock16.Err = Err16LockTimeout + Expect(l.Close()).To(Equal(Err16LockTimeout)) + }) + It("should return lock14 err", func() { + lock14.Err = Err14LockTimeout + Expect(l.Close()).To(Equal(Err14LockTimeout)) + }) +}) + +type mockCloser struct { + Err error +} + +func (c *mockCloser) Close() error { + return c.Err +} diff --git a/iptables/rules_test.go b/iptables/rules_test.go index 584797548d..9caf9d224a 100644 --- a/iptables/rules_test.go +++ b/iptables/rules_test.go @@ -17,6 +17,8 @@ package iptables import ( "bytes" + "sync" + . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" ) @@ -70,6 +72,7 @@ var _ = Describe("Hash extraction tests", func() { "filter", 4, "cali:", + &sync.Mutex{}, TableOptions{ HistoricChainPrefixes: []string{"felix-", "cali"}, ExtraCleanupRegexPattern: "an-old-rule", diff --git a/iptables/table.go b/iptables/table.go index 8e7e791564..e5f003c636 100644 --- a/iptables/table.go +++ b/iptables/table.go @@ -28,6 +28,8 @@ import ( log "github.com/Sirupsen/logrus" "github.com/prometheus/client_golang/prometheus" + "sync" + "github.com/projectcalico/felix/set" ) @@ -226,6 +228,8 @@ type Table struct { postWriteInterval time.Duration refreshInterval time.Duration + writeLock sync.Locker + logCxt *log.Entry gaugeNumChains prometheus.Gauge @@ -258,6 +262,7 @@ func NewTable( name string, ipVersion uint8, hashPrefix string, + iptablesWriteLock sync.Locker, options TableOptions, ) *Table { // Calculate the regex used to match the hash comment. The comment looks like this: @@ -347,6 +352,8 @@ func NewTable( refreshInterval: options.RefreshInterval, + writeLock: iptablesWriteLock, + newCmd: newCmd, timeSleep: sleep, timeNow: now, @@ -959,7 +966,9 @@ func (t *Table) applyUpdates() error { cmd.SetStdout(&outputBuf) cmd.SetStderr(&errBuf) countNumRestoreCalls.Inc() + t.writeLock.Lock() err := cmd.Run() + t.writeLock.Unlock() if err != nil { t.logCxt.WithFields(log.Fields{ "output": outputBuf.String(), diff --git a/iptables/table_test.go b/iptables/table_test.go index 060dcc9911..53fc0cff83 100644 --- a/iptables/table_test.go +++ b/iptables/table_test.go @@ -30,16 +30,19 @@ import ( var _ = Describe("Table with an empty dataplane", func() { var dataplane *mockDataplane var table *Table + var iptLock *mockMutex BeforeEach(func() { dataplane = newMockDataplane("filter", map[string][]string{ "FORWARD": {}, "INPUT": {}, "OUTPUT": {}, }) + iptLock = &mockMutex{} table = NewTable( "filter", 4, rules.RuleHashPrefix, + iptLock, TableOptions{ HistoricChainPrefixes: rules.AllHistoricChainNamePrefixes, NewCmdOverride: dataplane.newCmd, @@ -56,6 +59,8 @@ var _ = Describe("Table with an empty dataplane", func() { Expect(dataplane.CmdNames).To(Equal([]string{ "iptables-save", })) + Expect(iptLock.Held).To(BeFalse()) + Expect(iptLock.WasTaken).To(BeFalse()) }) It("should have a refresh scheduled at start-of-day", func() { @@ -91,6 +96,7 @@ var _ = Describe("Table with an empty dataplane", func() { "filter", 4, rules.RuleHashPrefix, + &mockMutex{}, TableOptions{ HistoricChainPrefixes: rules.AllHistoricChainNamePrefixes, NewCmdOverride: dataplane.newCmd, @@ -108,6 +114,12 @@ var _ = Describe("Table with an empty dataplane", func() { }) table.Apply() }) + It("should acquire the iptables lock", func() { + Expect(iptLock.WasTaken).To(BeTrue()) + }) + It("should release the iptables lock", func() { + Expect(iptLock.Held).To(BeFalse()) + }) It("should be in the dataplane", func() { Expect(dataplane.Chains).To(Equal(map[string][]string{ "FORWARD": {`-m comment --comment "cali:hecdSCslEjdBPBPo" --jump DROP`}, @@ -425,6 +437,7 @@ func describePostUpdateCheckTests(enableRefresh bool) { "filter", 4, rules.RuleHashPrefix, + &mockMutex{}, options, ) table.SetRuleInsertions("FORWARD", []Rule{ @@ -613,6 +626,7 @@ func describeDirtyDataplaneTests(appendMode bool) { "filter", 4, rules.RuleHashPrefix, + &mockMutex{}, TableOptions{ HistoricChainPrefixes: rules.AllHistoricChainNamePrefixes, ExtraCleanupRegexPattern: "sneaky-rule", @@ -973,15 +987,18 @@ func describeDirtyDataplaneTests(appendMode bool) { var _ = Describe("Table with inserts and a non-Calico chain", func() { var dataplane *mockDataplane var table *Table + var iptLock *mockMutex BeforeEach(func() { dataplane = newMockDataplane("filter", map[string][]string{ "FORWARD": {}, "non-calico": {"-m comment \"foo\""}, }) + iptLock = &mockMutex{} table = NewTable( "filter", 6, rules.RuleHashPrefix, + iptLock, TableOptions{ HistoricChainPrefixes: rules.AllHistoricChainNamePrefixes, NewCmdOverride: dataplane.newCmd, @@ -1008,6 +1025,8 @@ var _ = Describe("Table with inserts and a non-Calico chain", func() { "FORWARD": {"-m comment --comment \"cali:hecdSCslEjdBPBPo\" --jump DROP"}, } dataplane.ResetCmds() + iptLock.WasTaken = false + iptLock.Held = false table.Apply() }) @@ -1019,5 +1038,28 @@ var _ = Describe("Table with inserts and a non-Calico chain", func() { It("should make no changes to the dataplane", func() { Expect(dataplane.CmdNames).To(BeEmpty()) }) + It("should not take the lock", func() { + Expect(iptLock.WasTaken).To(BeFalse()) + }) }) }) + +type mockMutex struct { + Held bool + WasTaken bool +} + +func (m *mockMutex) Lock() { + if m.Held { + Fail("Mutex already held") + } + m.Held = true + m.WasTaken = true +} + +func (m *mockMutex) Unlock() { + if !m.Held { + Fail("Mutex not held") + } + m.Held = false +} diff --git a/k8sfv/calc_graph_test.go b/k8sfv/calc_graph_test.go index 38792ee9cc..53e91977ac 100644 --- a/k8sfv/calc_graph_test.go +++ b/k8sfv/calc_graph_test.go @@ -22,8 +22,9 @@ import ( log "github.com/Sirupsen/logrus" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/projectcalico/felix/health" "k8s.io/client-go/kubernetes" + + "github.com/projectcalico/felix/health" ) var _ = Describe("calculation graph scale test", func() {