Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

examples: Update DAQmx and MI examples to use new session management API #442

Merged
merged 8 commits into from
Oct 13, 2023
47 changes: 15 additions & 32 deletions examples/nidaqmx_analog_input/measurement.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,14 @@
import logging
import pathlib
import sys
from typing import Any, List, Optional, Tuple
from typing import List, Tuple

import click
import ni_measurementlink_service as nims
import nidaqmx
from _helpers import (
ServiceOptions,
configure_logging,
create_session_management_client,
get_grpc_device_channel,
get_service_options,
grpc_device_options,
verbosity_option,
)
from _nidaqmx_helpers import create_task
from nidaqmx.constants import TaskMode

script_or_exe = sys.executable if getattr(sys, "frozen", False) else __file__
Expand All @@ -27,7 +20,6 @@
version="0.1.0.0",
ui_file_paths=[service_directory / "NIDAQmxAnalogInput.measui"],
)
service_options = ServiceOptions()


@measurement_service.register_measurement
Expand All @@ -48,22 +40,18 @@ def measure(pin_name: str, sample_rate: float, number_of_samples: int) -> Tuple[
sample_rate,
number_of_samples,
)
session_management_client = create_session_management_client(measurement_service)
with session_management_client.reserve_session(
context=measurement_service.context.pin_map_context, pin_or_relay_names=[pin_name]
) as reservation:
task: Optional[nidaqmx.Task] = None

def cancel_callback() -> None:
logging.info("Canceling measurement")
task_to_abort = task
if task_to_abort is not None:
task_to_abort.control(TaskMode.TASK_ABORT)

measurement_service.context.add_cancel_callback(cancel_callback)

grpc_device_channel = get_grpc_device_channel(measurement_service, nidaqmx, service_options)
with create_task(reservation.session_info, grpc_device_channel) as task:
with measurement_service.context.reserve_session(pin_name) as reservation:
with reservation.create_nidaqmx_task() as session_info:
task = session_info.session

def cancel_callback() -> None:
logging.info("Canceling measurement")
if (task_to_abort := task) is not None:
task_to_abort.control(TaskMode.TASK_ABORT)

measurement_service.context.add_cancel_callback(cancel_callback)

# If we created a new DAQmx task, we must also add channels to it.
if not reservation.session_info.session_exists:
task.ai_channels.add_ai_voltage_chan(reservation.session_info.channel_list)

Expand All @@ -73,9 +61,7 @@ def cancel_callback() -> None:
)

timeout = min(measurement_service.context.time_remaining, 10.0)
voltage_values = task.read(
number_of_samples_per_channel=number_of_samples, timeout=timeout
)
voltage_values = task.read(number_of_samples, timeout)
task = None # Don't abort after this point

_log_measured_values(voltage_values)
Expand All @@ -97,12 +83,9 @@ def _log_measured_values(samples: List[float], max_samples_to_display: int = 5)

@click.command
@verbosity_option
@grpc_device_options
def main(verbosity: int, **kwargs: Any) -> None:
def main(verbosity: int) -> None:
"""Perform a finite analog input measurement with NI-DAQmx."""
configure_logging(verbosity)
global service_options
service_options = get_service_options(**kwargs)

with measurement_service.host_service():
input("Press enter to close the measurement service.\n")
Expand Down
184 changes: 98 additions & 86 deletions examples/nidcpower_source_dc_voltage/measurement.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,29 @@
"""Source and measure a DC voltage with an NI SMU."""

from __future__ import annotations

import logging
import pathlib
import sys
import threading
import time
from typing import Any, Iterable, List, Tuple
from typing import TYPE_CHECKING, Iterable, List, NamedTuple, Tuple

import click
import grpc
import hightime
import ni_measurementlink_service as nims
import nidcpower
from _constants import USE_SIMULATION
from _helpers import (
ServiceOptions,
configure_logging,
create_session_management_client,
get_grpc_device_channel,
get_service_options,
grpc_device_options,
use_simulation_option,
verbosity_option,
)
from _nidcpower_helpers import create_session
import nidcpower.session
from _helpers import configure_logging, verbosity_option
from ni_measurementlink_service.session_management import TypedConnection

NIDCPOWER_WAIT_FOR_EVENT_TIMEOUT_ERROR_CODE = -1074116059
NIDCPOWER_TIMEOUT_EXCEEDED_ERROR_CODE = -1074097933
_NIDCPOWER_WAIT_FOR_EVENT_TIMEOUT_ERROR_CODE = -1074116059
_NIDCPOWER_TIMEOUT_EXCEEDED_ERROR_CODE = -1074097933
_NIDCPOWER_TIMEOUT_ERROR_CODES = [
_NIDCPOWER_WAIT_FOR_EVENT_TIMEOUT_ERROR_CODE,
_NIDCPOWER_TIMEOUT_EXCEEDED_ERROR_CODE,
]

script_or_exe = sys.executable if getattr(sys, "frozen", False) else __file__
service_directory = pathlib.Path(script_or_exe).resolve().parent
Expand All @@ -38,7 +35,15 @@
service_directory / "NIDCPowerSourceDCVoltageUI.vi",
],
)
service_options = ServiceOptions()

if TYPE_CHECKING:
# The nidcpower Measurement named tuple doesn't support type annotations:
# https://github.com/ni/nimi-python/issues/1885
class _Measurement(NamedTuple):
voltage: float
current: float
in_compliance: bool
channel: str


@measurement_service.register_measurement
Expand Down Expand Up @@ -69,103 +74,110 @@ def measure(
"""Source and measure a DC voltage with an NI SMU."""
logging.info("Executing measurement: pin_names=%s voltage_level=%g", pin_names, voltage_level)

session_management_client = create_session_management_client(measurement_service)

with session_management_client.reserve_session(
context=measurement_service.context.pin_map_context, pin_or_relay_names=pin_names
) as reservation:
grpc_device_channel = get_grpc_device_channel(
measurement_service, nidcpower, service_options
)
with create_session(reservation.session_info, grpc_device_channel) as session:
channels = session.channels[reservation.session_info.channel_list]
channel_mappings = reservation.session_info.channel_mappings
cancellation_event = threading.Event()
measurement_service.context.add_cancel_callback(cancellation_event.set)

cancellation_event = threading.Event()
measurement_service.context.add_cancel_callback(cancellation_event.set)
time_remaining = measurement_service.context.time_remaining
with measurement_service.context.reserve_session(pin_names) as reservation:
with reservation.create_nidcpower_session() as session_info:
# Use connections to map pin names to channel names. This sets the
# channel order based on the pin order and allows mapping the
# resulting measurements back to the corresponding pins and sites.
connections = reservation.get_nidcpower_connections(pin_names)
channel_order = ",".join(connection.channel_name for connection in connections)
channels = session_info.session.channels[channel_order]

# Configure the same settings for all of the channels corresponding
# to the selected pins and sites.
channels.source_mode = nidcpower.SourceMode.SINGLE_POINT
channels.output_function = nidcpower.OutputFunction.DC_VOLTAGE
channels.current_limit = current_limit
channels.voltage_level_range = voltage_level_range
channels.current_limit_range = current_limit_range
channels.source_delay = hightime.timedelta(seconds=source_delay)
channels.voltage_level = voltage_level
# The Measurement named tuple doesn't support type annotations:
# https://github.com/ni/nimi-python/issues/1885
measured_values = []

# Initiate the channels to start sourcing the outputs. initiate()
# returns a context manager that aborts the measurement when the
# function returns or raises an exception.
with channels.initiate():
deadline = time.time() + time_remaining
while True:
if time.time() > deadline:
measurement_service.context.abort(
grpc.StatusCode.DEADLINE_EXCEEDED, "deadline exceeded"
)
if cancellation_event.is_set():
measurement_service.context.abort(
grpc.StatusCode.CANCELLED, "client requested cancellation"
)
try:
channels.wait_for_event(nidcpower.enums.Event.SOURCE_COMPLETE, timeout=0.1)
break
except nidcpower.errors.DriverError as e:
"""
There is no native way to support cancellation when taking a DCPower
measurement. To support cancellation, we will be calling WaitForEvent
until it succeeds or we have gone past the specified timeout. WaitForEvent
will throw an exception if it times out, which is why we are catching
and doing nothing.
"""
if (
e.code == NIDCPOWER_WAIT_FOR_EVENT_TIMEOUT_ERROR_CODE
or e.code == NIDCPOWER_TIMEOUT_EXCEEDED_ERROR_CODE
):
pass
else:
raise

measured_values = channels.measure_multiple()
for index, mapping in enumerate(channel_mappings):
measured_values[index] = measured_values[index]._replace(
in_compliance=session.channels[mapping.channel].query_in_compliance()
)
session = None # Don't abort after this point

_log_measured_values(channel_mappings, measured_values)
# Wait for the outputs to settle.
timeout = source_delay + 10.0
_wait_for_event(
channels, cancellation_event, nidcpower.enums.Event.SOURCE_COMPLETE, timeout
)

# Measure the voltage and current for each output.
measurements: List[_Measurement] = channels.measure_multiple()

# Determine whether the outputs are in compliance.
for index, connection in enumerate(connections):
channel = connection.session.channels[connection.channel_name]
in_compliance = channel.query_in_compliance()
measurements[index] = measurements[index]._replace(in_compliance=in_compliance)

_log_measurements(connections, measurements)
logging.info("Completed measurement")
return (
[m.site for m in channel_mappings],
[m.pin_or_relay_name for m in channel_mappings],
[m.voltage for m in measured_values],
[m.current for m in measured_values],
[m.in_compliance for m in measured_values],
[connection.site for connection in connections],
[connection.pin_or_relay_name for connection in connections],
[measurement.voltage for measurement in measurements],
[measurement.current for measurement in measurements],
[measurement.in_compliance for measurement in measurements],
)


def _log_measured_values(
channel_mappings: Iterable[nims.session_management.ChannelMapping],
measured_values: Iterable,
def _wait_for_event(
channels: nidcpower.session._SessionBase,
cancellation_event: threading.Event,
event_id: nidcpower.enums.Event,
timeout: float,
) -> None:
"""Wait for a NI-DCPower event or until error/cancellation occurs."""
grpc_deadline = time.time() + measurement_service.context.time_remaining
user_deadline = time.time() + timeout

while True:
if time.time() > user_deadline:
raise TimeoutError("User timeout expired.")
if time.time() > grpc_deadline:
measurement_service.context.abort(
grpc.StatusCode.DEADLINE_EXCEEDED, "Deadline exceeded."
)
if cancellation_event.is_set():
measurement_service.context.abort(
grpc.StatusCode.CANCELLED, "Client requested cancellation."
)

# Wait for the NI-DCPower event. If this takes more than 100 ms, check
# whether the measurement was canceled and try again. NI-DCPower does
# not support canceling a call to wait_for_event().
try:
channels.wait_for_event(event_id, timeout=100e-3)
break
except nidcpower.errors.DriverError as e:
if e.code in _NIDCPOWER_TIMEOUT_ERROR_CODES:
pass
raise


def _log_measurements(
connections: Iterable[TypedConnection[nidcpower.Session]],
measured_values: Iterable[_Measurement],
) -> None:
"""Log the measured values."""
for mapping, measurement in zip(channel_mappings, measured_values):
logging.info("site%s/%s:", mapping.site, mapping.pin_or_relay_name)
for connection, measurement in zip(connections, measured_values):
logging.info("site%s/%s:", connection.site, connection.pin_or_relay_name)
logging.info(" Voltage: %g V", measurement.voltage)
logging.info(" Current: %g A", measurement.current)
logging.info(" In compliance: %s", str(measurement.in_compliance))


@click.command
@verbosity_option
@grpc_device_options
@use_simulation_option(default=USE_SIMULATION)
def main(verbosity: int, **kwargs: Any) -> None:
def main(verbosity: int) -> None:
"""Source and measure a DC voltage with an NI SMU."""
configure_logging(verbosity)

global service_options
service_options = get_service_options(**kwargs)

with measurement_service.host_service():
input("Press enter to close the measurement service.\n")

Expand Down
Loading
Loading