Skip to content

Commit

Permalink
Use workqueue (#182)
Browse files Browse the repository at this point in the history
  • Loading branch information
tamalsaha authored Oct 8, 2017
1 parent b3ee076 commit 9c1cc43
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 20 deletions.
3 changes: 3 additions & 0 deletions root.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (
"strings"

v "github.com/appscode/go/version"
"github.com/appscode/stash/client/scheme"
"github.com/jpillora/go-ogle-analytics"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
clientsetscheme "k8s.io/client-go/kubernetes/scheme"
)

const (
Expand All @@ -34,6 +36,7 @@ func NewCmdStash(version string) *cobra.Command {
client.Send(ga.NewEvent(parts[0], strings.Join(parts[1:], "/")).Label(version))
}
}
scheme.AddToScheme(clientsetscheme.Scheme)
},
}
rootCmd.PersistentFlags().AddGoFlagSet(flag.CommandLine)
Expand Down
34 changes: 20 additions & 14 deletions run.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,20 @@ import (
stringz "github.com/appscode/go/strings"
"github.com/appscode/pat"
sapi "github.com/appscode/stash/apis/stash"
scs "github.com/appscode/stash/client/typed/stash/v1alpha1"
cs "github.com/appscode/stash/client/typed/stash/v1alpha1"
"github.com/appscode/stash/pkg/controller"
"github.com/appscode/stash/pkg/docker"
"github.com/appscode/stash/pkg/migrator"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)

var (
kubeClient clientset.Interface
stashClient scs.ResticsGetter
kubeClient kubernetes.Interface
stashClient cs.StashV1alpha1Interface

scratchDir string = "/tmp"
)
Expand All @@ -31,29 +31,32 @@ func NewCmdRun(version string) *cobra.Command {
var (
masterURL string
kubeconfigPath string
tag string = stringz.Val(version, "canary")
address string = ":56790"
resyncPeriod time.Duration = 5 * time.Minute
address string = ":56790"
opts = controller.Options{
SidecarImageTag: stringz.Val(version, "canary"),
ResyncPeriod: 5 * time.Minute,
MaxNumRequeues: 5,
}
)

cmd := &cobra.Command{
Use: "run",
Short: "Run Stash operator",
DisableAutoGenTag: true,
Run: func(cmd *cobra.Command, args []string) {
if err := docker.CheckDockerImageVersion(docker.ImageOperator, tag); err != nil {
log.Fatalf(`Image %v:%v not found.`, docker.ImageOperator, tag)
if err := docker.CheckDockerImageVersion(docker.ImageOperator, opts.SidecarImageTag); err != nil {
log.Fatalf(`Image %v:%v not found.`, docker.ImageOperator, opts.SidecarImageTag)
}

config, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfigPath)
if err != nil {
log.Fatalln(err)
}
kubeClient = clientset.NewForConfigOrDie(config)
stashClient = scs.NewForConfigOrDie(config)
kubeClient = kubernetes.NewForConfigOrDie(config)
stashClient = cs.NewForConfigOrDie(config)
crdClient := apiextensionsclient.NewForConfigOrDie(config)

ctrl := controller.New(kubeClient, crdClient, stashClient, tag, resyncPeriod)
ctrl := controller.New(kubeClient, crdClient, stashClient, opts)
err = ctrl.Setup()
if err != nil {
log.Fatalln(err)
Expand All @@ -64,7 +67,10 @@ func NewCmdRun(version string) *cobra.Command {
}

log.Infoln("Starting operator...")
ctrl.Run()
// Now let's start the controller
stop := make(chan struct{})
defer close(stop)
go ctrl.Run(1, stop)

m := pat.New()
m.Get("/metrics", promhttp.Handler())
Expand All @@ -82,7 +88,7 @@ func NewCmdRun(version string) *cobra.Command {
cmd.Flags().StringVar(&kubeconfigPath, "kubeconfig", kubeconfigPath, "Path to kubeconfig file with authorization information (the master location is set by the master flag).")
cmd.Flags().StringVar(&address, "address", address, "Address to listen on for web interface and telemetry.")
cmd.Flags().StringVar(&scratchDir, "scratch-dir", scratchDir, "Directory used to store temporary files. Use an `emptyDir` in Kubernetes.")
cmd.Flags().DurationVar(&resyncPeriod, "resync-period", resyncPeriod, "If non-zero, will re-list this often. Otherwise, re-list will be delayed aslong as possible (until the upstream source closes the watch or times out.")
cmd.Flags().DurationVar(&opts.ResyncPeriod, "resync-period", opts.ResyncPeriod, "If non-zero, will re-list this often. Otherwise, re-list will be delayed aslong as possible (until the upstream source closes the watch or times out.")

return cmd
}
8 changes: 4 additions & 4 deletions schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (

"github.com/appscode/go/log"
"github.com/appscode/kutil"
scs "github.com/appscode/stash/client/typed/stash/v1alpha1"
cs "github.com/appscode/stash/client/typed/stash/v1alpha1"
"github.com/appscode/stash/pkg/scheduler"
"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)

Expand Down Expand Up @@ -40,8 +40,8 @@ func NewCmdSchedule() *cobra.Command {
if err != nil {
log.Fatalf("Could not get Kubernetes config: %s", err)
}
kubeClient = clientset.NewForConfigOrDie(config)
stashClient = scs.NewForConfigOrDie(config)
kubeClient = kubernetes.NewForConfigOrDie(config)
stashClient = cs.NewForConfigOrDie(config)

opt.NodeName = os.Getenv("NODE_NAME")
if opt.NodeName == "" {
Expand Down
4 changes: 2 additions & 2 deletions snapshot_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
_ "net/http/pprof"

"github.com/appscode/pat"
sapi "github.com/appscode/stash/apis/stash/v1alpha1"
api "github.com/appscode/stash/apis/stash/v1alpha1"
"github.com/appscode/stash/pkg/cli"
kerr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -37,7 +37,7 @@ func ExportSnapshots(w http.ResponseWriter, r *http.Request) {
}
resticCLI := cli.New(scratchDir)

var resource *sapi.Restic
var resource *api.Restic
resource, err := stashClient.Restics(namespace).Get(name, metav1.GetOptions{})
if kerr.IsNotFound(err) {
http.Error(w, err.Error(), http.StatusNotFound)
Expand Down

0 comments on commit 9c1cc43

Please sign in to comment.