Skip to content

Commit

Permalink
Added modelName parameter to be considered for testing finetune demo
Browse files Browse the repository at this point in the history
  • Loading branch information
abhijeet-dhumal authored and openshift-merge-bot[bot] committed Aug 28, 2024
1 parent 2a0386c commit 7f2dbf7
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 90 deletions.
1 change: 0 additions & 1 deletion tests/odh/mnist_ray_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ func mnistRay(t *testing.T, numGpus int) {

// Fetch created raycluster
rayClusterName := "mnisttest"
// Wait until raycluster is up and running
rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Get(test.Ctx(), rayClusterName, metav1.GetOptions{})
test.Expect(err).ToNot(HaveOccurred())

Expand Down
41 changes: 0 additions & 41 deletions tests/odh/mnist_raytune_hpo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,53 +129,12 @@ func mnistRayTuneHpo(t *testing.T, numGpus int) {
ContainElement(WithTransform(KueueWorkloadAdmitted, BeTrueBecause("Workload failed to be admitted"))),
),
)
time.Sleep(30 * time.Second)

// Fetch created raycluster
rayClusterName := "mnisthpotest"
rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Get(test.Ctx(), rayClusterName, metav1.GetOptions{})
test.Expect(err).ToNot(HaveOccurred())

// Initialise raycluster client to interact with raycluster to get rayjob details using REST-API
dashboardUrl := GetDashboardUrl(test, namespace, rayCluster)
rayClusterClientConfig := RayClusterClientConfig{Address: dashboardUrl.String(), Client: nil, InsecureSkipVerify: true}
rayClient, err := NewRayClusterClient(rayClusterClientConfig, test.Config().BearerToken)
if err != nil {
test.T().Errorf("%s", err)
}

jobID := GetTestJobId(test, rayClient, dashboardUrl.Host)
test.Expect(jobID).ToNot(Equal(nil))

// Wait for the job to be succeeded or failed
var rayJobStatus string
fmt.Printf("Waiting for job to be Succeeded...\n")
test.Eventually(func() string {
resp, err := rayClient.GetJobDetails(jobID)
test.Expect(err).ToNot(HaveOccurred())
rayJobStatusVal := resp.Status
if rayJobStatusVal == "SUCCEEDED" || rayJobStatusVal == "FAILED" {
fmt.Printf("JobStatus : %s\n", rayJobStatusVal)
rayJobStatus = rayJobStatusVal
return rayJobStatus
}
if rayJobStatus != rayJobStatusVal && rayJobStatusVal != "SUCCEEDED" {
fmt.Printf("JobStatus : %s...\n", rayJobStatusVal)
rayJobStatus = rayJobStatusVal
}
return rayJobStatus
}, TestTimeoutDouble, 3*time.Second).Should(Or(Equal("SUCCEEDED"), Equal("FAILED")), "Job did not complete within the expected time")
test.Expect(rayJobStatus).To(Equal("SUCCEEDED"), "RayJob failed !")

// Store job logs in output directory
WriteRayJobAPILogs(test, rayClient, jobID)

// Fetch created raycluster
rayClusterName := "mnisthpotest"
// Wait until raycluster is up and running
rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Get(test.Ctx(), rayClusterName, metav1.GetOptions{})
test.Expect(err).ToNot(HaveOccurred())

// Initialise raycluster client to interact with raycluster to get rayjob details using REST-API
dashboardUrl := GetDashboardUrl(test, namespace, rayCluster)
rayClusterClientConfig := RayClusterClientConfig{Address: dashboardUrl.String(), Client: nil, InsecureSkipVerify: true}
Expand Down
2 changes: 0 additions & 2 deletions tests/odh/notebook.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (
"k8s.io/apimachinery/pkg/util/yaml"
)

const recommendedTagAnnotation = "opendatahub.io/workbench-image-recommended"

var notebookResource = schema.GroupVersionResource{Group: "kubeflow.org", Version: "v1", Resource: "notebooks"}

type NotebookProps struct {
Expand Down
71 changes: 38 additions & 33 deletions tests/odh/ray_finetune_llm_deepspeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ import (
)

func TestRayFinetuneLlmDeepspeedDemoLlama_2_7b(t *testing.T) {
rayFinetuneLlmDeepspeed(t, 1, "zero_3_llama_2_7b.json")
rayFinetuneLlmDeepspeed(t, 1, "meta-llama/Llama-2-7b-chat-hf", "zero_3_llama_2_7b.json")
}
func TestRayFinetuneLlmDeepspeedDemoLlama_31_8b(t *testing.T) {
rayFinetuneLlmDeepspeed(t, 1, "zero_3_offload_optim_param.json")
rayFinetuneLlmDeepspeed(t, 1, "meta-llama/Meta-Llama-3.1-8B", "zero_3_offload_optim_param.json")
}

func rayFinetuneLlmDeepspeed(t *testing.T, numGpus int, modelConfigFile string) {
func rayFinetuneLlmDeepspeed(t *testing.T, numGpus int, modelName string, modelConfigFile string) {
test := With(t)

// Create a namespace
Expand All @@ -56,21 +56,22 @@ func rayFinetuneLlmDeepspeed(t *testing.T, numGpus int, modelConfigFile string)
"import os": "import os,time,sys",
"import sys": "!cp /opt/app-root/notebooks/* ./\\n\",\n\t\"!ls",
"from codeflare_sdk.cluster.auth import TokenAuthentication": "from codeflare_sdk.cluster.auth import TokenAuthentication\\n\",\n\t\"from codeflare_sdk.job import RayJobClient",
"token = ''": fmt.Sprintf("token = '%s'", userToken),
"server = ''": fmt.Sprintf("server = '%s'", GetOpenShiftApiUrl(test)),
"namespace='ray-finetune-llm-deepspeed'": fmt.Sprintf("namespace='%s'", namespace.Name),
"head_cpus=16": "head_cpus=2",
"head_extended_resource_requests=1": "head_extended_resource_requests=0",
"num_workers=7": "num_workers=1",
"worker_cpu_requests=16": "worker_cpu_requests=4",
"worker_cpu_limits=16": "worker_cpu_limits=4",
"worker_memory_requests=128": "worker_memory_requests=64",
"worker_memory_limits=256": "worker_memory_limits=128",
"head_memory=128": "head_memory=48",
"client = cluster.job_client": "ray_dashboard = cluster.cluster_dashboard_uri()\\n\",\n\t\"header = {\\\"Authorization\\\": \\\"Bearer " + userToken + "\\\"}\\n\",\n\t\"client = RayJobClient(address=ray_dashboard, headers=header, verify=False)\\n",
"--num-devices=8": fmt.Sprintf("--num-devices=%d", numGpus),
"--num-epochs=3": fmt.Sprintf("--num-epochs=%d", 1),
"--ds-config=./deepspeed_configs/zero_3_offload_optim+param.json": fmt.Sprintf("--ds-config=./%s \\\"\\n\",\n\t\" \\\"--lora-config=./lora.json \\\"\\n\",\n\t\" \\\"--as-test", modelConfigFile),
"token = ''": fmt.Sprintf("token = '%s'", userToken),
"server = ''": fmt.Sprintf("server = '%s'", GetOpenShiftApiUrl(test)),
"namespace='ray-finetune-llm-deepspeed'": fmt.Sprintf("namespace='%s'", namespace.Name),
"head_cpus=16": "head_cpus=2",
"head_extended_resource_requests=1": "head_extended_resource_requests=0",
"num_workers=7": "num_workers=1",
"worker_cpu_requests=16": "worker_cpu_requests=4",
"worker_cpu_limits=16": "worker_cpu_limits=4",
"worker_memory_requests=128": "worker_memory_requests=64",
"worker_memory_limits=256": "worker_memory_limits=128",
"head_memory=128": "head_memory=48",
"client = cluster.job_client": "ray_dashboard = cluster.cluster_dashboard_uri()\\n\",\n\t\"header = {\\\"Authorization\\\": \\\"Bearer " + userToken + "\\\"}\\n\",\n\t\"client = RayJobClient(address=ray_dashboard, headers=header, verify=False)\\n",
"--num-devices=8": fmt.Sprintf("--num-devices=%d", numGpus),
"--num-epochs=3": fmt.Sprintf("--num-epochs=%d", 1),
"--model-name=meta-llama/Meta-Llama-3.1-8B": fmt.Sprintf("--model-name=%s", modelName),
"--ds-config=./deepspeed_configs/zero_3_offload_optim_param.json": fmt.Sprintf("--ds-config=./%s \\\"\\n\",\n\t\" \\\"--lora-config=./lora.json \\\"\\n\",\n\t\" \\\"--as-test", modelConfigFile),
"--batch-size-per-device=32": "--batch-size-per-device=6",
"--eval-batch-size-per-device=32": "--eval-batch-size-per-device=6",
"'pip': 'requirements.txt'": "'pip': '/opt/app-root/src/requirements.txt'",
Expand All @@ -83,7 +84,6 @@ func rayFinetuneLlmDeepspeed(t *testing.T, numGpus int, modelConfigFile string)
updatedNotebookContent = strings.Replace(updatedNotebookContent, oldValue, newValue, -1)
}
updatedNotebook := []byte(updatedNotebookContent)
os.WriteFile("demo.ipynb", updatedNotebook, 0644)

// Test configuration
jupyterNotebookConfigMapFileName := "ray_finetune_llm_deepspeed.ipynb"
Expand Down Expand Up @@ -117,8 +117,6 @@ func rayFinetuneLlmDeepspeed(t *testing.T, numGpus int, modelConfigFile string)
),
)

time.Sleep(30 * time.Second)

// Fetch created raycluster
rayClusterName := "ray"
rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Get(test.Ctx(), rayClusterName, metav1.GetOptions{})
Expand All @@ -128,37 +126,44 @@ func rayFinetuneLlmDeepspeed(t *testing.T, numGpus int, modelConfigFile string)
dashboardUrl := GetDashboardUrl(test, namespace, rayCluster)
rayClusterClientConfig := RayClusterClientConfig{Address: dashboardUrl.String(), Client: nil, InsecureSkipVerify: true}
rayClient, err := NewRayClusterClient(rayClusterClientConfig, test.Config().BearerToken)
if err != nil {
test.T().Errorf("%s", err)
}
test.Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("Failed to create new raycluster client: %s", err))

// wait until rayjob exists
test.Eventually(func() []RayJobDetailsResponse {
rayJobs, err := rayClient.GetJobs()
test.Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("Failed to fetch ray-jobs : %s", err))
return *rayJobs
}, TestTimeoutMedium, 1*time.Second).Should(HaveLen(1), "Ray job not found")

// Get test job-id
jobID := GetTestJobId(test, rayClient, dashboardUrl.Host)
test.Expect(jobID).ToNot(Equal(nil))
test.Expect(jobID).ToNot(BeEmpty())

// Wait for the job to be succeeded or failed
var rayJobStatus string
fmt.Printf("Waiting for job to be Succeeded...\n")
test.T().Logf("Waiting for job to be Succeeded...\n")
test.Eventually(func() string {
resp, err := rayClient.GetJobDetails(jobID)
test.Expect(err).ToNot(HaveOccurred())
test.Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("Failed to get job details :%s", err))
rayJobStatusVal := resp.Status
if rayJobStatusVal == "SUCCEEDED" || rayJobStatusVal == "FAILED" {
fmt.Printf("JobStatus : %s\n", rayJobStatusVal)
test.T().Logf("JobStatus - %s\n", rayJobStatusVal)
rayJobStatus = rayJobStatusVal
WriteRayJobAPILogs(test, rayClient, jobID)
return rayJobStatus
}
if rayJobStatus != rayJobStatusVal && rayJobStatusVal != "SUCCEEDED" {
fmt.Printf("JobStatus : %s...\n", rayJobStatusVal)
test.T().Logf("JobStatus - %s...\n", rayJobStatusVal)
rayJobStatus = rayJobStatusVal
}
return rayJobStatus
}, TestTimeoutDouble, 3*time.Second).Should(Or(Equal("SUCCEEDED"), Equal("FAILED")), "Job did not complete within the expected time")
}, TestTimeoutDouble, 1*time.Second).Should(Or(Equal("SUCCEEDED"), Equal("FAILED")), "Job did not complete within the expected time")
// Store job logs in output directory
WriteRayJobAPILogs(test, rayClient, jobID)

// Assert ray-job status after job execution
test.Expect(rayJobStatus).To(Equal("SUCCEEDED"), "RayJob failed !")

// Make sure the RayCluster finishes and is deleted
test.Eventually(RayClusters(test, namespace.Name), TestTimeoutMedium).
Should(HaveLen(0))
test.Eventually(RayClusters(test, namespace.Name), TestTimeoutLong).
Should(BeEmpty())
}
16 changes: 3 additions & 13 deletions tests/odh/support.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,11 @@ package odh

import (
"embed"
"net/http"
"net/url"
"os"

. "github.com/onsi/gomega"
gomega "github.com/onsi/gomega"
"github.com/project-codeflare/codeflare-common/support"
. "github.com/project-codeflare/codeflare-common/support"
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
v1 "k8s.io/api/core/v1"
)
Expand All @@ -49,24 +46,17 @@ func ReadFileExt(t support.Test, fileName string) []byte {

func GetDashboardUrl(test support.Test, namespace *v1.Namespace, rayCluster *rayv1.RayCluster) *url.URL {
dashboardName := "ray-dashboard-" + rayCluster.Name
test.T().Logf("Raycluster created : %s\n", rayCluster.Name)
route := GetRoute(test, namespace.Name, dashboardName)
route := support.GetRoute(test, namespace.Name, dashboardName)
hostname := route.Status.Ingress[0].Host
dashboardUrl, _ := url.Parse("https://" + hostname)
test.T().Logf("Ray-dashboard route : %s\n", dashboardUrl.String())

return dashboardUrl
}

func GetTestJobId(test Test, rayClient RayClusterClient, hostName string) string {
listJobsReq, err := http.NewRequest("GET", "https://"+hostName+"/api/jobs/", nil)
if err != nil {
test.T().Errorf("failed to do get request: %s\n", err)
}
listJobsReq.Header.Add("Authorization", "Bearer "+test.Config().BearerToken)

func GetTestJobId(test support.Test, rayClient support.RayClusterClient, hostName string) string {
allJobsData, err := rayClient.GetJobs()
test.Expect(err).ToNot(HaveOccurred())
test.Expect(err).ToNot(gomega.HaveOccurred())

jobID := (*allJobsData)[0].SubmissionID
if len(*allJobsData) > 0 {
Expand Down

0 comments on commit 7f2dbf7

Please sign in to comment.