Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update measurement link examples to match dcpower example changes. #284

Merged
merged 23 commits into from
Jun 14, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion examples/nidaqmx_analog_input/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,23 @@

import logging
import pathlib
import types
from typing import Any, Callable, Dict, NamedTuple, TypeVar

import click
import grpc

import ni_measurementlink_service as nims
from ni_measurementlink_service import session_management
from ni_measurementlink_service._internal.discovery_client import DiscoveryClient
from ni_measurementlink_service._internal.stubs.ni.measurementlink.pinmap.v1 import (
pin_map_service_pb2,
pin_map_service_pb2_grpc,
)
from ni_measurementlink_service.measurement.service import GrpcChannelPool
from ni_measurementlink_service.measurement.service import (
GrpcChannelPool,
MeasurementService,
)


class ServiceOptions(NamedTuple):
Expand Down Expand Up @@ -231,3 +236,30 @@ def use_simulation_option(default: bool) -> Callable[[F], F]:
is_flag=True,
help="Use simulated instruments.",
)


def get_grpc_device_channel(
measurement_service: MeasurementService,
driver_module: types.ModuleType,
service_options: ServiceOptions,
) -> grpc.Channel:
"""Returns driver specific grpc device channel."""
if service_options.use_grpc_device and service_options.grpc_device_address:
return measurement_service.get_channel(service_options.grpc_device_address)

return measurement_service.get_channel(
provided_interface=getattr(driver_module, "GRPC_SERVICE_INTERFACE_NAME"),
service_class="ni.measurementlink.v1.grpcdeviceserver",
subash-suresh marked this conversation as resolved.
Show resolved Hide resolved
)


def create_session_management_client(
measurement_service: MeasurementService,
) -> nims.session_management.Client:
"""Return created session management client."""
return nims.session_management.Client(
grpc_channel=measurement_service.get_channel(
provided_interface=nims.session_management.GRPC_SERVICE_INTERFACE_NAME,
service_class=nims.session_management.GRPC_SERVICE_CLASS,
)
)
26 changes: 26 additions & 0 deletions examples/nidaqmx_analog_input/_nidaqmx_helpers.py
subash-suresh marked this conversation as resolved.
Show resolved Hide resolved
subash-suresh marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""nidaqmx Helper classes and functions for MeasurementLink examples."""

from typing import Optional

import grpc
import nidaqmx

import ni_measurementlink_service as nims


def create_task(
session_info: nims.session_management.SessionInformation,
session_grpc_channel: Optional[grpc.Channel] = None,
initialization_behavior=nidaqmx.SessionInitializationBehavior.AUTO,
) -> nidaqmx.Task:
"""Create daqmx task based on reserved session and grpc channel."""
session_kwargs = {}

if session_grpc_channel is not None:
session_kwargs["grpc_options"] = nidaqmx.GrpcSessionOptions(
session_grpc_channel,
session_name=session_info.session_name,
initialization_behavior=initialization_behavior,
)

return nidaqmx.Task(new_task_name=session_info.session_name, **session_kwargs)
80 changes: 26 additions & 54 deletions examples/nidaqmx_analog_input/measurement.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Perform a finite analog input measurement with NI-DAQmx."""

import contextlib
import logging
import pathlib
from typing import Optional
Expand All @@ -11,10 +10,13 @@
from _helpers import (
ServiceOptions,
configure_logging,
create_session_management_client,
get_grpc_device_channel,
get_service_options,
grpc_device_options,
verbosity_option,
)
from _nidaqmx_helpers import create_task
from nidaqmx.constants import TaskMode

import ni_measurementlink_service as nims
Expand Down Expand Up @@ -46,24 +48,16 @@ def measure(pin_name, sample_rate, number_of_samples):
sample_rate,
subash-suresh marked this conversation as resolved.
Show resolved Hide resolved
number_of_samples,
)
session_management_client = nims.session_management.Client(
grpc_channel=measurement_service.get_channel(
provided_interface=nims.session_management.GRPC_SERVICE_INTERFACE_NAME,
service_class=nims.session_management.GRPC_SERVICE_CLASS,
)
)
with contextlib.ExitStack() as stack:
reservation = stack.enter_context(
session_management_client.reserve_sessions(
context=measurement_service.context.pin_map_context,
pin_or_relay_names=[pin_name],
instrument_type_id=nims.session_management.INSTRUMENT_TYPE_NI_DAQMX,
# If another measurement is using the session, wait for it to complete.
# Specify a timeout to aid in debugging missed unreserve calls.
# Long measurements may require a longer timeout.
timeout=60,
)
)
session_management_client = create_session_management_client(measurement_service)
with session_management_client.reserve_sessions(
context=measurement_service.context.pin_map_context,
pin_or_relay_names=[pin_name],
instrument_type_id=nims.session_management.INSTRUMENT_TYPE_NI_DAQMX,
# If another measurement is using the session, wait for it to complete.
# Specify a timeout to aid in debugging missed unreserve calls.
# Long measurements may require a longer timeout.
timeout=60,
) as reservation:
if len(reservation.session_info) != 1:
measurement_service.context.abort(
grpc.StatusCode.INVALID_ARGUMENT,
Expand All @@ -80,49 +74,27 @@ def cancel_callback():

measurement_service.context.add_cancel_callback(cancel_callback)

task = stack.enter_context(_create_nidaqmx_task(session_info))
if not session_info.session_exists:
task.ai_channels.add_ai_voltage_chan(session_info.channel_list)
grpc_device_channel = get_grpc_device_channel(measurement_service, nidaqmx, service_options)
with create_task(session_info, grpc_device_channel) as task:
if not session_info.session_exists:
task.ai_channels.add_ai_voltage_chan(session_info.channel_list)

task.timing.cfg_samp_clk_timing(
rate=sample_rate,
samps_per_chan=number_of_samples,
)
task.timing.cfg_samp_clk_timing(
rate=sample_rate,
samps_per_chan=number_of_samples,
)

timeout = min(measurement_service.context.time_remaining, 10.0)
voltage_values = task.read(number_of_samples_per_channel=number_of_samples, timeout=timeout)
task = None # Don't abort after this point
timeout = min(measurement_service.context.time_remaining, 10.0)
voltage_values = task.read(
number_of_samples_per_channel=number_of_samples, timeout=timeout
)
task = None # Don't abort after this point

_log_measured_values(voltage_values)
logging.info("Completed measurement")
return (voltage_values,)


def _create_nidaqmx_task(
session_info: nims.session_management.SessionInformation,
) -> nidaqmx.Task:
session_kwargs = {}
if service_options.use_grpc_device:
session_grpc_address = service_options.grpc_device_address

if not session_grpc_address:
session_grpc_channel = measurement_service.get_channel(
provided_interface=nidaqmx.GRPC_SERVICE_INTERFACE_NAME,
service_class="ni.measurementlink.v1.grpcdeviceserver",
)
else:
session_grpc_channel = measurement_service.channel_pool.get_channel(
target=session_grpc_address
)
session_kwargs["grpc_options"] = nidaqmx.GrpcSessionOptions(
session_grpc_channel,
session_name=session_info.session_name,
initialization_behavior=nidaqmx.SessionInitializationBehavior.AUTO,
)

return nidaqmx.Task(new_task_name=session_info.session_name, **session_kwargs)


def _log_measured_values(samples, max_samples_to_display=5):
"""Log the measured values."""
if len(samples) > max_samples_to_display:
Expand Down
12 changes: 10 additions & 2 deletions examples/nidcpower_source_dc_voltage/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
pin_map_service_pb2,
pin_map_service_pb2_grpc,
)
from ni_measurementlink_service.measurement.service import GrpcChannelPool, MeasurementService
from ni_measurementlink_service.measurement.service import (
GrpcChannelPool,
MeasurementService,
)


class ServiceOptions(NamedTuple):
Expand Down Expand Up @@ -236,9 +239,14 @@ def use_simulation_option(default: bool) -> Callable[[F], F]:


def get_grpc_device_channel(
measurement_service: MeasurementService, driver_module: types.ModuleType
measurement_service: MeasurementService,
driver_module: types.ModuleType,
service_options: ServiceOptions,
) -> grpc.Channel:
"""Returns driver specific grpc device channel."""
if service_options.use_grpc_device and service_options.grpc_device_address:
return measurement_service.get_channel(service_options.grpc_device_address)

return measurement_service.get_channel(
provided_interface=getattr(driver_module, "GRPC_SERVICE_INTERFACE_NAME"),
service_class="ni.measurementlink.v1.grpcdeviceserver",
Expand Down
38 changes: 9 additions & 29 deletions examples/nidcpower_source_dc_voltage/_nidcpower_helpers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""nidcpower Helper classes and functions for MeasurementLink examples."""

from typing import Any, Dict, Iterable, Optional
from typing import Any, Dict, Optional

import grpc
import nidcpower
Expand All @@ -14,10 +14,8 @@

def create_session(
session_info: nims.session_management.SessionInformation,
session_grpc_channel: grpc.Channel,
initialization_behavior: Optional[
nidcpower.SessionInitializationBehavior
] = nidcpower.SessionInitializationBehavior.AUTO,
session_grpc_channel: Optional[grpc.Channel] = None,
initialization_behavior=nidcpower.SessionInitializationBehavior.AUTO,
) -> nidcpower.Session:
"""Create driver session based on reserved session and grpc channel."""
options: Dict[str, Any] = {}
Expand All @@ -27,31 +25,13 @@ def create_session(

session_kwargs: Dict[str, Any] = {}

session_kwargs["grpc_options"] = nidcpower.GrpcSessionOptions(
session_grpc_channel,
session_name=session_info.session_name,
initialization_behavior=initialization_behavior,
)
if session_grpc_channel is not None:
session_kwargs["grpc_options"] = nidcpower.GrpcSessionOptions(
session_grpc_channel,
session_name=session_info.session_name,
initialization_behavior=initialization_behavior,
)

return nidcpower.Session(
resource_name=session_info.resource_name, options=options, **session_kwargs
)


def reserve_session(
session_management_client: nims.session_management.Client,
pin_map_context: nims.session_management.PinMapContext,
pin_names: Optional[Iterable[str]] = None,
timeout: Optional[float] = None,
) -> nims.session_management.Reservation:
"""Reserve session(s).

Reserve session(s) for the given pins and returns the
information needed to create or access the session.
"""
return session_management_client.reserve_sessions(
context=pin_map_context,
pin_or_relay_names=pin_names,
instrument_type_id=nims.session_management.INSTRUMENT_TYPE_NI_DCPOWER,
timeout=timeout,
)
19 changes: 10 additions & 9 deletions examples/nidcpower_source_dc_voltage/measurement.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,16 @@
ServiceOptions,
configure_logging,
create_session_management_client,
get_service_options,
get_grpc_device_channel,
get_service_options,
grpc_device_options,
use_simulation_option,
verbosity_option,
)
from _nidcpower_helpers import create_session, reserve_session, USE_SIMULATION
from _nidcpower_helpers import USE_SIMULATION, create_session

import ni_measurementlink_service as nims


NIDCPOWER_WAIT_FOR_EVENT_TIMEOUT_ERROR_CODE = -1074116059
NIDCPOWER_TIMEOUT_EXCEEDED_ERROR_CODE = -1074097933

Expand Down Expand Up @@ -69,10 +68,10 @@ def measure(

session_management_client = create_session_management_client(measurement_service)

with reserve_session(
session_management_client,
measurement_service.context.pin_map_context,
pin_names=pin_names,
with session_management_client.reserve_sessions(
context=measurement_service.context.pin_map_context,
pin_or_relay_names=pin_names,
instrument_type_id=nims.session_management.INSTRUMENT_TYPE_NI_DCPOWER,
# If another measurement is using the session, wait for it to complete.
# Specify a timeout to aid in debugging missed unreserve calls.
# Long measurements may require a longer timeout.
Expand All @@ -85,10 +84,12 @@ def measure(
)

session_info = reservation.session_info[0]

grpc_device_channel = get_grpc_device_channel(
measurement_service, nidcpower, service_options
)
with create_session(
session_info,
get_grpc_device_channel(measurement_service, nidcpower),
grpc_device_channel,
) as session:
channels = session.channels[session_info.channel_list]
channel_mappings = session_info.channel_mappings
Expand Down
14 changes: 8 additions & 6 deletions examples/nidcpower_source_dc_voltage/teststand_fixture.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import nidcpower
from _helpers import GrpcChannelPoolHelper, PinMapClient, TestStandSupport
from _nidcpower_helpers import create_session, reserve_session
from _nidcpower_helpers import create_session

import ni_measurementlink_service as nims

Expand Down Expand Up @@ -48,11 +48,13 @@ def create_nidcpower_sessions(sequence_context: Any) -> None:
pin_map_id = teststand_support.get_active_pin_map_id()

pin_map_context = nims.session_management.PinMapContext(pin_map_id=pin_map_id, sites=None)
with reserve_session(
session_management_client,
pin_map_context,
# This code module sets up the sessions, so error immediately if they are in use.
timeout=0,
with session_management_client.reserve_sessions(
context=pin_map_context,
instrument_type_id=nims.session_management.INSTRUMENT_TYPE_NI_DCPOWER,
# If another measurement is using the session, wait for it to complete.
# Specify a timeout to aid in debugging missed unreserve calls.
# Long measurements may require a longer timeout.
timeout=60,
) as reservation:
for session_info in reservation.session_info:
grpc_device_channel = grpc_channel_pool.get_grpc_device_channel(
Expand Down
Loading