Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable UI service flag for disabling UI service #1261

Merged
merged 3 commits into from
May 19, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion charts/spark-operator-chart/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ All charts linted successfully
| image.repository | string | `"gcr.io/spark-operator/spark-operator"` | Image repository |
| image.tag | string | `""` | Overrides the image tag whose default is the chart appVersion. |
| imagePullSecrets | list | `[]` | Image pull secrets |
| ingressUrlFormat | string | `""` | Ingress URL format |
| uiService.enable | bool | `""` | Enable UI service creation for Spark application |
| ingressUrlFormat | string | `""` | Ingress URL format. Requires the UI service to be enabled by setting `uiService.enable` to true. |
| istio.enabled | bool | `false` | When using `istio`, spark jobs need to run without a sidecar to properly terminate |
| labelSelectorFilter | string | `""` | A comma-separated list of key=value, or key labels to filter resources during watch and list based on the specified labels. |
| leaderElection.lockName | string | `"spark-operator-lock"` | Leader election lock name. Ref: https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/docs/user-guide.md#enabling-leader-election-for-high-availability. |
Expand Down
1 change: 1 addition & 0 deletions charts/spark-operator-chart/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ spec:
- -v={{ .Values.logLevel }}
- -logtostderr
- -namespace={{ .Values.sparkJobNamespace }}
- -enable-ui-service={{ .Values.uiService.enable}}
- -ingress-url-format={{ .Values.ingressUrlFormat }}
- -controller-threads={{ .Values.controllerThreads }}
- -resync-interval={{ .Values.resyncInterval }}
Expand Down
7 changes: 6 additions & 1 deletion charts/spark-operator-chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@ controllerThreads: 10
# unrelated to this setting
resyncInterval: 30

# -- Ingress URL format
uiService:
# -- Enable UI service creation for Spark application
enable: true

# -- Ingress URL format.
# Requires the UI service to be enabled by setting `uiService.enable` to true.
ingressUrlFormat: ""

# -- Set higher levels for more verbose logging
Expand Down
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ var (
enableWebhook = flag.Bool("enable-webhook", false, "Whether to enable the mutating admission webhook for admitting and patching Spark pods.")
enableResourceQuotaEnforcement = flag.Bool("enable-resource-quota-enforcement", false, "Whether to enable ResourceQuota enforcement for SparkApplication resources. Requires the webhook to be enabled.")
ingressURLFormat = flag.String("ingress-url-format", "", "Ingress URL format.")
enableUIService = flag.Bool("enable-ui-service", true, "Enable Spark service UI.")
enableLeaderElection = flag.Bool("leader-election", false, "Enable Spark operator leader election.")
leaderElectionLockNamespace = flag.String("leader-election-lock-namespace", "spark-operator", "Namespace in which to create the ConfigMap for leader election.")
leaderElectionLockName = flag.String("leader-election-lock-name", "spark-operator-lock", "Name of the ConfigMap for leader election.")
Expand Down Expand Up @@ -178,7 +179,7 @@ func main() {
}

applicationController := sparkapplication.NewController(
crClient, kubeClient, crInformerFactory, podInformerFactory, metricConfig, *namespace, *ingressURLFormat, batchSchedulerMgr)
crClient, kubeClient, crInformerFactory, podInformerFactory, metricConfig, *namespace, *ingressURLFormat, batchSchedulerMgr, *enableUIService)
scheduledApplicationController := scheduledsparkapplication.NewController(
crClient, kubeClient, apiExtensionsClient, crInformerFactory, clock.RealClock{})

Expand Down
42 changes: 24 additions & 18 deletions pkg/controller/sparkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type Controller struct {
podLister v1.PodLister
ingressURLFormat string
batchSchedulerMgr *batchscheduler.SchedulerManager
enableUIService bool
}

// NewController creates a new Controller.
Expand All @@ -88,7 +89,8 @@ func NewController(
metricsConfig *util.MetricConfig,
namespace string,
ingressURLFormat string,
batchSchedulerMgr *batchscheduler.SchedulerManager) *Controller {
batchSchedulerMgr *batchscheduler.SchedulerManager,
enableUIService bool) *Controller {
crdscheme.AddToScheme(scheme.Scheme)

eventBroadcaster := record.NewBroadcaster()
Expand All @@ -98,7 +100,7 @@ func NewController(
})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{Component: "spark-operator"})

return newSparkApplicationController(crdClient, kubeClient, crdInformerFactory, podInformerFactory, recorder, metricsConfig, ingressURLFormat, batchSchedulerMgr)
return newSparkApplicationController(crdClient, kubeClient, crdInformerFactory, podInformerFactory, recorder, metricsConfig, ingressURLFormat, batchSchedulerMgr, enableUIService)
}

func newSparkApplicationController(
Expand All @@ -109,7 +111,8 @@ func newSparkApplicationController(
eventRecorder record.EventRecorder,
metricsConfig *util.MetricConfig,
ingressURLFormat string,
batchSchedulerMgr *batchscheduler.SchedulerManager) *Controller {
batchSchedulerMgr *batchscheduler.SchedulerManager,
enableUIService bool) *Controller {
queue := workqueue.NewNamedRateLimitingQueue(&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(queueTokenRefillRate), queueTokenBucketSize)},
"spark-application-controller")

Expand All @@ -120,6 +123,7 @@ func newSparkApplicationController(
queue: queue,
ingressURLFormat: ingressURLFormat,
batchSchedulerMgr: batchSchedulerMgr,
enableUIService: enableUIService,
}

if metricsConfig != nil {
Expand Down Expand Up @@ -699,21 +703,23 @@ func (c *Controller) submitSparkApplication(app *v1beta2.SparkApplication) *v1be
}
c.recordSparkApplicationEvent(app)

service, err := createSparkUIService(app, c.kubeClient)
if err != nil {
glog.Errorf("failed to create UI service for SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
} else {
app.Status.DriverInfo.WebUIServiceName = service.serviceName
app.Status.DriverInfo.WebUIPort = service.servicePort
app.Status.DriverInfo.WebUIAddress = fmt.Sprintf("%s:%d", service.serviceIP, app.Status.DriverInfo.WebUIPort)
// Create UI Ingress if ingress-format is set.
if c.ingressURLFormat != "" {
ingress, err := createSparkUIIngress(app, *service, c.ingressURLFormat, c.kubeClient)
if err != nil {
glog.Errorf("failed to create UI Ingress for SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
} else {
app.Status.DriverInfo.WebUIIngressAddress = ingress.ingressURL
app.Status.DriverInfo.WebUIIngressName = ingress.ingressName
if c.enableUIService {
service, err := createSparkUIService(app, c.kubeClient)
if err != nil {
glog.Errorf("failed to create UI service for SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
} else {
app.Status.DriverInfo.WebUIServiceName = service.serviceName
app.Status.DriverInfo.WebUIPort = service.servicePort
app.Status.DriverInfo.WebUIAddress = fmt.Sprintf("%s:%d", service.serviceIP, app.Status.DriverInfo.WebUIPort)
// Create UI Ingress if ingress-format is set.
if c.ingressURLFormat != "" {
ingress, err := createSparkUIIngress(app, *service, c.ingressURLFormat, c.kubeClient)
if err != nil {
glog.Errorf("failed to create UI Ingress for SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
} else {
app.Status.DriverInfo.WebUIIngressAddress = ingress.ingressURL
app.Status.DriverInfo.WebUIIngressName = ingress.ingressName
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/sparkapplication/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func newFakeController(app *v1beta2.SparkApplication, pods ...*apiv1.Pod) (*Cont

podInformerFactory := informers.NewSharedInformerFactory(kubeClient, 0*time.Second)
controller := newSparkApplicationController(crdClient, kubeClient, informerFactory, podInformerFactory, recorder,
&util.MetricConfig{}, "", nil)
&util.MetricConfig{}, "", nil, true)

informer := informerFactory.Sparkoperator().V1beta2().SparkApplications().Informer()
if app != nil {
Expand Down