From 0bf52f39485150555ac2023292daec4103c7ef82 Mon Sep 17 00:00:00 2001 From: Allen Zhong Date: Thu, 16 Jul 2020 15:38:55 +0800 Subject: [PATCH] cluster/spec: remove tispark_{config,env} form worker spec --- examples/topology.example.yaml | 26 ---------------------- pkg/cluster/spec/tispark.go | 40 +++++++++------------------------- 2 files changed, 10 insertions(+), 56 deletions(-) diff --git a/examples/topology.example.yaml b/examples/topology.example.yaml index 6dc6342bd6..a302d795ea 100644 --- a/examples/topology.example.yaml +++ b/examples/topology.example.yaml @@ -231,33 +231,7 @@ tispark_masters: # NOTE: multiple worker nodes on the same host is not supported tispark_workers: - host: 10.0.1.22 - #spark_config: - # spark.driver.memory: "2g" - # spark.eventLog.enabled: "False" - # spark.tispark.grpc.framesize: 268435456 - # spark.tispark.grpc.timeout_in_sec: 100 - # spark.tispark.meta.reload_period_in_sec: 60 - # spark.tispark.request.command.priority: "Low" - # spark.tispark.table.scan_concurrency: 256 - #spark_env: - # SPARK_EXECUTOR_CORES: 5 - # SPARK_EXECUTOR_MEMORY: "10g" - # SPARK_WORKER_CORES: 5 - # SPARK_WORKER_MEMORY: "10g" - host: 10.0.1.23 - #spark_config: - # spark.driver.memory: "2g" - # spark.eventLog.enabled: "False" - # spark.tispark.grpc.framesize: 268435456 - # spark.tispark.grpc.timeout_in_sec: 100 - # spark.tispark.meta.reload_period_in_sec: 60 - # spark.tispark.request.command.priority: "Low" - # spark.tispark.table.scan_concurrency: 256 - #spark_env: - # SPARK_EXECUTOR_CORES: 5 - # SPARK_EXECUTOR_MEMORY: "10g" - # SPARK_WORKER_CORES: 5 - # SPARK_WORKER_MEMORY: "10g" monitoring_servers: - host: 10.0.1.11 diff --git a/pkg/cluster/spec/tispark.go b/pkg/cluster/spec/tispark.go index 6143bf63c7..7cabca5ec9 100644 --- a/pkg/cluster/spec/tispark.go +++ b/pkg/cluster/spec/tispark.go @@ -71,16 +71,14 @@ func (s TiSparkMasterSpec) Status(pdList ...string) string { // TiSparkWorkerSpec is the topology specification for TiSpark slave nodes type TiSparkWorkerSpec struct { - Host string `yaml:"host"` - SSHPort int `yaml:"ssh_port,omitempty"` - Imported bool `yaml:"imported,omitempty"` - Port int `yaml:"port" default:"7078"` - WebPort int `yaml:"web_port" default:"8081"` - DeployDir string `yaml:"deploy_dir,omitempty"` - SparkConfigs map[string]interface{} `yaml:"spark_config,omitempty"` - SparkEnvs map[string]string `yaml:"spark_env,omitempty"` - Arch string `yaml:"arch,omitempty"` - OS string `yaml:"os,omitempty"` + Host string `yaml:"host"` + SSHPort int `yaml:"ssh_port,omitempty"` + Imported bool `yaml:"imported,omitempty"` + Port int `yaml:"port" default:"7078"` + WebPort int `yaml:"web_port" default:"8081"` + DeployDir string `yaml:"deploy_dir,omitempty"` + Arch string `yaml:"arch,omitempty"` + OS string `yaml:"os,omitempty"` } // Role returns the component role of the instance @@ -287,24 +285,6 @@ type TiSparkWorkerInstance struct { instance } -// GetCustomFields get custom spark configs of the instance -func (i *TiSparkWorkerInstance) GetCustomFields() map[string]interface{} { - v := reflect.ValueOf(i.InstanceSpec).FieldByName("SparkConfigs") - if !v.IsValid() { - return nil - } - return v.Interface().(map[string]interface{}) -} - -// GetCustomEnvs get custom spark envionment variables of the instance -func (i *TiSparkWorkerInstance) GetCustomEnvs() map[string]string { - v := reflect.ValueOf(i.InstanceSpec).FieldByName("SparkEnvs") - if !v.IsValid() { - return nil - } - return v.Interface().(map[string]string) -} - // InitConfig implement Instance interface func (i *TiSparkWorkerInstance) InitConfig(e executor.Executor, clusterName, clusterVersion, deployUser string, paths meta.DirPaths) error { // generate systemd service to invoke spark's start/stop scripts @@ -338,7 +318,7 @@ func (i *TiSparkWorkerInstance) InitConfig(e executor.Executor, clusterName, clu } cfg := config.NewTiSparkConfig(pdList).WithMasters(strings.Join(masterList, ",")). - WithCustomFields(i.GetCustomFields()) + WithCustomFields(i.instance.topo.TiSparkMasters[0].SparkConfigs) // transfer spark-defaults.conf fp := filepath.Join(paths.Cache, fmt.Sprintf("spark-defaults-%s-%d.conf", host, port)) if err := cfg.ConfigToFile(fp); err != nil { @@ -352,7 +332,7 @@ func (i *TiSparkWorkerInstance) InitConfig(e executor.Executor, clusterName, clu env := scripts.NewTiSparkEnv(i.topo.TiSparkMasters[0].Host). WithMasterPorts(i.topo.TiSparkMasters[0].Port, i.topo.TiSparkMasters[0].WebPort). WithWorkerPorts(i.usedPorts[0], i.usedPorts[1]). - WithCustomEnv(i.GetCustomEnvs()) + WithCustomEnv(i.instance.topo.TiSparkMasters[0].SparkEnvs) // transfer spark-env.sh file fp = filepath.Join(paths.Cache, fmt.Sprintf("spark-env-%s-%d.sh", host, port)) if err := env.ScriptToFile(fp); err != nil {