Skip to content

Commit

Permalink
airflow, api: rest: Add support of parsing query parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
ish-hcc committed Oct 8, 2024
1 parent abd561d commit 27adb95
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 45 deletions.
18 changes: 14 additions & 4 deletions lib/airflow/gusty.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package airflow

import (
"errors"
"fmt"
"github.com/cloud-barista/cm-cicada/common"
"github.com/cloud-barista/cm-cicada/db"
"github.com/cloud-barista/cm-cicada/lib/config"
Expand Down Expand Up @@ -67,12 +68,21 @@ func isTaskExist(workflow *model.Workflow, taskID string) bool {
return false
}

func parseEndpoint(pathParams map[string]string, endpoint string) string {
keys := reflect.ValueOf(pathParams).MapKeys()
for _, key := range keys {
func parseEndpoint(pathParams map[string]string, queryParams map[string]string, endpoint string) string {
pathParamKeys := reflect.ValueOf(pathParams).MapKeys()
for _, key := range pathParamKeys {
endpoint = strings.ReplaceAll(endpoint, "{"+key.String()+"}", pathParams[key.String()])
}

queryParamKeys := reflect.ValueOf(pathParams).MapKeys()
if len(queryParamKeys) > 0 {
endpoint += "?"
for _, key := range queryParamKeys {
endpoint += fmt.Sprintf("%v=%v&", key.String(), queryParams[key.String()])
}
endpoint = strings.TrimRight(endpoint, "&")
}

return endpoint
}

Expand Down Expand Up @@ -176,7 +186,7 @@ func writeGustyYAMLs(workflow *model.Workflow) error {
}

taskOptions["http_conn_id"] = taskComponent.Data.Options.APIConnectionID
taskOptions["endpoint"] = parseEndpoint(t.PathParams, taskComponent.Data.Options.Endpoint)
taskOptions["endpoint"] = parseEndpoint(t.PathParams, t.QueryParams, taskComponent.Data.Options.Endpoint)
taskOptions["method"] = taskComponent.Data.Options.Method

filePath = dagDir + "/" + tg.Name + "/" + t.Name + ".yml"
Expand Down
83 changes: 42 additions & 41 deletions pkg/api/rest/model/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Task struct {
TaskComponent string `json:"task_component" mapstructure:"task_component" validate:"required"`
RequestBody string `json:"request_body" mapstructure:"request_body" validate:"required"`
PathParams map[string]string `json:"path_params" mapstructure:"path_params"`
QueryParams map[string]string `json:"query_params" mapstructure:"query_params"`
Dependencies []string `json:"dependencies" mapstructure:"dependencies"`
}

Expand Down Expand Up @@ -93,51 +94,51 @@ type CreateWorkflowReq struct {
}

type Monit struct {
WorkflowID string
WorkflowID string
WorkflowVersion string
Status string
startTime time.Time
endTime time.Time
Duration time.Time
WorkflowInput string
WorkflowResult string
Status string
startTime time.Time
endTime time.Time
Duration time.Time
WorkflowInput string
WorkflowResult string
}

type WorkflowRun struct {
WorkflowRunID string `json:"workflow_run_id,omitempty"`
WorkflowID *string `json:"workflow_id,omitempty"`
LogicalDate string `json:"logical_date,omitempty"`
ExecutionDate time.Time `json:"execution_date,omitempty"`
StartDate time.Time `json:"start_date,omitempty"`
EndDate time.Time `json:"end_date,omitempty"`
DurationDate float64 `json:"duration_date,omitempty"`
DataIntervalStart time.Time `json:"data_interval_start,omitempty"`
DataIntervalEnd time.Time `json:"data_interval_end,omitempty"`
LastSchedulingDecision time.Time `json:"last_scheduling_decision,omitempty"`
RunType string `json:"run_type,omitempty"`
State string `json:"state,omitempty"`
ExternalTrigger *bool `json:"external_trigger,omitempty"`
Conf map[string]interface{} `json:"conf,omitempty"`
Note string `json:"note,omitempty"`
WorkflowRunID string `json:"workflow_run_id,omitempty"`
WorkflowID *string `json:"workflow_id,omitempty"`
LogicalDate string `json:"logical_date,omitempty"`
ExecutionDate time.Time `json:"execution_date,omitempty"`
StartDate time.Time `json:"start_date,omitempty"`
EndDate time.Time `json:"end_date,omitempty"`
DurationDate float64 `json:"duration_date,omitempty"`
DataIntervalStart time.Time `json:"data_interval_start,omitempty"`
DataIntervalEnd time.Time `json:"data_interval_end,omitempty"`
LastSchedulingDecision time.Time `json:"last_scheduling_decision,omitempty"`
RunType string `json:"run_type,omitempty"`
State string `json:"state,omitempty"`
ExternalTrigger *bool `json:"external_trigger,omitempty"`
Conf map[string]interface{} `json:"conf,omitempty"`
Note string `json:"note,omitempty"`
}

type TaskInstance struct {
WorkflowRunID string `json:"workflow_run_id,omitempty"`
WorkflowID *string `json:"workflow_id,omitempty"`
TaskID string `json:"task_id,omitempty"`
TaskName string `json:"task_name,omitempty"`
State string `json:"state,omitempty"`
StartDate time.Time `json:"start_date,omitempty"`
EndDate time.Time `json:"end_date,omitempty"`
DurationDate float64 `json:"duration_date"`
WorkflowRunID string `json:"workflow_run_id,omitempty"`
WorkflowID *string `json:"workflow_id,omitempty"`
TaskID string `json:"task_id,omitempty"`
TaskName string `json:"task_name,omitempty"`
State string `json:"state,omitempty"`
StartDate time.Time `json:"start_date,omitempty"`
EndDate time.Time `json:"end_date,omitempty"`
DurationDate float64 `json:"duration_date"`
ExecutionDate time.Time `json:"execution_date,omitempty"`
TryNumber int `json:"try_number"`
TryNumber int `json:"try_number"`
}

type TaskInstanceReference struct {
// The task ID.
TaskId *string `json:"task_id,omitempty"`
TaskName string `json:"task_name,omitempty"`
TaskId *string `json:"task_id,omitempty"`
TaskName string `json:"task_name,omitempty"`
// The DAG ID.
WorkflowID *string `json:"workflow_id,omitempty"`
// The DAG run ID.
Expand All @@ -150,13 +151,13 @@ type TaskLog struct {
}

type EventLog struct {
WorkflowRunID string `json:"workflow_run_id,omitempty"`
WorkflowID string `json:"workflow_id"`
TaskID string `json:"task_id"`
TaskName string `json:"task_name"`
Event string `json:"event,omitempty"`
When time.Time `json:"start_date,omitempty"`
Extra string `json:"extra,omitempty"`
WorkflowRunID string `json:"workflow_run_id,omitempty"`
WorkflowID string `json:"workflow_id"`
TaskID string `json:"task_id"`
TaskName string `json:"task_name"`
Event string `json:"event,omitempty"`
When time.Time `json:"start_date,omitempty"`
Extra string `json:"extra,omitempty"`
}

func (d Data) Value() (driver.Value, error) {
Expand Down Expand Up @@ -187,4 +188,4 @@ func (d *CreateDataReq) Scan(value interface{}) error {
return errors.New("Invalid type for CreateDataReq")
}
return json.Unmarshal(bytes, d)
}
}

0 comments on commit 27adb95

Please sign in to comment.