From 29f283440075f21dec600ccb52f50df4f395e7da Mon Sep 17 00:00:00 2001 From: yby654 Date: Fri, 1 Nov 2024 03:17:40 +0000 Subject: [PATCH 1/4] =?UTF-8?q?workflow=20version=20=EB=B0=8F=20email=20?= =?UTF-8?q?=EC=95=8C=EB=9E=8C=20=EA=B8=B0=EB=8A=A5=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- _airflow/airflow-home/dags/complex.py | 91 +++++++++++ _airflow/airflow-home/dags/dags.py | 17 +- _airflow/airflow-home/dags/example2.py | 61 +++++++ _airflow/airflow-home/dags/mail.py | 93 +++++++++++ _airflow/airflow-home/dags/utils.py | 13 ++ _airflow/docker-compose.yml | 6 + dao/workflow.go | 59 ++++++- db/sqlite.go | 8 +- lib/airflow/conneciton.go | 6 +- .../task_component/collect_failed_tasks.json | 12 ++ .../example/task_component/send_email.json | 13 ++ .../example/task_component/trigger_email.json | 20 +++ lib/airflow/gusty.go | 60 +++---- pkg/api/rest/controller/workflow.go | 65 ++++++++ pkg/api/rest/docs/docs.go | 149 +++++++++++++++++- pkg/api/rest/docs/swagger.json | 149 +++++++++++++++++- pkg/api/rest/docs/swagger.yaml | 101 +++++++++++- pkg/api/rest/model/connection.go | 1 + pkg/api/rest/model/taskComponent.go | 5 +- pkg/api/rest/model/workflow.go | 78 +++++++-- pkg/api/rest/route/workflow.go | 2 + 21 files changed, 954 insertions(+), 55 deletions(-) create mode 100644 _airflow/airflow-home/dags/complex.py create mode 100644 _airflow/airflow-home/dags/example2.py create mode 100644 _airflow/airflow-home/dags/mail.py create mode 100644 _airflow/airflow-home/dags/utils.py create mode 100644 lib/airflow/example/task_component/collect_failed_tasks.json create mode 100644 lib/airflow/example/task_component/send_email.json create mode 100644 lib/airflow/example/task_component/trigger_email.json diff --git a/_airflow/airflow-home/dags/complex.py b/_airflow/airflow-home/dags/complex.py new file mode 100644 index 0000000..6b91b1e --- /dev/null +++ b/_airflow/airflow-home/dags/complex.py @@ -0,0 +1,91 @@ +from __future__ import annotations +import pendulum +from airflow.models import DAG +from airflow.operators.empty import EmptyOperator +from airflow.operators.bash_operator import BashOperator +from airflow.operators.python import PythonOperator +from airflow.operators.email import EmailOperator +from airflow.utils.state import State +from airflow.utils.task_group import TaskGroup +from airflow.utils.dates import days_ago + +def collect_failed_tasks(**context): + dag_run = context.get('dag_run') + ti = context.get('ti') + + failed_tasks = [] + for task_instance in dag_run.get_task_instances(): + if task_instance.state == State.FAILED: + failed_tasks.append(task_instance.task_id) + + # Push DAG state and failed task list to XCom + ti.xcom_push(key='dag_state', value=dag_run.get_state()) + ti.xcom_push(key='failed_tasks', value=failed_tasks) + +with DAG( + dag_id="example_task_group", + schedule=None, + start_date=days_ago(1), + catchup=False, + tags=["example"], +) as dag: + + with TaskGroup("main_flow", tooltip="Main workflow sequence") as main_flow: + start = EmptyOperator(task_id="start") + + with TaskGroup("section_1", tooltip="Tasks for section_1") as section_1: + task_1 = EmptyOperator(task_id="task_1") + task_2 = BashOperator(task_id="task_2", bash_command="echo 1") + task_3 = EmptyOperator(task_id="task_3") + task_1 >> [task_2, task_3] + + with TaskGroup("section_2", tooltip="Tasks for section_2 (designed to fail)") as section_2: + fail_task_1 = BashOperator(task_id="fail_task_1", bash_command="exit 1") + fail_task_2 = BashOperator(task_id="fail_task_2", bash_command="exit 1") + fail_task_1 >> fail_task_2 + + with TaskGroup("section_3", tooltip="Tasks for section_3") as section_3: + task_1 = EmptyOperator(task_id="task_1") + + with TaskGroup("inner_section_3", tooltip="Tasks for inner_section3") as inner_section_3: + task_2 = BashOperator(task_id="task_2", bash_command="echo 1") + task_3 = EmptyOperator(task_id="task_3") + task_4 = EmptyOperator(task_id="task_4") + [task_2, task_3] >> task_4 + + # 모든 작업이 완료되면 end task로 연결되도록 설정 + end = EmptyOperator(task_id="end") + start >> section_1 >> section_2 >> section_3 >> end + + with TaskGroup("notification_flow", tooltip="Notification and Failure Collection") as notification_flow: + collect_failed = PythonOperator( + task_id="collect_failed_tasks", + python_callable=collect_failed_tasks, + provide_context=True, + trigger_rule="all_done" + ) + + notify_email = EmailOperator( + task_id="send_email", + to="yby987654@gmail.com", + subject="Workflow Execution Report : {{ dag.dag_id }} ", + html_content="""

Workflow Execution Complete

+

Workflow ID: {{ dag.dag_id }}

+

Workflow Run ID: {{ run_id }}

+

Workflow 상태: {{ ti.xcom_pull(task_ids='collect_failed_tasks', key='dag_state') }}

+ + {% set failed_tasks = ti.xcom_pull(task_ids='collect_failed_tasks', key='failed_tasks', default=[]) %} + + {% if failed_tasks %} +

실패한 Tasks: {{ failed_tasks }}

+

Task Try Number: {{ ti.try_number }}

+ {% else %} +

모든 Tasks가 성공적으로 완료되었습니다.

+ {% endif %}""", + trigger_rule="all_done" + ) + + collect_failed >> notify_email + + # TaskGroup 순서 설정 + main_flow >> notification_flow diff --git a/_airflow/airflow-home/dags/dags.py b/_airflow/airflow-home/dags/dags.py index 80f594b..f10fdc3 100644 --- a/_airflow/airflow-home/dags/dags.py +++ b/_airflow/airflow-home/dags/dags.py @@ -1,6 +1,8 @@ import os import airflow from gusty import create_dag +from utils import collect_failed_tasks +from airflow.utils.state import State ##################### ## DAG Directories ## @@ -23,4 +25,17 @@ tags = ['default', 'tags'], task_group_defaults={"tooltip": "default tooltip"}, wait_for_defaults={"retries": 10, "check_existence": True}, - latest_only=False) \ No newline at end of file + latest_only=False) + + +def collect_failed_tasks(**context): + dag_run = context['dag_run'] + failed_tasks = [] + + for task_instance in dag_run.get_task_instances(): + if task_instance.state == State.FAILED: + failed_tasks.append(task_instance.task_id) + + # DAG 상태 및 실패한 작업 목록을 XCom에 푸시 + context['ti'].xcom_push(key='dag_state', value=dag_run.get_state()) + context['ti'].xcom_push(key='failed_tasks', value=failed_tasks) \ No newline at end of file diff --git a/_airflow/airflow-home/dags/example2.py b/_airflow/airflow-home/dags/example2.py new file mode 100644 index 0000000..a80d4c7 --- /dev/null +++ b/_airflow/airflow-home/dags/example2.py @@ -0,0 +1,61 @@ +from __future__ import annotations +import pendulum +from airflow.models import DAG +from airflow.operators.empty import EmptyOperator +from airflow.operators.bash_operator import BashOperator +from airflow.operators.python import PythonOperator +from airflow.operators.email import EmailOperator +from airflow.utils.state import State +from airflow.utils.task_group import TaskGroup +from airflow.utils.dates import days_ago +from airflow.operators.trigger_dagrun import TriggerDagRunOperator +with DAG( + dag_id="example_task_group2", + schedule=None, + start_date=days_ago(1), + catchup=False, + tags=["example"], +) as dag: + + start = EmptyOperator(task_id="start") + + with TaskGroup("section_1", tooltip="Tasks for section_1") as section_1: + task_1 = EmptyOperator(task_id="task_1") + task_2 = BashOperator(task_id="task_2", bash_command="echo 1") + task_3 = EmptyOperator(task_id="task_3") + task_1 >> [task_2, task_3] + + with TaskGroup("section_2", tooltip="Tasks for section_2 (designed to fail)") as section_2: + fail_task_1 = BashOperator(task_id="fail_task_1", bash_command="exit 1") + fail_task_2 = BashOperator(task_id="fail_task_2", bash_command="exit 1") + fail_task_1 >> fail_task_2 + + with TaskGroup("section_3", tooltip="Tasks for section_3") as section_3: + task_1 = EmptyOperator(task_id="task_1") + + with TaskGroup("inner_section_3", tooltip="Tasks for inner_section3") as inner_section_3: + task_2 = BashOperator(task_id="task_2", bash_command="echo 1") + task_3 = EmptyOperator(task_id="task_3") + task_4 = EmptyOperator(task_id="task_4") + [task_2, task_3] >> task_4 + + # 모든 작업이 완료되면 end task로 연결되도록 설정 + end = EmptyOperator(task_id="end") + + trigger = TriggerDagRunOperator( + trigger_dag_id='monitor_dag', + task_id='trigger', + execution_date='{{ execution_date }}', + wait_for_completion=False, + conf= { + "source_dag_id": "{{ dag.dag_id }}", # 현재 DAG의 dag_id + "source_dag_run_id": "{{ dag_run.run_id }}" # 현재 DAG의 dag_run_id + }, + poke_interval=30, + reset_dag_run=True, + # trigger_rule="all_done" + trigger_rule="one_done" + ) + start >> section_1 >> section_2 >> section_3 >> end >> trigger + + diff --git a/_airflow/airflow-home/dags/mail.py b/_airflow/airflow-home/dags/mail.py new file mode 100644 index 0000000..3217a8d --- /dev/null +++ b/_airflow/airflow-home/dags/mail.py @@ -0,0 +1,93 @@ +from airflow.models import DAG, DagRun +from airflow.operators.python import PythonOperator +from airflow.operators.email import EmailOperator +from airflow.utils.dates import days_ago +from airflow.utils.state import State + +# 실패한 태스크 수집 함수 +def collect_failed_tasks(**context): + from airflow.utils.db import provide_session + + @provide_session + def _inner(session=None): + # conf에서 전달받은 dag_id와 dag_run_id + source_dag_id = context['dag_run'].conf.get('source_dag_id') + source_dag_run_id = context['dag_run'].conf.get('source_dag_run_id') + + if not source_dag_id or not source_dag_run_id: + raise ValueError("source_dag_id와 source_dag_run_id가 전달되지 않았습니다.") + + # source_dag_id와 source_dag_run_id를 이용해 DagRun 정보 가져오기 + source_dag_run = session.query(DagRun).filter_by( + dag_id=source_dag_id, + run_id=source_dag_run_id + ).first() + + if not source_dag_run: + raise ValueError("해당하는 DAG Run을 찾을 수 없습니다.") + + # 실패한 태스크 ID 목록 수집 + failed_tasks = [] + for task_instance in source_dag_run.get_task_instances(): + if task_instance.state != State.SUCCESS: + failed_tasks.append(task_instance.task_id) + + # 결과 반환 + return { + "dag_id": source_dag_id, + "dag_run_id": source_dag_run_id, + "dag_state": source_dag_run.state, + "failed_tasks": failed_tasks + } + + return _inner() + +# DAG 정의 +with DAG( + dag_id="monitor_dag", + default_args={'start_date': days_ago(1)}, + schedule_interval=None, +) as dag: + + # 실패한 태스크 수집 태스크 + collect_task = PythonOperator( + task_id='collect_failed_tasks', + python_callable=collect_failed_tasks, + provide_context=True, + ) + + # EmailOperator 설정 + email_task = EmailOperator( + task_id='send_email', + to='yby987654@gmail.com', + subject='DAG 상태 보고서', + html_content="""

Workflow Execution Complete

+

Workflow ID: {{ ti.xcom_pull(task_ids='collect_failed_tasks').get('dag_id') }}

+

Workflow Run ID: {{ ti.xcom_pull(task_ids='collect_failed_tasks').get('dag_run_id') }}

+

Workflow 상태: {{ ti.xcom_pull(task_ids='collect_failed_tasks').get('dag_state') }}

+ {% if ti.xcom_pull(task_ids='collect_failed_tasks').get('failed_tasks') | length == 0 %} +

모든 Tasks가 성공적으로 완료되었습니다.

+ {% else %} +

실패한 Tasks: {{ ti.xcom_pull(task_ids='collect_failed_tasks').get('failed_tasks') }}

+ {% endif %} + """ + #params={}, # Initial empty params + ) + + # collect_task의 반환 값을 email_task에 전달하기 위한 PythonOperator 후크 + def set_email_params(ti, **context): + # Pull the result from the previous task + task_result = ti.xcom_pull(task_ids='collect_failed_tasks') + if task_result: + # Update the email_task.params directly + email_task.params.update(task_result) + print( "params : " , email_task.params) + + update_email_params = PythonOperator( + task_id="update_email_params", + python_callable=set_email_params, + provide_context=True, + ) + + # 의존성 설정 + collect_task >> update_email_params >> email_task diff --git a/_airflow/airflow-home/dags/utils.py b/_airflow/airflow-home/dags/utils.py new file mode 100644 index 0000000..305664a --- /dev/null +++ b/_airflow/airflow-home/dags/utils.py @@ -0,0 +1,13 @@ +from airflow.utils.state import State + +def collect_failed_tasks(**context): + dag_run = context['dag_run'] + failed_tasks = [] + + for task_instance in dag_run.get_task_instances(): + if task_instance.state == State.FAILED: + failed_tasks.append(task_instance.task_id) + + # DAG 상태 및 실패한 작업 목록을 XCom에 푸시 + context['ti'].xcom_push(key='dag_state', value=dag_run.get_state()) + context['ti'].xcom_push(key='failed_tasks', value=failed_tasks) \ No newline at end of file diff --git a/_airflow/docker-compose.yml b/_airflow/docker-compose.yml index 2be5b04..be48a25 100644 --- a/_airflow/docker-compose.yml +++ b/_airflow/docker-compose.yml @@ -27,6 +27,12 @@ services: container_name: airflow-server image: cloudbaristaorg/airflow-server:edge restart: always + environment: + AIRFLOW__SMTP__SMTP_HOST: 'smtp.gmail.com' + AIRFLOW__SMTP__SMTP_USER: 'yby654321@gmail.com' + AIRFLOW__SMTP__SMTP_PASSWORD: 'wtknvaprkkwyaurd' + AIRFLOW__SMTP__SMTP_PORT: 587 + AIRFLOW__SMTP__SMTP_MAIL_FROM: 'yby654321@gmail.com' command: > /bin/bash -c " # Wait for MySQL diff --git a/dao/workflow.go b/dao/workflow.go index 952d26d..a080f99 100644 --- a/dao/workflow.go +++ b/dao/workflow.go @@ -2,10 +2,11 @@ package dao import ( "errors" + "time" + "github.com/cloud-barista/cm-cicada/db" "github.com/cloud-barista/cm-cicada/pkg/api/rest/model" "gorm.io/gorm" - "time" ) func WorkflowCreate(workflow *model.Workflow) (*model.Workflow, error) { @@ -118,3 +119,59 @@ func WorkflowDelete(workflow *model.Workflow) error { return nil } + +func WorkflowVersionGetList(workflowVersion *model.WorkflowVersion, page int, row int) (*[]model.WorkflowVersion, error) { + WorkflowVersionList := &[]model.WorkflowVersion{} + // Ensure db.DB is not nil to avoid runtime panics + if db.DB == nil { + return nil, errors.New("database connection is not initialized") + } + + result := db.DB.Scopes(func(d *gorm.DB) *gorm.DB { + var filtered = d + + if len(workflowVersion.WorkflowID) != 0 { + filtered = filtered.Where("workflowId LIKE ?", "%"+workflowVersion.WorkflowID+"%") + } + + if page != 0 && row != 0 { + offset := (page - 1) * row + + return filtered.Offset(offset).Limit(row) + } else if row != 0 && page == 0 { + filtered.Error = errors.New("row is not 0 but page is 0") + return filtered + } else if page != 0 && row == 0 { + filtered.Error = errors.New("page is not 0 but row is 0") + return filtered + } + return filtered + }).Find(WorkflowVersionList) + + err := result.Error + if err != nil { + return nil, err + } + + return WorkflowVersionList, nil +} + +func WorkflowVersionGet(id string, wkId string) (*model.WorkflowVersion, error) { + workflowVersion := &model.WorkflowVersion{} + + // Ensure db.DB is not nil to avoid runtime panics + if db.DB == nil { + return nil, errors.New("database connection is not initialized") + } + + result := db.DB.Where("id = ? and workflowId = ?", id, wkId).First(workflowVersion) + err := result.Error + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil, errors.New("workflow not found with the provided id") + } + return nil, err + } + + return workflowVersion, nil +} diff --git a/db/sqlite.go b/db/sqlite.go index 31bf586..9742fab 100644 --- a/db/sqlite.go +++ b/db/sqlite.go @@ -1,13 +1,14 @@ package db import ( + "strconv" + "github.com/cloud-barista/cm-cicada/common" "github.com/cloud-barista/cm-cicada/lib/config" "github.com/cloud-barista/cm-cicada/pkg/api/rest/model" "github.com/glebarez/sqlite" "github.com/jollaman999/utils/logger" "gorm.io/gorm" - "strconv" ) var DB *gorm.DB @@ -40,6 +41,11 @@ func Open() error { logger.Panicln(logger.ERROR, true, err) } + err = DB.AutoMigrate(&model.WorkflowVersion{}) + if err != nil { + logger.Panicln(logger.ERROR, true, err) + } + err = DB.AutoMigrate(&model.Workflow{}) if err != nil { logger.Panicln(logger.ERROR, true, err) diff --git a/lib/airflow/conneciton.go b/lib/airflow/conneciton.go index f494993..5a040f1 100644 --- a/lib/airflow/conneciton.go +++ b/lib/airflow/conneciton.go @@ -2,6 +2,7 @@ package airflow import ( "errors" + "github.com/apache/airflow-client-go/airflow" "github.com/cloud-barista/cm-cicada/pkg/api/rest/model" "github.com/jollaman999/utils/logger" @@ -26,6 +27,9 @@ func (client *client) RegisterConnection(connection *model.Connection) error { port := airflow.NullableInt32{} port.Set(&connection.Port) + extra := airflow.NullableString{} + extra.Set(&connection.Extra) + conn := airflow.Connection{ ConnectionId: &connection.ID, ConnType: &connection.Type, @@ -35,7 +39,7 @@ func (client *client) RegisterConnection(connection *model.Connection) error { Schema: schema, Port: port, Password: &connection.Password, - Extra: airflow.NullableString{}, + Extra: extra, } _, _ = client.api.ConnectionApi.DeleteConnection(ctx, connection.ID).Execute() diff --git a/lib/airflow/example/task_component/collect_failed_tasks.json b/lib/airflow/example/task_component/collect_failed_tasks.json new file mode 100644 index 0000000..1f441e4 --- /dev/null +++ b/lib/airflow/example/task_component/collect_failed_tasks.json @@ -0,0 +1,12 @@ +{ + "name": "collect_failed_tasks", + "description": "collect failed tasks Info", + "data": { + "extra": { + "operator": "airflow.operators.python.PythonOperator", + "python_callable": "collect_failed_tasks", + "provide_context": true, + "trigger_rule": "all_done" + } + } +} \ No newline at end of file diff --git a/lib/airflow/example/task_component/send_email.json b/lib/airflow/example/task_component/send_email.json new file mode 100644 index 0000000..21bab67 --- /dev/null +++ b/lib/airflow/example/task_component/send_email.json @@ -0,0 +1,13 @@ +{ + "name": "send_email", + "description": "smtp send email", + "data": { + "extra": { + "operator": "airflow.operators.email.EmailOperator", + "to": "yby987654@gmail.com", + "subject": "Workflow Execution Report : {{ dag.dag_id }}", + "html_content": "

Workflow Execution Complete

Workflow ID: {{ dag.dag_id }}

Workflow Run ID: {{ run_id }}

Workflow 상태: {{ ti.xcom_pull(task_ids='collect_failed_tasks', key='dag_state') }}

{% if ti.xcom_pull(task_ids='collect_failed_tasks', key='failed_tasks') %}

실패한 Tasks: {{ ti.xcom_pull(task_ids='collect_failed_tasks', key='failed_tasks') }}

Task Try Number: {{ ti.try_number }}

{% else %}

모든 Tasks가 성공적으로 완료되었습니다.

{% endif %}", + "trigger_rule": "all_done" + } + } +} \ No newline at end of file diff --git a/lib/airflow/example/task_component/trigger_email.json b/lib/airflow/example/task_component/trigger_email.json new file mode 100644 index 0000000..8bd6ed0 --- /dev/null +++ b/lib/airflow/example/task_component/trigger_email.json @@ -0,0 +1,20 @@ +{ + "name": "trigger_email", + "description": "trigger email alert", + "data": { + "extra": { + "operator": "airflow.operators.trigger_dagrun.TriggerDagRunOperator", + "trigger_dag_id":"monitor_dag", + "task_id":"trigger", + "execution_date":"{{ execution_date }}", + "wait_for_completion":"False", + "conf": { + "source_dag_id": "{{ dag.dag_id }}", + "source_dag_run_id": "{{ dag_run.run_id }}" + }, + "poke_interval":30, + "reset_dag_run": "True", + "trigger_rule": "one_done" + } + } +} \ No newline at end of file diff --git a/lib/airflow/gusty.go b/lib/airflow/gusty.go index 47e1d82..84216cb 100644 --- a/lib/airflow/gusty.go +++ b/lib/airflow/gusty.go @@ -3,15 +3,16 @@ package airflow import ( "errors" "fmt" + "reflect" + "strings" + "time" + "github.com/cloud-barista/cm-cicada/common" "github.com/cloud-barista/cm-cicada/db" "github.com/cloud-barista/cm-cicada/lib/config" "github.com/cloud-barista/cm-cicada/pkg/api/rest/model" "github.com/jollaman999/utils/fileutil" "gopkg.in/yaml.v3" - "reflect" - "strings" - "time" ) func checkWorkflow(workflow *model.Workflow) error { @@ -159,38 +160,41 @@ func writeGustyYAMLs(workflow *model.Workflow) error { for _, t := range tg.Tasks { taskOptions := make(map[string]any) - - if isTaskExist(workflow, t.RequestBody) { - taskOptions["operator"] = "local.JsonHttpRequestOperator" - taskOptions["xcom_task"] = t.RequestBody - } else { - taskOptions["operator"] = "airflow.providers.http.operators.http.SimpleHttpOperator" - - type headers struct { - ContentType string `json:"Content-Type" yaml:"Content-Type"` - } - taskOptions["headers"] = headers{ - ContentType: "application/json", - } - - taskOptions["log_response"] = true - - taskOptions["data"] = t.RequestBody - } - - taskOptions["dependencies"] = t.Dependencies - - taskOptions["task_id"] = t.Name - taskComponent := db.TaskComponentGetByName(t.TaskComponent) + if taskComponent == nil { return errors.New("task component '" + t.TaskComponent + "' not found") } - + fmt.Println("0 : ", taskComponent ) + if taskComponent.Data.Extra != nil { + fmt.Println("1 : ", t.Name ) + taskOptions = taskComponent.Data.Extra + } else { + fmt.Println("2 : ", t.Name ) + if isTaskExist(workflow, t.RequestBody) { + taskOptions["operator"] = "local.JsonHttpRequestOperator" + taskOptions["xcom_task"] = t.RequestBody + } else { + taskOptions["operator"] = "airflow.providers.http.operators.http.SimpleHttpOperator" + + type headers struct { + ContentType string `json:"Content-Type" yaml:"Content-Type"` + } + taskOptions["headers"] = headers{ + ContentType: "application/json", + } + + taskOptions["log_response"] = true + + taskOptions["data"] = t.RequestBody + } + taskOptions["http_conn_id"] = taskComponent.Data.Options.APIConnectionID taskOptions["endpoint"] = parseEndpoint(t.PathParams, t.QueryParams, taskComponent.Data.Options.Endpoint) taskOptions["method"] = taskComponent.Data.Options.Method - + } + taskOptions["dependencies"] = t.Dependencies + taskOptions["task_id"] = t.Name filePath = dagDir + "/" + tg.Name + "/" + t.Name + ".yml" err = writeModelToYAMLFile(taskOptions, filePath) diff --git a/pkg/api/rest/controller/workflow.go b/pkg/api/rest/controller/workflow.go index 5792393..ee51e42 100644 --- a/pkg/api/rest/controller/workflow.go +++ b/pkg/api/rest/controller/workflow.go @@ -1202,3 +1202,68 @@ func GetImportErrors(c echo.Context) error { return c.JSONPretty(http.StatusOK, logs, " ") } + +// ListWorkflow godoc +// +// @ID list-workflowVersion +// @Summary List workflowVersion +// @Description Get a workflowVersion list. +// @Tags [Workflow] +// @Accept json +// @Produce json +// @Param wfId path string true "wfId of the workflow" +// @Param page query string false "Page of the workflowVersion list." +// @Param row query string false "Row of the workflowVersion list." +// @Success 200 {object} []model.WorkflowVersion "Successfully get a workflowVersion list." +// @Failure 400 {object} common.ErrorResponse "Sent bad request." +// @Failure 500 {object} common.ErrorResponse "Failed to get a workflowVersion list." +// @Router /workflow/{wfId}/version [get] +func ListWorkflowVersion(c echo.Context) error { + page, row, err := common.CheckPageRow(c) + if err != nil { + return common.ReturnErrorMsg(c, err.Error()) + } + + workflow := &model.WorkflowVersion{ + WorkflowID: c.QueryParam("wfId"), + } + + workflows, err := dao.WorkflowVersionGetList(workflow, page, row) + if err != nil { + return common.ReturnErrorMsg(c, err.Error()) + } + + return c.JSONPretty(http.StatusOK, workflows, " ") +} + +// GetWorkflowVersion godoc +// +// @ID get-WorkflowVersion +// @Summary Get WorkflowVersion +// @Description Get the WorkflowVersion. +// @Tags [Workflow] +// @Accept json +// @Produce json +// @Param wfId path string true "wfId of the workflow" +// @Param verId path string true "ID of the WorkflowVersion." +// @Success 200 {object} model.Workflow "Successfully get the WorkflowVersion." +// @Failure 400 {object} common.ErrorResponse "Sent bad request." +// @Failure 500 {object} common.ErrorResponse "Failed to get the WorkflowVersion." +// @Router /workflow/{wfId}/version/{verId} [get] +func GetWorkflowVersion(c echo.Context) error { + wfId := c.Param("wfId") + if wfId == "" { + return common.ReturnErrorMsg(c, "Please provide the wfId.") + } + verId := common.UrlDecode(c.Param("verId")) + if verId == "" { + return common.ReturnErrorMsg(c, "Please provide the verId.") + } + + workflow, err := dao.WorkflowVersionGet(verId, wfId) + if err != nil { + return common.ReturnErrorMsg(c, err.Error()) + } + + return c.JSONPretty(http.StatusOK, workflow, " ") +} \ No newline at end of file diff --git a/pkg/api/rest/docs/docs.go b/pkg/api/rest/docs/docs.go index 76c97de..d9cd7ae 100644 --- a/pkg/api/rest/docs/docs.go +++ b/pkg/api/rest/docs/docs.go @@ -1214,6 +1214,118 @@ const docTemplate = `{ } } }, + "/workflow/{wfId}/version": { + "get": { + "description": "Get a workflowVersion list.", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "[Workflow]" + ], + "summary": "List workflowVersion", + "operationId": "list-workflowVersion", + "parameters": [ + { + "type": "string", + "description": "wfId of the workflow", + "name": "wfId", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "Page of the workflowVersion list.", + "name": "page", + "in": "query" + }, + { + "type": "string", + "description": "Row of the workflowVersion list.", + "name": "row", + "in": "query" + } + ], + "responses": { + "200": { + "description": "Successfully get a workflowVersion list.", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.WorkflowVersion" + } + } + }, + "400": { + "description": "Sent bad request.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" + } + }, + "500": { + "description": "Failed to get a workflowVersion list.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" + } + } + } + } + }, + "/workflow/{wfId}/version/{verId}": { + "get": { + "description": "Get the WorkflowVersion.", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "[Workflow]" + ], + "summary": "Get WorkflowVersion", + "operationId": "get-WorkflowVersion", + "parameters": [ + { + "type": "string", + "description": "wfId of the workflow", + "name": "wfId", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "ID of the WorkflowVersion.", + "name": "verId", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "Successfully get the WorkflowVersion.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.Workflow" + } + }, + "400": { + "description": "Sent bad request.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" + } + }, + "500": { + "description": "Failed to get the WorkflowVersion.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" + } + } + } + } + }, "/workflow/{wfId}/workflowRun/{wfRunId}/task/{taskId}/clear": { "post": { "description": "Clear the task Instance.", @@ -1833,6 +1945,10 @@ const docTemplate = `{ "type": "string" } }, + "extra": { + "type": "object", + "additionalProperties": true + }, "id": { "type": "string" }, @@ -1886,11 +2002,11 @@ const docTemplate = `{ }, "github_com_cloud-barista_cm-cicada_pkg_api_rest_model.TaskData": { "type": "object", - "required": [ - "options", - "param_option" - ], "properties": { + "extra": { + "type": "object", + "additionalProperties": true + }, "options": { "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.Options" }, @@ -2119,6 +2235,31 @@ const docTemplate = `{ "type": "string" } } + }, + "github_com_cloud-barista_cm-cicada_pkg_api_rest_model.WorkflowVersion": { + "type": "object", + "required": [ + "action", + "id", + "workflowId" + ], + "properties": { + "action": { + "type": "string" + }, + "created_at": { + "type": "string" + }, + "data": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.Workflow" + }, + "id": { + "type": "string" + }, + "workflowId": { + "type": "string" + } + } } } }` diff --git a/pkg/api/rest/docs/swagger.json b/pkg/api/rest/docs/swagger.json index 3313e4c..742cd3c 100644 --- a/pkg/api/rest/docs/swagger.json +++ b/pkg/api/rest/docs/swagger.json @@ -1207,6 +1207,118 @@ } } }, + "/workflow/{wfId}/version": { + "get": { + "description": "Get a workflowVersion list.", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "[Workflow]" + ], + "summary": "List workflowVersion", + "operationId": "list-workflowVersion", + "parameters": [ + { + "type": "string", + "description": "wfId of the workflow", + "name": "wfId", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "Page of the workflowVersion list.", + "name": "page", + "in": "query" + }, + { + "type": "string", + "description": "Row of the workflowVersion list.", + "name": "row", + "in": "query" + } + ], + "responses": { + "200": { + "description": "Successfully get a workflowVersion list.", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.WorkflowVersion" + } + } + }, + "400": { + "description": "Sent bad request.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" + } + }, + "500": { + "description": "Failed to get a workflowVersion list.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" + } + } + } + } + }, + "/workflow/{wfId}/version/{verId}": { + "get": { + "description": "Get the WorkflowVersion.", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "[Workflow]" + ], + "summary": "Get WorkflowVersion", + "operationId": "get-WorkflowVersion", + "parameters": [ + { + "type": "string", + "description": "wfId of the workflow", + "name": "wfId", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "ID of the WorkflowVersion.", + "name": "verId", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "Successfully get the WorkflowVersion.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.Workflow" + } + }, + "400": { + "description": "Sent bad request.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" + } + }, + "500": { + "description": "Failed to get the WorkflowVersion.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" + } + } + } + } + }, "/workflow/{wfId}/workflowRun/{wfRunId}/task/{taskId}/clear": { "post": { "description": "Clear the task Instance.", @@ -1826,6 +1938,10 @@ "type": "string" } }, + "extra": { + "type": "object", + "additionalProperties": true + }, "id": { "type": "string" }, @@ -1879,11 +1995,11 @@ }, "github_com_cloud-barista_cm-cicada_pkg_api_rest_model.TaskData": { "type": "object", - "required": [ - "options", - "param_option" - ], "properties": { + "extra": { + "type": "object", + "additionalProperties": true + }, "options": { "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.Options" }, @@ -2112,6 +2228,31 @@ "type": "string" } } + }, + "github_com_cloud-barista_cm-cicada_pkg_api_rest_model.WorkflowVersion": { + "type": "object", + "required": [ + "action", + "id", + "workflowId" + ], + "properties": { + "action": { + "type": "string" + }, + "created_at": { + "type": "string" + }, + "data": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.Workflow" + }, + "id": { + "type": "string" + }, + "workflowId": { + "type": "string" + } + } } } } \ No newline at end of file diff --git a/pkg/api/rest/docs/swagger.yaml b/pkg/api/rest/docs/swagger.yaml index 51ad0de..35a55b8 100644 --- a/pkg/api/rest/docs/swagger.yaml +++ b/pkg/api/rest/docs/swagger.yaml @@ -190,6 +190,9 @@ definitions: items: type: string type: array + extra: + additionalProperties: true + type: object id: type: string name: @@ -231,13 +234,13 @@ definitions: type: object github_com_cloud-barista_cm-cicada_pkg_api_rest_model.TaskData: properties: + extra: + additionalProperties: true + type: object options: $ref: '#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.Options' param_option: $ref: '#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.ParamOption' - required: - - options - - param_option type: object github_com_cloud-barista_cm-cicada_pkg_api_rest_model.TaskDirectly: properties: @@ -389,6 +392,23 @@ definitions: - id - name type: object + github_com_cloud-barista_cm-cicada_pkg_api_rest_model.WorkflowVersion: + properties: + action: + type: string + created_at: + type: string + data: + $ref: '#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.Workflow' + id: + type: string + workflowId: + type: string + required: + - action + - id + - workflowId + type: object info: contact: {} description: Workflow management module @@ -1167,6 +1187,81 @@ paths: summary: Get Task from Task Group tags: - '[Workflow]' + /workflow/{wfId}/version: + get: + consumes: + - application/json + description: Get a workflowVersion list. + operationId: list-workflowVersion + parameters: + - description: wfId of the workflow + in: path + name: wfId + required: true + type: string + - description: Page of the workflowVersion list. + in: query + name: page + type: string + - description: Row of the workflowVersion list. + in: query + name: row + type: string + produces: + - application/json + responses: + "200": + description: Successfully get a workflowVersion list. + schema: + items: + $ref: '#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.WorkflowVersion' + type: array + "400": + description: Sent bad request. + schema: + $ref: '#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse' + "500": + description: Failed to get a workflowVersion list. + schema: + $ref: '#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse' + summary: List workflowVersion + tags: + - '[Workflow]' + /workflow/{wfId}/version/{verId}: + get: + consumes: + - application/json + description: Get the WorkflowVersion. + operationId: get-WorkflowVersion + parameters: + - description: wfId of the workflow + in: path + name: wfId + required: true + type: string + - description: ID of the WorkflowVersion. + in: path + name: verId + required: true + type: string + produces: + - application/json + responses: + "200": + description: Successfully get the WorkflowVersion. + schema: + $ref: '#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.Workflow' + "400": + description: Sent bad request. + schema: + $ref: '#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse' + "500": + description: Failed to get the WorkflowVersion. + schema: + $ref: '#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse' + summary: Get WorkflowVersion + tags: + - '[Workflow]' /workflow/{wfId}/workflowRun/{wfRunId}/task/{taskId}/clear: post: consumes: diff --git a/pkg/api/rest/model/connection.go b/pkg/api/rest/model/connection.go index 48eb875..d1f7ce8 100644 --- a/pkg/api/rest/model/connection.go +++ b/pkg/api/rest/model/connection.go @@ -9,4 +9,5 @@ type Connection struct { Schema string `json:"schema" yaml:"schema" mapstructure:"schema"` Login string `json:"login" yaml:"login" mapstructure:"login"` Password string `json:"password" yaml:"password" mapstructure:"password"` + Extra string `json:"extra" yaml:"extra" mapstructure:"extra"` } diff --git a/pkg/api/rest/model/taskComponent.go b/pkg/api/rest/model/taskComponent.go index 31b91f8..6fb8318 100644 --- a/pkg/api/rest/model/taskComponent.go +++ b/pkg/api/rest/model/taskComponent.go @@ -20,8 +20,9 @@ type ParamOption struct { } type TaskData struct { - Options Options `json:"options" mapstructure:"options" validate:"required"` - ParmaOption ParamOption `json:"param_option" mapstructure:"param_option" validate:"required"` + Options Options `json:"options" mapstructure:"options" ` + ParmaOption ParamOption `json:"param_option" mapstructure:"param_option"` + Extra map[string]interface{} `json:"extra,omitempty" mapstructure:"extra"` } type TaskComponent struct { diff --git a/pkg/api/rest/model/workflow.go b/pkg/api/rest/model/workflow.go index 1412428..623466f 100644 --- a/pkg/api/rest/model/workflow.go +++ b/pkg/api/rest/model/workflow.go @@ -5,6 +5,8 @@ import ( "encoding/json" "errors" "time" + + "gorm.io/gorm" ) type Task struct { @@ -14,6 +16,7 @@ type Task struct { RequestBody string `json:"request_body" mapstructure:"request_body" validate:"required"` PathParams map[string]string `json:"path_params" mapstructure:"path_params"` QueryParams map[string]string `json:"query_params" mapstructure:"query_params"` + Extra map[string]interface{} `json:"extra,omitempty" mapstructure:"extra"` Dependencies []string `json:"dependencies" mapstructure:"dependencies"` } @@ -88,21 +91,19 @@ type Workflow struct { UpdatedAt time.Time `gorm:"column:updated_at" json:"updated_at" mapstructure:"updated_at"` } +type WorkflowVersion struct { + ID string `gorm:"primaryKey" json:"id" mapstructure:"id" validate:"required"` + WorkflowID string `gorm:"column:workflowId" json:"workflowId" mapstructure:"workflowId" validate:"required"` + Data Workflow `gorm:"column:data" json:"data" mapstructure:"data"` + Action string `gorm:"column:action" json:"action" mapstructure:"action" validate:"required"` + CreatedAt time.Time `gorm:"column:created_at" json:"created_at" mapstructure:"created_at"` +} + type CreateWorkflowReq struct { Name string `gorm:"column:name" json:"name" mapstructure:"name" validate:"required"` Data CreateDataReq `gorm:"column:data" json:"data" mapstructure:"data" validate:"required"` } -type Monit struct { - WorkflowID string - WorkflowVersion string - Status string - startTime time.Time - endTime time.Time - Duration time.Time - WorkflowInput string - WorkflowResult string -} type WorkflowRun struct { WorkflowRunID string `json:"workflow_run_id,omitempty"` @@ -194,3 +195,60 @@ func (d *CreateDataReq) Scan(value interface{}) error { } return json.Unmarshal(bytes, d) } +func (d Workflow) Value() (driver.Value, error) { + return json.Marshal(d) +} + +func (d *Workflow) Scan(value interface{}) error { + if value == nil { + return nil + } + bytes, ok := value.([]byte) + if !ok { + return errors.New("Invalid type for Data") + } + return json.Unmarshal(bytes, d) +} + +// AfterCreate Hook for Workflow to add WorkflowVersion on create +func (w *Workflow) AfterCreate(tx *gorm.DB) (err error) { + workflowVersion := WorkflowVersion{ + ID: "create_" + time.Now().String(), + WorkflowID: w.ID, + // Version: "", // 기본 버전 + Data: *w, + Action: "create", + CreatedAt: time.Now(), + } + + if err := tx.Create(&workflowVersion).Error; err != nil { + return err + } + return nil +} + +// AfterUpdate Hook for Workflow to add WorkflowVersion on update +func (w *Workflow) AfterUpdate(tx *gorm.DB) (err error) { + workflowVersion := WorkflowVersion{ + ID: "update_" + time.Now().String(), + // ID: uuid.New().String(), + WorkflowID: w.ID, + // Version: "new_version", // 새로운 버전 설정 로직 추가 가능 + Data: *w, + Action: "update", + CreatedAt: time.Now(), + } + + if err := tx.Create(&workflowVersion).Error; err != nil { + return err + } + return nil +} + +// BeforeDelete Hook for Workflow to delete WorkflowVersion on delete +func (w *Workflow) BeforeDelete(tx *gorm.DB) (err error) { + if err := tx.Where("workflowId = ?", w.ID).Delete(&WorkflowVersion{}).Error; err != nil { + return err + } + return nil +} \ No newline at end of file diff --git a/pkg/api/rest/route/workflow.go b/pkg/api/rest/route/workflow.go index 7943427..5dfa79c 100644 --- a/pkg/api/rest/route/workflow.go +++ b/pkg/api/rest/route/workflow.go @@ -34,4 +34,6 @@ func Workflow(e *echo.Echo) { e.POST("/"+strings.ToLower(common.ShortModuleName)+"/workflow/:wfId/workflowRun/:wfRunId/task/:taskId/clear", controller.ClearTaskInstances) e.GET("/"+strings.ToLower(common.ShortModuleName)+"/workflow/:wfId/eventlogs", controller.GetEventLogs) e.GET("/"+strings.ToLower(common.ShortModuleName)+"/importErrors", controller.GetImportErrors) + e.GET("/"+strings.ToLower(common.ShortModuleName)+"/workflow/:wfId/version", controller.ListWorkflowVersion) + e.GET("/"+strings.ToLower(common.ShortModuleName)+"/workflow/:wfId/version/:verId", controller.GetWorkflowVersion) } From 1576afb502570ad0133bbe0de1f0b418960976ad Mon Sep 17 00:00:00 2001 From: yby654 Date: Fri, 1 Nov 2024 03:25:04 +0000 Subject: [PATCH 2/4] =?UTF-8?q?email=20=EA=B0=9C=EC=9D=B8=EC=A0=95?= =?UTF-8?q?=EB=B3=B4=20=EC=82=AD=EC=A0=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- _airflow/airflow-home/dags/mail.py | 2 +- _airflow/docker-compose.yml | 4 ++-- lib/airflow/example/task_component/send_email.json | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/_airflow/airflow-home/dags/mail.py b/_airflow/airflow-home/dags/mail.py index 3217a8d..84c1ab6 100644 --- a/_airflow/airflow-home/dags/mail.py +++ b/_airflow/airflow-home/dags/mail.py @@ -59,7 +59,7 @@ def _inner(session=None): # EmailOperator 설정 email_task = EmailOperator( task_id='send_email', - to='yby987654@gmail.com', + to='yourEmail@gmail.com', subject='DAG 상태 보고서', html_content="""

Workflow Execution Complete

Workflow ID: {{ ti.xcom_pull(task_ids='collect_failed_tasks').get('dag_id') }}

diff --git a/_airflow/docker-compose.yml b/_airflow/docker-compose.yml index be48a25..b360164 100644 --- a/_airflow/docker-compose.yml +++ b/_airflow/docker-compose.yml @@ -29,10 +29,10 @@ services: restart: always environment: AIRFLOW__SMTP__SMTP_HOST: 'smtp.gmail.com' - AIRFLOW__SMTP__SMTP_USER: 'yby654321@gmail.com' + AIRFLOW__SMTP__SMTP_USER: 'yourEmail@gmail.com' AIRFLOW__SMTP__SMTP_PASSWORD: 'wtknvaprkkwyaurd' AIRFLOW__SMTP__SMTP_PORT: 587 - AIRFLOW__SMTP__SMTP_MAIL_FROM: 'yby654321@gmail.com' + AIRFLOW__SMTP__SMTP_MAIL_FROM: 'yourEmail@gmail.com' command: > /bin/bash -c " # Wait for MySQL diff --git a/lib/airflow/example/task_component/send_email.json b/lib/airflow/example/task_component/send_email.json index 21bab67..959178c 100644 --- a/lib/airflow/example/task_component/send_email.json +++ b/lib/airflow/example/task_component/send_email.json @@ -4,7 +4,7 @@ "data": { "extra": { "operator": "airflow.operators.email.EmailOperator", - "to": "yby987654@gmail.com", + "to": "yourEmail@gmail.com", "subject": "Workflow Execution Report : {{ dag.dag_id }}", "html_content": "

Workflow Execution Complete

Workflow ID: {{ dag.dag_id }}

Workflow Run ID: {{ run_id }}

Workflow 상태: {{ ti.xcom_pull(task_ids='collect_failed_tasks', key='dag_state') }}

{% if ti.xcom_pull(task_ids='collect_failed_tasks', key='failed_tasks') %}

실패한 Tasks: {{ ti.xcom_pull(task_ids='collect_failed_tasks', key='failed_tasks') }}

Task Try Number: {{ ti.try_number }}

{% else %}

모든 Tasks가 성공적으로 완료되었습니다.

{% endif %}", "trigger_rule": "all_done" From 0d3e5a95951840503733b1802fd52d24d7a40c66 Mon Sep 17 00:00:00 2001 From: yby654 Date: Tue, 12 Nov 2024 02:37:45 +0000 Subject: [PATCH 3/4] gusty + Email alert --- _airflow/airflow-home/dags/complex.py | 91 ------------------- _airflow/airflow-home/dags/dags.py | 3 +- _airflow/airflow-home/dags/example2.py | 61 ------------- _airflow/airflow-home/dags/mail.py | 9 +- _airflow/airflow-home/dags/utils.py | 13 --- cmd/cm-cicada/main.go | 15 +-- db/taskComponent.go | 55 ++++++----- .../task_component/collect_failed_tasks.json | 12 --- .../example/task_component/send_email.json | 13 --- .../example/task_component/trigger_email.json | 28 +++--- lib/airflow/gusty.go | 30 +++--- pkg/api/rest/docs/docs.go | 8 +- pkg/api/rest/docs/swagger.json | 8 +- pkg/api/rest/docs/swagger.yaml | 6 +- pkg/api/rest/model/taskComponent.go | 4 +- 15 files changed, 89 insertions(+), 267 deletions(-) delete mode 100644 _airflow/airflow-home/dags/complex.py delete mode 100644 _airflow/airflow-home/dags/example2.py delete mode 100644 _airflow/airflow-home/dags/utils.py delete mode 100644 lib/airflow/example/task_component/collect_failed_tasks.json delete mode 100644 lib/airflow/example/task_component/send_email.json diff --git a/_airflow/airflow-home/dags/complex.py b/_airflow/airflow-home/dags/complex.py deleted file mode 100644 index 6b91b1e..0000000 --- a/_airflow/airflow-home/dags/complex.py +++ /dev/null @@ -1,91 +0,0 @@ -from __future__ import annotations -import pendulum -from airflow.models import DAG -from airflow.operators.empty import EmptyOperator -from airflow.operators.bash_operator import BashOperator -from airflow.operators.python import PythonOperator -from airflow.operators.email import EmailOperator -from airflow.utils.state import State -from airflow.utils.task_group import TaskGroup -from airflow.utils.dates import days_ago - -def collect_failed_tasks(**context): - dag_run = context.get('dag_run') - ti = context.get('ti') - - failed_tasks = [] - for task_instance in dag_run.get_task_instances(): - if task_instance.state == State.FAILED: - failed_tasks.append(task_instance.task_id) - - # Push DAG state and failed task list to XCom - ti.xcom_push(key='dag_state', value=dag_run.get_state()) - ti.xcom_push(key='failed_tasks', value=failed_tasks) - -with DAG( - dag_id="example_task_group", - schedule=None, - start_date=days_ago(1), - catchup=False, - tags=["example"], -) as dag: - - with TaskGroup("main_flow", tooltip="Main workflow sequence") as main_flow: - start = EmptyOperator(task_id="start") - - with TaskGroup("section_1", tooltip="Tasks for section_1") as section_1: - task_1 = EmptyOperator(task_id="task_1") - task_2 = BashOperator(task_id="task_2", bash_command="echo 1") - task_3 = EmptyOperator(task_id="task_3") - task_1 >> [task_2, task_3] - - with TaskGroup("section_2", tooltip="Tasks for section_2 (designed to fail)") as section_2: - fail_task_1 = BashOperator(task_id="fail_task_1", bash_command="exit 1") - fail_task_2 = BashOperator(task_id="fail_task_2", bash_command="exit 1") - fail_task_1 >> fail_task_2 - - with TaskGroup("section_3", tooltip="Tasks for section_3") as section_3: - task_1 = EmptyOperator(task_id="task_1") - - with TaskGroup("inner_section_3", tooltip="Tasks for inner_section3") as inner_section_3: - task_2 = BashOperator(task_id="task_2", bash_command="echo 1") - task_3 = EmptyOperator(task_id="task_3") - task_4 = EmptyOperator(task_id="task_4") - [task_2, task_3] >> task_4 - - # 모든 작업이 완료되면 end task로 연결되도록 설정 - end = EmptyOperator(task_id="end") - start >> section_1 >> section_2 >> section_3 >> end - - with TaskGroup("notification_flow", tooltip="Notification and Failure Collection") as notification_flow: - collect_failed = PythonOperator( - task_id="collect_failed_tasks", - python_callable=collect_failed_tasks, - provide_context=True, - trigger_rule="all_done" - ) - - notify_email = EmailOperator( - task_id="send_email", - to="yby987654@gmail.com", - subject="Workflow Execution Report : {{ dag.dag_id }} ", - html_content="""

Workflow Execution Complete

-

Workflow ID: {{ dag.dag_id }}

-

Workflow Run ID: {{ run_id }}

-

Workflow 상태: {{ ti.xcom_pull(task_ids='collect_failed_tasks', key='dag_state') }}

- - {% set failed_tasks = ti.xcom_pull(task_ids='collect_failed_tasks', key='failed_tasks', default=[]) %} - - {% if failed_tasks %} -

실패한 Tasks: {{ failed_tasks }}

-

Task Try Number: {{ ti.try_number }}

- {% else %} -

모든 Tasks가 성공적으로 완료되었습니다.

- {% endif %}""", - trigger_rule="all_done" - ) - - collect_failed >> notify_email - - # TaskGroup 순서 설정 - main_flow >> notification_flow diff --git a/_airflow/airflow-home/dags/dags.py b/_airflow/airflow-home/dags/dags.py index f10fdc3..a0b9435 100644 --- a/_airflow/airflow-home/dags/dags.py +++ b/_airflow/airflow-home/dags/dags.py @@ -1,7 +1,6 @@ import os import airflow from gusty import create_dag -from utils import collect_failed_tasks from airflow.utils.state import State ##################### @@ -38,4 +37,4 @@ def collect_failed_tasks(**context): # DAG 상태 및 실패한 작업 목록을 XCom에 푸시 context['ti'].xcom_push(key='dag_state', value=dag_run.get_state()) - context['ti'].xcom_push(key='failed_tasks', value=failed_tasks) \ No newline at end of file + context['ti'].xcom_push(key='failed_tasks', value=failed_tasks) diff --git a/_airflow/airflow-home/dags/example2.py b/_airflow/airflow-home/dags/example2.py deleted file mode 100644 index a80d4c7..0000000 --- a/_airflow/airflow-home/dags/example2.py +++ /dev/null @@ -1,61 +0,0 @@ -from __future__ import annotations -import pendulum -from airflow.models import DAG -from airflow.operators.empty import EmptyOperator -from airflow.operators.bash_operator import BashOperator -from airflow.operators.python import PythonOperator -from airflow.operators.email import EmailOperator -from airflow.utils.state import State -from airflow.utils.task_group import TaskGroup -from airflow.utils.dates import days_ago -from airflow.operators.trigger_dagrun import TriggerDagRunOperator -with DAG( - dag_id="example_task_group2", - schedule=None, - start_date=days_ago(1), - catchup=False, - tags=["example"], -) as dag: - - start = EmptyOperator(task_id="start") - - with TaskGroup("section_1", tooltip="Tasks for section_1") as section_1: - task_1 = EmptyOperator(task_id="task_1") - task_2 = BashOperator(task_id="task_2", bash_command="echo 1") - task_3 = EmptyOperator(task_id="task_3") - task_1 >> [task_2, task_3] - - with TaskGroup("section_2", tooltip="Tasks for section_2 (designed to fail)") as section_2: - fail_task_1 = BashOperator(task_id="fail_task_1", bash_command="exit 1") - fail_task_2 = BashOperator(task_id="fail_task_2", bash_command="exit 1") - fail_task_1 >> fail_task_2 - - with TaskGroup("section_3", tooltip="Tasks for section_3") as section_3: - task_1 = EmptyOperator(task_id="task_1") - - with TaskGroup("inner_section_3", tooltip="Tasks for inner_section3") as inner_section_3: - task_2 = BashOperator(task_id="task_2", bash_command="echo 1") - task_3 = EmptyOperator(task_id="task_3") - task_4 = EmptyOperator(task_id="task_4") - [task_2, task_3] >> task_4 - - # 모든 작업이 완료되면 end task로 연결되도록 설정 - end = EmptyOperator(task_id="end") - - trigger = TriggerDagRunOperator( - trigger_dag_id='monitor_dag', - task_id='trigger', - execution_date='{{ execution_date }}', - wait_for_completion=False, - conf= { - "source_dag_id": "{{ dag.dag_id }}", # 현재 DAG의 dag_id - "source_dag_run_id": "{{ dag_run.run_id }}" # 현재 DAG의 dag_run_id - }, - poke_interval=30, - reset_dag_run=True, - # trigger_rule="all_done" - trigger_rule="one_done" - ) - start >> section_1 >> section_2 >> section_3 >> end >> trigger - - diff --git a/_airflow/airflow-home/dags/mail.py b/_airflow/airflow-home/dags/mail.py index 84c1ab6..49f2da5 100644 --- a/_airflow/airflow-home/dags/mail.py +++ b/_airflow/airflow-home/dags/mail.py @@ -32,11 +32,14 @@ def _inner(session=None): if task_instance.state != State.SUCCESS: failed_tasks.append(task_instance.task_id) + # DAG 상태를 실패한 태스크가 1개 이상인 경우 "failed"로 설정 + dag_state = "failed" if failed_tasks else "success" + # 결과 반환 return { "dag_id": source_dag_id, "dag_run_id": source_dag_run_id, - "dag_state": source_dag_run.state, + "dag_state": dag_state, "failed_tasks": failed_tasks } @@ -59,7 +62,7 @@ def _inner(session=None): # EmailOperator 설정 email_task = EmailOperator( task_id='send_email', - to='yourEmail@gmail.com', + to='Your Email', subject='DAG 상태 보고서', html_content="""

Workflow Execution Complete

Workflow ID: {{ ti.xcom_pull(task_ids='collect_failed_tasks').get('dag_id') }}

@@ -90,4 +93,4 @@ def set_email_params(ti, **context): ) # 의존성 설정 - collect_task >> update_email_params >> email_task + collect_task >> update_email_params >> email_task \ No newline at end of file diff --git a/_airflow/airflow-home/dags/utils.py b/_airflow/airflow-home/dags/utils.py deleted file mode 100644 index 305664a..0000000 --- a/_airflow/airflow-home/dags/utils.py +++ /dev/null @@ -1,13 +0,0 @@ -from airflow.utils.state import State - -def collect_failed_tasks(**context): - dag_run = context['dag_run'] - failed_tasks = [] - - for task_instance in dag_run.get_task_instances(): - if task_instance.state == State.FAILED: - failed_tasks.append(task_instance.task_id) - - # DAG 상태 및 실패한 작업 목록을 XCom에 푸시 - context['ti'].xcom_push(key='dag_state', value=dag_run.get_state()) - context['ti'].xcom_push(key='failed_tasks', value=failed_tasks) \ No newline at end of file diff --git a/cmd/cm-cicada/main.go b/cmd/cm-cicada/main.go index 62912f9..ef9eda4 100644 --- a/cmd/cm-cicada/main.go +++ b/cmd/cm-cicada/main.go @@ -1,6 +1,13 @@ package main import ( + "log" + "os" + "os/signal" + "strings" + "sync" + "syscall" + "github.com/cloud-barista/cm-cicada/common" "github.com/cloud-barista/cm-cicada/db" "github.com/cloud-barista/cm-cicada/lib/airflow" @@ -9,12 +16,6 @@ import ( "github.com/cloud-barista/cm-cicada/pkg/api/rest/server" "github.com/jollaman999/utils/logger" "github.com/jollaman999/utils/syscheck" - "log" - "os" - "os/signal" - "strings" - "sync" - "syscall" ) func init() { @@ -74,4 +75,4 @@ func main() { end() os.Exit(0) }() -} +} \ No newline at end of file diff --git a/db/taskComponent.go b/db/taskComponent.go index 5972378..06d686c 100644 --- a/db/taskComponent.go +++ b/db/taskComponent.go @@ -4,8 +4,6 @@ import ( "context" "encoding/json" "fmt" - "github.com/jollaman999/utils/logger" - "gorm.io/gorm" "io" "net/http" "os" @@ -15,6 +13,9 @@ import ( "strings" "time" + "github.com/jollaman999/utils/logger" + "gorm.io/gorm" + "github.com/cloud-barista/cm-cicada/lib/config" "github.com/cloud-barista/cm-cicada/pkg/api/rest/model" "github.com/google/uuid" @@ -27,6 +28,7 @@ type ConfigFile struct { APIConnectionID string `json:"api_connection_id"` SwaggerYAMLEndpoint string `json:"swagger_yaml_endpoint"` Endpoint string `json:"endpoint"` + Extra map[string]interface{} `json:"extra"` } type SwaggerSpec struct { @@ -161,29 +163,39 @@ func TaskComponentInit() error { if err := json.Unmarshal(configData, &configFile); err != nil { return fmt.Errorf("failed to parse config file %s: %v", file, err) } + var taskComponent *model.TaskComponent + if configFile.Extra != nil { + taskComponent = &model.TaskComponent{} + taskComponent.Data.Options.Extra = configFile.Extra - var connectionFound bool - var connection model.Connection - for _, connection = range config.CMCicadaConfig.CMCicada.AirflowServer.Connections { - if connection.ID == configFile.APIConnectionID { - connectionFound = true - break + } else { + var connectionFound bool + var connection model.Connection + for _, connection = range config.CMCicadaConfig.CMCicada.AirflowServer.Connections { + if connection.ID == configFile.APIConnectionID { + connectionFound = true + break + } + } + if !connectionFound { + logger.Println(logger.WARN, true, fmt.Sprintf("failed to find connection with ID %s", configFile.APIConnectionID)) + continue + // return fmt.Errorf("failed to find connection with ID %s", configFile.APIConnectionID) } - } - if !connectionFound { - return fmt.Errorf("failed to find connection with ID %s", configFile.APIConnectionID) - } - spec, err := fetchAndParseYAML(connection, configFile.SwaggerYAMLEndpoint) - if err != nil { - logger.Println(logger.WARN, true, fmt.Sprintf("failed to fetch and parse swagger spec: %v", err)) - continue - } + spec, err := fetchAndParseYAML(connection, configFile.SwaggerYAMLEndpoint) + if err != nil { + logger.Println(logger.WARN, true, fmt.Sprintf("failed to fetch and parse swagger spec: %v", err)) + continue + } - endpoint := strings.TrimPrefix(configFile.Endpoint, spec.BasePath) - taskComponent, err := processEndpoint(connection.ID, spec, endpoint) - if err != nil { - return fmt.Errorf("failed to process endpoint: %v", err) + endpoint := strings.TrimPrefix(configFile.Endpoint, spec.BasePath) + taskComponent, err = processEndpoint(connection.ID, spec, endpoint ) + if err != nil { + logger.Println(logger.WARN, true, fmt.Sprintf("failed to process endpoint: %v", err)) + continue + // return fmt.Errorf("failed to process endpoint: %v", err) + } } taskComponent.Name = configFile.Name taskComponent.Description = configFile.Description @@ -404,7 +416,6 @@ func generateRequestBodyExample(schema SchemaModel) string { func processEndpoint(connectionID string, spec *SwaggerSpec, targetEndpoint string) (*model.TaskComponent, error) { targetEndpoint = normalizePath(targetEndpoint) - for path, pathItem := range spec.Paths { if normalizePath(path) == targetEndpoint { for method, operation := range pathItem { diff --git a/lib/airflow/example/task_component/collect_failed_tasks.json b/lib/airflow/example/task_component/collect_failed_tasks.json deleted file mode 100644 index 1f441e4..0000000 --- a/lib/airflow/example/task_component/collect_failed_tasks.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "name": "collect_failed_tasks", - "description": "collect failed tasks Info", - "data": { - "extra": { - "operator": "airflow.operators.python.PythonOperator", - "python_callable": "collect_failed_tasks", - "provide_context": true, - "trigger_rule": "all_done" - } - } -} \ No newline at end of file diff --git a/lib/airflow/example/task_component/send_email.json b/lib/airflow/example/task_component/send_email.json deleted file mode 100644 index 959178c..0000000 --- a/lib/airflow/example/task_component/send_email.json +++ /dev/null @@ -1,13 +0,0 @@ -{ - "name": "send_email", - "description": "smtp send email", - "data": { - "extra": { - "operator": "airflow.operators.email.EmailOperator", - "to": "yourEmail@gmail.com", - "subject": "Workflow Execution Report : {{ dag.dag_id }}", - "html_content": "

Workflow Execution Complete

Workflow ID: {{ dag.dag_id }}

Workflow Run ID: {{ run_id }}

Workflow 상태: {{ ti.xcom_pull(task_ids='collect_failed_tasks', key='dag_state') }}

{% if ti.xcom_pull(task_ids='collect_failed_tasks', key='failed_tasks') %}

실패한 Tasks: {{ ti.xcom_pull(task_ids='collect_failed_tasks', key='failed_tasks') }}

Task Try Number: {{ ti.try_number }}

{% else %}

모든 Tasks가 성공적으로 완료되었습니다.

{% endif %}", - "trigger_rule": "all_done" - } - } -} \ No newline at end of file diff --git a/lib/airflow/example/task_component/trigger_email.json b/lib/airflow/example/task_component/trigger_email.json index 8bd6ed0..dc13277 100644 --- a/lib/airflow/example/task_component/trigger_email.json +++ b/lib/airflow/example/task_component/trigger_email.json @@ -1,20 +1,18 @@ { "name": "trigger_email", "description": "trigger email alert", - "data": { - "extra": { - "operator": "airflow.operators.trigger_dagrun.TriggerDagRunOperator", - "trigger_dag_id":"monitor_dag", - "task_id":"trigger", - "execution_date":"{{ execution_date }}", - "wait_for_completion":"False", - "conf": { - "source_dag_id": "{{ dag.dag_id }}", - "source_dag_run_id": "{{ dag_run.run_id }}" - }, - "poke_interval":30, - "reset_dag_run": "True", - "trigger_rule": "one_done" - } + "extra": { + "operator": "airflow.operators.trigger_dagrun.TriggerDagRunOperator", + "trigger_dag_id":"monitor_dag", + "task_id":"trigger", + "execution_date":"{{ execution_date }}", + "wait_for_completion":false, + "conf": { + "source_dag_id": "{{ dag.dag_id }}", + "source_dag_run_id": "{{ dag_run.run_id }}" + }, + "poke_interval":30, + "reset_dag_run": true, + "trigger_rule": "all_done" } } \ No newline at end of file diff --git a/lib/airflow/gusty.go b/lib/airflow/gusty.go index 84216cb..2351bfa 100644 --- a/lib/airflow/gusty.go +++ b/lib/airflow/gusty.go @@ -161,40 +161,40 @@ func writeGustyYAMLs(workflow *model.Workflow) error { for _, t := range tg.Tasks { taskOptions := make(map[string]any) taskComponent := db.TaskComponentGetByName(t.TaskComponent) - if taskComponent == nil { return errors.New("task component '" + t.TaskComponent + "' not found") } - fmt.Println("0 : ", taskComponent ) - if taskComponent.Data.Extra != nil { - fmt.Println("1 : ", t.Name ) - taskOptions = taskComponent.Data.Extra + if taskComponent.Data.Options.Extra != nil { + taskOptions = taskComponent.Data.Options.Extra + } else { - fmt.Println("2 : ", t.Name ) if isTaskExist(workflow, t.RequestBody) { taskOptions["operator"] = "local.JsonHttpRequestOperator" taskOptions["xcom_task"] = t.RequestBody } else { taskOptions["operator"] = "airflow.providers.http.operators.http.SimpleHttpOperator" - + type headers struct { ContentType string `json:"Content-Type" yaml:"Content-Type"` } taskOptions["headers"] = headers{ ContentType: "application/json", } - + taskOptions["log_response"] = true - + taskOptions["data"] = t.RequestBody - } - - taskOptions["http_conn_id"] = taskComponent.Data.Options.APIConnectionID - taskOptions["endpoint"] = parseEndpoint(t.PathParams, t.QueryParams, taskComponent.Data.Options.Endpoint) - taskOptions["method"] = taskComponent.Data.Options.Method - } + } + + taskOptions["http_conn_id"] = taskComponent.Data.Options.APIConnectionID + taskOptions["endpoint"] = parseEndpoint(t.PathParams, t.QueryParams, taskComponent.Data.Options.Endpoint) + taskOptions["method"] = taskComponent.Data.Options.Method + } + taskOptions["dependencies"] = t.Dependencies + taskOptions["task_id"] = t.Name + filePath = dagDir + "/" + tg.Name + "/" + t.Name + ".yml" err = writeModelToYAMLFile(taskOptions, filePath) diff --git a/pkg/api/rest/docs/docs.go b/pkg/api/rest/docs/docs.go index 8ce0fb2..0b8a1e1 100644 --- a/pkg/api/rest/docs/docs.go +++ b/pkg/api/rest/docs/docs.go @@ -2010,10 +2010,6 @@ const docTemplate = `{ "body_params": { "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.ParameterStructure" }, - "extra": { - "type": "object", - "additionalProperties": true - }, "options": { "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.TaskComponentOptions" }, @@ -2034,6 +2030,10 @@ const docTemplate = `{ "endpoint": { "type": "string" }, + "extra": { + "type": "object", + "additionalProperties": true + }, "method": { "type": "string" }, diff --git a/pkg/api/rest/docs/swagger.json b/pkg/api/rest/docs/swagger.json index 810a3be..134708f 100644 --- a/pkg/api/rest/docs/swagger.json +++ b/pkg/api/rest/docs/swagger.json @@ -2003,10 +2003,6 @@ "body_params": { "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.ParameterStructure" }, - "extra": { - "type": "object", - "additionalProperties": true - }, "options": { "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.TaskComponentOptions" }, @@ -2027,6 +2023,10 @@ "endpoint": { "type": "string" }, + "extra": { + "type": "object", + "additionalProperties": true + }, "method": { "type": "string" }, diff --git a/pkg/api/rest/docs/swagger.yaml b/pkg/api/rest/docs/swagger.yaml index f83e830..149db97 100644 --- a/pkg/api/rest/docs/swagger.yaml +++ b/pkg/api/rest/docs/swagger.yaml @@ -238,9 +238,6 @@ definitions: properties: body_params: $ref: '#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.ParameterStructure' - extra: - additionalProperties: true - type: object options: $ref: '#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.TaskComponentOptions' path_params: @@ -254,6 +251,9 @@ definitions: type: string endpoint: type: string + extra: + additionalProperties: true + type: object method: type: string request_body: diff --git a/pkg/api/rest/model/taskComponent.go b/pkg/api/rest/model/taskComponent.go index a83c1b9..80dac9c 100644 --- a/pkg/api/rest/model/taskComponent.go +++ b/pkg/api/rest/model/taskComponent.go @@ -28,11 +28,11 @@ type TaskComponentOptions struct { Endpoint string `json:"endpoint"` Method string `json:"method"` RequestBody string `json:"request_body"` + Extra map[string]interface{} `json:"extra,omitempty"` } type TaskComponentData struct { Options TaskComponentOptions `json:"options"` - Extra map[string]interface{} `json:"extra,omitempty"` BodyParams ParameterStructure `json:"body_params,omitempty"` PathParams ParameterStructure `json:"path_params,omitempty"` QueryParams ParameterStructure `json:"query_params,omitempty"` @@ -71,4 +71,4 @@ func (d *TaskComponentData) Scan(value interface{}) error { return errors.New("Invalid type for Data") } return json.Unmarshal(bytes, d) -} +} \ No newline at end of file From 3f742b0da55fa6be16adea351b35e4305996fd41 Mon Sep 17 00:00:00 2001 From: yby654 Date: Tue, 12 Nov 2024 05:34:20 +0000 Subject: [PATCH 4/4] =?UTF-8?q?Workflow=20version=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- _airflow/airflow-home/dags/mail.py | 2 +- dao/workflow.go | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/_airflow/airflow-home/dags/mail.py b/_airflow/airflow-home/dags/mail.py index 49f2da5..94d83e8 100644 --- a/_airflow/airflow-home/dags/mail.py +++ b/_airflow/airflow-home/dags/mail.py @@ -62,7 +62,7 @@ def _inner(session=None): # EmailOperator 설정 email_task = EmailOperator( task_id='send_email', - to='Your Email', + to='Your Email@example.com', subject='DAG 상태 보고서', html_content="""

Workflow Execution Complete

Workflow ID: {{ ti.xcom_pull(task_ids='collect_failed_tasks').get('dag_id') }}

diff --git a/dao/workflow.go b/dao/workflow.go index 5731f2d..6747c17 100644 --- a/dao/workflow.go +++ b/dao/workflow.go @@ -2,6 +2,7 @@ package dao import ( "errors" + "fmt" "time" "github.com/cloud-barista/cm-cicada/db" @@ -15,7 +16,8 @@ func WorkflowCreate(workflow *model.Workflow) (*model.Workflow, error) { workflow.CreatedAt = now workflow.UpdatedAt = now - result := db.DB.Session(&gorm.Session{SkipHooks: true}).Create(workflow) + result := db.DB.Create(workflow) + // result := db.DB.Session(&gorm.Session{SkipHooks: true}).Create(workflow) err := result.Error if err != nil { return nil, err @@ -167,6 +169,7 @@ func WorkflowVersionGet(id string, wkId string) (*model.WorkflowVersion, error) } result := db.DB.Where("id = ? and workflowId = ?", id, wkId).First(workflowVersion) + fmt.Println("result : ", result) err := result.Error if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) {