Skip to content

Commit

Permalink
Rename uri and update some json attributes
Browse files Browse the repository at this point in the history
Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia committed Dec 17, 2024
1 parent d79c042 commit fe35d45
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 21 deletions.
41 changes: 24 additions & 17 deletions client/bulkwriter/bulk_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ import (
"net/http"
)

// ResponseBase is the common milvus restful response struct.
type ResponseBase struct {
Status int `json:"status"`
Message string `json:"message"`
}

// CheckStatus checks the response status and return error if not ok.
func (b ResponseBase) CheckStatus() error {
if b.Status != 0 {
return fmt.Errorf("bulk import return error, status: %d, message: %s", b.Status, b.Message)
Expand All @@ -39,9 +41,10 @@ func (b ResponseBase) CheckStatus() error {

type BulkImportOption struct {
// milvus params
URI string `json:"-"`
CollectionName string `json:"collectionName"`
Files [][]string `json:"files"`
URL string `json:"-"`
CollectionName string `json:"collectionName"`
// optional in cloud api, use object url instead
Files [][]string `json:"files,omitempty"`
// optional params
PartitionName string `json:"partitionName,omitempty"`
APIKey string `json:"-"`
Expand Down Expand Up @@ -77,12 +80,13 @@ func (opt *BulkImportOption) WithOption(key, value string) *BulkImportOption {
return opt
}

// NewBulkImportOption returns BulkImportOption for Milvus bulk import API.
func NewBulkImportOption(uri string,
collectionName string,
files [][]string,
) *BulkImportOption {
return &BulkImportOption{
URI: uri,
URL: uri,
CollectionName: collectionName,
Files: files,
}
Expand All @@ -91,17 +95,15 @@ func NewBulkImportOption(uri string,
// NewCloudBulkImportOption returns import option for cloud import API.
func NewCloudBulkImportOption(uri string,
collectionName string,
files [][]string,
apiKey string,
objectURL string,
clusterID string,
accessKey string,
secretKey string,
) *BulkImportOption {
return &BulkImportOption{
URI: uri,
URL: uri,
CollectionName: collectionName,
Files: files,
APIKey: apiKey,
ObjectURL: objectURL,
ClusterID: clusterID,
Expand All @@ -119,7 +121,7 @@ type BulkImportResponse struct {

// BulkImport is the API wrapper for restful import API.
func BulkImport(ctx context.Context, option *BulkImportOption) (*BulkImportResponse, error) {
url := option.URI + "/v2/vectordb/jobs/import/create"
url := option.URL + "/v2/vectordb/jobs/import/create"
bs, err := option.GetRequest()
if err != nil {
return nil, err
Expand All @@ -142,12 +144,17 @@ func BulkImport(ctx context.Context, option *BulkImportOption) (*BulkImportRespo
}

type ListImportJobsOption struct {
URI string `json:"-"`
URL string `json:"-"`
CollectionName string `json:"collectionName"`
ClusterID string `json:"clusterId,omitempty"`
APIKey string `json:"-"`
PageSize int `json:"pageSize"`
CurrentPage int `json:"currentPage"`
PageSize int `json:"pageSize,omitempty"`
CurrentPage int `json:"currentPage,omitempty"`
}

func (opt *ListImportJobsOption) WithAPIKey(key string) *ListImportJobsOption {
opt.APIKey = key
return opt
}

func (opt *ListImportJobsOption) WithPageSize(pageSize int) *ListImportJobsOption {
Expand All @@ -166,7 +173,7 @@ func (opt *ListImportJobsOption) GetRequest() ([]byte, error) {

func NewListImportJobsOption(uri string, collectionName string) *ListImportJobsOption {
return &ListImportJobsOption{
URI: uri,
URL: uri,
CollectionName: collectionName,
CurrentPage: 1,
PageSize: 10,
Expand All @@ -191,7 +198,7 @@ type ImportJobRecord struct {
}

func ListImportJobs(ctx context.Context, option *ListImportJobsOption) (*ListImportJobsResponse, error) {
url := option.URI + "/v2/vectordb/jobs/import/list"
url := option.URL + "/v2/vectordb/jobs/import/list"
bs, err := option.GetRequest()
if err != nil {
return nil, err
Expand All @@ -214,7 +221,7 @@ func ListImportJobs(ctx context.Context, option *ListImportJobsOption) (*ListImp
}

type GetImportProgressOption struct {
URI string `json:"-"`
URL string `json:"-"`
JobID string `json:"jobId"`
// optional
ClusterID string `json:"clusterId"`
Expand All @@ -232,14 +239,14 @@ func (opt *GetImportProgressOption) WithAPIKey(key string) *GetImportProgressOpt

func NewGetImportProgressOption(uri string, jobID string) *GetImportProgressOption {
return &GetImportProgressOption{
URI: uri,
URL: uri,
JobID: jobID,
}
}

func NewCloudGetImportProgressOption(uri string, jobID string, apiKey string, clusterID string) *GetImportProgressOption {
return &GetImportProgressOption{
URI: uri,
URL: uri,
JobID: jobID,
APIKey: apiKey,
ClusterID: clusterID,
Expand Down Expand Up @@ -275,7 +282,7 @@ type ImportProgressDetail struct {
}

func GetImportProgress(ctx context.Context, option *GetImportProgressOption) (*GetImportProgressResponse, error) {
url := option.URI + "/v2/vectordb/jobs/import/describe"
url := option.URL + "/v2/vectordb/jobs/import/describe"

bs, err := option.GetRequest()
if err != nil {
Expand Down
35 changes: 31 additions & 4 deletions client/bulkwriter/bulk_import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,27 @@ type BulkImportSuite struct {
func (s *BulkImportSuite) TestBulkImport() {
s.Run("normal_case", func() {
svr := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
authHeader := req.Header.Get("Authorization")
s.Equal("Bearer root:Milvus", authHeader)
s.True(strings.Contains(req.URL.Path, "/v2/vectordb/jobs/import/create"))
rw.Write([]byte(`{"status":0, "data":{"jobId": "123"}}`))
}))
defer svr.Close()

resp, err := BulkImport(context.Background(), NewBulkImportOption(svr.URL, "hello_milvus", [][]string{{"files/a.json", "files/b.json"}}))
resp, err := BulkImport(context.Background(),
NewBulkImportOption(svr.URL, "hello_milvus", [][]string{{"files/a.json", "files/b.json"}}).
WithPartition("_default").
WithOption("backup", "true").
WithAPIKey("root:Milvus"),
)
s.NoError(err)
s.EqualValues(0, resp.Status)
s.Equal("123", resp.Data.JobID)
})

s.Run("svr_error", func() {
svr := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
// rw.
rw.WriteHeader(http.StatusInternalServerError)
rw.Write([]byte(`interal server error`))
}))
Expand All @@ -58,24 +66,38 @@ func (s *BulkImportSuite) TestBulkImport() {
s.Run("status_error", func() {
svr := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
s.True(strings.Contains(req.URL.Path, "/v2/vectordb/jobs/import/create"))
rw.Write([]byte(`{"status":1100, "message": "import job failed"`))
rw.Write([]byte(`{"status":1100, "message": "import job failed"}`))
}))
defer svr.Close()

_, err := BulkImport(context.Background(), NewBulkImportOption(svr.URL, "hello_milvus", [][]string{{"files/a.json", "files/b.json"}}))
s.Error(err)
})

s.Run("server_closed", func() {
svr2 := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}))
svr2.Close()
_, err := BulkImport(context.Background(), NewBulkImportOption(svr2.URL, "hello_milvus", [][]string{{"files/a.json", "files/b.json"}}))
s.Error(err)
})
}

func (s *BulkImportSuite) TestListImportJobs() {
s.Run("normal_case", func() {
svr := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
authHeader := req.Header.Get("Authorization")
s.Equal("Bearer root:Milvus", authHeader)
s.True(strings.Contains(req.URL.Path, "/v2/vectordb/jobs/import/list"))
rw.Write([]byte(`{"status":0, "data":{"records": [{"jobID": "abc", "collectionName": "hello_milvus", "state":"Importing", "progress": 50}]}}`))
}))
defer svr.Close()

resp, err := ListImportJobs(context.Background(), NewListImportJobsOption(svr.URL, "hello_milvus"))
resp, err := ListImportJobs(context.Background(),
NewListImportJobsOption(svr.URL, "hello_milvus").
WithPageSize(10).
WithCurrentPage(1).
WithAPIKey("root:Milvus"),
)
s.NoError(err)
s.EqualValues(0, resp.Status)
if s.Len(resp.Data.Records, 1) {
Expand All @@ -101,12 +123,17 @@ func (s *BulkImportSuite) TestListImportJobs() {
func (s *BulkImportSuite) TestGetImportProgress() {
s.Run("normal_case", func() {
svr := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
authHeader := req.Header.Get("Authorization")
s.Equal("Bearer root:Milvus", authHeader)
s.True(strings.Contains(req.URL.Path, "/v2/vectordb/jobs/import/describe"))
rw.Write([]byte(`{"status":0, "data":{"collectionName": "hello_milvus","jobId":"abc", "state":"Importing", "progress": 50, "importedRows": 20000,"totalRows": 40000, "details":[{"fileName": "files/a.json", "fileSize": 64312, "progress": 100, "state": "Completed"}, {"fileName":"files/b.json", "fileSize":52912, "progress":0, "state":"Importing"}]}}`))
}))
defer svr.Close()

resp, err := GetImportProgress(context.Background(), NewGetImportProgressOption(svr.URL, "abc"))
resp, err := GetImportProgress(context.Background(),
NewGetImportProgressOption(svr.URL, "abc").
WithAPIKey("root:Milvus"),
)
s.NoError(err)
s.EqualValues(0, resp.Status)
s.Equal("hello_milvus", resp.Data.CollectionName)
Expand Down

0 comments on commit fe35d45

Please sign in to comment.