diff --git a/go.mod b/go.mod index c11661b781e..fa3dd960c03 100644 --- a/go.mod +++ b/go.mod @@ -59,6 +59,7 @@ require ( github.com/moby/buildkit v0.11.5 github.com/onsi/gomega v1.27.6 github.com/openshift/api v0.0.0-20200929171550-c99a4deebbe5 + github.com/openshift/client-go v0.0.0-20200929181438-91d71ef2122c github.com/pires/go-proxyproto v0.7.0 github.com/pmezard/go-difflib v1.0.0 github.com/prometheus/client_golang v1.14.0 @@ -225,7 +226,7 @@ require ( github.com/spf13/cast v1.5.0 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/stoewer/go-strcase v1.3.0 - github.com/stretchr/testify v1.8.2 // indirect + github.com/stretchr/testify v1.8.2 github.com/subosito/gotenv v1.4.2 // indirect github.com/vbatts/tar-split v0.11.2 // indirect github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f // indirect diff --git a/go.sum b/go.sum index 0bcee85f1a6..990cd75cd78 100644 --- a/go.sum +++ b/go.sum @@ -7,6 +7,7 @@ cloud.google.com/go v0.44.3/go.mod h1:60680Gw3Yr4ikxnPRS/oxxkBccT6SA1yMk63TGekxK cloud.google.com/go v0.45.1/go.mod h1:RpBamKRgapWJb87xiFSdk4g1CME7QZg3uwTez+TSTjc= cloud.google.com/go v0.46.3/go.mod h1:a6bKKbmY7er1mI7TEI4lsAkts/mkhTSZK8w33B4RAg0= cloud.google.com/go v0.50.0/go.mod h1:r9sluTvynVuxRIOHXQEHMFffphuXHOMZMycpNR5e6To= +cloud.google.com/go v0.51.0/go.mod h1:hWtGJ6gnXH+KgDv+V0zFGDvpi07n3z8ZNj3T1RW0Gcw= cloud.google.com/go v0.52.0/go.mod h1:pXajvRH/6o3+F9jDHZWQ5PbGhn+o8w9qiu/CffaVdO4= cloud.google.com/go v0.53.0/go.mod h1:fp/UouUEsRkN6ryDKNW/Upv/JBKnv6WDthjR6+vze6M= cloud.google.com/go v0.54.0/go.mod h1:1rq2OEkV3YMf6n/9ZvGWI3GWw0VoqH/1x2nd8Is/bPc= @@ -63,10 +64,14 @@ github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/Azure/go-autorest/autorest v0.9.0/go.mod h1:xyHB1BMZT0cuDHU7I0+g046+BFDTQ8rEZB0s4Yfa6bI= +github.com/Azure/go-autorest/autorest v0.9.6/go.mod h1:/FALq9T/kS7b5J5qsQ+RSTUdAmGFqi0vUdVNNx8q630= github.com/Azure/go-autorest/autorest/adal v0.5.0/go.mod h1:8Z9fGy2MpX0PvDjB1pEgQTmVqjGhiHBW7RJJEciWzS0= +github.com/Azure/go-autorest/autorest/adal v0.8.2/go.mod h1:ZjhuQClTqx435SRJ2iMlOxPYt3d2C/T/7TiQCVZSn3Q= github.com/Azure/go-autorest/autorest/date v0.1.0/go.mod h1:plvfp3oPSKwf2DNjlBjWF/7vwR+cUD/ELuzDCXwHUVA= +github.com/Azure/go-autorest/autorest/date v0.2.0/go.mod h1:vcORJHLJEh643/Ioh9+vPmf1Ij9AEBM5FuBIXLmIy0g= github.com/Azure/go-autorest/autorest/mocks v0.1.0/go.mod h1:OTyCOPRA2IgIlWxVYxBee2F5Gr4kF2zd2J5cFRaIDN0= github.com/Azure/go-autorest/autorest/mocks v0.2.0/go.mod h1:OTyCOPRA2IgIlWxVYxBee2F5Gr4kF2zd2J5cFRaIDN0= +github.com/Azure/go-autorest/autorest/mocks v0.3.0/go.mod h1:a8FDP3DYzQ4RYfVAxAN3SVSiiO77gL2j2ronKKP0syM= github.com/Azure/go-autorest/logger v0.1.0/go.mod h1:oExouG+K6PryycPJfVSxi/koC6LSNgds39diKLz7Vrc= github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= @@ -702,6 +707,8 @@ github.com/opencontainers/image-spec v1.1.0-rc3/go.mod h1:X4pATf0uXsnn3g5aiGIsVn github.com/openshift/api v0.0.0-20200929171550-c99a4deebbe5 h1:ZyeIVj2qV9fNF4SKVMAIhCSdGOnQcOxYDah2wXVqeiY= github.com/openshift/api v0.0.0-20200929171550-c99a4deebbe5/go.mod h1:Si/I9UGeRR3qzg01YWPmtlr0GeGk2fnuggXJRmjAZ6U= github.com/openshift/build-machinery-go v0.0.0-20200819073603-48aa266c95f7/go.mod h1:b1BuldmJlbA/xYtdZvKi+7j5YGB44qJUJDZ9zwiNCfE= +github.com/openshift/client-go v0.0.0-20200929181438-91d71ef2122c h1:DQTWW8DGRN7fu5qwEPcbdP9hAxXi7dm5cvi0hrdR3UE= +github.com/openshift/client-go v0.0.0-20200929181438-91d71ef2122c/go.mod h1:MwESrlhzumQGcGtPCpz/WjDrlvhu1fMNlLBcNYjO0fY= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= @@ -933,6 +940,7 @@ golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190617133340-57b3e21c3d56/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190911031432-227b76d455e7/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= @@ -1461,6 +1469,7 @@ k8s.io/cli-runtime v0.27.0 h1:kYVGqjmBbaj22nJ7je/3tigjiSlB04kVbjW+51zivu8= k8s.io/cli-runtime v0.27.0/go.mod h1:kN+Q+5L37DFCdpNuCLTHO7w+dwlJb0xzn8jveB3bPSw= k8s.io/client-go v0.18.2/go.mod h1:Xcm5wVGXX9HAA2JJ2sSBUn3tCJ+4SVlCbl2MNNv+CIU= k8s.io/client-go v0.18.4/go.mod h1:f5sXwL4yAZRkAtzOxRWUhA/N8XzGCb+nPZI8PfobZ9g= +k8s.io/client-go v0.19.0/go.mod h1:H9E/VT95blcFQnlyShFgnFT9ZnJOAceiUHM3MlRC+mU= k8s.io/client-go v0.27.0 h1:DyZS1fJkv73tEy7rWv4VF6NwGeJ7SKvNaLRXZBYLA+4= k8s.io/client-go v0.27.0/go.mod h1:XVEmpNnM+4JYO3EENoFV/ZDv3KxKVJUnzGo70avk+C4= k8s.io/code-generator v0.18.2/go.mod h1:+UHX5rSbxmR8kzS+FAv7um6dtYrZokQvjHpDSYRVkTc= @@ -1479,6 +1488,7 @@ k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk= k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I= k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE= k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= +k8s.io/klog/v2 v2.3.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= k8s.io/klog/v2 v2.90.1 h1:m4bYOKall2MmOiRaR1J+We67Do7vm9KiQVlT96lnHUw= k8s.io/klog/v2 v2.90.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= k8s.io/kube-openapi v0.0.0-20200121204235-bf4fb3bd569c/go.mod h1:GRQhZsXIAJ1xR0C9bd8UpWHZ5plfAS9fzPjJuQ6JL3E= @@ -1490,6 +1500,7 @@ k8s.io/kubectl v0.27.0 h1:ZcWS6ufixDXwovWtzF149gd5GzxdpsIl4YqfioSkq5w= k8s.io/kubectl v0.27.0/go.mod h1:tyFzo+6WfbUEccm8rFIliQ79FAmm9uTFN+1oC5Ytamo= k8s.io/utils v0.0.0-20200324210504-a9aa75ae1b89/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew= k8s.io/utils v0.0.0-20200603063816-c1c6865ac451/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= +k8s.io/utils v0.0.0-20200729134348-d5654de09c73/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= k8s.io/utils v0.0.0-20230313181309-38a27ef9d749 h1:xMMXJlJbsU8w3V5N2FLDQ8YgU8s1EoULdbQBcAeNJkY= k8s.io/utils v0.0.0-20230313181309-38a27ef9d749/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= maistra.io/api v0.0.0-20230704084350-dfc96815fb16 h1:Dg/R1GHZ03eUpOHSOmunttNQBmPcoyEUeaphd7OXCxk= diff --git a/licenses/github.com/openshift/client-go/LICENSE b/licenses/github.com/openshift/client-go/LICENSE new file mode 100644 index 00000000000..c4ea8b6f9d8 --- /dev/null +++ b/licenses/github.com/openshift/client-go/LICENSE @@ -0,0 +1,191 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + Copyright 2014 Red Hat, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/pilot/pkg/bootstrap/configcontroller.go b/pilot/pkg/bootstrap/configcontroller.go index f35c6a19270..6020f86ffd0 100644 --- a/pilot/pkg/bootstrap/configcontroller.go +++ b/pilot/pkg/bootstrap/configcontroller.go @@ -27,6 +27,7 @@ import ( "istio.io/istio/pilot/pkg/config/kube/crdclient" "istio.io/istio/pilot/pkg/config/kube/gateway" ingress "istio.io/istio/pilot/pkg/config/kube/ingress" + "istio.io/istio/pilot/pkg/config/kube/ior" "istio.io/istio/pilot/pkg/config/memory" configmonitor "istio.io/istio/pilot/pkg/config/monitor" "istio.io/istio/pilot/pkg/features" @@ -123,6 +124,8 @@ func (s *Server) initConfigController(args *PilotArgs) error { // Create the config store. s.environment.ConfigStore = aggregateConfigController + s.startIOR(args) + // Defer starting the controller until after the service is created. s.addStartFunc(func(stop <-chan struct{}) error { go s.configController.Run(stop) @@ -132,6 +135,32 @@ func (s *Server) initConfigController(args *PilotArgs) error { return nil } +// startIOR tries to start IOR, if it's enabled. If it encounters any failure, it logs an error and continue +func (s *Server) startIOR(args *PilotArgs) { + if !features.EnableIOR { + return + } + + routerClient, err := ior.NewRouterClient() + if err != nil { + ior.IORLog.Errorf("error creating an openshift router client: %v", err) + return + } + + iorKubeClient := ior.NewKubeClient(s.kubeClient) + + s.addStartFunc(func(stop <-chan struct{}) error { + go leaderelection. + NewLeaderElection(args.Namespace, args.PodName, leaderelection.IORController, args.Revision, s.kubeClient). + AddRunFunction(func(stop <-chan struct{}) { + if err := ior.Register(iorKubeClient, routerClient, s.configController, args.Namespace, s.kubeClient.GetMemberRollController(), stop, nil); err != nil { + ior.IORLog.Error(err) + } + }).Run(stop) + return nil + }) +} + func (s *Server) initK8SConfigStore(args *PilotArgs) error { if s.kubeClient == nil { return nil diff --git a/pilot/pkg/config/kube/ior/client.go b/pilot/pkg/config/kube/ior/client.go new file mode 100644 index 00000000000..5314b06f652 --- /dev/null +++ b/pilot/pkg/config/kube/ior/client.go @@ -0,0 +1,59 @@ +// Copyright Red Hat, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ior + +import ( + "strings" + "time" + + "istio.io/istio/pkg/kube" +) + +// KubeClient is an extension of `kube.Client` with auxiliary functions for IOR +type KubeClient interface { + IsRouteSupported() bool + GetActualClient() kube.Client + GetHandleEventTimeout() time.Duration +} + +type kubeClient struct { + client kube.Client +} + +// NewKubeClient creates the IOR version of KubeClient +func NewKubeClient(client kube.Client) KubeClient { + return &kubeClient{client: client} +} + +func (c *kubeClient) IsRouteSupported() bool { + _, s, _ := c.client.Kube().Discovery().ServerGroupsAndResources() + // This may fail if any api service is down, but the result will still be populated, so we skip the error + for _, res := range s { + for _, api := range res.APIResources { + if api.Kind == "Route" && strings.HasPrefix(res.GroupVersion, "route.openshift.io/") { + return true + } + } + } + return false +} + +func (c *kubeClient) GetActualClient() kube.Client { + return c.client +} + +func (c *kubeClient) GetHandleEventTimeout() time.Duration { + return 10 * time.Second +} diff --git a/pilot/pkg/config/kube/ior/fake.go b/pilot/pkg/config/kube/ior/fake.go new file mode 100644 index 00000000000..8c0f7990ad2 --- /dev/null +++ b/pilot/pkg/config/kube/ior/fake.go @@ -0,0 +1,254 @@ +// Copyright Red Hat, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ior + +import ( + "fmt" + "strings" + "sync" + "time" + + v1 "github.com/openshift/api/route/v1" + routev1 "github.com/openshift/client-go/route/clientset/versioned/typed/route/v1" + "golang.org/x/net/context" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/rest" + + "istio.io/istio/pkg/kube" + "istio.io/istio/pkg/servicemesh/controller" +) + +// FakeRouter implements routev1.RouteInterface +type FakeRouter struct { + routes map[string]*v1.Route + routesLock sync.Mutex +} + +// FakeRouterClient implements routev1.RouteV1Interface +type FakeRouterClient struct { + routesByNamespace map[string]routev1.RouteInterface + routesByNamespaceLock sync.Mutex +} + +type fakeKubeClient struct { + client kube.Client +} + +// NewFakeKubeClient creates a new FakeKubeClient +func NewFakeKubeClient(client kube.Client) KubeClient { + return &fakeKubeClient{client: client} +} + +func (c *fakeKubeClient) IsRouteSupported() bool { + return true +} + +func (c *fakeKubeClient) GetActualClient() kube.Client { + return c.client +} + +func (c *fakeKubeClient) GetHandleEventTimeout() time.Duration { + return time.Millisecond +} + +// NewFakeRouterClient creates a new FakeRouterClient +func NewFakeRouterClient() routev1.RouteV1Interface { + return &FakeRouterClient{ + routesByNamespace: make(map[string]routev1.RouteInterface), + } +} + +// NewFakeRouter creates a new FakeRouter +func NewFakeRouter() routev1.RouteInterface { + return &FakeRouter{ + routes: make(map[string]*v1.Route), + } +} + +// RESTClient implements routev1.RouteV1Interface +func (rc *FakeRouterClient) RESTClient() rest.Interface { + panic("not implemented") +} + +// Routes implements routev1.RouteV1Interface +func (rc *FakeRouterClient) Routes(namespace string) routev1.RouteInterface { + rc.routesByNamespaceLock.Lock() + defer rc.routesByNamespaceLock.Unlock() + + if _, ok := rc.routesByNamespace[namespace]; !ok { + rc.routesByNamespace[namespace] = NewFakeRouter() + } + + countCallsIncrement("routes") + return rc.routesByNamespace[namespace] +} + +var generatedHostNumber int + +// Create implements routev1.RouteInterface +func (fk *FakeRouter) Create(ctx context.Context, route *v1.Route, opts metav1.CreateOptions) (*v1.Route, error) { + fk.routesLock.Lock() + defer fk.routesLock.Unlock() + + if strings.Contains(route.Spec.Host, "/") { + return nil, fmt.Errorf("invalid hostname") + } + + if route.Spec.Host == "" { + generatedHostNumber++ + route.Spec.Host = fmt.Sprintf("generated-host%d.com", generatedHostNumber) + } + + fk.routes[route.Name] = route + + countCallsIncrement("create") + return route, nil +} + +// Update implements routev1.RouteInterface +func (fk *FakeRouter) Update(ctx context.Context, route *v1.Route, opts metav1.UpdateOptions) (*v1.Route, error) { + panic("not implemented") +} + +// UpdateStatus implements routev1.RouteInterface +func (fk *FakeRouter) UpdateStatus(ctx context.Context, route *v1.Route, opts metav1.UpdateOptions) (*v1.Route, error) { + panic("not implemented") +} + +// Delete implements routev1.RouteInterface +func (fk *FakeRouter) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error { + fk.routesLock.Lock() + defer fk.routesLock.Unlock() + + if _, ok := fk.routes[name]; !ok { + return fmt.Errorf("route %s not found", name) + } + + delete(fk.routes, name) + + countCallsIncrement("delete") + return nil +} + +// DeleteCollection implements routev1.RouteInterface +func (fk *FakeRouter) DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error { + panic("not implemented") +} + +// Get implements routev1.RouteInterface +func (fk *FakeRouter) Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Route, error) { + panic("not implemented") +} + +// List implements routev1.RouteInterface +func (fk *FakeRouter) List(ctx context.Context, opts metav1.ListOptions) (*v1.RouteList, error) { + fk.routesLock.Lock() + defer fk.routesLock.Unlock() + + var items []v1.Route + for _, route := range fk.routes { + items = append(items, *route) + } + result := &v1.RouteList{Items: items} + + countCallsIncrement("list") + return result, nil +} + +// Watch Create implements routev1.RouteInterface +func (fk *FakeRouter) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { + panic("not implemented") +} + +// Patch implements routev1.RouteInterface +func (fk *FakeRouter) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, + subresources ...string, +) (result *v1.Route, err error) { + panic("not implemented") +} + +// fakeMemberRollController implements controller.MemberRollController +type fakeMemberRollController struct { + listeners []controller.MemberRollListener + namespaces []string + lock sync.Mutex +} + +func newFakeMemberRollController() *fakeMemberRollController { + return &fakeMemberRollController{} +} + +// Register implements controller.MemberRollController +func (fk *fakeMemberRollController) Register(listener controller.MemberRollListener, name string) { + fk.lock.Lock() + defer fk.lock.Unlock() + + if listener == nil { + return + } + + // ensure that listener has no namespaces until the smmrc initializes it with the actual list of namespaces in the member roll + listener.SetNamespaces(nil) + + fk.listeners = append(fk.listeners, listener) +} + +// Start implements controller.MemberRollController +func (fk *fakeMemberRollController) Start(stopCh <-chan struct{}) { + panic("not implemented") +} + +func (fk *fakeMemberRollController) setNamespaces(namespaces ...string) { + fk.namespaces = namespaces + fk.invokeListeners() +} + +func (fk *fakeMemberRollController) invokeListeners() { + fk.lock.Lock() + defer fk.lock.Unlock() + + for _, l := range fk.listeners { + l.SetNamespaces(fk.namespaces) + } +} + +var ( + countCalls = map[string]int{} + countCallsLock sync.Mutex +) + +func countCallsReset() { + countCallsLock.Lock() + defer countCallsLock.Unlock() + countCalls = map[string]int{} +} + +func countCallsGet(k string) int { + countCallsLock.Lock() + defer countCallsLock.Unlock() + v, ok := countCalls[k] + if !ok { + v = 0 + } + return v +} + +func countCallsIncrement(k string) { + countCallsLock.Lock() + defer countCallsLock.Unlock() + countCalls[k]++ +} diff --git a/pilot/pkg/config/kube/ior/ior.go b/pilot/pkg/config/kube/ior/ior.go new file mode 100644 index 00000000000..06332c511d0 --- /dev/null +++ b/pilot/pkg/config/kube/ior/ior.go @@ -0,0 +1,97 @@ +// Copyright Red Hat, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ior + +import ( + "fmt" + "sync" + + routev1 "github.com/openshift/client-go/route/clientset/versioned/typed/route/v1" + + networking "istio.io/api/networking/v1alpha3" + "istio.io/istio/pilot/pkg/model" + "istio.io/istio/pkg/config" + "istio.io/istio/pkg/config/schema/collections" + "istio.io/istio/pkg/servicemesh/controller" + "istio.io/pkg/log" +) + +// IORLog is IOR-scoped log +var IORLog = log.RegisterScope("ior", "IOR logging", 0) + +// Register configures IOR component to respond to Gateway creations and removals +func Register( + k8sClient KubeClient, + routerClient routev1.RouteV1Interface, + store model.ConfigStoreController, + pilotNamespace string, + mrc controller.MemberRollController, + stop <-chan struct{}, + errorChannel chan error, +) error { + IORLog.Info("Registering IOR component") + + r := newRoute(k8sClient, routerClient, store, pilotNamespace, mrc, stop) + r.errorChannel = errorChannel + + alive := true + var aliveLock sync.Mutex + go func(stop <-chan struct{}) { + // Stop responding to events when we are no longer a leader. + // Two notes here: + // (1) There's no such method "UnregisterEventHandler()" + // (2) It might take a few seconds to this channel to be closed. So, both pods might be leader for a few seconds. + <-stop + IORLog.Info("This pod is no longer a leader. IOR stopped responding") + aliveLock.Lock() + alive = false + aliveLock.Unlock() + }(stop) + + IORLog.Debugf("Registering IOR into Istio's Gateway broadcast") + kind := collections.Gateway.GroupVersionKind() + store.RegisterEventHandler(kind, func(old, curr config.Config, event model.Event) { + aliveLock.Lock() + defer aliveLock.Unlock() + if !alive { + return + } + + // encapsulate in goroutine to not slow down processing because of waiting for mutex + go func() { + _, ok := curr.Spec.(*networking.Gateway) + if !ok { + IORLog.Errorf("could not decode object as Gateway. Object = %v", curr) + return + } + + debugMessage := fmt.Sprintf("Event %v arrived:", event) + if event == model.EventUpdate { + debugMessage += fmt.Sprintf("\tOld object: %v", old) + } + debugMessage += fmt.Sprintf("\tNew object: %v", curr) + IORLog.Debug(debugMessage) + + if err := r.handleEvent(event, curr); err != nil { + IORLog.Error(err) + if r.errorChannel != nil { + r.errorChannel <- err + } + } + }() + }) + + return nil +} diff --git a/pilot/pkg/config/kube/ior/ior_test.go b/pilot/pkg/config/kube/ior/ior_test.go new file mode 100644 index 00000000000..4bb303ddff1 --- /dev/null +++ b/pilot/pkg/config/kube/ior/ior_test.go @@ -0,0 +1,774 @@ +// Copyright Red Hat, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ior + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + routeapiv1 "github.com/openshift/api/route/v1" + routev1 "github.com/openshift/client-go/route/clientset/versioned/typed/route/v1" + "github.com/stretchr/testify/assert" + k8sioapicorev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/cache" + + networking "istio.io/api/networking/v1alpha3" + "istio.io/istio/pilot/pkg/config/kube/crdclient" + "istio.io/istio/pilot/pkg/model" + "istio.io/istio/pkg/config" + "istio.io/istio/pkg/config/schema/collections" + "istio.io/istio/pkg/kube" + memberroll "istio.io/istio/pkg/servicemesh/controller" + "istio.io/istio/pkg/test/util/retry" + "istio.io/pkg/log" +) + +const prefixedLabel = maistraPrefix + "fake" + +func initClients(t *testing.T, + stop <-chan struct{}, + errorChannel chan error, + mrc memberroll.MemberRollController, + register bool, +) (model.ConfigStoreController, KubeClient, routev1.RouteV1Interface) { + t.Helper() + + k8sClient := kube.NewFakeClient() + iorKubeClient := NewFakeKubeClient(k8sClient) + routerClient := NewFakeRouterClient() + store, err := crdclient.New(k8sClient, crdclient.Option{}) + if err != nil { + t.Fatal(err) + } + + go store.Run(stop) + k8sClient.RunAndWait(stop) + cache.WaitForCacheSync(stop, store.HasSynced) + retry.UntilSuccessOrFail(t, func() error { + if !store.HasSynced() { + return fmt.Errorf("store has not synced yet") + } + return nil + }, retry.Timeout(time.Second)) + + if register { + if err := Register(iorKubeClient, routerClient, store, "istio-system", mrc, stop, errorChannel); err != nil { + t.Fatal(err) + } + } + + return store, iorKubeClient, routerClient +} + +func TestCreate(t *testing.T) { + cases := []struct { + testName string + ns string + hosts []string + gwSelector map[string]string + expectedRoutes int + expectedError string + tls bool + annotations map[string]string + }{ + { + "One host", + "istio-system", + []string{"one.org"}, + map[string]string{"istio": "ingressgateway"}, + 1, + "", + false, + nil, + }, + { + "Two hosts", + "istio-system", + []string{"two.org", "three.com"}, + map[string]string{"istio": "ingressgateway"}, + 2, + "", + false, + nil, + }, + { + "Wildcard 1", + "istio-system", + []string{"*"}, + map[string]string{"istio": "ingressgateway"}, + 1, + "", + false, + nil, + }, + { + "Wildcard 2", + "istio-system", + []string{"*.a.com"}, + map[string]string{"istio": "ingressgateway"}, + 1, + "", + false, + nil, + }, + { + "Invalid gateway", + "istio-system", + []string{"fail.com"}, + map[string]string{"istio": "nonexistent"}, + 0, + "could not find a service that matches the gateway selector `istio=nonexistent'", + false, + nil, + }, + { + "TLS 1", + "istio-system", + []string{"one.org"}, + map[string]string{"istio": "ingressgateway"}, + 1, + "", + true, + nil, + }, + { + "Non-existing namespace", + "non-existing", + []string{"fail.com"}, + map[string]string{"istio": "ingressgateway"}, + 0, + "could not handle the ADD event for non-existing", + false, + nil, + }, + { + "Gateway not managed", + "istio-system", + []string{"notmanaged.org"}, + map[string]string{"istio": "ingressgateway"}, + 0, + "", + false, + map[string]string{ShouldManageRouteAnnotation: "false", "foo": "bar"}, + }, + { + "Gateway explicitly managed", + "istio-system", + []string{"explicitlymanaged.org"}, + map[string]string{"istio": "ingressgateway"}, + 1, + "", + false, + map[string]string{ShouldManageRouteAnnotation: "TRUE", "foo": "bar"}, + }, + { + "Gateway explicitly managed with an invalid value", + "istio-system", + []string{"explicitlymanaged.org"}, + map[string]string{"istio": "ingressgateway"}, + 0, + fmt.Sprintf("could not parse annotation %q:", ShouldManageRouteAnnotation), + false, + map[string]string{ShouldManageRouteAnnotation: "ABC", "foo": "bar"}, + }, + { + "Egress gateway must be ignored", + "istio-system", + []string{"egress.org"}, + map[string]string{"istio": "egressgateway"}, + 0, + "", + false, + nil, + }, + { + "Host with all namespaces", + "istio-system", + []string{"*/one.org"}, + map[string]string{"istio": "ingressgateway"}, + 1, + "", + false, + nil, + }, + { + "Host with current namespace", + "istio-system", + []string{"./one.org"}, + map[string]string{"istio": "ingressgateway"}, + 1, + "", + false, + nil, + }, + { + "Host with a specific namespace", + "istio-system", + []string{"ns1/one.org"}, + map[string]string{"istio": "ingressgateway"}, + 1, + "", + false, + nil, + }, + { + "Host with a namespace and wildcard", + "istio-system", + []string{"*/*.one.org"}, + map[string]string{"istio": "ingressgateway"}, + 1, + "", + false, + nil, + }, + } + + IORLog.SetOutputLevel(log.DebugLevel) + + var stop chan struct{} + var errorChannel chan error + var store model.ConfigStoreController + var k8sClient KubeClient + var routerClient routev1.RouteV1Interface + var mrc *fakeMemberRollController + controlPlaneNs := "istio-system" + + for _, testType := range []string{"initialSync", "events"} { + if testType == "events" { + stop = make(chan struct{}) + defer func() { close(stop) }() + errorChannel = make(chan error) + mrc = newFakeMemberRollController() + store, k8sClient, routerClient = initClients(t, stop, errorChannel, mrc, true) + mrc.setNamespaces(controlPlaneNs) + + createIngressGateway(t, k8sClient.GetActualClient(), controlPlaneNs, map[string]string{"istio": "ingressgateway"}) + } + + for i, c := range cases { + t.Run(testType+"-"+c.testName, func(t *testing.T) { + if testType == "initialSync" { + stop = make(chan struct{}) + defer func() { close(stop) }() + errorChannel = make(chan error) + mrc = newFakeMemberRollController() + store, k8sClient, routerClient = initClients(t, stop, errorChannel, mrc, false) + createIngressGateway(t, k8sClient.GetActualClient(), controlPlaneNs, map[string]string{"istio": "ingressgateway"}) + if err := Register(k8sClient, routerClient, store, controlPlaneNs, mrc, stop, errorChannel); err != nil { + t.Fatal(err) + } + } + gatewayName := fmt.Sprintf("gw%d", i) + createGateway(t, store, c.ns, gatewayName, c.hosts, c.gwSelector, c.tls, c.annotations) + if testType == "initialSync" { + mrc.setNamespaces(controlPlaneNs) + } + list, _ := getRoutes(t, routerClient, controlPlaneNs, c.expectedRoutes, time.Second) + if err := getError(errorChannel); err != nil { + if c.expectedError == "" { + t.Fatal(err) + } + + if !strings.Contains(err.Error(), c.expectedError) { + t.Fatalf("expected error message containing `%s', got: %s", c.expectedError, err.Error()) + } + + // Error is expected and matches the golden string, nothing to do + } else { + if c.expectedError != "" { + t.Fatalf("expected error message containing `%s', got success", c.expectedError) + } + + // Only continue the validation if any route is expected to be created + if c.expectedRoutes > 0 { + validateRoutes(t, c.hosts, list, gatewayName, c.tls) + + // Remove the gateway and expect all routes get removed + deleteGateway(t, store, c.ns, gatewayName) + _, _ = getRoutes(t, routerClient, c.ns, 0, time.Second) + if err := getError(errorChannel); err != nil { + t.Fatal(err) + } + } + } + }) + } + } +} + +func validateRoutes(t *testing.T, hosts []string, list *routeapiv1.RouteList, gatewayName string, tls bool) { + for _, host := range hosts { + route := findRouteByHost(list, host) + if route == nil { + t.Fatalf("could not find a route with hostname %s", host) + } + + // Check metadata + if route.Labels[gatewayNameLabel] != gatewayName { + t.Fatalf("wrong label, expecting %s, got %s", gatewayName, route.Annotations[gatewayNameLabel]) + } + if route.Annotations["foo"] != "bar" { + t.Fatal("gateway annotations were not copied to route") + } + if _, found := route.Annotations[ShouldManageRouteAnnotation]; found { + t.Fatalf("annotation %q should not be copied to the route", ShouldManageRouteAnnotation) + } + if route.Labels["foo"] != "bar" { + t.Fatal("gateway labels were not copied to route") + } + if _, found := route.Labels[prefixedLabel]; found { + t.Fatalf("label %q should not be copied to the route", prefixedLabel) + } + + // Check hostname + if host == "*" && route.Spec.Host == "*" { + t.Fatal("Route's host wrongly set to *") + } + if strings.Contains(host, "*.") && !strings.Contains(route.Spec.Host, "wildcard.") { + t.Fatal("Route's host wrongly not set to wildcard.") + } + + // TLS + if tls { + if route.Spec.TLS.InsecureEdgeTerminationPolicy != routeapiv1.InsecureEdgeTerminationPolicyRedirect { + t.Fatalf("wrong InsecureEdgeTerminationPolicy: %v", route.Spec.TLS.InsecureEdgeTerminationPolicy) + } + if route.Spec.TLS.Termination != routeapiv1.TLSTerminationPassthrough { + t.Fatalf("wrong Termination: %v", route.Spec.TLS.Termination) + } + } + } +} + +func TestEdit(t *testing.T) { + cases := []struct { + testName string + ns string + hosts []string + gwSelector map[string]string + expectedRoutes int + expectedError string + tls bool + }{ + { + "One host", + "istio-system", + []string{"def.com"}, + map[string]string{"istio": "ingressgateway"}, + 1, + "", + false, + }, + { + "Two hosts", + "istio-system", + []string{"ghi.org", "jkl.com"}, + map[string]string{"istio": "ingressgateway"}, + 2, + "", + false, + }, + { + "Wildcard 1", + "istio-system", + []string{"*"}, + map[string]string{"istio": "ingressgateway"}, + 1, + "", + false, + }, + { + "Wildcard 2", + "istio-system", + []string{"*.a.com"}, + map[string]string{"istio": "ingressgateway"}, + 1, + "", + false, + }, + { + "TLS 1", + "istio-system", + []string{"one.org"}, + map[string]string{"istio": "ingressgateway"}, + 1, + "", + true, + }, + } + + stop := make(chan struct{}) + defer func() { close(stop) }() + errorChannel := make(chan error) + mrc := newFakeMemberRollController() + store, k8sClient, routerClient := initClients(t, stop, errorChannel, mrc, true) + + controlPlane := "istio-system" + createIngressGateway(t, k8sClient.GetActualClient(), controlPlane, map[string]string{"istio": "ingressgateway"}) + createGateway(t, store, controlPlane, "gw", []string{"abc.com"}, map[string]string{"istio": "ingressgateway"}, false, nil) + mrc.setNamespaces("istio-system") + + list, _ := getRoutes(t, routerClient, controlPlane, 1, time.Second) + if err := getError(errorChannel); err != nil { + t.Fatal(err) + } + + for i, c := range cases { + t.Run(c.testName, func(t *testing.T) { + editGateway(t, store, c.ns, "gw", c.hosts, c.gwSelector, c.tls, fmt.Sprintf("%d", i+2)) + list, _ = getRoutes(t, routerClient, controlPlane, c.expectedRoutes, time.Second) + if err := getError(errorChannel); err != nil { + t.Fatal(err) + } + + validateRoutes(t, c.hosts, list, "gw", c.tls) + }) + } +} + +// TestPerf makes sure we are not doing more API calls than necessary +func TestPerf(t *testing.T) { + IORLog.SetOutputLevel(log.DebugLevel) + countCallsReset() + + stop := make(chan struct{}) + defer func() { close(stop) }() + errorChannel := make(chan error) + mrc := newFakeMemberRollController() + store, k8sClient, routerClient := initClients(t, stop, errorChannel, mrc, true) + + // Create a bunch of namespaces and gateways, and make sure they don't take too long to be created + createIngressGateway(t, k8sClient.GetActualClient(), "istio-system", map[string]string{"istio": "ingressgateway"}) + qty := 100 + qtyNamespaces := qty + 1 + createGateways(t, store, 1, qty) + mrc.setNamespaces(generateNamespaces(qty)...) + + // It takes ~ 2s on my laptop, it's slower on prow + _, ignore := getRoutes(t, routerClient, "istio-system", qty, time.Minute) + if err := getError(errorChannel); err != nil { + t.Fatal(err) + } + assert.Equal(t, qty, countCallsGet("create"), "wrong number of calls to client.Routes().Create()") + assert.Equal(t, 0, countCallsGet("delete"), "wrong number of calls to client.Routes().Delete()") + assert.Equal(t, qtyNamespaces, countCallsGet("list")-ignore, "wrong number of calls to client.Routes().List()") + // qty=number of Create() calls; qtyNamespaces=number of List() calls + assert.Equal(t, qty+qtyNamespaces, countCallsGet("routes")-ignore, "wrong number of calls to client.Routes()") + + // Now we have a lot of routes created, let's create one more gateway. We don't expect a lot of new API calls + countCallsReset() + createGateway(t, store, "ns1", "gw-ns1-1", []string{"instant.com"}, map[string]string{"istio": "ingressgateway"}, false, nil) + _, ignore = getRoutes(t, routerClient, "istio-system", qty+1, time.Second) + if err := getError(errorChannel); err != nil { + t.Fatal(err) + } + assert.Equal(t, 1, countCallsGet("create"), "wrong number of calls to client.Routes().Create()") + assert.Equal(t, 0, countCallsGet("delete"), "wrong number of calls to client.Routes().Delete()") + assert.Equal(t, 0, countCallsGet("list")-ignore, "wrong number of calls to client.Routes().List()") + assert.Equal(t, 1, countCallsGet("routes")-ignore, "wrong number of calls to client.Routes()") + + // Editing. We don't expect a lot of new API calls + countCallsReset() + editGateway(t, store, "ns1", "gw-ns1-1", []string{"edited.com", "edited-other.com"}, map[string]string{"istio": "ingressgateway"}, false, "2") + _, ignore = getRoutes(t, routerClient, "istio-system", qty+2, time.Second) + if err := getError(errorChannel); err != nil { + t.Fatal(err) + } + assert.Equal(t, 2, countCallsGet("create"), "wrong number of calls to client.Routes().Create()") + assert.Equal(t, 1, countCallsGet("delete"), "wrong number of calls to client.Routes().Delete()") + assert.Equal(t, 0, countCallsGet("list")-ignore, "wrong number of calls to client.Routes().List()") + assert.Equal(t, 3, countCallsGet("routes")-ignore, "wrong number of calls to client.Routes()") + + // Same for deletion. We don't expect a lot of new API calls + countCallsReset() + deleteGateway(t, store, "ns1", "gw-ns1-1") + _, ignore = getRoutes(t, routerClient, "istio-system", qty, time.Second) + if err := getError(errorChannel); err != nil { + t.Fatal(err) + } + assert.Equal(t, 0, countCallsGet("create"), "wrong number of calls to client.Routes().Create()") + assert.Equal(t, 2, countCallsGet("delete"), "wrong number of calls to client.Routes().Delete()") + assert.Equal(t, 0, countCallsGet("list")-ignore, "wrong number of calls to client.Routes().List()") + assert.Equal(t, 2, countCallsGet("routes")-ignore, "wrong number of calls to client.Routes()") +} + +// TestConcurrency makes sure IOR can respond to events even when doing its initial sync +func TestConcurrency(t *testing.T) { + IORLog.SetOutputLevel(log.DebugLevel) + stop := make(chan struct{}) + defer func() { close(stop) }() + errorChannel := make(chan error) + mrc := newFakeMemberRollController() + store, k8sClient, routerClient := initClients(t, stop, errorChannel, mrc, true) + + // Create a bunch of namespaces and gateways + createIngressGateway(t, k8sClient.GetActualClient(), "istio-system", map[string]string{"istio": "ingressgateway"}) + qty := 50 + createGateways(t, store, 1, qty) + mrc.setNamespaces(generateNamespaces(qty)...) + + // At the same time, while IOR is processing those initial `qty` gateways, create `qty` more + go func() { + mrc.setNamespaces(generateNamespaces(qty * 2)...) + createGateways(t, store, qty+1, qty*2) + }() + + // And expect all `qty * 2` gateways to be created + _, _ = getRoutes(t, routerClient, "istio-system", (qty * 2), time.Minute) + if err := getError(errorChannel); err != nil { + t.Fatal(err) + } +} + +func TestDuplicateUpdateEvents(t *testing.T) { + IORLog.SetOutputLevel(log.DebugLevel) + stop := make(chan struct{}) + defer func() { close(stop) }() + errorChannel := make(chan error) + mrc := newFakeMemberRollController() + store, k8sClient, routerClient := initClients(t, stop, errorChannel, mrc, false) + + r := newRoute(k8sClient, routerClient, store, "istio-system", mrc, stop) + + mrc.setNamespaces("istio-system") + createIngressGateway(t, k8sClient.GetActualClient(), "istio-system", map[string]string{"istio": "ingressgateway"}) + + cfg := config.Config{ + Meta: config.Meta{ + GroupVersionKind: collections.Gateway.GroupVersionKind(), + Namespace: "istio-system", + Name: "a", + ResourceVersion: "1", + }, + Spec: &networking.Gateway{ + Servers: []*networking.Server{ + { + Hosts: []string{"a.com"}, + }, + }, + }, + } + + // Create the first router, should work just fine + err := r.handleEvent(model.EventAdd, cfg) + if err != nil { + t.Fatal(err) + } + func() { + r.gatewaysLock.Lock() + defer r.gatewaysLock.Unlock() + if len(r.gatewaysMap) != 1 { + t.Fatal("error creating the first route") + } + }() + + // Simulate an UPDATE event with the same data, should be ignored + err = r.handleEvent(model.EventUpdate, cfg) + if err == nil { + t.Fatalf("expecting the error: %q, but got nothing", eventDuplicatedMessage) + } + if msg := err.Error(); msg != eventDuplicatedMessage { + t.Fatalf("expecting the error: %q, but got %q", eventDuplicatedMessage, msg) + } +} + +func generateNamespaces(qty int) []string { + var result []string + + for i := 1; i <= qty; i++ { + result = append(result, fmt.Sprintf("ns%d", i)) + } + + return append(result, "istio-system") +} + +func createGateways(t *testing.T, store model.ConfigStoreController, begin, end int) { + for i := begin; i <= end; i++ { + createGateway(t, + store, + fmt.Sprintf("ns%d", i), + fmt.Sprintf("gw-ns%d", i), + []string{fmt.Sprintf("d%d.com", i)}, + map[string]string{"istio": "ingressgateway"}, + false, + nil) + } +} + +// getError tries to read an error from the error channel. +// It tries 3 times beforing returning nil, in case of there's no error in the channel, +// this is to give some time to async functions to run and fill the channel properly +func getError(errorChannel chan error) error { + for i := 1; i < 3; i++ { + select { + case err := <-errorChannel: + return err + default: + } + time.Sleep(10 * time.Millisecond) + } + return nil +} + +// getRoutes is a helper function that keeps trying getting a list of routes until it gets `size` items. +// It returns the list of routes itself and the number of retries it run +func getRoutes(t *testing.T, routerClient routev1.RouteV1Interface, ns string, size int, timeout time.Duration) (*routeapiv1.RouteList, int) { + var list *routeapiv1.RouteList + + t.Helper() + count := 0 + + retry.UntilSuccessOrFail(t, func() error { + var err error + + time.Sleep(time.Millisecond * 100) + list, err = routerClient.Routes(ns).List(context.TODO(), v1.ListOptions{}) + count++ + if err != nil { + return err + } + if len(list.Items) != size { + return fmt.Errorf("expected %d route(s), got %d", size, len(list.Items)) + } + return nil + }, retry.Timeout(timeout)) + + return list, count +} + +func findRouteByHost(list *routeapiv1.RouteList, host string) *routeapiv1.Route { + for _, route := range list.Items { + if route.Annotations[originalHostAnnotation] == host { + return &route + } + } + return nil +} + +func createIngressGateway(t *testing.T, client kube.Client, ns string, labels map[string]string) { + t.Helper() + createPod(t, client, ns, labels) + createService(t, client, ns, labels) +} + +func createPod(t *testing.T, client kube.Client, ns string, labels map[string]string) { + t.Helper() + + _, err := client.Kube().CoreV1().Pods(ns).Create(context.TODO(), &k8sioapicorev1.Pod{ + ObjectMeta: v1.ObjectMeta{ + Labels: labels, + }, + }, v1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } +} + +func createService(t *testing.T, client kube.Client, ns string, labels map[string]string) { + t.Helper() + + _, err := client.Kube().CoreV1().Services(ns).Create(context.TODO(), &k8sioapicorev1.Service{ + ObjectMeta: v1.ObjectMeta{ + Labels: labels, + }, + }, v1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } +} + +func createGateway(t *testing.T, store model.ConfigStoreController, ns string, name string, hosts []string, gwSelector map[string]string, + tls bool, annotations map[string]string, +) { + t.Helper() + + var tlsConfig *networking.ServerTLSSettings + if tls { + tlsConfig = &networking.ServerTLSSettings{HttpsRedirect: true} + } + + if annotations == nil { + annotations = map[string]string{"foo": "bar"} + } + + _, err := store.Create(config.Config{ + Meta: config.Meta{ + GroupVersionKind: collections.Gateway.GroupVersionKind(), + Namespace: ns, + Name: name, + Annotations: annotations, + Labels: map[string]string{"foo": "bar", prefixedLabel: "present"}, + ResourceVersion: "1", + }, + Spec: &networking.Gateway{ + Selector: gwSelector, + Servers: []*networking.Server{ + { + Hosts: hosts, + Tls: tlsConfig, + }, + }, + }, + }) + if err != nil { + t.Fatal(err) + } +} + +func editGateway(t *testing.T, store model.ConfigStoreController, ns string, name string, hosts []string, + gwSelector map[string]string, tls bool, resource string, +) { + t.Helper() + + var tlsConfig *networking.ServerTLSSettings + if tls { + tlsConfig = &networking.ServerTLSSettings{HttpsRedirect: true} + } + _, err := store.Update(config.Config{ + Meta: config.Meta{ + GroupVersionKind: collections.Gateway.GroupVersionKind(), + Namespace: ns, + Name: name, + Annotations: map[string]string{"foo": "bar"}, + Labels: map[string]string{"foo": "bar", prefixedLabel: "present"}, + ResourceVersion: resource, + }, + Spec: &networking.Gateway{ + Selector: gwSelector, + Servers: []*networking.Server{ + { + Hosts: hosts, + Tls: tlsConfig, + }, + }, + }, + }) + if err != nil { + t.Fatal(err) + } +} + +func deleteGateway(t *testing.T, store model.ConfigStoreController, ns string, name string) { + t.Helper() + + err := store.Delete(collections.Gateway.GroupVersionKind(), name, ns, nil) + if err != nil { + t.Fatal(err) + } +} diff --git a/pilot/pkg/config/kube/ior/route.go b/pilot/pkg/config/kube/ior/route.go new file mode 100644 index 00000000000..43b3680ecc7 --- /dev/null +++ b/pilot/pkg/config/kube/ior/route.go @@ -0,0 +1,627 @@ +// Copyright Red Hat, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ior + +import ( + "crypto/sha256" + "encoding/hex" + "fmt" + "strconv" + "strings" + "sync" + "time" + + "github.com/hashicorp/go-multierror" + v1 "github.com/openshift/api/route/v1" + routev1 "github.com/openshift/client-go/route/clientset/versioned/typed/route/v1" + "golang.org/x/net/context" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + + networking "istio.io/api/networking/v1alpha3" + "istio.io/istio/pilot/pkg/model" + "istio.io/istio/pkg/config" + "istio.io/istio/pkg/config/schema/collections" + "istio.io/istio/pkg/kube" + "istio.io/istio/pkg/servicemesh/controller" +) + +const ( + maistraPrefix = "maistra.io/" + generatedByLabel = maistraPrefix + "generated-by" + generatedByValue = "ior" + originalHostAnnotation = maistraPrefix + "original-host" + gatewayNameLabel = maistraPrefix + "gateway-name" + gatewayNamespaceLabel = maistraPrefix + "gateway-namespace" + gatewayResourceVersionLabel = maistraPrefix + "gateway-resourceVersion" + ShouldManageRouteAnnotation = maistraPrefix + "manageRoute" + + eventDuplicatedMessage = "event UPDATE arrived but resourceVersions are the same - ignoring" +) + +type syncRoutes struct { + metadata config.Meta + gateway *networking.Gateway + routes []*v1.Route +} + +// route manages the integration between Istio Gateways and OpenShift Routes +type route struct { + pilotNamespace string + routerClient routev1.RouteV1Interface + kubeClient kubernetes.Interface + store model.ConfigStoreController + gatewaysMap map[string]*syncRoutes + gatewaysLock sync.Mutex + initialSyncRun chan struct{} + alive bool + stop <-chan struct{} + handleEventTimeout time.Duration + errorChannel chan error + + // memberroll functionality + mrc controller.MemberRollController + namespaceLock sync.Mutex + namespaces []string + gotInitialUpdate bool +} + +// NewRouterClient returns an OpenShift client for Routers +func NewRouterClient() (routev1.RouteV1Interface, error) { + config, err := kube.BuildClientConfig("", "") + if err != nil { + return nil, err + } + + client, err := routev1.NewForConfig(config) + if err != nil { + return nil, err + } + + return client, nil +} + +// newRoute returns a new instance of Route object +func newRoute( + kubeClient KubeClient, + routerClient routev1.RouteV1Interface, + store model.ConfigStoreController, + pilotNamespace string, + mrc controller.MemberRollController, + stop <-chan struct{}, +) *route { + for !kubeClient.IsRouteSupported() { + IORLog.Infof("routes are not supported in this cluster; waiting for Route resource to become available...") + time.Sleep(10 * time.Second) + } + + r := &route{} + + r.kubeClient = kubeClient.GetActualClient().Kube() + r.routerClient = routerClient + r.pilotNamespace = pilotNamespace + r.store = store + r.mrc = mrc + r.namespaces = []string{pilotNamespace} + r.stop = stop + r.initialSyncRun = make(chan struct{}) + r.handleEventTimeout = kubeClient.GetHandleEventTimeout() + + if r.mrc != nil { + IORLog.Debugf("Registering IOR into SMMR broadcast") + r.alive = true + r.mrc.Register(r, "ior") + + go func(stop <-chan struct{}) { + <-stop + r.alive = false + IORLog.Debugf("Unregistering IOR from SMMR broadcast") + }(stop) + } + + return r +} + +// initialSync runs on initialization only. +// +// It lists all Istio Gateways (source of truth) and OpenShift Routes, compares them and makes the necessary adjustments +// (creation and/or removal of routes) so that gateways and routes be in sync. +func (r *route) initialSync(initialNamespaces []string) error { + var result *multierror.Error + r.gatewaysMap = make(map[string]*syncRoutes) + + r.gatewaysLock.Lock() + defer r.gatewaysLock.Unlock() + + // List the gateways and put them into the gatewaysMap + // The store must be synced otherwise we might get an empty list + // We enforce this before calling this function in UpdateNamespaces() + configs := r.store.List(collections.Gateway.GroupVersionKind(), model.NamespaceAll) + + IORLog.Debugf("initialSync() - Got %d Gateway(s)", len(configs)) + + for i, cfg := range configs { + if err := r.ensureNamespaceExists(cfg); err != nil { + result = multierror.Append(result, err) + continue + } + manageRoute, err := isManagedByIOR(cfg) + if err != nil { + result = multierror.Append(result, err) + continue + } + if !manageRoute { + IORLog.Debugf("initialSync() - Ignoring Gateway %s/%s as it is not managed by Istiod", cfg.Namespace, cfg.Name) + continue + } + + IORLog.Debugf("initialSync() - Parsing Gateway [%d] %s/%s", i+1, cfg.Namespace, cfg.Name) + r.addNewSyncRoute(cfg) + } + + // List the routes and put them into a map. Map key is the route object name + routes := map[string]v1.Route{} + for _, ns := range initialNamespaces { + IORLog.Debugf("initialSync() - Listing routes in ns %s", ns) + routeList, err := r.routerClient.Routes(ns).List(context.TODO(), metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s", generatedByLabel, generatedByValue), + }) + if err != nil { + return fmt.Errorf("could not get list of Routes in namespace %s: %s", ns, err) + } + for _, route := range routeList.Items { + routes[route.Name] = route + } + } + IORLog.Debugf("initialSync() - Got %d route(s) across all %d namespace(s)", len(routes), len(initialNamespaces)) + + // Now that we have maps and routes mapped we can compare them (Gateways are the source of truth) + for _, syncRoute := range r.gatewaysMap { + for _, server := range syncRoute.gateway.Servers { + for _, host := range server.Hosts { + actualHost, _ := getActualHost(host, false) + routeName := getRouteName(syncRoute.metadata.Namespace, syncRoute.metadata.Name, actualHost) + route, ok := routes[routeName] + if ok { + // A route for this host was found, remove its entry in this map so that in the end only orphan routes are left + delete(routes, routeName) + + // Route matches, no need to create one. Put it in the gatewaysMap and move to the next one + if syncRoute.metadata.ResourceVersion == route.Labels[gatewayResourceVersionLabel] { + syncRoute.routes = append(syncRoute.routes, &route) + continue + } + + // Route does not match, remove it. + result = multierror.Append(result, r.deleteRoute(&route)) + } + + // Route is not found or was removed above because it didn't match. We need to create one now. + route2, err := r.createRoute(syncRoute.metadata, syncRoute.gateway, host, server.Tls) + if err != nil { + result = multierror.Append(result, err) + } else { + // Put it in the gatewaysMap and move to the next one + syncRoute.routes = append(syncRoute.routes, route2) + } + } + } + } + + // At this point there are routes for every hostname in every Gateway. + // The `routes` map should only contain "orphan" routes, i.e., routes that do not belong to any Gateway + // + for _, route := range routes { + result = multierror.Append(result, r.deleteRoute(&route)) + } + + return result.ErrorOrNil() +} + +func gatewaysMapKey(namespace, name string) string { + return namespace + "/" + name +} + +// addNewSyncRoute creates a new syncRoutes and adds it to the gatewaysMap +// Must be called with gatewaysLock locked +func (r *route) addNewSyncRoute(cfg config.Config) *syncRoutes { + gw := cfg.Spec.(*networking.Gateway) + syncRoute := &syncRoutes{ + metadata: cfg.Meta, + gateway: gw, + } + + r.gatewaysMap[gatewaysMapKey(cfg.Namespace, cfg.Name)] = syncRoute + return syncRoute +} + +// ensureNamespaceExists makes sure the gateway namespace is present in r.namespaces +// r.namespaces is updated by the SMMR controller, in SetNamespaces() +// This handles the case where an ADD event comes before SetNamespaces() is called and +// the unlikely case an ADD event arrives for a gateway whose namespace does not belong to the SMMR at all +func (r *route) ensureNamespaceExists(cfg config.Config) error { + timeout := time.After(r.handleEventTimeout) // production default is 10s, but test default is only 1ms + + for { + r.namespaceLock.Lock() + namespaces := r.namespaces + r.namespaceLock.Unlock() + + for _, ns := range namespaces { + if ns == cfg.Namespace { + IORLog.Debugf("Namespace %s found in SMMR", cfg.Namespace) + return nil + } + } + + select { + case <-timeout: + IORLog.Debugf("Namespace %s not found in SMMR. Aborting.", cfg.Namespace) + return fmt.Errorf("could not handle the ADD event for %s/%s: SMMR does not recognize this namespace", cfg.Namespace, cfg.Name) + default: + IORLog.Debugf("Namespace %s not found in SMMR, trying again", cfg.Namespace) + } + time.Sleep(r.handleEventTimeout / 100) + } +} + +func (r *route) handleAdd(cfg config.Config) error { + var result *multierror.Error + + if err := r.ensureNamespaceExists(cfg); err != nil { + return err + } + + r.gatewaysLock.Lock() + defer r.gatewaysLock.Unlock() + + if _, ok := r.gatewaysMap[gatewaysMapKey(cfg.Namespace, cfg.Name)]; ok { + IORLog.Infof("gateway %s/%s already exists, not creating route(s) for it", cfg.Namespace, cfg.Name) + return nil + } + + syncRoute := r.addNewSyncRoute(cfg) + + for _, server := range syncRoute.gateway.Servers { + for _, host := range server.Hosts { + route, err := r.createRoute(cfg.Meta, syncRoute.gateway, host, server.Tls) + if err != nil { + result = multierror.Append(result, err) + } else { + syncRoute.routes = append(syncRoute.routes, route) + } + } + } + + return result.ErrorOrNil() +} + +func isManagedByIOR(cfg config.Config) (bool, error) { + // We don't manage egress gateways, but we can only look for the default label here. + // Users can still use generic labels (e.g. "app: my-ingressgateway" as in the istio docs) to refer to the gateway pod + gw := cfg.Spec.(*networking.Gateway) + if istioLabel, ok := gw.Selector["istio"]; ok && istioLabel == "egressgateway" { + return false, nil + } + + manageRouteValue, ok := cfg.Annotations[ShouldManageRouteAnnotation] + if !ok { + // Manage routes by default, when annotation is not found. + return true, nil + } + + manageRoute, err := strconv.ParseBool(manageRouteValue) + if err != nil { + return false, fmt.Errorf("could not parse annotation %q: %s", ShouldManageRouteAnnotation, err) + } + + return manageRoute, nil +} + +func (r *route) handleDel(cfg config.Config) error { + var result *multierror.Error + + r.gatewaysLock.Lock() + defer r.gatewaysLock.Unlock() + + key := gatewaysMapKey(cfg.Namespace, cfg.Name) + syncRoute, ok := r.gatewaysMap[key] + if !ok { + return fmt.Errorf("could not find an internal reference to gateway %s/%s", cfg.Namespace, cfg.Name) + } + + IORLog.Debugf("The gateway %s/%s has %d route(s) associated with it. Removing them now.", cfg.Namespace, cfg.Name, len(syncRoute.routes)) + for _, route := range syncRoute.routes { + result = multierror.Append(result, r.deleteRoute(route)) + } + + delete(r.gatewaysMap, key) + + return result.ErrorOrNil() +} + +func (r *route) verifyResourceVersions(cfg config.Config) error { + r.gatewaysLock.Lock() + defer r.gatewaysLock.Unlock() + + key := gatewaysMapKey(cfg.Namespace, cfg.Name) + syncRoute, ok := r.gatewaysMap[key] + if !ok { + return fmt.Errorf("could not find an internal reference to gateway %s/%s", cfg.Namespace, cfg.Name) + } + + if syncRoute.metadata.ResourceVersion != cfg.ResourceVersion { + return nil + } + + return fmt.Errorf(eventDuplicatedMessage) +} + +func (r *route) handleEvent(event model.Event, cfg config.Config) error { + // Block until initial sync has finished + <-r.initialSyncRun + + manageRoute, err := isManagedByIOR(cfg) + if err != nil { + return err + } + if !manageRoute { + IORLog.Infof("Ignoring Gateway %s/%s as it is not managed by Istiod", cfg.Namespace, cfg.Name) + return nil + } + + switch event { + case model.EventAdd: + return r.handleAdd(cfg) + + case model.EventUpdate: + if err = r.verifyResourceVersions(cfg); err != nil { + return err + } + + var result *multierror.Error + result = multierror.Append(result, r.handleDel(cfg)) + result = multierror.Append(result, r.handleAdd(cfg)) + return result.ErrorOrNil() + + case model.EventDelete: + return r.handleDel(cfg) + } + + return fmt.Errorf("unknown event type %s", event) +} + +// Trigerred by SMMR controller when SMMR changes +func (r *route) SetNamespaces(namespaces []string) { + if !r.alive { + return + } + + if namespaces == nil { + return + } + + IORLog.Debugf("UpdateNamespaces(%v)", namespaces) + r.namespaceLock.Lock() + r.namespaces = namespaces + r.namespaceLock.Unlock() + + if r.gotInitialUpdate { + return + } + r.gotInitialUpdate = true + + // In the first update we perform an initial sync + go func() { + // But only after gateway store cache is synced + IORLog.Debug("Waiting for the Gateway store cache to sync before performing our initial sync") + if !cache.WaitForNamedCacheSync("Gateways", r.stop, r.store.HasSynced) { + IORLog.Infof("Failed to sync Gateway store cache. Not performing initial sync.") + return + } + IORLog.Debug("Gateway store cache synced. Performing our initial sync now") + + if err := r.initialSync(namespaces); err != nil { + IORLog.Error(err) + if r.errorChannel != nil { + r.errorChannel <- err + } + } + IORLog.Debug("Initial sync finished") + close(r.initialSyncRun) + }() +} + +func getHost(route v1.Route) string { + if host := route.ObjectMeta.Annotations[originalHostAnnotation]; host != "" { + return host + } + return route.Spec.Host +} + +func (r *route) deleteRoute(route *v1.Route) error { + var immediate int64 + host := getHost(*route) + err := r.routerClient.Routes(route.Namespace).Delete(context.TODO(), route.ObjectMeta.Name, metav1.DeleteOptions{GracePeriodSeconds: &immediate}) + if err != nil { + return fmt.Errorf("error deleting route %s/%s: %s", route.ObjectMeta.Namespace, route.ObjectMeta.Name, err) + } + + IORLog.Infof("Deleted route %s/%s (gateway hostname: %s)", route.ObjectMeta.Namespace, route.ObjectMeta.Name, host) + return nil +} + +func (r *route) createRoute(metadata config.Meta, gateway *networking.Gateway, originalHost string, tls *networking.ServerTLSSettings) (*v1.Route, error) { + IORLog.Debugf("Creating route for hostname %s", originalHost) + actualHost, wildcard := getActualHost(originalHost, true) + + var tlsConfig *v1.TLSConfig + targetPort := "http2" + if tls != nil { + tlsConfig = &v1.TLSConfig{Termination: v1.TLSTerminationPassthrough} + targetPort = "https" + if tls.HttpsRedirect { + tlsConfig.InsecureEdgeTerminationPolicy = v1.InsecureEdgeTerminationPolicyRedirect + } + } + + serviceNamespace, serviceName, err := r.findService(gateway) + if err != nil { + return nil, err + } + + // Copy annotations + annotations := map[string]string{ + originalHostAnnotation: originalHost, + } + for keyName, keyValue := range metadata.Annotations { + if !strings.HasPrefix(keyName, "kubectl.kubernetes.io") && keyName != ShouldManageRouteAnnotation { + annotations[keyName] = keyValue + } + } + + // Copy labels + labels := map[string]string{ + generatedByLabel: generatedByValue, + gatewayNamespaceLabel: metadata.Namespace, + gatewayNameLabel: metadata.Name, + gatewayResourceVersionLabel: metadata.ResourceVersion, + } + for keyName, keyValue := range metadata.Labels { + if !strings.HasPrefix(keyName, maistraPrefix) { + labels[keyName] = keyValue + } + } + + nr, err := r.routerClient.Routes(serviceNamespace).Create(context.TODO(), &v1.Route{ + ObjectMeta: metav1.ObjectMeta{ + Name: getRouteName(metadata.Namespace, metadata.Name, actualHost), + Namespace: serviceNamespace, + Labels: labels, + Annotations: annotations, + }, + Spec: v1.RouteSpec{ + Host: actualHost, + Port: &v1.RoutePort{ + TargetPort: intstr.IntOrString{ + Type: intstr.String, + StrVal: targetPort, + }, + }, + To: v1.RouteTargetReference{ + Name: serviceName, + }, + TLS: tlsConfig, + WildcardPolicy: wildcard, + }, + }, metav1.CreateOptions{}) + if err != nil { + return nil, fmt.Errorf("error creating a route for the host %s (gateway: %s/%s): %s", originalHost, metadata.Namespace, metadata.Name, err) + } + + IORLog.Infof("Created route %s/%s for hostname %s (gateway: %s/%s)", + nr.ObjectMeta.Namespace, nr.ObjectMeta.Name, + nr.Spec.Host, + metadata.Namespace, metadata.Name) + + return nr, nil +} + +// findService tries to find a service that matches with the given gateway selector +// Returns the namespace and service name that is a match, or an error +func (r *route) findService(gateway *networking.Gateway) (string, string, error) { + r.namespaceLock.Lock() + namespaces := r.namespaces + r.namespaceLock.Unlock() + + gwSelector := labels.SelectorFromSet(gateway.Selector) + + for _, ns := range namespaces { + // Get the list of pods that match the gateway selector + podList, err := r.kubeClient.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{LabelSelector: gwSelector.String()}) + if err != nil { + return "", "", fmt.Errorf("could not get the list of pods in namespace %s: %v", ns, err) + } + + // Get the list of services in this namespace + svcList, err := r.kubeClient.CoreV1().Services(ns).List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return "", "", fmt.Errorf("could not get the list of services in namespace %s: %v", ns, err) + } + + // Look for a service whose selector matches the pod labels + for _, pod := range podList.Items { + podLabels := labels.Set(pod.ObjectMeta.Labels) + + for _, svc := range svcList.Items { + svcSelector := labels.SelectorFromSet(svc.Spec.Selector) + if svcSelector.Matches(podLabels) { + return ns, svc.Name, nil + } + } + } + } + + return "", "", fmt.Errorf("could not find a service that matches the gateway selector `%s'. Namespaces where we looked at: %v", + gwSelector.String(), namespaces) +} + +func getRouteName(namespace, name, actualHost string) string { + return fmt.Sprintf("%s-%s-%s", namespace, name, hostHash(actualHost)) +} + +// getActualHost returns the actual hostname to be used in the route +// `emitWarning` should be false when this function is used internally, without user interaction +// It also returns the route's WildcardPolicy based on the hostname +func getActualHost(originalHost string, emitWarning bool) (string, v1.WildcardPolicyType) { + wildcard := v1.WildcardPolicyNone + + if strings.Contains(originalHost, "/") { + originalHost = strings.SplitN(originalHost, "/", 2)[1] + IORLog.Debugf("Hostname contains a namespace part. Ignoring it and considering the %q portion.", originalHost) + } + + actualHost := originalHost + + if originalHost == "*" { + actualHost = "" + if emitWarning { + IORLog.Warn("Hostname * is not supported at the moment. Letting OpenShift create it instead.") + } + } else if strings.HasPrefix(originalHost, "*.") { + // FIXME: Update link below to version 4.5 when it's out + // Wildcards are not enabled by default in OCP 3.x. + // See https://docs.openshift.com/container-platform/3.11/install_config/router/default_haproxy_router.html#using-wildcard-routes + // FIXME(2): Is there a way to check if OCP supports wildcard and print out a warning if not? + wildcard = v1.WildcardPolicySubdomain + actualHost = "wildcard." + strings.TrimPrefix(originalHost, "*.") + } + + return actualHost, wildcard +} + +// hostHash applies a sha256 on the host and truncate it to the first 8 bytes +// This gives enough uniqueness for a given hostname +func hostHash(name string) string { + if name == "" { + name = "star" + } + + hash := sha256.Sum256([]byte(name)) + return hex.EncodeToString(hash[:8]) +} diff --git a/pilot/pkg/features/pilot.go b/pilot/pkg/features/pilot.go index 2869dd827a4..1eb24fbd132 100644 --- a/pilot/pkg/features/pilot.go +++ b/pilot/pkg/features/pilot.go @@ -770,6 +770,9 @@ var ( "Metric scope rotation interval, set to 0 to disable the metric scope rotation").Get() MetricGracefulDeletionInterval = env.Register("METRIC_GRACEFUL_DELETION_INTERVAL", 5*time.Minute, "Metric expiry graceful deletion interval. No-op if METRIC_ROTATION_INTERVAL is disabled.").Get() + + EnableIOR = env.RegisterBoolVar("ENABLE_IOR", false, + "Whether to enable IOR component, which provides integration between Istio Gateways and OpenShift Routes").Get() ) // EnableEndpointSliceController returns the value of the feature flag and whether it was actually specified. diff --git a/pilot/pkg/leaderelection/leaderelection.go b/pilot/pkg/leaderelection/leaderelection.go index 0db6ce81874..e9595cc811d 100644 --- a/pilot/pkg/leaderelection/leaderelection.go +++ b/pilot/pkg/leaderelection/leaderelection.go @@ -54,6 +54,7 @@ const ( // * This type is per-revision, so it is higher cost. Leases are cheaper // * Other types use "prioritized leader election", which isn't implemented for Lease GatewayDeploymentController = "istio-gateway-deployment" + IORController = "ior-leader" ) // Leader election key prefix for remote istiod managed clusters diff --git a/tests/integration/servicemesh/maistra/maistra.go b/tests/integration/servicemesh/maistra/maistra.go index 00f2c10bd12..4e304ddc8f4 100644 --- a/tests/integration/servicemesh/maistra/maistra.go +++ b/tests/integration/servicemesh/maistra/maistra.go @@ -192,7 +192,7 @@ func DisableWebhooksAndRestart(istioNs namespace.Getter) resource.SetupFn { if err := waitForIstiod(kubeClient, istioNs.Get(), &lastSeenGeneration); err != nil { return err } - if err := patchIstiodArgs(kubeClient, istioNs.Get()); err != nil { + if err := patchIstiodArgs(kubeClient, istioNs.Get(), disableWebhookPatch); err != nil { return err } if err := waitForIstiod(kubeClient, istioNs.Get(), &lastSeenGeneration); err != nil { @@ -220,25 +220,7 @@ func waitForIstiod(kubeClient kubernetes.Interface, istioNs namespace.Instance, return err } -func patchIstiodArgs(kubeClient kubernetes.Interface, istioNs namespace.Instance) error { - patch := `[ - { - "op": "add", - "path": "/spec/template/spec/containers/0/env/1", - "value": { - "name": "INJECTION_WEBHOOK_CONFIG_NAME", - "value": "" - } - }, - { - "op": "add", - "path": "/spec/template/spec/containers/0/env/2", - "value": { - "name": "VALIDATION_WEBHOOK_CONFIG_NAME", - "value": "" - } - } -]` +func patchIstiodArgs(kubeClient kubernetes.Interface, istioNs namespace.Instance, patch string) error { return retry.UntilSuccess(func() error { _, err := kubeClient.AppsV1().Deployments(istioNs.Name()). Patch(context.TODO(), "istiod-"+istioNs.Prefix(), types.JSONPatchType, []byte(patch), metav1.PatchOptions{}) @@ -270,6 +252,21 @@ func ApplyServiceMeshMemberRoll(ctx framework.TestContext, istioNs namespace.Ins return updateServiceMeshMemberRollStatus(ctx.Clusters().Default(), istioNs.Name(), memberNamespaces...) } +func EnableIOR(ctx resource.Context, ns namespace.Instance) error { + kubeClient := ctx.Clusters().Default().Kube() + var lastSeenGeneration int64 + if err := waitForIstiod(kubeClient, ns, &lastSeenGeneration); err != nil { + return err + } + if err := patchIstiodArgs(kubeClient, ns, enableIOR); err != nil { + return err + } + if err := waitForIstiod(kubeClient, ns, &lastSeenGeneration); err != nil { + return err + } + return nil +} + func updateServiceMeshMemberRollStatus(c cluster.Cluster, istioNamespace string, memberNamespaces ...string) error { client, err := maistrav1.NewForConfig(c.RESTConfig()) if err != nil { @@ -298,3 +295,112 @@ func applyRolesToMemberNamespaces(c config.Factory, values map[string]string, na } return nil } + +const defaultMaistraSettings = `[ + { + "op": "add", + "path": "/spec/template/spec/containers/0/args/1", + "value": "--memberRollName=default" + }, + { + "op": "add", + "path": "/spec/template/spec/containers/0/args/2", + "value": "--enableCRDScan=false" + }, + { + "op": "add", + "path": "/spec/template/spec/containers/0/args/3", + "value": "--disableNodeAccess=true" + }, + { + "op": "add", + "path": "/spec/template/spec/containers/0/args/4", + "value": "--enableIngressClassName=false" + }, + { + "op": "add", + "path": "/spec/template/spec/containers/0/env/1", + "value": { + "name": "ENABLE_IOR", + "value": "false" + } + }, + { + "op": "add", + "path": "/spec/template/spec/containers/0/env/2", + "value": { + "name": "PILOT_ENABLE_GATEWAY_API", + "value": "false" + } + }, + { + "op": "add", + "path": "/spec/template/spec/containers/0/env/3", + "value": { + "name": "PILOT_ENABLE_GATEWAY_API_STATUS", + "value": "false" + } + }, + { + "op": "add", + "path": "/spec/template/spec/containers/0/env/4", + "value": { + "name": "PILOT_ENABLE_GATEWAY_API_DEPLOYMENT_CONTROLLER", + "value": "false" + } + }, + { + "op": "add", + "path": "/spec/template/spec/containers/0/env/5", + "value": { + "name": "PRIORITIZED_LEADER_ELECTION", + "value": "false" + } + }, + { + "op": "add", + "path": "/spec/template/spec/containers/0/env/6", + "value": { + "name": "INJECTION_WEBHOOK_CONFIG_NAME", + "value": "" + } + }, + { + "op": "add", + "path": "/spec/template/spec/containers/0/env/7", + "value": { + "name": "VALIDATION_WEBHOOK_CONFIG_NAME", + "value": "" + } + } +]` + +const disableWebhookPatch = `[ + { + "op": "add", + "path": "/spec/template/spec/containers/0/env/1", + "value": { + "name": "INJECTION_WEBHOOK_CONFIG_NAME", + "value": "" + } + }, + { + "op": "add", + "path": "/spec/template/spec/containers/0/env/2", + "value": { + "name": "VALIDATION_WEBHOOK_CONFIG_NAME", + "value": "" + } + } +]` + +const enableIOR = `[ + { + "op": "replace", + "path": "/spec/template/spec/containers/0/env/1", + "value": { + "name": "ENABLE_IOR", + "value": "true" + } + } +]` diff --git a/tests/integration/servicemesh/managingroutes/main_test.go b/tests/integration/servicemesh/managingroutes/main_test.go index 5f26cbc801b..322859092ba 100644 --- a/tests/integration/servicemesh/managingroutes/main_test.go +++ b/tests/integration/servicemesh/managingroutes/main_test.go @@ -22,9 +22,14 @@ import ( "encoding/json" "fmt" "path/filepath" + "strings" "testing" "time" + routeapiv1 "github.com/openshift/api/route/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "istio.io/istio/pilot/pkg/config/kube/ior" "istio.io/istio/pkg/test/env" "istio.io/istio/pkg/test/framework" "istio.io/istio/pkg/test/framework/components/istioctl" @@ -60,6 +65,8 @@ func TestMain(m *testing.M) { Run() } +const gatewayName = "common-gateway" + func TestManagingGateways(t *testing.T) { framework.NewTest(t). Run(func(ctx framework.TestContext) { @@ -84,6 +91,13 @@ func TestManagingGateways(t *testing.T) { ctx.Fatalf("failed to create ServiceMeshMemberRoll: %s", err) } verifyThatIngressHasVirtualHostForMember(ctx, istioNamespace.Name(), "b") + + ctx.NewSubTest("RouteCreation").Run(func(t framework.TestContext) { + if err := maistra.EnableIOR(t, istioNamespace); err != nil { + t.Fatalf("failed to enable IOR: %s", err) + } + verifyThatRouteExistsOrFail(t, namespaceGateway, gatewayName, "a.maistra.io") + }) }) } @@ -123,6 +137,35 @@ func verifyThatIngressHasVirtualHostForMember(ctx framework.TestContext, istioNa }, retry.Timeout(10*time.Second)) } +func verifyThatRouteExistsOrFail(ctx framework.TestContext, expectedGwNs, expectedGwName, expectedHost string) { + routerClient, err := ior.NewRouterClient() + if err != nil { + ctx.Fatalf("failed to create Router client: %s", err) + } + var routes *routeapiv1.RouteList + retry.UntilSuccessOrFail(ctx, func() error { + routes, err = routerClient.Routes("istio-system").List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return fmt.Errorf("failed to get Routes: %s", err) + } + if len(routes.Items) == 0 { + return fmt.Errorf("no Routes found") + } + return nil + }, retry.Timeout(10*time.Second)) + + found := false + for _, route := range routes.Items { + if route.Spec.Host == expectedHost && strings.HasPrefix(route.Name, fmt.Sprintf("%s-%s-", expectedGwNs, expectedGwName)) { + found = true + break + } + } + if !found { + ctx.Fatalf("failed to find Route for host %s", expectedHost) + } +} + type RouteConfig struct { Name string `json:"name"` VirtualHosts []*VirtualHost `json:"virtualHosts"` diff --git a/tests/integration/servicemesh/router/router.go b/tests/integration/servicemesh/router/router.go new file mode 100644 index 00000000000..fc1ec4e9627 --- /dev/null +++ b/tests/integration/servicemesh/router/router.go @@ -0,0 +1,52 @@ +//go:build integ +// +build integ + +// +// Copyright Red Hat, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package router + +import ( + "context" + "path/filepath" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "istio.io/istio/pkg/test/env" + "istio.io/istio/pkg/test/framework/resource" +) + +func InstallOpenShiftRouter(ctx resource.Context) error { + c := ctx.Clusters().Default() + openshiftIngressNs := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "openshift-ingress", + }, + } + if _, err := c.Kube().CoreV1().Namespaces().Create(context.Background(), openshiftIngressNs, metav1.CreateOptions{}); err != nil { + return err + } + if err := c.ApplyYAMLFiles("", filepath.Join(env.IstioSrc, "tests/integration/servicemesh/router/testdata/route_crd.yaml")); err != nil { + return err + } + if err := c.ApplyYAMLFiles("", filepath.Join(env.IstioSrc, "tests/integration/servicemesh/router/testdata/router.yaml")); err != nil { + return err + } + if err := c.ApplyYAMLFiles("", filepath.Join(env.IstioSrc, "tests/integration/servicemesh/router/testdata/router_rbac.yaml")); err != nil { + return err + } + return nil +} diff --git a/tests/integration/servicemesh/router/testdata/route_crd.yaml b/tests/integration/servicemesh/router/testdata/route_crd.yaml new file mode 100644 index 00000000000..4d1c5e1f0b0 --- /dev/null +++ b/tests/integration/servicemesh/router/testdata/route_crd.yaml @@ -0,0 +1,44 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + # name must match the spec fields below, and be in the form: . + name: routes.route.openshift.io +spec: + # group name to use for REST API: /apis// + group: route.openshift.io + # list of versions supported by this CustomResourceDefinition + versions: + - name: v1 + # Each version can be enabled/disabled by Served flag. + served: true + # One and only one version must be marked as the storage version. + storage: true + schema: + openAPIV3Schema: + type: object + x-kubernetes-preserve-unknown-fields: true + additionalPrinterColumns: + - name: Host + type: string + jsonPath: .status.ingress[0].host + - name: Admitted + type: string + jsonPath: .status.ingress[0].conditions[?(@.type=="Admitted")].status + - name: Service + type: string + jsonPath: .spec.to.name + - name: TLS + type: string + jsonPath: .spec.tls.type + subresources: + # enable spec/status + status: {} + # either Namespaced or Cluster + scope: Namespaced + names: + # plural name to be used in the URL: /apis/// + plural: routes + # singular name to be used as an alias on the CLI and for display + singular: route + # kind is normally the CamelCased singular type. Your resource manifests use this. + kind: Route diff --git a/tests/integration/servicemesh/router/testdata/router.yaml b/tests/integration/servicemesh/router/testdata/router.yaml new file mode 100644 index 00000000000..80b31450c61 --- /dev/null +++ b/tests/integration/servicemesh/router/testdata/router.yaml @@ -0,0 +1,54 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: ingress-router + namespace: openshift-ingress + labels: + k8s-app: ingress-router +spec: + selector: + matchLabels: + k8s-app: ingress-router + template: + metadata: + labels: + k8s-app: ingress-router + spec: + serviceAccountName: ingress-router + containers: + - env: + - name: ROUTER_LISTEN_ADDR + value: 0.0.0.0:1936 + - name: ROUTER_METRICS_TYPE + value: haproxy + - name: ROUTER_SERVICE_HTTPS_PORT + value: "443" + - name: ROUTER_SERVICE_HTTP_PORT + value: "80" + - name: ROUTER_THREADS + value: "4" + image: openshift/origin-haproxy-router:v4.0.0 + livenessProbe: + httpGet: + host: localhost + path: /healthz + port: 1936 + initialDelaySeconds: 10 + name: router + ports: + - containerPort: 80 + - containerPort: 443 + - containerPort: 1936 + name: stats + protocol: TCP + readinessProbe: + httpGet: + host: localhost + path: healthz/ready + port: 1936 + initialDelaySeconds: 10 + resources: + requests: + cpu: 100m + memory: 256Mi + hostNetwork: true \ No newline at end of file diff --git a/tests/integration/servicemesh/router/testdata/router_rbac.yaml b/tests/integration/servicemesh/router/testdata/router_rbac.yaml new file mode 100644 index 00000000000..ac28607f3fa --- /dev/null +++ b/tests/integration/servicemesh/router/testdata/router_rbac.yaml @@ -0,0 +1,68 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: openshift-ingress-router +rules: + - apiGroups: + - "" + resources: + - namespaces + - services + - endpoints + verbs: + - get + - list + - watch + - apiGroups: + - route.openshift.io + resources: + - routes + verbs: + - list + - watch + - apiGroups: + - route.openshift.io + resources: + - routes/status + verbs: + - get + - patch + - update +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: openshift-ingress-router +roleRef: + apiGroup: "" + kind: ClusterRole + name: openshift-ingress-router +subjects: + - kind: ServiceAccount + namespace: openshift-ingress + name: ingress-router +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: openshift-ingress-router-auth-delegator +roleRef: + apiGroup: "" + kind: ClusterRole + name: system:auth-delegator +subjects: + - kind: ServiceAccount + namespace: openshift-ingress + name: ingress-router + +--- +apiVersion: v1 +kind: Namespace +metadata: + name: openshift-ingress +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: ingress-router + namespace: openshift-ingress