From 27adb956379c1c21d3536527f17d9eac46d7ad49 Mon Sep 17 00:00:00 2001 From: ish Date: Tue, 8 Oct 2024 11:16:03 +0900 Subject: [PATCH] airflow, api: rest: Add support of parsing query parameters --- lib/airflow/gusty.go | 18 ++++++-- pkg/api/rest/model/workflow.go | 83 +++++++++++++++++----------------- 2 files changed, 56 insertions(+), 45 deletions(-) diff --git a/lib/airflow/gusty.go b/lib/airflow/gusty.go index c524b6a..86da654 100644 --- a/lib/airflow/gusty.go +++ b/lib/airflow/gusty.go @@ -2,6 +2,7 @@ package airflow import ( "errors" + "fmt" "github.com/cloud-barista/cm-cicada/common" "github.com/cloud-barista/cm-cicada/db" "github.com/cloud-barista/cm-cicada/lib/config" @@ -67,12 +68,21 @@ func isTaskExist(workflow *model.Workflow, taskID string) bool { return false } -func parseEndpoint(pathParams map[string]string, endpoint string) string { - keys := reflect.ValueOf(pathParams).MapKeys() - for _, key := range keys { +func parseEndpoint(pathParams map[string]string, queryParams map[string]string, endpoint string) string { + pathParamKeys := reflect.ValueOf(pathParams).MapKeys() + for _, key := range pathParamKeys { endpoint = strings.ReplaceAll(endpoint, "{"+key.String()+"}", pathParams[key.String()]) } + queryParamKeys := reflect.ValueOf(pathParams).MapKeys() + if len(queryParamKeys) > 0 { + endpoint += "?" + for _, key := range queryParamKeys { + endpoint += fmt.Sprintf("%v=%v&", key.String(), queryParams[key.String()]) + } + endpoint = strings.TrimRight(endpoint, "&") + } + return endpoint } @@ -176,7 +186,7 @@ func writeGustyYAMLs(workflow *model.Workflow) error { } taskOptions["http_conn_id"] = taskComponent.Data.Options.APIConnectionID - taskOptions["endpoint"] = parseEndpoint(t.PathParams, taskComponent.Data.Options.Endpoint) + taskOptions["endpoint"] = parseEndpoint(t.PathParams, t.QueryParams, taskComponent.Data.Options.Endpoint) taskOptions["method"] = taskComponent.Data.Options.Method filePath = dagDir + "/" + tg.Name + "/" + t.Name + ".yml" diff --git a/pkg/api/rest/model/workflow.go b/pkg/api/rest/model/workflow.go index b7ae2f0..1712a13 100644 --- a/pkg/api/rest/model/workflow.go +++ b/pkg/api/rest/model/workflow.go @@ -13,6 +13,7 @@ type Task struct { TaskComponent string `json:"task_component" mapstructure:"task_component" validate:"required"` RequestBody string `json:"request_body" mapstructure:"request_body" validate:"required"` PathParams map[string]string `json:"path_params" mapstructure:"path_params"` + QueryParams map[string]string `json:"query_params" mapstructure:"query_params"` Dependencies []string `json:"dependencies" mapstructure:"dependencies"` } @@ -93,51 +94,51 @@ type CreateWorkflowReq struct { } type Monit struct { - WorkflowID string + WorkflowID string WorkflowVersion string - Status string - startTime time.Time - endTime time.Time - Duration time.Time - WorkflowInput string - WorkflowResult string + Status string + startTime time.Time + endTime time.Time + Duration time.Time + WorkflowInput string + WorkflowResult string } type WorkflowRun struct { - WorkflowRunID string `json:"workflow_run_id,omitempty"` - WorkflowID *string `json:"workflow_id,omitempty"` - LogicalDate string `json:"logical_date,omitempty"` - ExecutionDate time.Time `json:"execution_date,omitempty"` - StartDate time.Time `json:"start_date,omitempty"` - EndDate time.Time `json:"end_date,omitempty"` - DurationDate float64 `json:"duration_date,omitempty"` - DataIntervalStart time.Time `json:"data_interval_start,omitempty"` - DataIntervalEnd time.Time `json:"data_interval_end,omitempty"` - LastSchedulingDecision time.Time `json:"last_scheduling_decision,omitempty"` - RunType string `json:"run_type,omitempty"` - State string `json:"state,omitempty"` - ExternalTrigger *bool `json:"external_trigger,omitempty"` - Conf map[string]interface{} `json:"conf,omitempty"` - Note string `json:"note,omitempty"` + WorkflowRunID string `json:"workflow_run_id,omitempty"` + WorkflowID *string `json:"workflow_id,omitempty"` + LogicalDate string `json:"logical_date,omitempty"` + ExecutionDate time.Time `json:"execution_date,omitempty"` + StartDate time.Time `json:"start_date,omitempty"` + EndDate time.Time `json:"end_date,omitempty"` + DurationDate float64 `json:"duration_date,omitempty"` + DataIntervalStart time.Time `json:"data_interval_start,omitempty"` + DataIntervalEnd time.Time `json:"data_interval_end,omitempty"` + LastSchedulingDecision time.Time `json:"last_scheduling_decision,omitempty"` + RunType string `json:"run_type,omitempty"` + State string `json:"state,omitempty"` + ExternalTrigger *bool `json:"external_trigger,omitempty"` + Conf map[string]interface{} `json:"conf,omitempty"` + Note string `json:"note,omitempty"` } type TaskInstance struct { - WorkflowRunID string `json:"workflow_run_id,omitempty"` - WorkflowID *string `json:"workflow_id,omitempty"` - TaskID string `json:"task_id,omitempty"` - TaskName string `json:"task_name,omitempty"` - State string `json:"state,omitempty"` - StartDate time.Time `json:"start_date,omitempty"` - EndDate time.Time `json:"end_date,omitempty"` - DurationDate float64 `json:"duration_date"` + WorkflowRunID string `json:"workflow_run_id,omitempty"` + WorkflowID *string `json:"workflow_id,omitempty"` + TaskID string `json:"task_id,omitempty"` + TaskName string `json:"task_name,omitempty"` + State string `json:"state,omitempty"` + StartDate time.Time `json:"start_date,omitempty"` + EndDate time.Time `json:"end_date,omitempty"` + DurationDate float64 `json:"duration_date"` ExecutionDate time.Time `json:"execution_date,omitempty"` - TryNumber int `json:"try_number"` + TryNumber int `json:"try_number"` } type TaskInstanceReference struct { // The task ID. - TaskId *string `json:"task_id,omitempty"` - TaskName string `json:"task_name,omitempty"` + TaskId *string `json:"task_id,omitempty"` + TaskName string `json:"task_name,omitempty"` // The DAG ID. WorkflowID *string `json:"workflow_id,omitempty"` // The DAG run ID. @@ -150,13 +151,13 @@ type TaskLog struct { } type EventLog struct { - WorkflowRunID string `json:"workflow_run_id,omitempty"` - WorkflowID string `json:"workflow_id"` - TaskID string `json:"task_id"` - TaskName string `json:"task_name"` - Event string `json:"event,omitempty"` - When time.Time `json:"start_date,omitempty"` - Extra string `json:"extra,omitempty"` + WorkflowRunID string `json:"workflow_run_id,omitempty"` + WorkflowID string `json:"workflow_id"` + TaskID string `json:"task_id"` + TaskName string `json:"task_name"` + Event string `json:"event,omitempty"` + When time.Time `json:"start_date,omitempty"` + Extra string `json:"extra,omitempty"` } func (d Data) Value() (driver.Value, error) { @@ -187,4 +188,4 @@ func (d *CreateDataReq) Scan(value interface{}) error { return errors.New("Invalid type for CreateDataReq") } return json.Unmarshal(bytes, d) -} \ No newline at end of file +}