Skip to content

Commit

Permalink
Merge #3856 into 3.7.0-M5
Browse files Browse the repository at this point in the history
  • Loading branch information
chemicL committed Jul 25, 2024
2 parents 64815f3 + 56ebc88 commit 46cd006
Show file tree
Hide file tree
Showing 2 changed files with 198 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

package reactor.core.observability.micrometer;

import java.util.Collection;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.LongTaskTimer;
Expand All @@ -27,8 +29,10 @@
import io.micrometer.core.instrument.Timer;

import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.observability.micrometer.TimedSchedulerMeterDocumentation.SubmittedTags;
import reactor.core.scheduler.Scheduler;
import reactor.util.annotation.Nullable;

import static reactor.core.observability.micrometer.TimedSchedulerMeterDocumentation.*;

Expand Down Expand Up @@ -136,21 +140,32 @@ static final class TimedWorker implements Worker {
final TimedScheduler parent;
final Worker delegate;

/**
* As this Worker creates {@link TimedRunnable} instances which are {@link Disposable}
* it needs to keep track of them to be able to dispose them when this instance
* is {@link #dispose() disposed}.
*/
final Composite disposables;

TimedWorker(TimedScheduler parent, Worker delegate) {
this.parent = parent;
this.delegate = delegate;
this.disposables = Disposables.composite();
}

TimedRunnable wrap(Runnable task) {
return new WorkerBackedTimedRunnable(parent.registry, parent, delegate, task);
return new WorkerBackedTimedRunnable(parent.registry, parent, delegate,
task, disposables);
}

TimedRunnable wrapPeriodic(Runnable task) {
return new WorkerBackedTimedRunnable(parent.registry, parent, delegate, task, true);
return new WorkerBackedTimedRunnable(parent.registry, parent, delegate,
task, disposables, true);
}

@Override
public void dispose() {
disposables.dispose();
delegate.dispose();
}

Expand All @@ -162,51 +177,67 @@ public boolean isDisposed() {
@Override
public Disposable schedule(Runnable task) {
TimedRunnable timedTask = wrap(task);
disposables.add(timedTask);

return timedTask.schedule();
}

@Override
public Disposable schedule(Runnable task, long delay, TimeUnit unit) {
TimedRunnable timedTask = wrap(task);
disposables.add(timedTask);

return timedTask.schedule(delay, unit);
}

@Override
public Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit) {
TimedRunnable timedTask = wrapPeriodic(task);
disposables.add(timedTask);

return timedTask.schedulePeriodically(initialDelay, period, unit);
}
}

private static abstract class TimedRunnable implements Runnable, Disposable {
final MeterRegistry registry;
final TimedScheduler parent;
final Runnable task;
/** marker that the Worker was disposed and the parent got notified */
static final Composite DISPOSED = new EmptyCompositeDisposable();
/** marker that the Worker has completed, for the PARENT field */
static final Composite DONE = new EmptyCompositeDisposable();

final MeterRegistry registry;
final TimedScheduler timedScheduler;
final Runnable task;

final LongTaskTimer.Sample pendingSample;

boolean isRerun;

Disposable disposable;

TimedRunnable(MeterRegistry registry, TimedScheduler parent, Runnable task) {
this(registry, parent, task, false);
volatile Composite parent;
static final AtomicReferenceFieldUpdater<TimedRunnable, Composite> PARENT =
AtomicReferenceFieldUpdater.newUpdater(TimedRunnable.class, Composite.class, "parent");

TimedRunnable(MeterRegistry registry, TimedScheduler timedScheduler, Runnable task,
@Nullable Composite parent) {
this(registry, timedScheduler, task, parent, false);
}

TimedRunnable(MeterRegistry registry, TimedScheduler parent, Runnable task, boolean periodic) {
TimedRunnable(MeterRegistry registry, TimedScheduler timedScheduler, Runnable task,
@Nullable Composite parent, boolean periodic) {
this.registry = registry;
this.parent = parent;
this.timedScheduler = timedScheduler;
this.task = task;

if (periodic) {
this.pendingSample = null;
}
else {
this.pendingSample = parent.pendingTasks.start();
this.pendingSample = timedScheduler.pendingTasks.start();
}
this.isRerun = false; //will be ignored if not periodic
PARENT.lazySet(this, parent);
}

@Override
Expand All @@ -220,16 +251,23 @@ public void run() {
this.isRerun = true;
}
else {
parent.submittedPeriodicIteration.increment();
timedScheduler.submittedPeriodicIteration.increment();
}
}

Runnable completionTrackingTask = parent.completedTasks.wrap(this.task);
this.parent.activeTasks.record(completionTrackingTask);
try {
Runnable completionTrackingTask = timedScheduler.completedTasks.wrap(this.task);
this.timedScheduler.activeTasks.record(completionTrackingTask);
} finally {
Composite o = parent;
if (o != DISPOSED && PARENT.compareAndSet(this, o, DONE) && o != null) {
o.remove(this);
}
}
}

public Disposable schedule() {
parent.submittedDirect.increment();
timedScheduler.submittedDirect.increment();

try {
disposable = this.internalSchedule();
Expand All @@ -241,7 +279,7 @@ public Disposable schedule() {
}

public Disposable schedule(long delay, TimeUnit unit) {
parent.submittedDelayed.increment();
timedScheduler.submittedDelayed.increment();

try {
disposable = this.internalSchedule(delay, unit);
Expand All @@ -253,7 +291,7 @@ public Disposable schedule(long delay, TimeUnit unit) {
}

public Disposable schedulePeriodically(long initialDelay, long period, TimeUnit unit) {
parent.submittedPeriodicInitial.increment();
timedScheduler.submittedPeriodicInitial.increment();
return this.internalSchedulePeriodically(initialDelay, period, unit);
}

Expand All @@ -266,6 +304,23 @@ public void dispose() {
if (pendingSample != null) {
pendingSample.stop();
}

for (;;) {
Composite o = parent;
if (o == DONE || o == DISPOSED || o == null) {
return;
}
if (PARENT.compareAndSet(this, o, DISPOSED)) {
o.remove(this);
return;
}
}
}

@Override
public boolean isDisposed() {
Composite o = PARENT.get(this);
return o == DISPOSED || o == DONE;
}

abstract Disposable internalSchedule();
Expand All @@ -279,13 +334,15 @@ static final class WorkerBackedTimedRunnable extends TimedRunnable {

final Worker worker;

WorkerBackedTimedRunnable(MeterRegistry registry, TimedScheduler parent, Worker worker, Runnable task) {
super(registry, parent, task);
WorkerBackedTimedRunnable(MeterRegistry registry, TimedScheduler timedScheduler,
Worker worker, Runnable task, Composite parent) {
super(registry, timedScheduler, task, parent);
this.worker = worker;
}

WorkerBackedTimedRunnable(MeterRegistry registry, TimedScheduler parent, Worker worker, Runnable task, boolean periodic) {
super(registry, parent, task, periodic);
WorkerBackedTimedRunnable(MeterRegistry registry, TimedScheduler timedScheduler,
Worker worker, Runnable task, Composite parent, boolean periodic) {
super(registry, timedScheduler, task, parent, periodic);
this.worker = worker;
}

Expand All @@ -309,13 +366,15 @@ static final class SchedulerBackedTimedRunnable extends TimedRunnable {

final Scheduler scheduler;

SchedulerBackedTimedRunnable(MeterRegistry registry, TimedScheduler parent, Scheduler scheduler, Runnable task) {
super(registry, parent, task);
SchedulerBackedTimedRunnable(MeterRegistry registry, TimedScheduler timedScheduler,
Scheduler scheduler, Runnable task) {
super(registry, timedScheduler, task, null);
this.scheduler = scheduler;
}

SchedulerBackedTimedRunnable(MeterRegistry registry, TimedScheduler parent, Scheduler scheduler, Runnable task, boolean periodic) {
super(registry, parent, task, periodic);
SchedulerBackedTimedRunnable(MeterRegistry registry, TimedScheduler timedScheduler,
Scheduler scheduler, Runnable task, boolean periodic) {
super(registry, timedScheduler, task, null, periodic);
this.scheduler = scheduler;
}

Expand All @@ -334,4 +393,40 @@ Disposable internalSchedulePeriodically(long initialDelay, long period, TimeUnit
return scheduler.schedulePeriodically(this, initialDelay, period, unit);
}
}

/**
* Copy of reactor.core.scheduler.EmptyCompositeDisposable for internal use.
*/
static final class EmptyCompositeDisposable implements Disposable.Composite {

@Override
public boolean add(Disposable d) {
return false;
}

@Override
public boolean addAll(Collection<? extends Disposable> ds) {
return false;
}

@Override
public boolean remove(Disposable d) {
return false;
}

@Override
public int size() {
return 0;
}

@Override
public void dispose() {
}

@Override
public boolean isDisposed() {
return false;
}

}
}
Loading

0 comments on commit 46cd006

Please sign in to comment.