Skip to content

Commit

Permalink
Merge branch 'main' into scheduler-split
Browse files Browse the repository at this point in the history
  • Loading branch information
Little-Wallace authored Jun 16, 2023
2 parents 9ac9ed4 + d26f4bb commit 1059c15
Show file tree
Hide file tree
Showing 10 changed files with 65 additions and 17 deletions.
10 changes: 5 additions & 5 deletions ci/scripts/gen-flamegraph.sh
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ install_all() {
promql --version

echo ">>> Installing Kafka"
wget https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz
tar -zxvf kafka_2.13-3.4.0.tgz
wget https://downloads.apache.org/kafka/3.4.1/kafka_2.13-3.4.1.tgz
tar -zxvf kafka_2.13-3.4.1.tgz

echo ">>> Installing nexmark bench"
buildkite-agent artifact download nexmark-server /usr/local/bin
Expand Down Expand Up @@ -182,8 +182,8 @@ start_nperf() {
}

start_kafka() {
./kafka_2.13-3.4.0/bin/zookeeper-server-start.sh ./kafka_2.13-3.4.0/config/zookeeper.properties > zookeeper.log 2>&1 &
./kafka_2.13-3.4.0/bin/kafka-server-start.sh ./kafka_2.13-3.4.0/config/server.properties --override num.partitions=8 > kafka.log 2>&1 &
./kafka_2.13-3.4.1/bin/zookeeper-server-start.sh ./kafka_2.13-3.4.1/config/zookeeper.properties > zookeeper.log 2>&1 &
./kafka_2.13-3.4.1/bin/kafka-server-start.sh ./kafka_2.13-3.4.1/config/server.properties --override num.partitions=8 > kafka.log 2>&1 &
sleep 10
# TODO(kwannoel): `trap ERR` and upload these logs.
# buildkite-agent artifact upload ./zookeeper.log
Expand All @@ -207,7 +207,7 @@ gen_events() {
}

show_kafka_topics() {
./kafka_2.13-3.4.0/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic nexmark --bootstrap-server localhost:9092
./kafka_2.13-3.4.1/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic nexmark --bootstrap-server localhost:9092
}

gen_cpu_flamegraph() {
Expand Down
4 changes: 2 additions & 2 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ FROM ubuntu:22.04 as base

ENV LANG en_US.utf8

RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get -y install make build-essential cmake protobuf-compiler curl bash lld maven unzip
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get -y install make build-essential cmake protobuf-compiler curl bash lld maven unzip libsasl2-dev

FROM base as builder

Expand Down Expand Up @@ -43,7 +43,7 @@ RUN cd /risingwave/java && mvn -B package -Dmaven.test.skip=true -Djava.binding.
tar -zxvf /risingwave/java/connector-node/assembly/target/risingwave-connector-1.0.0.tar.gz -C /risingwave/bin/connector-node

FROM ubuntu:22.04 as image-base
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get -y install ca-certificates openjdk-11-jdk && rm -rf /var/lib/{apt,dpkg,cache,log}/
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get -y install ca-certificates openjdk-11-jdk libsasl2-dev && rm -rf /var/lib/{apt,dpkg,cache,log}/

FROM image-base as risingwave
LABEL org.opencontainers.image.source https://github.com/risingwavelabs/risingwave
Expand Down
4 changes: 2 additions & 2 deletions docker/Dockerfile.hdfs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ FROM ubuntu:22.04 as base

ENV LANG en_US.utf8

RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get -y install make build-essential cmake protobuf-compiler curl pkg-config bash lld maven unzip wget openjdk-11-jdk
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get -y install make build-essential cmake protobuf-compiler curl pkg-config bash lld maven unzip wget openjdk-11-jdk libsasl2-dev

FROM base as builder

Expand Down Expand Up @@ -48,7 +48,7 @@ RUN cd /risingwave/java && mvn -B package -Dmaven.test.skip=true -Djava.binding.
tar -zxvf /risingwave/java/connector-node/assembly/target/risingwave-connector-1.0.0.tar.gz -C /risingwave/bin/connector-node

FROM ubuntu:22.04 as image-base
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get -y install ca-certificates openjdk-11-jdk wget && rm -rf /var/lib/{apt,dpkg,cache,log}/
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get -y install ca-certificates openjdk-11-jdk wget libsasl2-dev && rm -rf /var/lib/{apt,dpkg,cache,log}/

FROM image-base as risingwave
LABEL org.opencontainers.image.source https://github.com/risingwavelabs/risingwave
Expand Down
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -2617,6 +2617,16 @@ def section_memory_manager(outer_panels):
),
],
),
panels.timeseries_ms(
"LRU manager diff between current watermark and evicted watermark time (ms) for actors",
"",
[
panels.target(
f"{metric('lru_evicted_watermark_time_diff_ms')}",
"table {{table_id}} actor {{actor_id}} desc: {{desc}}",
),
],
),
],
),
]
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/connector/src/sink/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use super::catalog::SinkCatalog;
use crate::sink::{record_to_json, Result, Sink, SinkError, TimestampHandlingMode};
use crate::ConnectorParams;

pub const VALID_REMOTE_SINKS: [&str; 3] = ["jdbc", "file", "iceberg"];
pub const VALID_REMOTE_SINKS: [&str; 3] = ["jdbc", "iceberg", "deltalake"];

pub fn is_valid_remote_sink(connector_type: &str) -> bool {
VALID_REMOTE_SINKS.contains(&connector_type)
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/stream_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl StreamSink {
Distribution::Single => RequiredDist::single(),
_ => {
match properties.get("connector") {
Some(s) if s == "iceberg" => {
Some(s) if s == "iceberg" || s == "deltalake" => {
// iceberg with multiple parallelism will fail easily with concurrent commit
// on metadata
// TODO: reset iceberg sink to have multiple parallelism
Expand Down
36 changes: 32 additions & 4 deletions src/stream/src/cache/managed_lru.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::sync::Arc;
use lru::{DefaultHasher, KeyRef, LruCache};
use prometheus::IntGauge;
use risingwave_common::estimate_size::EstimateSize;
use risingwave_common::util::epoch::Epoch;

use crate::common::metrics::MetricsInfo;

Expand All @@ -40,6 +41,8 @@ pub struct ManagedLruCache<K, V, S = DefaultHasher, A: Clone + Allocator = Globa
kv_heap_size: usize,
/// The metrics of memory usage
memory_usage_metrics: IntGauge,
// The metrics of evicted watermark time
lru_evicted_watermark_time_diff_ms: IntGauge,
// Metrics info
metrics_info: MetricsInfo,
/// The size reported last time
Expand All @@ -55,6 +58,10 @@ impl<K, V, S, A: Clone + Allocator> Drop for ManagedLruCache<K, V, S, A> {
.stream_memory_usage
.remove_label_values(&[&info.table_id, &info.actor_id, &info.desc])
.unwrap();
info.metrics
.lru_evicted_watermark_time_diff_ms
.remove_label_values(&[&info.table_id, &info.actor_id, &info.desc])
.unwrap();
}
}

Expand All @@ -74,27 +81,37 @@ impl<K: Hash + Eq + EstimateSize, V: EstimateSize, S: BuildHasher, A: Clone + Al
&metrics_info.actor_id,
&metrics_info.desc,
]);
memory_usage_metrics.set(0.into());

let lru_evicted_watermark_time_diff_ms = metrics_info
.metrics
.lru_evicted_watermark_time_diff_ms
.with_label_values(&[
&metrics_info.table_id,
&metrics_info.actor_id,
&metrics_info.desc,
]);
lru_evicted_watermark_time_diff_ms.set(watermark_epoch.load(Ordering::Relaxed) as _);

Self {
inner,
watermark_epoch,
kv_heap_size: 0,
memory_usage_metrics,
lru_evicted_watermark_time_diff_ms,
metrics_info,
last_reported_size_bytes: 0,
}
}

/// Evict epochs lower than the watermark
pub fn evict(&mut self) {
let epoch = self.watermark_epoch.load(Ordering::Relaxed);
self.evict_by_epoch(epoch);
self.evict_by_epoch(self.load_cur_epoch());
}

/// Evict epochs lower than the watermark, except those entry which touched in this epoch
pub fn evict_except_cur_epoch(&mut self) {
let epoch = self.watermark_epoch.load(Ordering::Relaxed);
let epoch = min(epoch, self.inner.current_epoch());
let epoch = min(self.load_cur_epoch(), self.inner.current_epoch());
self.evict_by_epoch(epoch);
}

Expand All @@ -103,6 +120,7 @@ impl<K: Hash + Eq + EstimateSize, V: EstimateSize, S: BuildHasher, A: Clone + Al
while let Some((key, value)) = self.inner.pop_lru_by_epoch(epoch) {
self.kv_heap_size_dec(key.estimated_size() + value.estimated_size());
}
self.report_evicted_watermark_time(epoch);
}

pub fn update_epoch(&mut self, epoch: u64) {
Expand Down Expand Up @@ -225,6 +243,16 @@ impl<K: Hash + Eq + EstimateSize, V: EstimateSize, S: BuildHasher, A: Clone + Al
false
}
}

fn report_evicted_watermark_time(&self, epoch: u64) {
self.lru_evicted_watermark_time_diff_ms.set(
(Epoch(self.load_cur_epoch()).physical_time() - Epoch(epoch).physical_time()) as _,
);
}

fn load_cur_epoch(&self) -> u64 {
self.watermark_epoch.load(Ordering::Relaxed)
}
}

pub fn new_unbounded<K: Hash + Eq + EstimateSize, V: EstimateSize>(
Expand Down
10 changes: 10 additions & 0 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ pub struct StreamingMetrics {
pub lru_physical_now_ms: IntGauge,
pub lru_runtime_loop_count: IntCounter,
pub lru_watermark_step: IntGauge,
pub lru_evicted_watermark_time_diff_ms: GenericGaugeVec<AtomicI64>,
pub jemalloc_allocated_bytes: IntGauge,
pub jemalloc_active_bytes: IntGauge,

Expand Down Expand Up @@ -588,6 +589,14 @@ impl StreamingMetrics {
)
.unwrap();

let lru_evicted_watermark_time_diff_ms = register_int_gauge_vec_with_registry!(
"lru_evicted_watermark_time_diff_ms",
"The diff between current watermark and latest evicted watermark time by actors",
&["table_id", "actor_id", "desc"],
registry
)
.unwrap();

let jemalloc_allocated_bytes = register_int_gauge_with_registry!(
"jemalloc_allocated_bytes",
"The allocated memory jemalloc, got from jemalloc_ctl",
Expand Down Expand Up @@ -695,6 +704,7 @@ impl StreamingMetrics {
lru_physical_now_ms,
lru_runtime_loop_count,
lru_watermark_step,
lru_evicted_watermark_time_diff_ms,
jemalloc_allocated_bytes,
jemalloc_active_bytes,
user_compute_error_count,
Expand Down

0 comments on commit 1059c15

Please sign in to comment.