Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: rest: workflow version get & Email alert #20

Merged
merged 6 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion _airflow/airflow-home/dags/dags.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import airflow
from gusty import create_dag
from airflow.utils.state import State

#####################
## DAG Directories ##
Expand All @@ -23,4 +24,17 @@
tags = ['default', 'tags'],
task_group_defaults={"tooltip": "default tooltip"},
wait_for_defaults={"retries": 10, "check_existence": True},
latest_only=False)
latest_only=False)


def collect_failed_tasks(**context):
dag_run = context['dag_run']
failed_tasks = []

for task_instance in dag_run.get_task_instances():
if task_instance.state == State.FAILED:
failed_tasks.append(task_instance.task_id)

# DAG 상태 및 실패한 작업 목록을 XCom에 푸시
context['ti'].xcom_push(key='dag_state', value=dag_run.get_state())
context['ti'].xcom_push(key='failed_tasks', value=failed_tasks)
96 changes: 96 additions & 0 deletions _airflow/airflow-home/dags/mail.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from airflow.models import DAG, DagRun
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator
from airflow.utils.dates import days_ago
from airflow.utils.state import State

# 실패한 태스크 수집 함수
def collect_failed_tasks(**context):
from airflow.utils.db import provide_session

@provide_session
def _inner(session=None):
# conf에서 전달받은 dag_id와 dag_run_id
source_dag_id = context['dag_run'].conf.get('source_dag_id')
source_dag_run_id = context['dag_run'].conf.get('source_dag_run_id')

if not source_dag_id or not source_dag_run_id:
raise ValueError("source_dag_id와 source_dag_run_id가 전달되지 않았습니다.")

# source_dag_id와 source_dag_run_id를 이용해 DagRun 정보 가져오기
source_dag_run = session.query(DagRun).filter_by(
dag_id=source_dag_id,
run_id=source_dag_run_id
).first()

if not source_dag_run:
raise ValueError("해당하는 DAG Run을 찾을 수 없습니다.")

# 실패한 태스크 ID 목록 수집
failed_tasks = []
for task_instance in source_dag_run.get_task_instances():
if task_instance.state != State.SUCCESS:
failed_tasks.append(task_instance.task_id)

# DAG 상태를 실패한 태스크가 1개 이상인 경우 "failed"로 설정
dag_state = "failed" if failed_tasks else "success"

# 결과 반환
return {
"dag_id": source_dag_id,
"dag_run_id": source_dag_run_id,
"dag_state": dag_state,
"failed_tasks": failed_tasks
}

return _inner()

# DAG 정의
with DAG(
dag_id="monitor_dag",
default_args={'start_date': days_ago(1)},
schedule_interval=None,
) as dag:

# 실패한 태스크 수집 태스크
collect_task = PythonOperator(
task_id='collect_failed_tasks',
python_callable=collect_failed_tasks,
provide_context=True,
)

# EmailOperator 설정
email_task = EmailOperator(
task_id='send_email',
to='Your [email protected]',
subject='DAG 상태 보고서',
html_content="""<h3>Workflow Execution Complete</h3>
<p><strong>Workflow ID:</strong> {{ ti.xcom_pull(task_ids='collect_failed_tasks').get('dag_id') }}</p>
<p><strong>Workflow Run ID:</strong> {{ ti.xcom_pull(task_ids='collect_failed_tasks').get('dag_run_id') }}</p>
<p><strong>Workflow 상태:</strong> {{ ti.xcom_pull(task_ids='collect_failed_tasks').get('dag_state') }}</p>
{% if ti.xcom_pull(task_ids='collect_failed_tasks').get('failed_tasks') | length == 0 %}
<p>모든 Tasks가 성공적으로 완료되었습니다.</p>
{% else %}
<p><strong>실패한 Tasks:</strong> {{ ti.xcom_pull(task_ids='collect_failed_tasks').get('failed_tasks') }}</p>
{% endif %}
"""
#params={}, # Initial empty params
)

# collect_task의 반환 값을 email_task에 전달하기 위한 PythonOperator 후크
def set_email_params(ti, **context):
# Pull the result from the previous task
task_result = ti.xcom_pull(task_ids='collect_failed_tasks')
if task_result:
# Update the email_task.params directly
email_task.params.update(task_result)
print( "params : " , email_task.params)

update_email_params = PythonOperator(
task_id="update_email_params",
python_callable=set_email_params,
provide_context=True,
)

# 의존성 설정
collect_task >> update_email_params >> email_task
6 changes: 6 additions & 0 deletions _airflow/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ services:
container_name: airflow-server
image: cloudbaristaorg/airflow-server:edge
restart: always
environment:
AIRFLOW__SMTP__SMTP_HOST: 'smtp.gmail.com'
AIRFLOW__SMTP__SMTP_USER: '[email protected]'
AIRFLOW__SMTP__SMTP_PASSWORD: 'wtknvaprkkwyaurd'
AIRFLOW__SMTP__SMTP_PORT: 587
AIRFLOW__SMTP__SMTP_MAIL_FROM: '[email protected]'
command: >
/bin/bash -c "
# Wait for MySQL
Expand Down
15 changes: 8 additions & 7 deletions cmd/cm-cicada/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
package main

import (
"log"
"os"
"os/signal"
"strings"
"sync"
"syscall"

"github.com/cloud-barista/cm-cicada/common"
"github.com/cloud-barista/cm-cicada/db"
"github.com/cloud-barista/cm-cicada/lib/airflow"
Expand All @@ -9,12 +16,6 @@ import (
"github.com/cloud-barista/cm-cicada/pkg/api/rest/server"
"github.com/jollaman999/utils/logger"
"github.com/jollaman999/utils/syscheck"
"log"
"os"
"os/signal"
"strings"
"sync"
"syscall"
)

func init() {
Expand Down Expand Up @@ -74,4 +75,4 @@ func main() {
end()
os.Exit(0)
}()
}
}
64 changes: 62 additions & 2 deletions dao/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package dao

import (
"errors"
"fmt"
"time"

"github.com/cloud-barista/cm-cicada/db"
"github.com/cloud-barista/cm-cicada/pkg/api/rest/model"
"gorm.io/gorm"
"time"
)

func WorkflowCreate(workflow *model.Workflow) (*model.Workflow, error) {
Expand All @@ -14,7 +16,8 @@ func WorkflowCreate(workflow *model.Workflow) (*model.Workflow, error) {
workflow.CreatedAt = now
workflow.UpdatedAt = now

result := db.DB.Session(&gorm.Session{SkipHooks: true}).Create(workflow)
result := db.DB.Create(workflow)
// result := db.DB.Session(&gorm.Session{SkipHooks: true}).Create(workflow)
err := result.Error
if err != nil {
return nil, err
Expand Down Expand Up @@ -120,3 +123,60 @@ func WorkflowDelete(workflow *model.Workflow) error {

return nil
}

func WorkflowVersionGetList(workflowVersion *model.WorkflowVersion, page int, row int) (*[]model.WorkflowVersion, error) {
WorkflowVersionList := &[]model.WorkflowVersion{}
// Ensure db.DB is not nil to avoid runtime panics
if db.DB == nil {
return nil, errors.New("database connection is not initialized")
}

result := db.DB.Scopes(func(d *gorm.DB) *gorm.DB {
var filtered = d

if len(workflowVersion.WorkflowID) != 0 {
filtered = filtered.Where("workflowId LIKE ?", "%"+workflowVersion.WorkflowID+"%")
}

if page != 0 && row != 0 {
offset := (page - 1) * row

return filtered.Offset(offset).Limit(row)
} else if row != 0 && page == 0 {
filtered.Error = errors.New("row is not 0 but page is 0")
return filtered
} else if page != 0 && row == 0 {
filtered.Error = errors.New("page is not 0 but row is 0")
return filtered
}
return filtered
}).Find(WorkflowVersionList)

err := result.Error
if err != nil {
return nil, err
}

return WorkflowVersionList, nil
}

func WorkflowVersionGet(id string, wkId string) (*model.WorkflowVersion, error) {
workflowVersion := &model.WorkflowVersion{}

// Ensure db.DB is not nil to avoid runtime panics
if db.DB == nil {
return nil, errors.New("database connection is not initialized")
}

result := db.DB.Where("id = ? and workflowId = ?", id, wkId).First(workflowVersion)
fmt.Println("result : ", result)
err := result.Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, errors.New("workflow not found with the provided id")
}
return nil, err
}

return workflowVersion, nil
}
8 changes: 7 additions & 1 deletion db/sqlite.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package db

import (
"strconv"

"github.com/cloud-barista/cm-cicada/common"
"github.com/cloud-barista/cm-cicada/lib/config"
"github.com/cloud-barista/cm-cicada/pkg/api/rest/model"
"github.com/glebarez/sqlite"
"github.com/jollaman999/utils/logger"
"gorm.io/gorm"
"strconv"
)

var DB *gorm.DB
Expand Down Expand Up @@ -40,6 +41,11 @@ func Open() error {
logger.Panicln(logger.ERROR, true, err)
}

err = DB.AutoMigrate(&model.WorkflowVersion{})
if err != nil {
logger.Panicln(logger.ERROR, true, err)
}

err = DB.AutoMigrate(&model.Workflow{})
if err != nil {
logger.Panicln(logger.ERROR, true, err)
Expand Down
55 changes: 33 additions & 22 deletions db/taskComponent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/jollaman999/utils/logger"
"gorm.io/gorm"
"io"
"net/http"
"os"
Expand All @@ -15,6 +13,9 @@ import (
"strings"
"time"

"github.com/jollaman999/utils/logger"
"gorm.io/gorm"

"github.com/cloud-barista/cm-cicada/lib/config"
"github.com/cloud-barista/cm-cicada/pkg/api/rest/model"
"github.com/google/uuid"
Expand All @@ -27,6 +28,7 @@ type ConfigFile struct {
APIConnectionID string `json:"api_connection_id"`
SwaggerYAMLEndpoint string `json:"swagger_yaml_endpoint"`
Endpoint string `json:"endpoint"`
Extra map[string]interface{} `json:"extra"`
}

type SwaggerSpec struct {
Expand Down Expand Up @@ -161,29 +163,39 @@ func TaskComponentInit() error {
if err := json.Unmarshal(configData, &configFile); err != nil {
return fmt.Errorf("failed to parse config file %s: %v", file, err)
}
var taskComponent *model.TaskComponent
if configFile.Extra != nil {
taskComponent = &model.TaskComponent{}
taskComponent.Data.Options.Extra = configFile.Extra

var connectionFound bool
var connection model.Connection
for _, connection = range config.CMCicadaConfig.CMCicada.AirflowServer.Connections {
if connection.ID == configFile.APIConnectionID {
connectionFound = true
break
} else {
var connectionFound bool
var connection model.Connection
for _, connection = range config.CMCicadaConfig.CMCicada.AirflowServer.Connections {
if connection.ID == configFile.APIConnectionID {
connectionFound = true
break
}
}
if !connectionFound {
logger.Println(logger.WARN, true, fmt.Sprintf("failed to find connection with ID %s", configFile.APIConnectionID))
continue
// return fmt.Errorf("failed to find connection with ID %s", configFile.APIConnectionID)
}
}
if !connectionFound {
return fmt.Errorf("failed to find connection with ID %s", configFile.APIConnectionID)
}

spec, err := fetchAndParseYAML(connection, configFile.SwaggerYAMLEndpoint)
if err != nil {
logger.Println(logger.WARN, true, fmt.Sprintf("failed to fetch and parse swagger spec: %v", err))
continue
}
spec, err := fetchAndParseYAML(connection, configFile.SwaggerYAMLEndpoint)
if err != nil {
logger.Println(logger.WARN, true, fmt.Sprintf("failed to fetch and parse swagger spec: %v", err))
continue
}

endpoint := strings.TrimPrefix(configFile.Endpoint, spec.BasePath)
taskComponent, err := processEndpoint(connection.ID, spec, endpoint)
if err != nil {
return fmt.Errorf("failed to process endpoint: %v", err)
endpoint := strings.TrimPrefix(configFile.Endpoint, spec.BasePath)
taskComponent, err = processEndpoint(connection.ID, spec, endpoint )
if err != nil {
logger.Println(logger.WARN, true, fmt.Sprintf("failed to process endpoint: %v", err))
continue
// return fmt.Errorf("failed to process endpoint: %v", err)
}
}
taskComponent.Name = configFile.Name
taskComponent.Description = configFile.Description
Expand Down Expand Up @@ -404,7 +416,6 @@ func generateRequestBodyExample(schema SchemaModel) string {

func processEndpoint(connectionID string, spec *SwaggerSpec, targetEndpoint string) (*model.TaskComponent, error) {
targetEndpoint = normalizePath(targetEndpoint)

for path, pathItem := range spec.Paths {
if normalizePath(path) == targetEndpoint {
for method, operation := range pathItem {
Expand Down
Loading