Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Further flink kafka optimizations #408

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions FlinkSqlGateway/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ ADD .eslintrc.json .eslintrc.json
WORKDIR /
RUN npm install && npm run lint && npm run test

FROM flink:1.16.1
FROM flink:1.16.2

RUN curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.39.1/install.sh | bash
ENV NVM_DIR="$HOME/.nvm"
Expand All @@ -28,7 +28,8 @@ COPY --from=nodelint gateway.js gateway.js
ADD package.json package.json
ADD lib/ lib
COPY --from=plint submitjob submitjob
RUN mkdir jars && cd jars && wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.16.1/flink-sql-connector-kafka-1.16.1.jar
RUN mkdir jars && cd jars && wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.16.2/flink-sql-connector-kafka-1.16.2.jar
RUN mkdir -p jars && cd jars && wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.16.1/flink-sql-connector-kafka-1.16.1.jar
RUN . $HOME/.nvm/nvm.sh && npm install --production
ADD sql-client-defaults.yaml /opt/flink/conf/

Expand All @@ -48,7 +49,7 @@ apt-get clean && \
rm -rf /var/lib/apt/lists/*

# install PyFlink
RUN pip3 install apache-flink==1.16.0
RUN pip3 install apache-flink==1.16.2


USER 9999
Expand Down
18 changes: 15 additions & 3 deletions FlinkSqlGateway/gateway.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const bodyParser = require('body-parser');

const logger = require('./lib/logger.js');
const port = process.env.SIMPLE_FLINK_SQL_GATEWAY_PORT || 9000;
const debug = process.env.DEBUG || 'false';
const flinksubmit = '/opt/flink/bin/flink run';
const runningAsMain = require.main === module;

Expand Down Expand Up @@ -102,16 +103,27 @@ function apppost (request, response) {

const command = createCommand(dirname);
submitJob(command, response).then(
() => { fs.rmSync(dirname, { recursive: true, force: true }); }
() => {
if (debug === 'false') {
logger.info('Removing ' + dirname + ': debug = ' + debug);
fs.rmSync(dirname, { recursive: true, force: true });
}
}
).catch(
(e) => {
logger.error(e.stack || e);
fs.rmSync(dirname, { recursive: true, force: true });
if (debug === 'false') {
logger.info('Removing ' + dirname + ': debug = ' + debug);
fs.rmSync(dirname, { recursive: true, force: true });
}
}
);
} catch (e) {
logger.error('Could not submit job: ' + e.stack || e);
fs.rmSync(dirname, { recursive: true, force: true });
if (debug === 'false') {
logger.info('Removing ' + dirname + ': debug = ' + debug);
fs.rmSync(dirname, { recursive: true, force: true });
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion FlinkSqlGateway/test/testGateway.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ const flinkVersion = '1.14.3';

const logger = {
debug: function () {},
error: function () {}
error: function () {},
info: function () {}
};

describe('Test health path', function () {
Expand Down
50 changes: 43 additions & 7 deletions helm/charts/flink/templates/flink-configuration-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ data:
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
rest.flamegraph.enabled: true

queryable-state.proxy.ports: 6125
jobmanager.memory.process.size: 1000m
Expand All @@ -33,13 +32,51 @@ data:
{{ else }}
s3.secret-key: {{ .Values.s3.userSecretKey }}
{{ end }}
state.backend: rocksdb
state.backend.rocksdb.localdir: /tmp/rocksdb
state.backend.rocksdb.log.level: INFO_LEVEL
state.backend.rocksdb.log.max-file-size: 10MB
state.backend.rocksdb.log.file-num: 10
state.backend.rocksdb.log.dir: /tmp/rocksdb/logs
state.backend.incremental: false
state.backend.rocksdb.writebuffer.size: 64 kb
state.backend.rocksdb.compaction.level.target-file-size-base: 64 kb
state.backend.rocksdb.use-bloom-filter: true
state.backend.rocksdb.predefined-options: SPINNING_DISK_OPTIMIZED

# RocksDB metrics and Flamegraph. Enable these for debugging troubleshooting
# rest.flamegraph.enabled: true
# state.backend.rocksdb.metrics.actual-delayed-write-rate: true
# state.backend.rocksdb.metrics.background-errors: true
# state.backend.rocksdb.metrics.block-cache-capacity: true
# state.backend.rocksdb.metrics.block-cache-hit: true
# state.backend.rocksdb.metrics.block-cache-miss: true
# state.backend.rocksdb.metrics.block-cache-pinned-usage: true
# state.backend.rocksdb.metrics.block-cache-usage: true
# state.backend.rocksdb.metrics.bytes-read: true
# state.backend.rocksdb.metrics.bytes-written: true
# state.backend.rocksdb.metrics.column-family-as-variable: true
# state.backend.rocksdb.metrics.compaction-pending: true
# state.backend.rocksdb.metrics.compaction-read-bytes: true
# state.backend.rocksdb.metrics.compaction-write-bytes: true
# state.backend.rocksdb.metrics.cur-size-active-mem-table: true
# state.backend.rocksdb.metrics.cur-size-all-mem-tables: true
# state.backend.rocksdb.metrics.estimate-live-data-size: true
# state.backend.rocksdb.metrics.estimate-num-keys: true
# state.backend.rocksdb.metrics.estimate-pending-compaction-bytes: true
# state.backend.rocksdb.metrics.estimate-table-readers-mem: true
# state.backend.rocksdb.metrics.is-write-stopped: true
# state.backend.rocksdb.metrics.iter-bytes-read: true
# state.backend.rocksdb.metrics.live-sst-files-size: true
# state.backend.rocksdb.metrics.mem-table-flush-pending: true
# state.backend.rocksdb.metrics.num-deletes-active-mem-table: true
# state.backend.rocksdb.metrics.num-deletes-imm-mem-tables: true
# state.backend.rocksdb.metrics.num-entries-active-mem-table: true
# state.backend.rocksdb.metrics.num-entries-imm-mem-tables: true
# state.backend.rocksdb.metrics.num-immutable-mem-table: true
# state.backend.rocksdb.metrics.num-live-versions: true
# state.backend.rocksdb.metrics.num-running-compactions: true
# state.backend.rocksdb.metrics.num-running-flushes: true
# state.backend.rocksdb.metrics.num-snapshots: true
# state.backend.rocksdb.metrics.size-all-mem-tables: true
# state.backend.rocksdb.metrics.stall-micros: true
# state.backend.rocksdb.metrics.total-sst-files-size: true

state.checkpoints.dir: s3://{{ .Values.flink.bucket }}/{{ .Values.flink.checkpointDir }}
state.savepoints.dir: s3://{{ .Values.flink.bucket }}/{{ .Values.flink.savepointDir }}
kubernetes.cluster-id: {{ .Values.flink.clusterId }}
Expand All @@ -48,7 +85,6 @@ data:
kubernetes.namespace: {{ .Release.Namespace }}
restart-strategy: exponential-delay
restart-strategy.exponential-delay.max-backoff: 2 min
execution.checkpointing.interval: {{ .Values.flink.checkpointInterval }}
process.working-dir: /tmp/process
log4j-console.properties: |+
# This affects logging for both user code and Flink
Expand Down
1 change: 1 addition & 0 deletions helm/charts/kafka-bridges/templates/bridge-topics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ spec:
replicas: 1
config:
retention.ms: {{.Values.kafkaBridge.debezium.listenTopicRetention}}
compression.type: {{.Values.kafkaBridge.debezium.listenTopicCompression}}
14 changes: 6 additions & 8 deletions helm/charts/kafka/templates/KafkaCluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@ spec:
type: internal
tls: true
resources:
requests:
memory: 800Mi
cpu: 500m
limits:
memory: 800Mi
cpu: 500m
{{ if .Values.kafka.resources }}
{{ .Values.kafka.resources | toYaml | indent 6 }}
{{ end }}
{{ if .Values.kafka.jvmOptions }}
jvmOptions:
"-Xmx": "400m"
"-Xms": "300m"
{{ .Values.kafka.jvmOptions | toYaml | indent 6 }}
{{ end }}
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
Expand Down
1 change: 1 addition & 0 deletions helm/charts/sql-core/templates/core-kafka-topics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ spec:
replicas: 1
config:
retention.ms: {{.Values.kafkaBridge.debezium.attributesTopicRetention}}
compression.type: {{.Values.kafkaBridge.debezium.attributesTopicCompression}}
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
Expand Down
54 changes: 21 additions & 33 deletions helm/charts/sql-core/templates/core-statementsets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,42 +10,30 @@ metadata:
{{ include (print $.Template.BasePath "/core-kafka-topics.yaml") . | sha256sum }}
spec:
sqlsettings:
- table.exec.state.ttl: '{{.Values.kafkaBridge.debezium.attributesTopicRetention}} ms'
- execution.savepoint.ignore-unclaimed-state: 'true'
- pipeline.object-reuse: 'true'
sqlstatements:
- |-
/* validates changes in alerts_bulk and only forward if there are specific changes
This is part of the core service */
insert into alerts
select resource, event, environment, service, severity, customer, `text` from
/* validates changes in alerts_bulk and only forward if there are specific changes*/
insert into alerts select resource, event, environment, service, severity, customer, `text` from
(
select resource, event,
last_value(environment) as environment,
environment,
service,
rand() as rnd,
last_value(A.customer) as customer,
last_value(`text`) as `text`,
LAG(A.severity) as last_severity,
last_value(A.severity) as severity,
LAG(A.customer) as last_customer,
LAG(A.environment) as last_environment
customer,
last_value(`text`) AS `text`,
last_value(severity) as severity
from (
select window_time as ts, customer, environment, service, resource, event, window_start, window_end, last_value(severity) as severity, last_value(`text`) as `text`
from TABLE(
TUMBLE(TABLE alerts_filter, DESCRIPTOR(`ts`), INTERVAL {{.Values.flink.alertWindow|squote}} SECOND))
GROUP BY window_time, window_start, window_end, resource, event, service, customer, environment
) as A
GROUP BY A.resource, A.event, A.service, A.customer, A.environment
WINDOW w as (
PARTITION BY resource, event
ORDER BY A.ts ASC
ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)
)
WHERE last_severity <> severity OR
last_severity IS NULL OR
last_environment <> environment OR
last_customer <> customer;
select `resource`, `event`,
`environment`,
service,
last_value(severity) as `severity`,
`customer`,
last_value(`text`) as `text`
from alerts_bulk
group by `resource`, `event`, service, customer, environment, TUMBLE(ts, INTERVAL {{.Values.flink.alertWindow | squote}} SECOND)
) as A
GROUP BY A.resource, A.event, A.service, A.customer, A.environment
);
- |-
/* aggregates entities from attributes topic and insert it every 2 seconds in update channel
part of CORE SERVICES */
Expand Down Expand Up @@ -81,14 +69,14 @@ spec:
(select id, last_value(entityId) as entityId, last_value(name) as name, last_value(nodeType) as nodeType, last_value(valueType) as valueType , `index`, last_value(`type`) as `type`,
last_value(`https://uri.etsi.org/ngsi-ld/hasValue`) as `value`,
last_value(`https://uri.etsi.org/ngsi-ld/hasObject`) as `object`,
TUMBLE_START(ts, INTERVAL '0.001' SECOND),
TUMBLE_END(ts, INTERVAL '0.001' SECOND)
TUMBLE_START(ts, INTERVAL {{ .Values.flink.attributeInsertWindow |squote }} SECOND),
TUMBLE_END(ts, INTERVAL {{ .Values.flink.attributeInsertWindow |squote }} SECOND)
from attributes_insert
group by id, `index`, TUMBLE(ts, INTERVAL '0.001' SECOND));
group by id, `index`, TUMBLE(ts, INTERVAL {{ .Values.flink.attributeInsertWindow| squote }} SECOND));

tables:
- alerts
- alerts-filter
- alerts-bulk
- attributes
- attributes-writeback
- attributes-insert
Expand Down
31 changes: 2 additions & 29 deletions helm/charts/sql-core/templates/core-tables.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ spec:
- 'severity': STRING
- 'customer': STRING
- 'text': STRING
- 'watermark': FOR `timestamp` AS `timestamp`
- 'timestamp': TIMESTAMP(3) METADATA VIRTUAL
- 'watermark': FOR ts AS ts - INTERVAL {{.Values.flink.alertDelay | squote}} SECONDS
- 'ts': TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL
kafka:
topic: {{.Values.kafkaBridge.alerta.bulkTopic}}
properties:
Expand All @@ -55,33 +55,6 @@ spec:
---
apiVersion: industry-fusion.com/v1alpha2
kind: BeamSqlTable
metadata:
name: alerts-filter
spec:
name: alerts_filter
connector: kafka
fields:
- 'resource': STRING
- 'event': STRING
- 'environment': STRING
- 'service': ARRAY<STRING>
- 'severity': STRING
- 'customer': STRING
- 'text': STRING
- 'watermark': FOR ts AS ts
- 'ts': TIMESTAMP(3) METADATA FROM 'timestamp'
kafka:
topic: {{.Values.kafkaBridge.alerta.bulkTopic}}
properties:
bootstrap.servers: {{.Values.kafka.bootstrapServer}}
scan.startup.mode: latest-offset
value:
format: json
json.fail-on-missing-field: false
json.ignore-parse-errors: true
---
apiVersion: industry-fusion.com/v1alpha2
kind: BeamSqlTable
metadata:
name: ngsild-updates
spec:
Expand Down
20 changes: 15 additions & 5 deletions helm/environment/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,31 @@ kafka:
storage:
type: persistent-claim
size: 1Gi
resources:
requests:
memory: 400Mi
cpu: 200m
limits:
memory: 800Mi
cpu: 500m
jvmOptions:
"-Xmx": "400m"
name: my-cluster

kafkaBridge:
debezium:
replicaCount: 1
listenTopicRetention: 86400000
entityTopicRetention: 604800000
attributesTopicRetention: 86400000
listenTopicRetention: 3600000
entityTopicRetention: 3600000
attributesTopicRetention: 3600000
alerta:
replicaCount: 1
bulkTopicRetention: 300000
listenTopicRetention: 86400000
listenTopicRetention: 3600000
ngsildUpdates:
replicaCount: 1
tokenRefreshInterval: 200
listenTopicRetention: 86400000
listenTopicRetention: 3600000

velero:
backupBucket: velero-backup
25 changes: 18 additions & 7 deletions helm/environment/production.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,31 +57,42 @@ minio:
enabled: true
storageSize: 10G


kafka:
storage:
type: persistent-claim
size: 5Gi
size: 20Gi
zookeeper:
replicas: 1
storage:
type: persistent-claim
size: 1Gi
resources:
requests:
memory: 400Mi
cpu: 200m
limits:
memory: 1000Mi
cpu: 800m
jvmOptions:
"-Xmx": "500m"
name: my-cluster

kafkaBridge:
debezium:
replicaCount: 1
listenTopicRetention: 86400000
entityTopicRetention: 604800000
attributesTopicRetention: 86400000
listenTopicRetention: 28800000
listenTopicCompression: gzip
entityTopicRetention: 28800000
attributesTopicRetention: 28800000
attributesTopicCompression: gzip
alerta:
replicaCount: 1
bulkTopicRetention: 300000
listenTopicRetention: 86400000
listenTopicRetention: 28800000
ngsildUpdates:
replicaCount: 1
tokenRefreshInterval: 200
listenTopicRetention: 86400000
listenTopicRetention: 28800000

velero:
backupBucket: velero-backup
Loading
Loading