Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADD Dataload-Manager Model #16

Merged
merged 3 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
77 changes: 77 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,83 @@ release_datamanager:
${MUILT_ARCH_PUSH_CMD} -i ${DATAMANAGER_IMAGE_NAME}:${RELEASE_TAG}



#### for DATALOAD-MANAGER #########
DATALOAD_MANAGER_MODULE_NAME = dataload-manager
DATALOAD_MANAGER_BUILD_INPUT = ${CMDS_DIR}/${DATALOAD_MANAGER_MODULE_NAME}/main.go
.PHONY: run_dataload_manager
run_dataload_manager:
go run ${BUILD_OPTIONS} ${DATALOAD_MANAGER_BUILD_INPUT} --isTrainMaster=false --isInitRole=true --baseModelLocalDir=/Users/liangsun/Workspace/projects/golang/src/github.com/hwameistor/datastore/_build/models --checkpointLocalDir=/Users/liangsun/Workspace/projects/golang/src/github.com/hwameistor/datastore/_build/checkpoints --checkpointLocalDirOnHost=/Users/liangsun/Downloads/checkpoints --trainingdataLocalDir=/Users/liangsun/Workspace/projects/golang/src/github.com/hwameistor/datastore/_build/training --trainingdataLocalDirOnHost=/Users/liangsun/Downloads/training

.PHONY: compile_dataload_manager
compile_dataload_manager:
GOARCH=amd64 ${BUILD_ENVS} ${BUILD_CMD} ${BUILD_OPTIONS} -o ${DATALOAD_MANAGER_BUILD_OUTPUT} ${DATALOAD_MANAGER_BUILD_INPUT}

.PHONY: compile_dataload_manager_arm64
compile_dataload_manager_arm64:
GOARCH=arm64 ${BUILD_ENVS} ${BUILD_CMD} ${BUILD_OPTIONS} -o ${DATALOAD_MANAGER_BUILD_OUTPUT} ${DATALOAD_MANAGER_BUILD_INPUT}

.PHONY: build_dataload_manager_image
build_dataload_manager_image:
@echo "Build dataload_manager image ${DATALOAD_MANAGER_IMAGE_NAME}:${IMAGE_TAG}"
${DOCKER_MAKE_CMD} make compile_dataload_manager
${DOCKER_BUILDX_CMD_AMD64} -t ${DATALOAD_MANAGER_IMAGE_NAME}:${IMAGE_TAG} -f ${DATALOAD_MANAGER_IMAGE_DOCKERFILE}.amd64 ${PROJECT_SOURCE_CODE_DIR}

.PHONY: build_dataload_manager_image_arm64
build_dataload_manager_image_arm64:
@echo "Build dataload_manager image ${DATALOAD_MANAGER_IMAGE_NAME}:${IMAGE_TAG}"
${DOCKER_MAKE_CMD} make compile_dataload_manager_arm64
${DOCKER_BUILDX_CMD_ARM64} -t ${DATALOAD_MANAGER_IMAGE_NAME}:${IMAGE_TAG} -f ${DATALOAD_MANAGER_IMAGE_DOCKERFILE}.arm64 ${PROJECT_SOURCE_CODE_DIR}

.PHONY: release_dataload_manager
release_dataload_manager:
# build for amd64 version
${DOCKER_MAKE_CMD} make compile_dataload_manager
${DOCKER_BUILDX_CMD_AMD64} -t ${DATALOAD_MANAGER_IMAGE_NAME}:${RELEASE_TAG}-amd64 -f ${DATALOAD_MANAGER_IMAGE_DOCKERFILE}.amd64 ${PROJECT_SOURCE_CODE_DIR}
# build for arm64 version
${DOCKER_MAKE_CMD} make compile_dataload_manager_arm64
${DOCKER_BUILDX_CMD_ARM64} -t ${DATALOAD_MANAGER_IMAGE_NAME}:${RELEASE_TAG}-arm64 -f ${DATALOAD_MANAGER_IMAGE_DOCKERFILE}.arm64 ${PROJECT_SOURCE_CODE_DIR}
# push to a public registry
${MUILT_ARCH_PUSH_CMD} -i ${DATALOAD_MANAGER_IMAGE_NAME}:${RELEASE_TAG}

#### for DATALOAD-INIT #########
DATALOAD_INIT_MODULE_NAME = dataload-init
DATALOAD_INIT_BUILD_INPUT = ${CMDS_DIR}/${DATALOAD_INIT_MODULE_NAME}/main.go
.PHONY: run_dataload_init
run_dataload_init:
go run ${BUILD_OPTIONS} ${DATALOAD_INIT_BUILD_INPUT} --isTrainMaster=false --isInitRole=true --baseModelLocalDir=/Users/liangsun/Workspace/projects/golang/src/github.com/hwameistor/datastore/_build/models --checkpointLocalDir=/Users/liangsun/Workspace/projects/golang/src/github.com/hwameistor/datastore/_build/checkpoints --checkpointLocalDirOnHost=/Users/liangsun/Downloads/checkpoints --trainingdataLocalDir=/Users/liangsun/Workspace/projects/golang/src/github.com/hwameistor/datastore/_build/training --trainingdataLocalDirOnHost=/Users/liangsun/Downloads/training

.PHONY: compile_dataload_init
compile_dataload_init:
GOARCH=amd64 ${BUILD_ENVS} ${BUILD_CMD} ${BUILD_OPTIONS} -o ${DATALOAD_INIT_BUILD_OUTPUT} ${DATALOAD_INIT_BUILD_INPUT}

.PHONY: compile_dataload_init_arm64
compile_dataload_init_arm64:
GOARCH=arm64 ${BUILD_ENVS} ${BUILD_CMD} ${BUILD_OPTIONS} -o ${DATALOAD_INIT_BUILD_OUTPUT} ${DATALOAD_INIT_BUILD_INPUT}

.PHONY: build_dataload_init_image
build_dataload_init_image:
@echo "Build dataload_init image ${DATALOAD_INIT_IMAGE_NAME}:${IMAGE_TAG}"
${DOCKER_MAKE_CMD} make compile_dataload_init
${DOCKER_BUILDX_CMD_AMD64} -t ${DATALOAD_INIT_IMAGE_NAME}:${IMAGE_TAG} -f ${DATALOAD_INIT_IMAGE_DOCKERFILE}.amd64 ${PROJECT_SOURCE_CODE_DIR}

.PHONY: build_dataload_init_image_arm64
build_dataload_init_image_arm64:
@echo "Build dataload_init_arm64 image ${DATALOAD_INIT_IMAGE_NAME}:${IMAGE_TAG}"
${DOCKER_MAKE_CMD} make compile_dataload_init_arm64
${DOCKER_BUILDX_CMD_ARM64} -t ${DATALOAD_INIT_IMAGE_NAME}:${IMAGE_TAG} -f ${DATALOAD_INIT_IMAGE_DOCKERFILE}.arm64 ${PROJECT_SOURCE_CODE_DIR}

.PHONY: release_dataload_init
release_dataload_init:
# build for amd64 version
${DOCKER_MAKE_CMD} make compile_dataload_init
${DOCKER_BUILDX_CMD_AMD64} -t ${DATALOAD_INIT_IMAGE_NAME}:${RELEASE_TAG}-amd64 -f ${DATALOAD_INIT_IMAGE_DOCKERFILE}.amd64 ${PROJECT_SOURCE_CODE_DIR}
# build for arm64 version
${DOCKER_MAKE_CMD} make compile_dataload_init_arm64
${DOCKER_BUILDX_CMD_ARM64} -t ${DATALOAD_INIT_IMAGE_NAME}:${RELEASE_TAG}-arm64 -f ${DATALOAD_INIT_IMAGE_DOCKERFILE}.arm64 ${PROJECT_SOURCE_CODE_DIR}
# push to a public registry
${MUILT_ARCH_PUSH_CMD} -i ${DATALOAD_INIT_IMAGE_NAME}:${RELEASE_TAG}

.PHONY: apis
apis:
${DOCKER_MAKE_CMD} make _gen-apis
Expand Down
9 changes: 9 additions & 0 deletions Makefile.variables
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ DATAMANAGER_IMAGE_NAME = ${IMAGE_REGISTRY}/${DATAMANAGER_MODULE_NAME}
DATAMANAGER_IMAGE_DOCKERFILE = ${PROJECT_SOURCE_CODE_DIR}/build/${DATAMANAGER_MODULE_NAME}/Dockerfile
DATAMANAGER_BUILD_OUTPUT = ${BINS_DIR}/${DATAMANAGER_MODULE_NAME}

# [ IMAGE/DATALOAD_MANAGER ]
DATALOAD_MANAGER_IMAGE_NAME = ${IMAGE_REGISTRY}/${DATALOAD_MANAGER_MODULE_NAME}
DATALOAD_MANAGER_IMAGE_DOCKERFILE = ${PROJECT_SOURCE_CODE_DIR}/build/${DATALOAD_MANAGER_MODULE_NAME}/Dockerfile
DATALOAD_MANAGER_BUILD_OUTPUT = ${BINS_DIR}/${DATALOAD_MANAGER_MODULE_NAME}

# [ IMAGE/DATALOAD_INIT ]
DATALOAD_INIT_IMAGE_NAME = ${IMAGE_REGISTRY}/${DATALOAD_INIT_MODULE_NAME}
DATALOAD_INIT_IMAGE_DOCKERFILE = ${PROJECT_SOURCE_CODE_DIR}/build/${DATALOAD_INIT_MODULE_NAME}/Dockerfile
DATALOAD_INIT_BUILD_OUTPUT = ${BINS_DIR}/${DATALOAD_INIT_MODULE_NAME}

# [ BUILD ]
#----------------
Expand Down
5 changes: 5 additions & 0 deletions build/dataload-init/Dockerfile.amd64
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
FROM alpine:latest

COPY ./_build/dataload-init /

ENTRYPOINT [ "/dataload-init" ]
5 changes: 5 additions & 0 deletions build/dataload-init/Dockerfile.arm64
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
FROM alpine:latest

COPY ./_build/datamanager /

ENTRYPOINT [ "/dataload-init" ]
9 changes: 9 additions & 0 deletions build/dataload-manager/Dockerfile.amd64
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
FROM centos:7

RUN yum install -y wget
RUN wget https://github.com/juicedata/juicesync/releases/download/v1.1.1/juicesync-1.1.1-linux-arm64.tar.gz
RUN tar zxf juicesync-1.1.1-linux-arm64.tar.gz && mv juicesync /usr/bin && chmod +x /usr/bin/juicesync

COPY ./_build/dataload-manager /

ENTRYPOINT [ "/dataload-manager" ]
8 changes: 8 additions & 0 deletions build/dataload-manager/Dockerfile.arm64
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
FROM alpine:latest

RUN wget https://github.com/juicedata/juicesync/releases/download/v1.1.1/juicesync-1.1.1-linux-arm64.tar.gz
RUN tar zxf juicesync-1.1.1-linux-arm64.tar.gz && mv juicesync /usr/bin && chmod +x /usr/bin/juicesync

COPY ./_build/dataload-manager /

ENTRYPOINT [ "/dataload-manager" ]
3 changes: 2 additions & 1 deletion build/datamanager/Dockerfile.amd64
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
FROM alpine:latest
FROM centos:7
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alpine is a better choice for base-image than centos :-)


RUN yum install -y wget
RUN wget https://github.com/juicedata/juicesync/releases/download/v1.1.1/juicesync-1.1.1-linux-arm64.tar.gz
RUN tar zxf juicesync-1.1.1-linux-arm64.tar.gz && mv juicesync /usr/bin && chmod +x /usr/bin/juicesync

Expand Down
140 changes: 140 additions & 0 deletions cmd/dataload-init/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package main

import (
"context"
"flag"
"fmt"
datastorev1alpha1 "github.com/hwameistor/datastore/pkg/apis/client/clientset/versioned/typed/datastore/v1alpha1"
datastore "github.com/hwameistor/datastore/pkg/apis/datastore/v1alpha1"
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"os"
"time"
)

var (
nodeName = flag.String("nodename", "", "Node name")
subDir = flag.String("subdir", "", "subdir")
)

const (
NameSpaceEnvVar = "NAMESPACE"
pvcNameEnvVar = "PVC_NAME"
apiGroup = "example.com"
version = "v1alpha1"
)

func main() {
flag.Parse()
if *nodeName == "" {
log.WithFields(log.Fields{"nodename": *nodeName}).Error("Invalid node name")
os.Exit(1)
}
if *subDir == "" {
log.WithFields(log.Fields{"subdir": *subDir}).Error("Invalid subdir path")
os.Exit(1)
}
namespace := os.Getenv(NameSpaceEnvVar)
pvcName := os.Getenv(pvcNameEnvVar)

if namespace == "" || pvcName == "" {
log.Fatal("Namespace or PVC Name environment variables are not set.")
}

config, err := getConfig()
if err != nil {
log.WithError(err).Fatal("Failed to get Kubernetes configuration")
}

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.WithError(err).Fatal("Failed to create Kubernetes clientset")
}

pvcClient := clientset.CoreV1().PersistentVolumeClaims(namespace)
pvc, err := getPersistentVolumeClaim(pvcClient, pvcName)
if err != nil {
log.WithError(err).Error("Failed to get PVC")
return
}

dataSetName := pvc.Spec.VolumeName
dataLoadRequest := createDataLoadRequest(dataSetName, *subDir)
dsClient, err := datastorev1alpha1.NewForConfig(config)
watcher, err := watchCustomResource(dsClient, namespace, dataSetName)
if err != nil {
log.WithError(err).Error("Failed to start watching custom resource")
return
}
defer watcher.Stop()
// 开始计时
peng9808 marked this conversation as resolved.
Show resolved Hide resolved
start := time.Now()
if err := createCustomResource(dsClient, dataLoadRequest, namespace); err != nil {
log.WithError(err).Error("Failed to create custom resource")
return
}
fmt.Println("Created custom resource")
peng9808 marked this conversation as resolved.
Show resolved Hide resolved
for event := range watcher.ResultChan() {
if event.Type == watch.Deleted {
fmt.Println("Custom resource deleted, exiting")
//计时结束
end := time.Now()
duration := end.Sub(start)
fmt.Printf("DataLoad execution time: %s\n", duration)
return
}
}
}

func getConfig() (*rest.Config, error) {
config, err := rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("failed to get in-cluster config: %w", err)
}
return config, nil
}

func getPersistentVolumeClaim(pvcClient v1.PersistentVolumeClaimInterface, pvcName string) (*corev1.PersistentVolumeClaim, error) {
return pvcClient.Get(context.TODO(), pvcName, metav1.GetOptions{})
}

func createDataLoadRequest(dataSetName, subDir string) *datastore.DataLoadRequest {
return &datastore.DataLoadRequest{
TypeMeta: metav1.TypeMeta{
APIVersion: fmt.Sprintf("%s/%s", apiGroup, version),
Kind: "DataLoadRequest",
},
ObjectMeta: metav1.ObjectMeta{
Name: dataSetName,
},
Spec: datastore.DataLoadRequestSpec{
IsGlobal: true,
Node: *nodeName,
DataSet: dataSetName,
SubDir: subDir,
},
Status: datastore.DataLoadRequestStatus{
State: datastore.OperationStateStart,
},
}
}

func createCustomResource(dsClient datastorev1alpha1.DatastoreV1alpha1Interface, dataLoadRequest *datastore.DataLoadRequest, namespace string) error {
_, err := dsClient.DataLoadRequests(namespace).Create(context.Background(), dataLoadRequest, metav1.CreateOptions{})
if err != nil && !errors.IsAlreadyExists(err) {
return fmt.Errorf("failed to create custom resource: %w", err)
}
return nil
}

func watchCustomResource(dsClient datastorev1alpha1.DatastoreV1alpha1Interface, namespace, resourceName string) (watch.Interface, error) {
return dsClient.DataLoadRequests(namespace).Watch(context.TODO(), metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", resourceName),
})
}
105 changes: 105 additions & 0 deletions cmd/dataload-manager/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package main

import (
"context"
"flag"
"fmt"
dataloadManager "github.com/hwameistor/datastore/pkg/dataload-manager"
"github.com/kubernetes-csi/csi-lib-utils/leaderelection"
log "github.com/sirupsen/logrus"
"k8s.io/client-go/informers"
"strings"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
"os"
"time"

dsclientset "github.com/hwameistor/datastore/pkg/apis/client/clientset/versioned"
dsinformers "github.com/hwameistor/datastore/pkg/apis/client/informers/externalversions"
)

var (
showVersion = flag.Bool("version", false, "Show version.")
enableLeaderElection = flag.Bool("leader-election", false, "Enable leader election.")
kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster.")
rsync = flag.Duration("rsync", 10*time.Minute, "Rsync interval of the controller.")
nodeName = flag.String("nodename", "", "Node name")
version = "unknown"
)

func main() {
klog.InitFlags(nil)
flag.Set("logtostderr", "true")
flag.Parse()
klog.Infof("args: %s", strings.Join(os.Args, " "))

if *showVersion {
fmt.Println(os.Args[0], version)
return
}
klog.Infof("Version: %s", version)

if *nodeName == "" {
log.WithFields(log.Fields{"nodename": *nodeName}).Error("Invalid node name")
os.Exit(1)
}

// Create the client config. Use kubeconfig if given, otherwise assume in-cluster.
config, err := buildConfig(*kubeconfig)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
}

// Create the kubeClientset for in-cluster resources
kubeClientset, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
}

// Create the kubeClientset for datastore resources
dsClient, err := dsclientset.NewForConfig(config)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
}

// Create the kubeClientset for datastore resources
coreFactory := informers.NewSharedInformerFactory(kubeClientset, *rsync)
dsFactory := dsinformers.NewSharedInformerFactory(dsClient, *rsync)
dataLoadRequestInformer := dsFactory.Datastore().V1alpha1().DataLoadRequests()

ctr := dataloadManager.New(*nodeName, kubeClientset, dsClient, dataLoadRequestInformer)
run := func(ctx context.Context) {
stopCh := ctx.Done()
dsFactory.Start(stopCh)
coreFactory.Start(stopCh)
ctr.Run(stopCh)
}

leClientset, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Fatalf("Failed to create leaderelection client: %v", err)
}

if *enableLeaderElection {
lockName := "hwameistor-dataload-manager-node"
le := leaderelection.NewLeaderElection(leClientset, lockName, run)
if err = le.Run(); err != nil {
klog.Fatalf("Failed to initialize leader election: %v", err)
}
} else {
run(context.TODO())
}
}

func buildConfig(kubeconfig string) (*rest.Config, error) {
if kubeconfig != "" {
return clientcmd.BuildConfigFromFlags("", kubeconfig)
}
return rest.InClusterConfig()
}
Loading