Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
haitao127 committed Nov 20, 2020
2 parents cde7fc4 + 36d8a5d commit 8dd0a09
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import org.csstudio.display.builder.model.DisplayModel;
import org.csstudio.display.builder.model.Widget;
import org.csstudio.display.builder.model.WidgetFactory;
import org.csstudio.display.builder.model.properties.CommonWidgetProperties;
import org.csstudio.display.builder.model.properties.StringWidgetProperty;

/** Helper for naming widgets
* @author Kay Kasemir
Expand Down Expand Up @@ -71,7 +71,10 @@ public void setDefaultName(final DisplayModel model, final Widget widget)
if (widget_model == model)
throw new IllegalStateException(widget + " already in model " + model);

String name = widget.getName();
final StringWidgetProperty propName = (StringWidgetProperty) widget.propName();
String name = propName.getValue();
final String specification = propName.getSpecification();
final boolean has_macro = ! name.contentEquals(specification);

// Default to human-readable widget type
if (name.isEmpty())
Expand Down Expand Up @@ -110,7 +113,17 @@ public void setDefaultName(final DisplayModel model, final Widget widget)
}
max_used_instance.put(base, number);
}
widget.setPropertyValue(CommonWidgetProperties.propName, name);
// Preserve the macro specification if there is a macro in the name
if (has_macro)
{
final Matcher spec_matcher = pattern.matcher(specification);
if (spec_matcher.matches())
propName.setSpecification(spec_matcher.group(1) + "_" + number);
else
propName.setSpecification(specification + "_" + number);
}
else
propName.setSpecification(name);

final ChildrenProperty children = ChildrenProperty.getChildren(widget);
if (children != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ can also write to their respective array element.</text>
<y>190</y>
<width>250</width>
<height>200</height>
<pv_name>loc://&lt;VStringArray&gt;("Monitor 1", "Monitor 2", "Monitor 3", "Monitor 4", "Monitor 5", "Monitor 6")</pv_name>
<pv_name>loc://textarray&lt;VStringArray&gt;("Monitor 1", "Monitor 2", "Monitor 3", "Monitor 4", "Monitor 5", "Monitor 6")</pv_name>
<widget type="textupdate" version="2.0.0">
<name>Text Update</name>
<width>230</width>
Expand Down Expand Up @@ -121,7 +121,7 @@ all child widgets.
<y>190</y>
<width>250</width>
<height>200</height>
<pv_name>loc://&lt;VStringArray&gt;("Monitor 1", "Monitor 2", "Monitor 3", "Monitor 4", "Monitor 5", "Monitor 6")</pv_name>
<pv_name>loc://textarray&lt;VStringArray&gt;("Monitor 1", "Monitor 2", "Monitor 3", "Monitor 4", "Monitor 5", "Monitor 6")</pv_name>
<widget type="textentry" version="3.0.0">
<name>Text Entry_1</name>
<width>230</width>
Expand Down
100 changes: 86 additions & 14 deletions core/pv/src/main/java/org/phoebus/pv/PV.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2017-2018 Oak Ridge National Laboratory.
* Copyright (c) 2017-2020 Oak Ridge National Laboratory.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
Expand All @@ -11,6 +11,9 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -44,6 +47,29 @@ public class PV

final private String name;

/** Lock for value notifications
*
* A value could arrive while in addSubscription().
* We don't want to miss it,
* nor do we want to notify twice,
* so we lock both in there and in notifyListenersOfValue().
*
* This does raise the possibility of deadlocks
* if a client adds PVs or cancels subscriptions inside
* the value notification handler.
*
* To avoid a catastrophic deadlock, use a try-lock.
*
* An alternative to strict locking would be locking,
* but only queuing notifications to be performed by
* a new thread. That solves the deadlock, but
* changes the behavior of "loc://" PVs which would
* no longer react to write access via immediate notification
* in the calling thread, which impacts unit tests and
* other code that depends on this long standing behavior.
*/
final private Lock value_notification_lock = new ReentrantLock();

final private List<ValueEventHandler.Subscription> value_subs = new CopyOnWriteArrayList<>();

final private List<AccessRightsEventHandler.Subscription> access_subs = new CopyOnWriteArrayList<>();
Expand Down Expand Up @@ -85,11 +111,39 @@ public String getName()
*/
void addSubscription(final ValueEventHandler.Subscription value_sub)
{
// If there is a known value, perform initial update
final VType value = last_value;
if (value != null)
value_sub.update(value);
value_subs.add(value_sub);
try
{
if (! value_notification_lock.tryLock(20, TimeUnit.SECONDS))
throw new Exception("Timeout");
}
catch (Exception ex)
{
logger.log(Level.SEVERE, "Cannot lock " + name, ex);
return;
}

try
{
// Register subscription so we get notified of value updates
value_subs.add(value_sub);

// Lock prevents notifications right now,
// avoiding double updates for an initial value

// If there is a known value, perform initial update
final VType value = last_value;
if (value != null)
value_sub.update(value);

// Lock also asserts that this initial update completes
// before another update happens,
// avoiding a possible delay of this initial update
// which could then be delivered _after_ what's really a new value.
}
finally
{
value_notification_lock.unlock();
}
}

/** @param value_sub Listener that will no longer receive value updates */
Expand Down Expand Up @@ -236,18 +290,36 @@ public Future<?> asyncWrite(final Object new_value) throws Exception
*/
protected void notifyListenersOfValue(final VType value)
{
last_value = value;
for (ValueEventHandler.Subscription sub : value_subs)
try
{
try
{
sub.update(value);
}
catch (Throwable ex)
if (! value_notification_lock.tryLock(20, TimeUnit.SECONDS))
throw new Exception("Timeout");
}
catch (Exception ex)
{
logger.log(Level.SEVERE, "Cannot lock " + name, ex);
return;
}

try
{
last_value = value;
for (ValueEventHandler.Subscription sub : value_subs)
{
logger.log(Level.WARNING, name + " value update error", ex);
try
{
sub.update(value);
}
catch (Throwable ex)
{
logger.log(Level.WARNING, name + " value update error", ex);
}
}
}
finally
{
value_notification_lock.unlock();
}
}

/** Helper for PV implementation to notify listeners */
Expand Down
50 changes: 50 additions & 0 deletions core/pv/src/test/java/org/phoebus/pv/FormulaTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -173,4 +174,53 @@ public void initialDisconnect() throws Exception
assertTrue(PV.isDisconnected(value));
PVPool.releasePV(pv);
}

// Demo code for race condition
//
// Original PV.addSubscription
// 1) first checked for an initial value,
// 2) then added the new subscriber.
// When initial update arrived between those steps,
// it was lost because the subscriber was not known
// in PV.notifyListenersOfValue.
//
// Swapping 1&2:
// 1) Add the new subscriber,
// 2) check for an initial value .. and send it.
// results in two initial updates when the value arrives
// between those steps.
// Worse, this could happen:
// 1) addSubscription adds the new subscriber,
// 2a) finds an initial value and is about to send it to subscriber,
// .. but just before doing that, new value arrives, is sent to subscriber
// 2b) ... the _old_ initial value is sent
//
// https://github.com/ControlSystemStudio/phoebus/issues/1387

// Skip:
// This demo requires an actual PV,
// and either delays or breakpoints inside PV.addSubscription
// to demonstrate the original issue
//
// @Test
public void raceDemo() throws Exception
{
final PV pv = PVPool.getPV("=`DTL_LLRF:FCM1:cavAmpSet`");
final AtomicInteger updates = new AtomicInteger();

final Disposable sub = pv.onValueEvent().subscribe(value ->
{
updates.incrementAndGet();
System.out.println(pv.getName() + " = " + value);
});

TimeUnit.SECONDS.sleep(5);

sub.dispose();
PVPool.releasePV(pv);

// With formula, typically get initial DISCONNECTED then first value update.
// With plain but real PV, often get the initial value right away.
assertEquals(2, updates.get());
}
}
47 changes: 47 additions & 0 deletions core/pv/src/test/java/org/phoebus/pv/LocalPVTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,17 @@
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.epics.vtype.VDouble;
import org.epics.vtype.VType;
import org.junit.Test;
import org.phoebus.pv.loc.LocalPVFactory;
import org.phoebus.pv.loc.ValueHelper;

import io.reactivex.disposables.Disposable;

/** @author Kay Kasemir */
@SuppressWarnings("nls")
public class LocalPVTest
Expand Down Expand Up @@ -115,4 +120,46 @@ public void testLocalPV() throws Exception

pv1.close();
}

/** For local PVs, assert that notifications happen right away
* inside the calling thread.
*
* Some other tests as well as the 'ArrayPVDispatcher' used in the display runtime
* depend on 'write' to a local PV having an immediate effect,
* not decoupled to another thread.
*/
@Test
public void testLocalPVNotifications() throws Exception
{
Thread.currentThread().setName("TestThread");
final PV pv = PVPool.getPV("loc://event_demo(1)");

System.out.println("Running local PV test on " + Thread.currentThread());

// Count received values and track on which thread we receive them
final AtomicReference<String> update_thread_name = new AtomicReference<>("none");
final AtomicInteger updates = new AtomicInteger();
final Disposable sub = pv.onValueEvent().subscribe(value ->
{
System.out.println(pv + " = " + value + " on " + Thread.currentThread());
updates.incrementAndGet();
update_thread_name.set(Thread.currentThread().getName());
});

// Expect initial value update right away on calling thread
assertThat(updates.get(), equalTo(1));
assertThat(update_thread_name.get(), equalTo("TestThread"));

pv.write(42);
// Expect update for newly written value right away on calling thread
assertThat(updates.get(), equalTo(2));
assertThat(update_thread_name.get(), equalTo("TestThread"));
final VType value = pv.read();
assertThat(value, instanceOf(VDouble.class));
assertThat(((VDouble)value).getValue(), equalTo(42.0));

sub.dispose();
PVPool.releasePV(pv);
}

}

0 comments on commit 8dd0a09

Please sign in to comment.