diff --git a/charts/fluid-dataloader/README.md b/charts/fluid-dataloader/README.md deleted file mode 100644 index adcba47925e..00000000000 --- a/charts/fluid-dataloader/README.md +++ /dev/null @@ -1,86 +0,0 @@ -# fluid-dataloader - -## Prerequisite -- Dataset deployed -- Alluxio Runtime deployed -- Dataset mountPoint mounted -- Dataset-related PV, PVC created - -## Install -1. get dataset-related PVC name -```shell script -$ kubectl get pvc -NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE - Bound 100Gi RWX 4h5m -``` -Say `` is the name of your dataset-related PVC, usually it's the same name as your dataset. - -2. get num of Alluxio workers -```shell script -kubectl get pod -l release= | grep -c "worker" -``` - -3. Install fluid-dataloader - -```shell script -helm install \ - --set dataloader.numWorker= \ - --set dataloader.threads=2 \ - -load charts/fluid-dataloader -``` - -You will see something like this: -``` -helm install hbase-load charts/fluid-dataloader/ -NAME: -load -LAST DEPLOYED: Fri Jul 31 19:52:11 2020 -NAMESPACE: default -STATUS: deployed -REVISION: 1 -TEST SUITE: None -``` - -Some dataloader jobs will be launched. You will see multiple jobs running on different nodes: -```shell script -kubectl get pod -o wide -l role=alluxio-dataloader -``` - -Once some job completes, you can check time consumed during data prefetch: -```shell script -kubectl logs -loader-xxxxx -``` -and see something like this: -``` -THREADS=2 -DATAPATH=/data/* -python multithread_read_benchmark.py --threads=2 --path=/data/* -/data/* contains 15 items -/data/* processing 15 items with 2 threads uses 32.6712441444s, avg 0.459119338513/s, avg 8743748.5924B/s, avg 8.33868846169MiB/s -``` - -Now then, all data should be cached, reinstall it: -```shell script -helm del - -helm install \ - --set dataloader.numWorker= \ - --set dataloader.threads=2 \ - -load charts/fluid-dataloader -``` - -check again, and this time should be much faster: -```shell script -kubectl logs -loader-yyyyy -``` -``` -THREADS=2 -DATAPATH=/data/* -python multithread_read_benchmark.py --threads=2 --path=/data/* -/data/* contains 15 items -/data/* processing 15 items with 2 threads uses 0.308158159256s, avg 48.6763032211/s, avg 927021194.862B/s, avg 884.076304304MiB/s -``` - -## Uninstall -``` -helm del -``` diff --git a/charts/fluid-dataloader/CHANGELOG.md b/charts/fluid-dataloader/alluxio/CHANGELOG.md similarity index 100% rename from charts/fluid-dataloader/CHANGELOG.md rename to charts/fluid-dataloader/alluxio/CHANGELOG.md diff --git a/charts/fluid-dataloader/Chart.yaml b/charts/fluid-dataloader/alluxio/Chart.yaml similarity index 100% rename from charts/fluid-dataloader/Chart.yaml rename to charts/fluid-dataloader/alluxio/Chart.yaml diff --git a/charts/fluid-dataloader/templates/configmap.yaml b/charts/fluid-dataloader/alluxio/templates/configmap.yaml similarity index 100% rename from charts/fluid-dataloader/templates/configmap.yaml rename to charts/fluid-dataloader/alluxio/templates/configmap.yaml diff --git a/charts/fluid-dataloader/templates/dataloader.yaml b/charts/fluid-dataloader/alluxio/templates/dataloader.yaml similarity index 100% rename from charts/fluid-dataloader/templates/dataloader.yaml rename to charts/fluid-dataloader/alluxio/templates/dataloader.yaml diff --git a/charts/fluid-dataloader/values.yaml b/charts/fluid-dataloader/alluxio/values.yaml similarity index 100% rename from charts/fluid-dataloader/values.yaml rename to charts/fluid-dataloader/alluxio/values.yaml diff --git a/charts/fluid-dataloader/jindo/CHANGELOG.md b/charts/fluid-dataloader/jindo/CHANGELOG.md new file mode 100644 index 00000000000..56b2247a4c1 --- /dev/null +++ b/charts/fluid-dataloader/jindo/CHANGELOG.md @@ -0,0 +1,4 @@ +### 0.1.0 + +- Support parallel prefetch job +- Support configurations by setting values diff --git a/charts/fluid-dataloader/jindo/Chart.yaml b/charts/fluid-dataloader/jindo/Chart.yaml new file mode 100644 index 00000000000..7a1564448f3 --- /dev/null +++ b/charts/fluid-dataloader/jindo/Chart.yaml @@ -0,0 +1,23 @@ +apiVersion: v2 +name: fluid-dataloader +description: A Helm chart for Fluid to prefetch data + +# A chart can be either an 'application' or a 'library' chart. +# +# Application charts are a collection of templates that can be packaged into versioned archives +# to be deployed. +# +# Library charts provide useful utilities or functions for the chart developer. They're included as +# a dependency of application charts to inject those utilities and functions into the rendering +# pipeline. Library charts do not define any templates and therefore cannot be deployed. +type: application + +# This is the chart version. This version number should be incremented each time you make changes +# to the chart and its templates, including the app version. +# Versions are expected to follow Semantic Versioning (https://semver.org/) +version: 0.1.0 + +# This is the version number of the application being deployed. This version number should be +# incremented each time you make changes to the application. Versions are not expected to +# follow Semantic Versioning. They should reflect the version the application is using. +appVersion: 0.1.0 diff --git a/charts/fluid-dataloader/jindo/templates/configmap.yaml b/charts/fluid-dataloader/jindo/templates/configmap.yaml new file mode 100644 index 00000000000..68033623160 --- /dev/null +++ b/charts/fluid-dataloader/jindo/templates/configmap.yaml @@ -0,0 +1,79 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ printf "%s-data-load-script" .Release.Name }} + labels: + release: {{ .Release.Name }} + role: dataload-job +data: + dataloader.jindo.init: | + #!/usr/bin/env bash + set -xe + jindo_env_vars=( + STORAGE_ADDRESS + ) + function public::jindo::init_conf() { + local IFS=$'\n' # split by line instead of space + for keyvaluepair in $(env); do + # split around the first "=" + key=$(echo ${keyvaluepair} | cut -d= -f1) + value=$(echo ${keyvaluepair} | cut -d= -f2-) + if [[ "${jindo_env_vars[*]}" =~ "${key}" ]]; then + export ${key}=\"${value}\" + fi + done + } + main() { + public::jindo::init_conf + } + main + dataloader.distributedLoad: | + #!/usr/bin/env bash + set -xe + + function distributedLoad() { + local path=$1 + local replica=$2 + local default=$3 + + if [[ $needLoadMetadata == 'true' ]]; then + #echo -e "metadata cache start $default$path" + time jindo jfs -metaSync -R $default$path + else + echo -e "$default$path no need to cache metadata" + fi + + if [[ $loadMemoryData == 'true' ]]; then + #echo -e "metadata cache start $default$path" + time jindo jfs -cache -s -m -r $replica $default$path + else + time jindo jfs -cache -s -r $replica $default$path + fi + + #echo -e "distributedLoad and sleep start now" + #sleep 10m + } + + function main() { + needLoadMetadata="$NEED_LOAD_METADATA" + loadMemoryData="$LOAD_MEMORY_DATA" + dafault="jfs://jindo" + paths="$DATA_PATH" + paths=(${paths//:/ }) + replicas="$PATH_REPLICAS" + replicas=(${replicas//:/ }) + for((i=0;i<${#paths[@]};i++)) do + local path="${paths[i]}" + local replica="${replicas[i]}" + echo -e "distributedLoad on $path starts" + distributedLoad ${paths[i]} ${replicas[i]} ${dafault} + #echo -e "distributedLoad on $path ends" + done + } + + main "$@" + + + + + diff --git a/charts/fluid-dataloader/jindo/templates/dataloader.yaml b/charts/fluid-dataloader/jindo/templates/dataloader.yaml new file mode 100644 index 00000000000..5cb849942bd --- /dev/null +++ b/charts/fluid-dataloader/jindo/templates/dataloader.yaml @@ -0,0 +1,121 @@ +# .Release.Name will be used to decide which dataset will be preload +# .Release.Name should be like `-load`(e.g. hbase-load for a PersistentVolumeClaim named `hbase`) +# TODO: the length of .Release.Name won't exceed 53(limited by Helm), which means length of `` can't exceed 48. This might be a problem. + {{/* {{ $datasetName := "" -}}*/}} + {{/* {{- $randomSuffix := "" -}}*/}} + {{/* {{- if regexMatch "^[A-Za-z0-9._-]+-load-[A-Za-z0-9]{5}$" .Release.Name -}}*/}} + {{/* {{- $arr := regexSplit "-load-" .Release.Name -1 -}}*/}} + {{/* {{- $datasetName = first $arr -}}*/}} + {{/* {{- $randomSuffix = last $arr -}}*/}} + {{/* {{- else -}}*/}} + {{/* {{- printf "Illegal release name. Should be like -load-. Current name: %s" .Release.Name | fail -}}*/}} + {{/* {{- end }}*/}} +apiVersion: batch/v1 +kind: Job +metadata: + name: {{ printf "%s-job" .Release.Name }} + labels: + release: {{ .Release.Name }} + role: dataload-job + targetDataset: {{ required "targetDataset should be set" .Values.dataloader.targetDataset }} +spec: + backoffLimit: {{ .Values.dataloader.backoffLimit | default "3" }} + completions: 1 + parallelism: 1 + template: + metadata: + name: {{ printf "%s-loader" .Release.Name }} + labels: + release: {{ .Release.Name }} + role: dataload-pod + targetDataset: {{ required "targetDataset should be set" .Values.dataloader.targetDataset }} + spec: + restartPolicy: OnFailure + containers: + - name: dataloader + image: {{ required "Dataloader image should be set" .Values.dataloader.image }} + imagePullPolicy: IfNotPresent + command: ["/bin/sh", "-c"] + args: ["/scripts/jindo_env_init.sh && /scripts/jindo_dataload.sh"] + {{- $targetPaths := "" }} + {{- range .Values.dataloader.targetPaths }} + {{- $targetPaths = cat $targetPaths (required "Path must be set" .path) ":" }} + {{- end }} + {{- $targetPaths = $targetPaths | nospace | trimSuffix ":" }} + + {{- $pathReplicas := ""}} + {{- range .Values.dataloader.targetPaths }} + {{- $pathReplicas = cat $pathReplicas ( default 1 .replicas ) ":"}} + {{- end }} + {{- $pathReplicas = $pathReplicas | nospace | trimSuffix ":"}} + env: + - name: STORAGE_ADDRESS + valueFrom: + fieldRef: + fieldPath: status.podIP + - name: NEED_LOAD_METADATA + value: {{ default false .Values.dataloader.loadMetadata | quote }} + {{- range $key, $val := .Values.dataloader.options }} + {{- if eq $key "loadMemoryData" }} + - name: LOAD_MEMORY_DATA + value: {{ default false $val | quote }} + {{- end }} + {{- end }} + - name: DATA_PATH + value: {{ $targetPaths | quote }} + - name: PATH_REPLICAS + value: {{ $pathReplicas | quote }} + envFrom: + - configMapRef: + name: {{ required "targetDataset should be set" .Values.dataloader.targetDataset }}-jindofs-client-config + volumeMounts: + - name: bigboot-config + mountPath: /bigboot.cfg + subPath: bigboot.cfg + - name: bigboot-config + mountPath: /hdfs-3.2.1/etc/hadoop/core-site.xml + subPath: core-site.xml + {{- range $key, $val := .Values.dataloader.options }} + {{- if eq $key "hdfsConfig" }} + - name: hdfs-confs + mountPath: /hdfs-site.xml + subPath: hdfs-site.xml + {{- end }} + {{- end }} + - mountPath: /scripts + name: data-load-script + {{- range .Values.dataloader.targetPaths }} + {{- if .fluidNative }} + - mountPath: {{ .path | trimAll "/" | replace "/" "-" | printf "/data/%s"}} + name: {{ .path | trimAll "/" | replace "/" "-" | printf "native-%s"}} + {{- end }} + {{- end }} + volumes: + - name: bigboot-config + configMap: + name: {{ required "targetDataset should be set" .Values.dataloader.targetDataset }}-jindofs-config + {{- range $key, $val := .Values.dataloader.options }} + {{- if eq $key "hdfsConfig" }} + - name: hdfs-confs + configMap: + name: {{ $val }} + {{- end }} + {{- end }} + - name: data-load-script + configMap: + name: {{ printf "%s-data-load-script" .Release.Name }} + items: + - key: dataloader.jindo.init + path: jindo_env_init.sh + mode: 365 + - key: dataloader.distributedLoad + path: jindo_dataload.sh + mode: 365 + {{- range .Values.dataloader.targetPaths }} + {{- if .fluidNative }} + - name: {{ .path | trimAll "/" | replace "/" "-" | printf "native-%s"}} + hostPath: + path: {{ .path }} + {{- end }} + {{- end }} + diff --git a/charts/fluid-dataloader/jindo/values.yaml b/charts/fluid-dataloader/jindo/values.yaml new file mode 100644 index 00000000000..611a9ec2d65 --- /dev/null +++ b/charts/fluid-dataloader/jindo/values.yaml @@ -0,0 +1,30 @@ +# Default values for fluid-dataloader. +# This is a YAML-formatted file. +# Declare variables to be passed into your templates. + +dataloader: + # Optional + # Default: 3 + # Description: how many times the prefetch job can fail, i.e. `Job.spec.backoffLimit` + backoffLimit: 3 + + # Required + # Description: the dataset that this DataLoad targets + targetDataset: #imagenet + + # Optional + # Default: false + # Description: should load metadata from UFS when doing data load + loadMetadata: false + + # Optional + # Default: (path: "/", replicas: 1, fluidNative: false) + # Description: which paths should the DataLoad load + targetPaths: + - path: "/" + replicas: 1 + fluidNative: false + + # Required + # Description: the image that the DataLoad job uses + image: # diff --git a/charts/fluid/fluid/values.yaml b/charts/fluid/fluid/values.yaml index b6c9270778f..534001df0a9 100644 --- a/charts/fluid/fluid/values.yaml +++ b/charts/fluid/fluid/values.yaml @@ -33,8 +33,8 @@ runtime: portRange: 18000-19999 enabled: false smartdata: - image: registry.cn-shanghai.aliyuncs.com/jindofs/smartdata:3.5.2 + image: registry.cn-shanghai.aliyuncs.com/jindofs/smartdata:3.5.0 fuse: - image: registry.cn-shanghai.aliyuncs.com/jindofs/jindo-fuse:3.5.2 + image: registry.cn-shanghai.aliyuncs.com/jindofs/jindo-fuse:3.5.0 controller: image: registry.aliyuncs.com/fluid/jindoruntime-controller:v0.6.0-ed9b1be diff --git a/charts/jindofs/Chart.yaml b/charts/jindofs/Chart.yaml index cb4f8508636..39fe6bdbed3 100755 --- a/charts/jindofs/Chart.yaml +++ b/charts/jindofs/Chart.yaml @@ -1,5 +1,5 @@ apiVersion: v1 -appVersion: 3.5.2 +appVersion: 3.5.0 description: FileSystem on the cloud based on Aliyun Object Storage aimed for data acceleration. home: https://help.aliyun.com/document_detail/164207.html @@ -14,4 +14,4 @@ maintainers: - email: cheyang@163.com name: Yang Che name: jindofs -version: 3.5.2 +version: 3.5.0 diff --git a/charts/jindofs/templates/config/jindofs-client-conf.yaml b/charts/jindofs/templates/config/jindofs-client-conf.yaml new file mode 100755 index 00000000000..26aad163a21 --- /dev/null +++ b/charts/jindofs/templates/config/jindofs-client-conf.yaml @@ -0,0 +1,16 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + annotations: + "helm.sh/hook": "pre-install" + "helm.sh/hook-delete-policy": before-hook-creation + name: {{ template "jindofs.fullname" . }}-client-config + labels: + name: {{ template "jindofs.fullname" . }}-client-config + app: {{ template "jindofs.name" . }}-client + chart: {{ template "jindofs.chart" . }}-client + release: {{ .Release.Name }} + heritage: {{ .Release.Service }} +data: + STORAGE_NAMESPACE_RPC_ADDRESS: {{ template "jindofs.fullname" . }}-master:{{ .Values.master.ports.rpc }} + CLIENT_NAMESPACE_RPC_ADDRESS: {{ template "jindofs.fullname" . }}-master:{{ .Values.master.ports.rpc }} diff --git a/charts/jindofs/templates/config/jindofs-conf.yaml b/charts/jindofs/templates/config/jindofs-conf.yaml index 18907921bc2..2456228b7dd 100755 --- a/charts/jindofs/templates/config/jindofs-conf.yaml +++ b/charts/jindofs/templates/config/jindofs-conf.yaml @@ -37,3 +37,15 @@ data: {{ $key }} = {{ $val }} {{- end}} client.read.storagepolicy.ignore = true + core-site.xml: | + + + + jindo.common.accessKeyId + {{ .Values.master.osskey }} + + + jindo.common.accessKeySecret + {{ .Values.master.osssecret }} + + diff --git a/charts/jindofs/templates/fuse/daemonset.yaml b/charts/jindofs/templates/fuse/daemonset.yaml index c64c6d60959..8268fd46554 100755 --- a/charts/jindofs/templates/fuse/daemonset.yaml +++ b/charts/jindofs/templates/fuse/daemonset.yaml @@ -74,6 +74,9 @@ spec: - name: bigboot-config mountPath: /bigboot.cfg subPath: bigboot.cfg + - name: bigboot-config + mountPath: /hdfs-3.2.1/etc/hadoop/core-site.xml + subPath: core-site.xml {{- range $name, $path := .Values.mounts.workersAndClients }} - name: datavolume-{{ $name }} mountPath: "{{ $path }}" diff --git a/charts/jindofs/templates/master/statefulset.yaml b/charts/jindofs/templates/master/statefulset.yaml index 910ee90d94c..bd200d5022c 100755 --- a/charts/jindofs/templates/master/statefulset.yaml +++ b/charts/jindofs/templates/master/statefulset.yaml @@ -60,6 +60,9 @@ spec: - name: bigboot-config mountPath: /bigboot.cfg subPath: bigboot.cfg + - name: bigboot-config + mountPath: /hdfs-3.2.1/etc/hadoop/core-site.xml + subPath: core-site.xml {{- range $name, $path := .Values.mounts.master }} - name: datavolume-{{ $name }} mountPath: "{{ $path }}" diff --git a/charts/jindofs/templates/worker/daemonset.yaml b/charts/jindofs/templates/worker/daemonset.yaml index 066f119c0f0..4812e87c0e2 100755 --- a/charts/jindofs/templates/worker/daemonset.yaml +++ b/charts/jindofs/templates/worker/daemonset.yaml @@ -62,6 +62,9 @@ spec: - name: bigboot-config mountPath: /bigboot.cfg subPath: bigboot.cfg + - name: bigboot-config + mountPath: /hdfs-3.2.1/etc/hadoop/core-site.xml + subPath: core-site.xml {{- range $name, $path := .Values.mounts.workersAndClients }} - name: datavolume-{{ $name }} mountPath: "{{ $path }}" diff --git a/charts/jindofs/values.yaml b/charts/jindofs/values.yaml index 3ede82c54ab..48ab3d1b51c 100644 --- a/charts/jindofs/values.yaml +++ b/charts/jindofs/values.yaml @@ -4,11 +4,11 @@ image: registry-vpc.__ACK_REGION_ID__.aliyuncs.com/jindo/smartdata -imageTag: "3.5.2" +imageTag: "3.5.0" imagePullPolicy: Always fuseImage: registry-vpc.__ACK_REGION_ID__.aliyuncs.com/jindo/jindo-fuse -fuseImageTag: "3.5.2" +fuseImageTag: "3.5.0" user: 0 group: 0 diff --git a/pkg/controllers/v1alpha1/dataload/dataload_jindo.go b/pkg/controllers/v1alpha1/dataload/dataload_jindo.go new file mode 100644 index 00000000000..4c992606c93 --- /dev/null +++ b/pkg/controllers/v1alpha1/dataload/dataload_jindo.go @@ -0,0 +1,85 @@ +package dataload + +import ( + "fmt" + "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" + cdataload "github.com/fluid-cloudnative/fluid/pkg/dataload" + "github.com/fluid-cloudnative/fluid/pkg/utils" + "github.com/fluid-cloudnative/fluid/pkg/utils/docker" + "gopkg.in/yaml.v2" + "io/ioutil" + "os" +) + +// generateDataLoadValueFile builds a DataLoadValue by extracted specifications from the given DataLoad, and +// marshals the DataLoadValue to a temporary yaml file where stores values that'll be used by fluid dataloader helm chart +func (r *DataLoadReconcilerImplement) generateJindoDataLoadValueFile(dataload v1alpha1.DataLoad) (valueFileName string, err error) { + targetDataset, err := utils.GetDataset(r.Client, dataload.Spec.Dataset.Name, dataload.Spec.Dataset.Namespace) + if err != nil { + return "", err + } + + _, boundedRuntime := utils.GetRuntimeByCategory(targetDataset.Status.Runtimes, common.AccelerateCategory) + + imageName, imageTag := docker.GetWorkerImage(r.Client, dataload.Spec.Dataset.Name, boundedRuntime.Type, dataload.Spec.Dataset.Namespace) + image := fmt.Sprintf("%s:%s", imageName, imageTag) + + runtime, err := utils.GetJindoRuntime(r.Client, boundedRuntime.Name, boundedRuntime.Namespace) + if err != nil { + return + } + hadoopConfig := runtime.Spec.HadoopConfig + loadMemoryData := false + if len(runtime.Spec.Tieredstore.Levels) == 0 { + err = fmt.Errorf("the Tieredstore is null") + return + } + if runtime.Spec.Tieredstore.Levels[0].MediumType == "MEM" { + loadMemoryData = true + } + + dataloadInfo := cdataload.DataLoadInfo{ + BackoffLimit: 3, + TargetDataset: dataload.Spec.Dataset.Name, + LoadMetadata: dataload.Spec.LoadMetadata, + Image: image, + } + + targetPaths := []cdataload.TargetPath{} + for _, target := range dataload.Spec.Target { + fluidNative := isTargetPathUnderFluidNativeMounts(target.Path, *targetDataset) + targetPaths = append(targetPaths, cdataload.TargetPath{ + Path: target.Path, + Replicas: target.Replicas, + FluidNative: fluidNative, + }) + } + dataloadInfo.TargetPaths = targetPaths + options := map[string]string{} + if loadMemoryData { + options["loadMemoryData"] = "true" + } else { + options["loadMemoryData"] = "false" + } + if hadoopConfig != "" { + options["hdfsConfig"] = hadoopConfig + } + dataloadInfo.Options = options + + dataLoadValue := cdataload.DataLoadValue{DataLoadInfo: dataloadInfo} + data, err := yaml.Marshal(dataLoadValue) + if err != nil { + return + } + + valueFile, err := ioutil.TempFile(os.TempDir(), fmt.Sprintf("%s-%s-loader-values.yaml", dataload.Namespace, dataload.Name)) + if err != nil { + return + } + err = ioutil.WriteFile(valueFile.Name(), data, 0400) + if err != nil { + return + } + return valueFile.Name(), nil +} diff --git a/pkg/controllers/v1alpha1/dataload/implement.go b/pkg/controllers/v1alpha1/dataload/implement.go index a242dd6d8a6..f1a25bdfac5 100644 --- a/pkg/controllers/v1alpha1/dataload/implement.go +++ b/pkg/controllers/v1alpha1/dataload/implement.go @@ -12,7 +12,8 @@ import ( "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" cdataload "github.com/fluid-cloudnative/fluid/pkg/dataload" - "github.com/fluid-cloudnative/fluid/pkg/ddc/alluxio/operations" + alluxioOperations "github.com/fluid-cloudnative/fluid/pkg/ddc/alluxio/operations" + jindoOperations "github.com/fluid-cloudnative/fluid/pkg/ddc/jindo/operations" "github.com/fluid-cloudnative/fluid/pkg/utils" "github.com/fluid-cloudnative/fluid/pkg/utils/docker" "github.com/fluid-cloudnative/fluid/pkg/utils/helm" @@ -209,7 +210,12 @@ func (r *DataLoadReconcilerImplement) reconcilePendingDataLoad(ctx reconcileRequ case common.ALLUXIO_RUNTIME: podName := fmt.Sprintf("%s-master-0", targetDataset.Name) containerName := "alluxio-master" - fileUtils := operations.NewAlluxioFileUtils(podName, containerName, targetDataset.Namespace, ctx.Log) + fileUtils := alluxioOperations.NewAlluxioFileUtils(podName, containerName, targetDataset.Namespace, ctx.Log) + ready = fileUtils.Ready() + case common.JINDO_RUNTIME: + podName := fmt.Sprintf("%s-jindofs-master-0", targetDataset.Name) + containerName := "jindofs-master" + fileUtils := jindoOperations.NewJindoFileUtils(podName, containerName, targetDataset.Namespace, ctx.Log) ready = fileUtils.Ready() default: log.Error(fmt.Errorf("RuntimeNotSupported"), "The runtime is not supported yet", "runtime", boundedRuntime) @@ -234,7 +240,20 @@ func (r *DataLoadReconcilerImplement) reconcilePendingDataLoad(ctx reconcileRequ case common.ALLUXIO_RUNTIME: podName := fmt.Sprintf("%s-master-0", targetDataset.Name) containerName := "alluxio-master" - fileUtils := operations.NewAlluxioFileUtils(podName, containerName, targetDataset.Namespace, ctx.Log) + fileUtils := alluxioOperations.NewAlluxioFileUtils(podName, containerName, targetDataset.Namespace, ctx.Log) + for _, target := range ctx.DataLoad.Spec.Target { + isExist, err := fileUtils.IsExist(target.Path) + if err != nil { + return utils.RequeueAfterInterval(20 * time.Second) + } + if !isExist { + notExisted = true + } + } + case common.JINDO_RUNTIME: + podName := fmt.Sprintf("%s-jindofs-master-0", targetDataset.Name) + containerName := "jindofs-master" + fileUtils := jindoOperations.NewJindoFileUtils(podName, containerName, targetDataset.Namespace, ctx.Log) for _, target := range ctx.DataLoad.Spec.Target { isExist, err := fileUtils.IsExist(target.Path) if err != nil { @@ -325,12 +344,38 @@ func (r *DataLoadReconcilerImplement) reconcileExecutingDataLoad(ctx reconcileRe // 2. install the helm chart if not exists and requeue if !existed { log.Info("DataLoad job helm chart not installed yet, will install") - valueFileName, err := r.generateDataLoadValueFile(ctx.DataLoad) + + targetDataset, err := utils.GetDataset(r.Client, ctx.DataLoad.Spec.Dataset.Name, ctx.DataLoad.Spec.Dataset.Namespace) if err != nil { - log.Error(err, "failed to generate dataload chart's value file") - return utils.RequeueIfError(err) + log.Error(err, "targetDataset not exists", "targetDataset", releaseName, "namespace", ctx.DataLoad.Spec.Dataset.Name) + } + + _, boundedRuntime := utils.GetRuntimeByCategory(targetDataset.Status.Runtimes, common.AccelerateCategory) + + chartName := "" + valueFileName := "" + switch boundedRuntime.Type { + case common.ALLUXIO_RUNTIME: + valueFileName, err = r.generateDataLoadValueFile(ctx.DataLoad) + if err != nil { + log.Error(err, "failed to generate dataload chart's value file") + return utils.RequeueIfError(err) + } + chartName = utils.GetChartsDirectory() + "/" + cdataload.DATALOAD_CHART + "/" + common.ALLUXIO_RUNTIME + case common.JINDO_RUNTIME: + valueFileName, err = r.generateJindoDataLoadValueFile(ctx.DataLoad) + if err != nil { + log.Error(err, "failed to generate dataload chart's value file") + return utils.RequeueIfError(err) + } + chartName = utils.GetChartsDirectory() + "/" + cdataload.DATALOAD_CHART + "/" + common.JINDO_RUNTIME + default: + log.Error(fmt.Errorf("RuntimeNotSupported"), "The runtime is not supported yet", "runtime", boundedRuntime) + r.Recorder.Eventf(&ctx.DataLoad, + v1.EventTypeNormal, + common.RuntimeNotReady, + "Bounded accelerate runtime not supported") } - chartName := utils.GetChartsDirectory() + "/" + cdataload.DATALOAD_CHART err = helm.InstallRelease(releaseName, ctx.Namespace, valueFileName, chartName) if err != nil { log.Error(err, "failed to install dataload chart") @@ -486,7 +531,7 @@ func (r *DataLoadReconcilerImplement) generateDataLoadValueFile(dataload v1alpha } // isTargetPathUnderFluidNativeMounts checks if targetPath is a subpath under some given native mount point. -// We check this for the reason that native mount points need extra metadata sync operations. +// We check this for the reason that native mount points need extra metadata sync alluxioOperations. func isTargetPathUnderFluidNativeMounts(targetPath string, dataset v1alpha1.Dataset) bool { for _, mount := range dataset.Spec.Mounts { mountPointOnDDCEngine := fmt.Sprintf("/%s", mount.Name) diff --git a/pkg/dataload/value.go b/pkg/dataload/value.go index 17302bd5849..4c256585494 100644 --- a/pkg/dataload/value.go +++ b/pkg/dataload/value.go @@ -21,6 +21,9 @@ type DataLoadInfo struct { // Image specifies the image that the DataLoad job uses Image string `yaml:"image,omitempty"` + + // Options specifies the extra dataload properties for runtime + Options map[string]string `yaml:"options,omitempty"` } type TargetPath struct { diff --git a/pkg/ddc/jindo/cache.go b/pkg/ddc/jindo/cache.go index 076187d7131..2b22f6a0ffc 100644 --- a/pkg/ddc/jindo/cache.go +++ b/pkg/ddc/jindo/cache.go @@ -2,6 +2,7 @@ package jindo import ( "fmt" + "github.com/fluid-cloudnative/fluid/pkg/ddc/jindo/operations" "github.com/fluid-cloudnative/fluid/pkg/utils" "strings" ) @@ -56,7 +57,7 @@ func (e *JindoEngine) queryCacheStatus() (states cacheStates, err error) { } // clean cache -/*func (e *JindoEngine) invokeCleanCache() (err error) { +func (e *JindoEngine) invokeCleanCache() (err error) { // 1. Check if master is ready, if not, just return masterName := e.getMasterStatefulsetName() master, err := e.getMasterStatefulset(masterName, e.namespace) @@ -76,4 +77,4 @@ func (e *JindoEngine) queryCacheStatus() (states cacheStates, err error) { fileUitls := operations.NewJindoFileUtils(podName, containerName, e.namespace, e.Log) e.Log.Info("cleaning cache and wait for a while") return fileUitls.CleanCache() -}*/ +} diff --git a/pkg/ddc/jindo/master_internal.go b/pkg/ddc/jindo/master_internal.go index 97b6759c0f6..87ce3d3f884 100644 --- a/pkg/ddc/jindo/master_internal.go +++ b/pkg/ddc/jindo/master_internal.go @@ -5,6 +5,7 @@ import ( "github.com/fluid-cloudnative/fluid/pkg/utils" "github.com/fluid-cloudnative/fluid/pkg/utils/helm" "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" + "github.com/fluid-cloudnative/fluid/pkg/utils/kubectl" "gopkg.in/yaml.v2" "io/ioutil" "os" @@ -55,6 +56,11 @@ func (e *JindoEngine) generateJindoValueFile() (valueFileName string, err error) if err != nil { return } + + err = kubectl.CreateConfigMapFromFile(e.getConfigmapName(), "data", valueFileName, e.namespace) + if err != nil { + return + } return valueFileName, err } diff --git a/pkg/ddc/jindo/operations/base.go b/pkg/ddc/jindo/operations/base.go index 1b0770bd3a9..07deffc3d96 100644 --- a/pkg/ddc/jindo/operations/base.go +++ b/pkg/ddc/jindo/operations/base.go @@ -107,3 +107,40 @@ func (a JindoFileUtils) GetUfsTotalSize(url string, useStsSecret bool) (summary } return stdout, err } + +// Check if the Alluxio is ready by running `alluxio fsadmin report` command +func (a JindoFileUtils) Ready() (ready bool) { + var ( + command = []string{"/sdk/bin/jindo", "jfs", "-report"} + ) + + _, _, err := a.exec(command, true) + if err == nil { + ready = true + } + + return ready +} + +// IsExist checks if the alluxioPath exists +func (a JindoFileUtils) IsExist(jindoPath string) (found bool, err error) { + var ( + command = []string{"hadoop", "fs", "-ls", "jfs://jindo" + jindoPath} + stdout string + stderr string + ) + + stdout, stderr, err = a.exec(command, true) + if err != nil { + if strings.Contains(stdout, "No such file or directory") { + err = nil + } else { + err = fmt.Errorf("execute command %v with expectedErr: %v stdout %s and stderr %s", command, err, stdout, stderr) + return + } + } else { + found = true + } + + return +} diff --git a/pkg/ddc/jindo/runtime_info.go b/pkg/ddc/jindo/runtime_info.go index d48d73b2148..95dc8f9a2ab 100644 --- a/pkg/ddc/jindo/runtime_info.go +++ b/pkg/ddc/jindo/runtime_info.go @@ -18,6 +18,7 @@ func (e *JindoEngine) getRuntimeInfo() (base.RuntimeInfoInterface, error) { return e.runtimeInfo, err } + // Setup Fuse Deploy Mode if runtime.Spec.Fuse.Global { e.runtimeInfo.SetupFuseDeployMode(runtime.Spec.Fuse.Global, runtime.Spec.Fuse.NodeSelector) e.Log.Info("Enable global mode for fuse") diff --git a/pkg/ddc/jindo/shutdown.go b/pkg/ddc/jindo/shutdown.go index f45544e791a..9c424c9d223 100644 --- a/pkg/ddc/jindo/shutdown.go +++ b/pkg/ddc/jindo/shutdown.go @@ -17,10 +17,10 @@ import ( // shut down the Jindo engine func (e *JindoEngine) Shutdown() (err error) { - /*err = e.invokeCleanCache() + err = e.invokeCleanCache() if err != nil { return - }*/ + } _, err = e.destroyWorkers(-1) if err != nil { diff --git a/pkg/ddc/jindo/transform.go b/pkg/ddc/jindo/transform.go index bd09bd17777..d57b8c0075c 100644 --- a/pkg/ddc/jindo/transform.go +++ b/pkg/ddc/jindo/transform.go @@ -138,17 +138,24 @@ func (e *JindoEngine) transformMaster(runtime *datav1alpha1.JindoRuntime, metaPa if err != nil { return err } - jfsNamespace := "" + jfsNamespace := "jindo" + mode := "oss" for _, mount := range dataset.Spec.Mounts { - jfsNamespace = jfsNamespace + mount.Name + "," + //jfsNamespace = jfsNamespace + mount.Name + "," if !strings.HasSuffix(mount.MountPoint, "/") { mount.MountPoint = mount.MountPoint + "/" } // transform mountpoint for oss or hdfs format if strings.HasPrefix(mount.MountPoint, "hdfs://") { - properties["jfs.namespaces."+mount.Name+".hdfs.uri"] = mount.MountPoint + properties["jfs.namespaces.jindo.hdfs.uri"] = mount.MountPoint + mode = "hdfs" + } else if strings.HasPrefix(mount.MountPoint, "s3://") { + properties["jfs.namespaces.jindo.s3.uri"] = mount.MountPoint + properties["jfs.namespaces.jindo.s3.access.key"] = mount.Options["fs.s3.accessKeyId"] + properties["jfs.namespaces.jindo.s3.access.secret"] = mount.Options["fs.s3.accessKeySecret"] + mode = "s3" } else { if !strings.HasPrefix(mount.MountPoint, "oss://") { continue @@ -160,12 +167,12 @@ func (e *JindoEngine) transformMaster(runtime *datav1alpha1.JindoRuntime, metaPa e.Log.Info("incorrect muountpath", "mount.MountPoint", mount.MountPoint) } mount.MountPoint = strings.Replace(mount.MountPoint, rm[1], rm[1]+"."+mount.Options["fs.oss.endpoint"], 1) - properties["jfs.namespaces."+mount.Name+".oss.uri"] = mount.MountPoint - properties["jfs.namespaces."+mount.Name+".oss.access.key"] = mount.Options["fs.oss.accessKeyId"] - properties["jfs.namespaces."+mount.Name+".oss.access.secret"] = mount.Options["fs.oss.accessKeySecret"] - properties["jfs.namespaces."+mount.Name+".oss.access.endpoint"] = mount.Options["fs.oss.endpoint"] + properties["jfs.namespaces.jindo.oss.uri"] = mount.MountPoint + properties["jfs.namespaces.jindo.oss.access.key"] = mount.Options["fs.oss.accessKeyId"] + properties["jfs.namespaces.jindo.oss.access.secret"] = mount.Options["fs.oss.accessKeySecret"] + properties["jfs.namespaces.jindo.oss.access.endpoint"] = mount.Options["fs.oss.endpoint"] } - properties["jfs.namespaces."+mount.Name+".mode"] = "cache" + properties["jfs.namespaces.jindo.mode"] = "cache" // to check whether encryptOptions exist for _, encryptOption := range mount.EncryptOptions { key := encryptOption.Name @@ -179,11 +186,11 @@ func (e *JindoEngine) transformMaster(runtime *datav1alpha1.JindoRuntime, metaPa if err != nil { e.Log.Info("decode value failed") } - if key == "fs.oss.accessKeyId" { - properties["jfs.namespaces."+mount.Name+".oss.access.key"] = string(value) + if key == "fs."+mode+".accessKeyId" { + properties["jfs.namespaces.jindo."+mode+".access.key"] = string(value) } - if key == "fs.oss.accessKeySecret" { - properties["jfs.namespaces."+mount.Name+".oss.access.secret"] = string(value) + if key == "fs."+mode+".accessKeySecret" { + properties["jfs.namespaces.jindo."+mode+".access.secret"] = string(value) } e.Log.Info("get from secret") } @@ -199,7 +206,12 @@ func (e *JindoEngine) transformMaster(runtime *datav1alpha1.JindoRuntime, metaPa } } + if mode == "oss" || mode == "s3" { + value.Master.OssKey = properties["jfs.namespaces.jindo."+mode+".access.key"] + value.Master.OssSecret = properties["jfs.namespaces.jindo."+mode+".access.secret"] + } value.Master.MasterProperties = properties + return nil } @@ -266,7 +278,7 @@ func (e *JindoEngine) transformFuse(runtime *datav1alpha1.JindoRuntime, value *J "client.oss.connection.timeout.millisecond": "3000", "jfs.cache.meta-cache.enable": "0", "jfs.cache.data-cache.enable": "1", - "jfs.cache.data-cache.slicecache.enable": "1", + "jfs.cache.data-cache.slicecache.enable": "0", } // "client.storage.rpc.port": "6101", @@ -347,7 +359,7 @@ func (e *JindoEngine) transformFuseArg(runtime *datav1alpha1.JindoRuntime) []str var rootArg = "" var secretArg = "" if len(dataset.Spec.Mounts) > 0 && dataset.Spec.Mounts[0].Path != "" { - rootArg = "-oroot_ns=" + dataset.Spec.Mounts[0].Name + rootArg = "-oroot_ns=jindo" baseArg = rootArg + " " + baseArg } if len(runtime.Spec.Secret) != 0 { @@ -371,7 +383,7 @@ func (e *JindoEngine) transformFuseArg(runtime *datav1alpha1.JindoRuntime) []str func (e *JindoEngine) parseSmartDataImage() (image, tag string) { var ( defaultImage = "registry.cn-shanghai.aliyuncs.com/jindofs/smartdata" - defaultTag = "3.5.2" + defaultTag = "3.5.0" ) image, tag = docker.GetImageRepoTagFromEnv(common.JINDO_SMARTDATA_IMAGE_ENV, defaultImage, defaultTag) @@ -383,7 +395,7 @@ func (e *JindoEngine) parseSmartDataImage() (image, tag string) { func (e *JindoEngine) parseFuseImage() (image, tag string) { var ( defaultImage = "registry.cn-shanghai.aliyuncs.com/jindofs/jindo-fuse" - defaultTag = "3.5.2" + defaultTag = "3.5.0" ) image, tag = docker.GetImageRepoTagFromEnv(common.JINDO_FUSE_IMAGE_ENV, defaultImage, defaultTag) diff --git a/pkg/ddc/jindo/types.go b/pkg/ddc/jindo/types.go index 12c0df63d12..98afb5c5de1 100644 --- a/pkg/ddc/jindo/types.go +++ b/pkg/ddc/jindo/types.go @@ -33,6 +33,8 @@ type Master struct { MasterProperties map[string]string `yaml:"properties"` TokenProperties map[string]string `yaml:"secretProperties"` Port Ports `yaml:"ports,omitempty"` + OssKey string `yaml:"osskey,omitempty"` + OssSecret string `yaml:"osssecret,omitempty"` } type Worker struct { diff --git a/pkg/ddc/jindo/utils.go b/pkg/ddc/jindo/utils.go index afc373f8777..30222e74ab8 100644 --- a/pkg/ddc/jindo/utils.go +++ b/pkg/ddc/jindo/utils.go @@ -97,7 +97,7 @@ func (e *JindoEngine) getMasterPodInfo() (podName string, containerName string) func (e *JindoEngine) TotalJindoStorageBytes(name string, useStsSecret bool) (value int64, err error) { podName, containerName := e.getMasterPodInfo() fileUtils := operations.NewJindoFileUtils(podName, containerName, e.namespace, e.Log) - url := "jfs://" + name + "/" + url := "jfs://jindo/" ufsSize, err := fileUtils.GetUfsTotalSize(url, useStsSecret) e.Log.Info("jindo storage ufsSize", "ufsSize", ufsSize) if err != nil { diff --git a/pkg/utils/docker/image.go b/pkg/utils/docker/image.go index 26257633439..45250dfb649 100644 --- a/pkg/utils/docker/image.go +++ b/pkg/utils/docker/image.go @@ -68,10 +68,20 @@ func GetWorkerImage(client client.Client, datasetName string, runtimeType string } if imageName == "" { - imageName = "registry.cn-huhehaote.aliyuncs.com/alluxio/alluxio" + if runtimeType == common.ALLUXIO_RUNTIME { + imageName = "registry.cn-huhehaote.aliyuncs.com/alluxio/alluxio" + } + if runtimeType == common.JINDO_RUNTIME { + imageName = "registry.cn-shanghai.aliyuncs.com/jindofs/smartdata" + } } if imageTag == "" { - imageTag = "2.3.0-SNAPSHOT-238b7eb" + if runtimeType == common.ALLUXIO_RUNTIME { + imageTag = "2.3.0-SNAPSHOT-238b7eb" + } + if runtimeType == common.JINDO_RUNTIME { + imageTag = "3.5.0" + } } return } diff --git a/pkg/utils/runtimes.go b/pkg/utils/runtimes.go index e94c2658d0a..20674e19e26 100644 --- a/pkg/utils/runtimes.go +++ b/pkg/utils/runtimes.go @@ -65,3 +65,17 @@ func GetAlluxioRuntime(client client.Client, name, namespace string) (*data.Allu } return &runtime, nil } + +// GetJindoRuntime gets Jindo Runtime object with the given name and namespace +func GetJindoRuntime(client client.Client, name, namespace string) (*data.JindoRuntime, error) { + + key := types.NamespacedName{ + Namespace: namespace, + Name: name, + } + var runtime data.JindoRuntime + if err := client.Get(context.TODO(), key, &runtime); err != nil { + return nil, err + } + return &runtime, nil +}