Skip to content

Commit

Permalink
Merge pull request #10 from chnacib/feat/logs
Browse files Browse the repository at this point in the history
feat: start query for log insight
  • Loading branch information
chnacib authored Apr 29, 2024
2 parents 14a2429 + 23e3812 commit 2cf4809
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 51 deletions.
109 changes: 68 additions & 41 deletions pkg/ecs/servicelogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package ecs
import (
"fmt"
"os"
"sort"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
Expand All @@ -13,6 +15,9 @@ import (

func ServiceLogs() *cobra.Command {
var cluster string
var limit int64
var GetQueryResultOutput *cloudwatchlogs.GetQueryResultsOutput
var logs []Message

cmd := &cobra.Command{
Use: "service",
Expand All @@ -26,68 +31,90 @@ func ServiceLogs() *cobra.Command {
client := ecs.New(sess)
client_cw := cloudwatchlogs.New(sess)

input_list_tasks := &ecs.ListTasksInput{
ServiceName: aws.String(service),
Cluster: aws.String(cluster),
InputDescribeService := &ecs.DescribeServicesInput{
Services: []*string{
aws.String(service),
},
Cluster: aws.String(cluster),
}

list_tasks, err := client.ListTasks(input_list_tasks)
OutputDescribeService, err := client.DescribeServices(InputDescribeService)
if err != nil {
fmt.Println(err)
os.Exit(0)
}

for _, tasks := range list_tasks.TaskArns {
task := NameArn(*tasks)
fmt.Println(task)
input := &ecs.DescribeTasksInput{
Tasks: []*string{
aws.String(*tasks),
},
Cluster: aws.String(cluster),
}
task_def := aws.String(*OutputDescribeService.Services[0].TaskDefinition)

response, err := client.DescribeTasks(input)
if err != nil {
fmt.Println(err)
os.Exit(0)
}
container := aws.StringValue(response.Tasks[0].Containers[0].Name)
InputDescribeTaskDefinition := &ecs.DescribeTaskDefinitionInput{
TaskDefinition: aws.String(*task_def),
}

task_def := aws.StringValue(response.Tasks[0].TaskDefinitionArn)
OutputDescribeTaskDefinition, err := client.DescribeTaskDefinition(InputDescribeTaskDefinition)
if err != nil {
fmt.Println(err)
os.Exit(0)
}

task_input := &ecs.DescribeTaskDefinitionInput{
TaskDefinition: aws.String(task_def),
}
result, err_task := client.DescribeTaskDefinition(task_input)
if err != nil {
fmt.Println(err_task)
os.Exit(0)
}
log_group := aws.StringValue(result.TaskDefinition.ContainerDefinitions[0].LogConfiguration.Options["awslogs-group"])
log_prefix := aws.StringValue(result.TaskDefinition.ContainerDefinitions[0].LogConfiguration.Options["awslogs-stream-prefix"])
log_stream := fmt.Sprintf("%s/%s/%s", log_prefix, container, task)

log_input := &cloudwatchlogs.GetLogEventsInput{
LogGroupName: aws.String(log_group),
LogStreamName: aws.String(log_stream),
Limit: aws.Int64(100),
}
output, err := client_cw.GetLogEvents(log_input)
log_group := aws.String(*OutputDescribeTaskDefinition.TaskDefinition.ContainerDefinitions[0].LogConfiguration.Options["awslogs-group"])

query_string := fmt.Sprintf("fields @timestamp, @message | sort @timestamp desc | limit %d", limit)

StartQueryInput := &cloudwatchlogs.StartQueryInput{
QueryString: aws.String(query_string),
LogGroupName: aws.String(*log_group),
EndTime: aws.Int64(time.Now().UnixNano() / int64(time.Millisecond)),
StartTime: aws.Int64(5),
}
StartQueryOutput, err := client_cw.StartQuery(StartQueryInput)
if err != nil {
fmt.Println(err)
os.Exit(0)
}
query_id := aws.String(*StartQueryOutput.QueryId)

GetQueryResultInput := &cloudwatchlogs.GetQueryResultsInput{
QueryId: aws.String(*query_id),
}

for {
GetQueryResultOutput, err = client_cw.GetQueryResults(GetQueryResultInput)
if err != nil {
fmt.Println(err)
os.Exit(0)
}
for _, outputs := range output.Events {
message := *outputs.Message
fmt.Println(message)
status := aws.String(*GetQueryResultOutput.Status)
if *status == "Complete" {
break
}
time.Sleep(1 * time.Second)
}

for _, result := range GetQueryResultOutput.Results {
timestamp := aws.String(*result[0].Value)
message := aws.String(*result[1].Value)
logs = append(logs, Message{
Timestamp: *timestamp,
Message: *message,
})

}

sort.SliceStable(logs, func(i, j int) bool {
t1, _ := time.Parse(time.RFC3339Nano, logs[i].Timestamp)
t2, _ := time.Parse(time.RFC3339Nano, logs[j].Timestamp)
return t1.After(t2)
})

for i := len(logs) - 1; i >= 0; i-- {
fmt.Printf("%s %s\n", logs[i].Timestamp, logs[i].Message)
}

},
}

cmd.Flags().StringVarP(&cluster, "cluster", "c", "string", "ECS Cluster name")
cmd.Flags().Int64VarP(&limit, "limit", "l", 200, "Logs max result")
cmd.MarkFlagRequired("cluster")

return cmd
Expand Down
68 changes: 58 additions & 10 deletions pkg/ecs/tasklogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package ecs
import (
"fmt"
"os"
"sort"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
Expand All @@ -11,9 +13,16 @@ import (
"github.com/spf13/cobra"
)

type Message struct {
Timestamp string
Message string
}

func TaskLogs() *cobra.Command {
var cluster string
var limit int64
var GetQueryResultOutput *cloudwatchlogs.GetQueryResultsOutput
var logs []Message

cmd := &cobra.Command{
Use: "task",
Expand All @@ -39,7 +48,9 @@ func TaskLogs() *cobra.Command {
os.Exit(0)
}
container := aws.StringValue(response.Tasks[0].Containers[0].Name)

if container == "xray-daemon" {
container = aws.StringValue(response.Tasks[0].Containers[1].Name)
}
task_def := aws.StringValue(response.Tasks[0].TaskDefinitionArn)

task_input := &ecs.DescribeTaskDefinitionInput{
Expand All @@ -54,25 +65,62 @@ func TaskLogs() *cobra.Command {
log_prefix := aws.StringValue(result.TaskDefinition.ContainerDefinitions[0].LogConfiguration.Options["awslogs-stream-prefix"])
log_stream := fmt.Sprintf("%s/%s/%s", log_prefix, container, task)

log_input := &cloudwatchlogs.GetLogEventsInput{
LogGroupName: aws.String(log_group),
LogStreamName: aws.String(log_stream),
Limit: aws.Int64(limit),
query_string := fmt.Sprintf("filter @logStream = '%s' | fields @timestamp, @message | sort @timestamp desc | limit %d", log_stream, limit)

StartQueryInput := &cloudwatchlogs.StartQueryInput{
QueryString: aws.String(query_string),
LogGroupName: aws.String(log_group),
EndTime: aws.Int64(time.Now().UnixNano() / int64(time.Millisecond)),
StartTime: aws.Int64(5),
}
output, err := client_cw.GetLogEvents(log_input)
StartQueryOutput, err := client_cw.StartQuery(StartQueryInput)
if err != nil {
fmt.Println(err)
os.Exit(0)
}
for _, outputs := range output.Events {
message := *outputs.Message
fmt.Println(message)
query_id := aws.String(*StartQueryOutput.QueryId)

GetQueryResultInput := &cloudwatchlogs.GetQueryResultsInput{
QueryId: aws.String(*query_id),
}

for {
GetQueryResultOutput, err = client_cw.GetQueryResults(GetQueryResultInput)
if err != nil {
fmt.Println(err)
os.Exit(0)
}
status := aws.String(*GetQueryResultOutput.Status)
if *status == "Complete" {
break
}
time.Sleep(1 * time.Second)
}

for _, result := range GetQueryResultOutput.Results {
timestamp := aws.String(*result[0].Value)
message := aws.String(*result[1].Value)
logs = append(logs, Message{
Timestamp: *timestamp,
Message: *message,
})

}

sort.SliceStable(logs, func(i, j int) bool {
t1, _ := time.Parse(time.RFC3339Nano, logs[i].Timestamp)
t2, _ := time.Parse(time.RFC3339Nano, logs[j].Timestamp)
return t1.After(t2)
})

for i := len(logs) - 1; i >= 0; i-- {
fmt.Printf("%s %s\n", logs[i].Timestamp, logs[i].Message)
}

},
}

cmd.Flags().Int64VarP(&limit, "limit", "l", 100, "Logs max result")
cmd.Flags().Int64VarP(&limit, "limit", "l", 200, "Logs max result")
cmd.Flags().StringVarP(&cluster, "cluster", "c", "string", "ECS Cluster name")
cmd.MarkFlagRequired("cluster")

Expand Down

0 comments on commit 2cf4809

Please sign in to comment.