Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Flink logging receiver #632

Merged
merged 17 commits into from
Jun 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions apps/flink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package apps

import (
"github.com/GoogleCloudPlatform/ops-agent/confgenerator"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/fluentbit"
)

type LoggingProcessorFlink struct {
confgenerator.ConfigComponent `yaml:",inline"`
}

func (LoggingProcessorFlink) Type() string {
return "flink"
}

func (p LoggingProcessorFlink) Components(tag string, uid string) []fluentbit.Component {
c := confgenerator.LoggingProcessorParseMultilineRegex{
LoggingProcessorParseRegexComplex: confgenerator.LoggingProcessorParseRegexComplex{
Parsers: []confgenerator.RegexParser{
{
// Standalone session example
// Sample line: 2022-04-22 11:51:35,718 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Close ResourceManager connection 668abb5d496646a153262b5896fd935d: Stopping JobMaster for job 'Streaming WordCount' (2538c8dff66c8cf6ec58ad32b149e23f).

// Taskexecutor example
// 2022-04-23 16:13:05,459 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address akka.tcp://flink@localhost:6123/user/rpc/resourcemanager_*, retrying in 10000 ms: Could not connect to rpc endpoint under address akka.tcp://flink@localhost:6123/user/rpc/resourcemanager_*.

// Client example
// Sample line: 2022-04-22 11:51:32,901 INFO org.apache.flink.client.program.rest.RestClusterClient [] - Submitting job 'Streaming WordCount' (2538c8dff66c8cf6ec58ad32b149e23f).

Regex: `^(?<time>\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2},\d+)\s+(?<level>[A-Z]+)\s+(?<source>[^ ]*)(?<message>[\s\S]*)`,
Parser: confgenerator.ParserShared{
TimeKey: "time",
TimeFormat: "%Y-%m-%d %H:%M:%S,%L",
},
},
},
},
Rules: []confgenerator.MultilineRule{
{
StateName: "start_state",
NextState: "cont",
Regex: `\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2},\d+`,
},
{
StateName: "cont",
NextState: "cont",
Regex: `^(?!\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2},\d+)`,
},
},
}.Components(tag, uid)

// Log levels are just log4j log levels
// https://logging.apache.org/log4j/2.x/log4j-api/apidocs/org/apache/logging/log4j/Level.html
c = append(c,
confgenerator.LoggingProcessorModifyFields{
Fields: map[string]*confgenerator.ModifyField{
"severity": {
CopyFrom: "jsonPayload.level",
MapValues: map[string]string{
"TRACE": "TRACE",
"DEBUG": "DEBUG",
"INFO": "INFO",
"ERROR": "ERROR",
"WARN": "WARNING",
"FATAL": "CRITICAL",
},
MapValuesExclusive: true,
},
InstrumentationSourceLabel: instrumentationSourceValue(p.Type()),
},
}.Components(tag, uid)...,
)

return c
}

type LoggingReceiverFlink struct {
LoggingProcessorFlink `yaml:",inline"`
confgenerator.LoggingReceiverFilesMixin `yaml:",inline" validate:"structonly"`
}

func (r LoggingReceiverFlink) Components(tag string) []fluentbit.Component {
if len(r.IncludePaths) == 0 {
r.IncludePaths = []string{
"/opt/flink/log/flink-*-standalonesession-*.log",
"/opt/flink/log/flink-*-taskexecutor-*.log",
"/opt/flink/log/flink-*-client-*.log",
}
}
c := r.LoggingReceiverFilesMixin.Components(tag)
c = append(c, r.LoggingProcessorFlink.Components(tag, "flink")...)
return c
}

func init() {
confgenerator.LoggingProcessorTypes.RegisterType(func() confgenerator.Component { return &LoggingProcessorFlink{} })
confgenerator.LoggingReceiverTypes.RegisterType(func() confgenerator.Component { return &LoggingReceiverFlink{} })
}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
logging processor with type "unsupported_type" is not supported. Supported logging processor types: [apache_access, apache_error, cassandra_debug, cassandra_gc, cassandra_system, couchdb, exclude_logs, hbase_system, jetty_access, kafka, modify_fields, mysql_error, mysql_general, mysql_slow, nginx_access, nginx_error, parse_json, parse_multiline, parse_regex, postgresql_general, redis, solr_system, tomcat_access, tomcat_system, varnish, wildfly_system].
logging processor with type "unsupported_type" is not supported. Supported logging processor types: [apache_access, apache_error, cassandra_debug, cassandra_gc, cassandra_system, couchdb, exclude_logs, flink, hbase_system, jetty_access, kafka, modify_fields, mysql_error, mysql_general, mysql_slow, nginx_access, nginx_error, parse_json, parse_multiline, parse_regex, postgresql_general, redis, solr_system, tomcat_access, tomcat_system, varnish, wildfly_system].
Original file line number Diff line number Diff line change
@@ -1 +1 @@
logging receiver with type "active_directory_ds" is not supported. Supported logging receiver types: [apache_access, apache_error, cassandra_debug, cassandra_gc, cassandra_system, couchdb, elasticsearch_gc, elasticsearch_json, files, fluent_forward, hadoop, hbase_system, jetty_access, kafka, mongodb, mysql_error, mysql_general, mysql_slow, nginx_access, nginx_error, postgresql_general, rabbitmq, redis, solr_system, syslog, systemd_journald, tcp, tomcat_access, tomcat_system, varnish, wildfly_system, zookeeper_general].
logging receiver with type "active_directory_ds" is not supported. Supported logging receiver types: [apache_access, apache_error, cassandra_debug, cassandra_gc, cassandra_system, couchdb, elasticsearch_gc, elasticsearch_json, files, flink, fluent_forward, hadoop, hbase_system, jetty_access, kafka, mongodb, mysql_error, mysql_general, mysql_slow, nginx_access, nginx_error, postgresql_general, rabbitmq, redis, solr_system, syslog, systemd_journald, tcp, tomcat_access, tomcat_system, varnish, wildfly_system, zookeeper_general].
Original file line number Diff line number Diff line change
@@ -1 +1 @@
logging receiver with type "windows_event_log" is not supported. Supported logging receiver types: [apache_access, apache_error, cassandra_debug, cassandra_gc, cassandra_system, couchdb, elasticsearch_gc, elasticsearch_json, files, fluent_forward, hadoop, hbase_system, jetty_access, kafka, mongodb, mysql_error, mysql_general, mysql_slow, nginx_access, nginx_error, postgresql_general, rabbitmq, redis, solr_system, syslog, systemd_journald, tcp, tomcat_access, tomcat_system, varnish, wildfly_system, zookeeper_general].
logging receiver with type "windows_event_log" is not supported. Supported logging receiver types: [apache_access, apache_error, cassandra_debug, cassandra_gc, cassandra_system, couchdb, elasticsearch_gc, elasticsearch_json, files, flink, fluent_forward, hadoop, hbase_system, jetty_access, kafka, mongodb, mysql_error, mysql_general, mysql_slow, nginx_access, nginx_error, postgresql_general, rabbitmq, redis, solr_system, syslog, systemd_journald, tcp, tomcat_access, tomcat_system, varnish, wildfly_system, zookeeper_general].
Original file line number Diff line number Diff line change
@@ -1 +1 @@
logging receiver with type "unsupported_type" is not supported. Supported logging receiver types: [apache_access, apache_error, cassandra_debug, cassandra_gc, cassandra_system, couchdb, elasticsearch_gc, elasticsearch_json, files, fluent_forward, hadoop, hbase_system, jetty_access, kafka, mongodb, mysql_error, mysql_general, mysql_slow, nginx_access, nginx_error, postgresql_general, rabbitmq, redis, solr_system, syslog, systemd_journald, tcp, tomcat_access, tomcat_system, varnish, wildfly_system, zookeeper_general].
logging receiver with type "unsupported_type" is not supported. Supported logging receiver types: [apache_access, apache_error, cassandra_debug, cassandra_gc, cassandra_system, couchdb, elasticsearch_gc, elasticsearch_json, files, flink, fluent_forward, hadoop, hbase_system, jetty_access, kafka, mongodb, mysql_error, mysql_general, mysql_slow, nginx_access, nginx_error, postgresql_general, rabbitmq, redis, solr_system, syslog, systemd_journald, tcp, tomcat_access, tomcat_system, varnish, wildfly_system, zookeeper_general].
Original file line number Diff line number Diff line change
@@ -1 +1 @@
logging receiver with type "systemd" is not supported. Supported logging receiver types: [active_directory_ds, apache_access, apache_error, cassandra_debug, cassandra_gc, cassandra_system, couchdb, elasticsearch_gc, elasticsearch_json, files, fluent_forward, hadoop, hbase_system, iis_access, jetty_access, kafka, mongodb, mysql_error, mysql_general, mysql_slow, nginx_access, nginx_error, postgresql_general, rabbitmq, redis, solr_system, syslog, tcp, tomcat_access, tomcat_system, varnish, wildfly_system, windows_event_log, zookeeper_general].
logging receiver with type "systemd" is not supported. Supported logging receiver types: [active_directory_ds, apache_access, apache_error, cassandra_debug, cassandra_gc, cassandra_system, couchdb, elasticsearch_gc, elasticsearch_json, files, flink, fluent_forward, hadoop, hbase_system, iis_access, jetty_access, kafka, mongodb, mysql_error, mysql_general, mysql_slow, nginx_access, nginx_error, postgresql_general, rabbitmq, redis, solr_system, syslog, tcp, tomcat_access, tomcat_system, varnish, wildfly_system, windows_event_log, zookeeper_general].
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@

function process(tag, timestamp, record)
local __field_0 = (function()
if record["logging.googleapis.com/labels"] == nil
then
return nil
end
return record["logging.googleapis.com/labels"]["compute.googleapis.com/resource_name"]
end)();
local __field_1 = (function()
return record["logging.googleapis.com/logName"]
end)();
local v = __field_0;
if v == nil then v = "" end;
(function(value)
if record["logging.googleapis.com/labels"] == nil
then
record["logging.googleapis.com/labels"] = {}
end
record["logging.googleapis.com/labels"]["compute.googleapis.com/resource_name"] = value
end)(v)
local v = __field_1;
if v == nil then v = "flink" end;
(function(value)
record["logging.googleapis.com/logName"] = value
end)(v)
return 2, timestamp, record
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@

function shallow_merge(record, parsedRecord)
-- If no exiting record exists
if (record == nil) then
return parsedRecord
end

for k, v in pairs(parsedRecord) do
record[k] = v
end

return record
end

function merge(record, parsedRecord)
-- If no exiting record exists
if record == nil then
return parsedRecord
end

-- Potentially overwrite or merge the original records.
for k, v in pairs(parsedRecord) do
-- If there is no conflict
if k == "logging.googleapis.com/logName" then
-- Ignore the parsed payload since the logName is controlled
-- by the OpsAgent.
elseif k == "logging.googleapis.com/labels" then
-- LogEntry.labels are basically a map[string]string and so only require a
-- shallow merge (one level deep merge).
record[k] = shallow_merge(record[k], v)
else
record[k] = v
end
end

return record
end

function parser_merge_record(tag, timestamp, record)
originalPayload = record["logging.googleapis.com/__tmp"]
if originalPayload == nil then
return 0, timestamp, record
end

-- Remove original payload
record["logging.googleapis.com/__tmp"] = nil
record = merge(originalPayload, record)
return 2, timestamp, record
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@

function process(tag, timestamp, record)
local __field_0 = (function()
if record["logging.googleapis.com/labels"] == nil
then
return nil
end
return record["logging.googleapis.com/labels"]["compute.googleapis.com/resource_name"]
end)();
local __field_1 = (function()
return record["logging.googleapis.com/logName"]
end)();
local v = __field_0;
if v == nil then v = "" end;
(function(value)
if record["logging.googleapis.com/labels"] == nil
then
record["logging.googleapis.com/labels"] = {}
end
record["logging.googleapis.com/labels"]["compute.googleapis.com/resource_name"] = value
end)(v)
local v = __field_1;
if v == nil then v = "syslog" end;
(function(value)
record["logging.googleapis.com/logName"] = value
end)(v)
return 2, timestamp, record
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@

function parser_nest(tag, timestamp, record)
local nestedRecord = {}
local parseKey = "message"
for k, v in pairs(record) do
if k ~= parseKey then
nestedRecord[k] = v
end
end

local result = {}
result[parseKey] = record[parseKey]
result["logging.googleapis.com/__tmp"] = nestedRecord

return 2, timestamp, result
end

Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@

function process(tag, timestamp, record)
local __field_1 = (function()
return record["level"]
end)();
local v = "agent.googleapis.com/flink";
(function(value)
if record["logging.googleapis.com/labels"] == nil
then
record["logging.googleapis.com/labels"] = {}
end
record["logging.googleapis.com/labels"]["logging.googleapis.com/instrumentation_source"] = value
end)(v)
local v = __field_1;
if v == "DEBUG" then v = "DEBUG"
elseif v == "ERROR" then v = "ERROR"
elseif v == "FATAL" then v = "CRITICAL"
elseif v == "INFO" then v = "INFO"
elseif v == "TRACE" then v = "TRACE"
elseif v == "WARN" then v = "WARNING"
else v = nil
end
(function(value)
record["logging.googleapis.com/severity"] = value
end)(v)
return 2, timestamp, record
end
Loading