Skip to content

Commit

Permalink
Update measurement link examples to match dcpower example changes. (#284
Browse files Browse the repository at this point in the history
)

* update ni fgen example

* update the scope example

* nidigital example updates

* update nidigital examples

* update niswitch helpers

* nidaqmx example update

* udpate the visa dmm example

* fix lint erros

---------

Co-authored-by: Subash Suresh <[email protected]>
  • Loading branch information
subash-suresh and Subash Suresh authored Jun 14, 2023
1 parent 07d6199 commit fe5e4ce
Show file tree
Hide file tree
Showing 30 changed files with 796 additions and 663 deletions.
38 changes: 36 additions & 2 deletions examples/nidaqmx_analog_input/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,23 @@

import logging
import pathlib
from typing import Any, Callable, Dict, NamedTuple, TypeVar
import types
from typing import Any, Callable, Dict, NamedTuple, TypeVar, Optional

import click
import grpc

import ni_measurementlink_service as nims
from ni_measurementlink_service import session_management
from ni_measurementlink_service._internal.discovery_client import DiscoveryClient
from ni_measurementlink_service._internal.stubs.ni.measurementlink.pinmap.v1 import (
pin_map_service_pb2,
pin_map_service_pb2_grpc,
)
from ni_measurementlink_service.measurement.service import GrpcChannelPool
from ni_measurementlink_service.measurement.service import (
GrpcChannelPool,
MeasurementService,
)


class ServiceOptions(NamedTuple):
Expand Down Expand Up @@ -231,3 +236,32 @@ def use_simulation_option(default: bool) -> Callable[[F], F]:
is_flag=True,
help="Use simulated instruments.",
)


def get_grpc_device_channel(
measurement_service: MeasurementService,
driver_module: types.ModuleType,
service_options: ServiceOptions,
) -> Optional[grpc.Channel]:
"""Returns driver specific grpc device channel."""
if service_options.use_grpc_device:
if service_options.grpc_device_address:
return measurement_service.channel_pool.get_channel(service_options.grpc_device_address)

return measurement_service.get_channel(
provided_interface=getattr(driver_module, "GRPC_SERVICE_INTERFACE_NAME"),
service_class="ni.measurementlink.v1.grpcdeviceserver",
)
return None


def create_session_management_client(
measurement_service: MeasurementService,
) -> nims.session_management.Client:
"""Return created session management client."""
return nims.session_management.Client(
grpc_channel=measurement_service.get_channel(
provided_interface=nims.session_management.GRPC_SERVICE_INTERFACE_NAME,
service_class=nims.session_management.GRPC_SERVICE_CLASS,
)
)
26 changes: 26 additions & 0 deletions examples/nidaqmx_analog_input/_nidaqmx_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""nidaqmx Helper classes and functions for MeasurementLink examples."""

from typing import Optional

import grpc
import nidaqmx

import ni_measurementlink_service as nims


def create_task(
session_info: nims.session_management.SessionInformation,
session_grpc_channel: Optional[grpc.Channel] = None,
initialization_behavior=nidaqmx.SessionInitializationBehavior.AUTO,
) -> nidaqmx.Task:
"""Create daqmx task based on reserved session and grpc channel."""
session_kwargs = {}

if session_grpc_channel is not None:
session_kwargs["grpc_options"] = nidaqmx.GrpcSessionOptions(
session_grpc_channel,
session_name=session_info.session_name,
initialization_behavior=initialization_behavior,
)

return nidaqmx.Task(new_task_name=session_info.session_name, **session_kwargs)
80 changes: 26 additions & 54 deletions examples/nidaqmx_analog_input/measurement.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Perform a finite analog input measurement with NI-DAQmx."""

import contextlib
import logging
import pathlib
from typing import Optional
Expand All @@ -11,10 +10,13 @@
from _helpers import (
ServiceOptions,
configure_logging,
create_session_management_client,
get_grpc_device_channel,
get_service_options,
grpc_device_options,
verbosity_option,
)
from _nidaqmx_helpers import create_task
from nidaqmx.constants import TaskMode

import ni_measurementlink_service as nims
Expand Down Expand Up @@ -46,24 +48,16 @@ def measure(pin_name, sample_rate, number_of_samples):
sample_rate,
number_of_samples,
)
session_management_client = nims.session_management.Client(
grpc_channel=measurement_service.get_channel(
provided_interface=nims.session_management.GRPC_SERVICE_INTERFACE_NAME,
service_class=nims.session_management.GRPC_SERVICE_CLASS,
)
)
with contextlib.ExitStack() as stack:
reservation = stack.enter_context(
session_management_client.reserve_sessions(
context=measurement_service.context.pin_map_context,
pin_or_relay_names=[pin_name],
instrument_type_id=nims.session_management.INSTRUMENT_TYPE_NI_DAQMX,
# If another measurement is using the session, wait for it to complete.
# Specify a timeout to aid in debugging missed unreserve calls.
# Long measurements may require a longer timeout.
timeout=60,
)
)
session_management_client = create_session_management_client(measurement_service)
with session_management_client.reserve_sessions(
context=measurement_service.context.pin_map_context,
pin_or_relay_names=[pin_name],
instrument_type_id=nims.session_management.INSTRUMENT_TYPE_NI_DAQMX,
# If another measurement is using the session, wait for it to complete.
# Specify a timeout to aid in debugging missed unreserve calls.
# Long measurements may require a longer timeout.
timeout=60,
) as reservation:
if len(reservation.session_info) != 1:
measurement_service.context.abort(
grpc.StatusCode.INVALID_ARGUMENT,
Expand All @@ -80,49 +74,27 @@ def cancel_callback():

measurement_service.context.add_cancel_callback(cancel_callback)

task = stack.enter_context(_create_nidaqmx_task(session_info))
if not session_info.session_exists:
task.ai_channels.add_ai_voltage_chan(session_info.channel_list)
grpc_device_channel = get_grpc_device_channel(measurement_service, nidaqmx, service_options)
with create_task(session_info, grpc_device_channel) as task:
if not session_info.session_exists:
task.ai_channels.add_ai_voltage_chan(session_info.channel_list)

task.timing.cfg_samp_clk_timing(
rate=sample_rate,
samps_per_chan=number_of_samples,
)
task.timing.cfg_samp_clk_timing(
rate=sample_rate,
samps_per_chan=number_of_samples,
)

timeout = min(measurement_service.context.time_remaining, 10.0)
voltage_values = task.read(number_of_samples_per_channel=number_of_samples, timeout=timeout)
task = None # Don't abort after this point
timeout = min(measurement_service.context.time_remaining, 10.0)
voltage_values = task.read(
number_of_samples_per_channel=number_of_samples, timeout=timeout
)
task = None # Don't abort after this point

_log_measured_values(voltage_values)
logging.info("Completed measurement")
return (voltage_values,)


def _create_nidaqmx_task(
session_info: nims.session_management.SessionInformation,
) -> nidaqmx.Task:
session_kwargs = {}
if service_options.use_grpc_device:
session_grpc_address = service_options.grpc_device_address

if not session_grpc_address:
session_grpc_channel = measurement_service.get_channel(
provided_interface=nidaqmx.GRPC_SERVICE_INTERFACE_NAME,
service_class="ni.measurementlink.v1.grpcdeviceserver",
)
else:
session_grpc_channel = measurement_service.channel_pool.get_channel(
target=session_grpc_address
)
session_kwargs["grpc_options"] = nidaqmx.GrpcSessionOptions(
session_grpc_channel,
session_name=session_info.session_name,
initialization_behavior=nidaqmx.SessionInitializationBehavior.AUTO,
)

return nidaqmx.Task(new_task_name=session_info.session_name, **session_kwargs)


def _log_measured_values(samples, max_samples_to_display=5):
"""Log the measured values."""
if len(samples) > max_samples_to_display:
Expand Down
38 changes: 15 additions & 23 deletions examples/nidaqmx_analog_input/teststand_fixture.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@

import nidaqmx
from _helpers import GrpcChannelPoolHelper, PinMapClient, TestStandSupport
from nidaqmx.grpc_session_options import (
GRPC_SERVICE_INTERFACE_NAME,
GrpcSessionOptions,
SessionInitializationBehavior,
)
from _nidaqmx_helpers import create_task

import ni_measurementlink_service as nims

Expand Down Expand Up @@ -47,27 +43,25 @@ def create_nidaqmx_tasks(sequence_context: Any) -> None:
session_management_client = nims.session_management.Client(
grpc_channel=grpc_channel_pool.session_management_channel
)
session_kwargs = {}

teststand_support = TestStandSupport(sequence_context)
pin_map_id = teststand_support.get_active_pin_map_id()

pin_map_context = nims.session_management.PinMapContext(pin_map_id=pin_map_id, sites=None)
grpc_device_channel = grpc_channel_pool.get_grpc_device_channel(
nidaqmx.GRPC_SERVICE_INTERFACE_NAME
)
with session_management_client.reserve_sessions(
context=pin_map_context,
instrument_type_id=nims.session_management.INSTRUMENT_TYPE_NI_DAQMX,
# This code module sets up the sessions, so error immediately if they are in use.
timeout=0,
) as reservation:
for session_info in reservation.session_info:
session_kwargs["grpc_options"] = GrpcSessionOptions(
grpc_channel_pool.get_grpc_device_channel(GRPC_SERVICE_INTERFACE_NAME),
session_name=session_info.session_name,
initialization_behavior=SessionInitializationBehavior.INITIALIZE_SERVER_SESSION,
task = create_task(
session_info,
grpc_device_channel,
initialization_behavior=nidaqmx.SessionInitializationBehavior.INITIALIZE_SERVER_SESSION,
)

# Leave session open
task = nidaqmx.Task(new_task_name=session_info.session_name, **session_kwargs)
task.ai_channels.add_ai_voltage_chan(session_info.channel_list)

session_management_client.register_sessions(reservation.session_info)
Expand All @@ -79,7 +73,9 @@ def destroy_nidaqmx_tasks() -> None:
session_management_client = nims.session_management.Client(
grpc_channel=grpc_channel_pool.session_management_channel
)

grpc_device_channel = grpc_channel_pool.get_grpc_device_channel(
nidaqmx.GRPC_SERVICE_INTERFACE_NAME
)
with session_management_client.reserve_all_registered_sessions(
instrument_type_id=nims.session_management.INSTRUMENT_TYPE_NI_DAQMX,
# This code module sets up the sessions, so error immediately if they are in use.
Expand All @@ -88,13 +84,9 @@ def destroy_nidaqmx_tasks() -> None:
session_management_client.unregister_sessions(reservation.session_info)

for session_info in reservation.session_info:
grpc_options = GrpcSessionOptions(
grpc_channel_pool.get_grpc_device_channel(GRPC_SERVICE_INTERFACE_NAME),
session_name=session_info.session_name,
initialization_behavior=SessionInitializationBehavior.ATTACH_TO_SERVER_SESSION,
)

task = nidaqmx.Task(
new_task_name=session_info.session_name, grpc_options=grpc_options
task = create_task(
session_info,
grpc_device_channel,
initialization_behavior=nidaqmx.SessionInitializationBehavior.ATTACH_TO_SERVER_SESSION,
)
task.close()
26 changes: 18 additions & 8 deletions examples/nidcpower_source_dc_voltage/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import pathlib
import types
from typing import Any, Callable, Dict, NamedTuple, TypeVar
from typing import Any, Callable, Dict, NamedTuple, TypeVar, Optional

import click
import grpc
Expand All @@ -15,7 +15,10 @@
pin_map_service_pb2,
pin_map_service_pb2_grpc,
)
from ni_measurementlink_service.measurement.service import GrpcChannelPool, MeasurementService
from ni_measurementlink_service.measurement.service import (
GrpcChannelPool,
MeasurementService,
)


class ServiceOptions(NamedTuple):
Expand Down Expand Up @@ -236,13 +239,20 @@ def use_simulation_option(default: bool) -> Callable[[F], F]:


def get_grpc_device_channel(
measurement_service: MeasurementService, driver_module: types.ModuleType
) -> grpc.Channel:
measurement_service: MeasurementService,
driver_module: types.ModuleType,
service_options: ServiceOptions,
) -> Optional[grpc.Channel]:
"""Returns driver specific grpc device channel."""
return measurement_service.get_channel(
provided_interface=getattr(driver_module, "GRPC_SERVICE_INTERFACE_NAME"),
service_class="ni.measurementlink.v1.grpcdeviceserver",
)
if service_options.use_grpc_device:
if service_options.grpc_device_address:
return measurement_service.channel_pool.get_channel(service_options.grpc_device_address)

return measurement_service.get_channel(
provided_interface=getattr(driver_module, "GRPC_SERVICE_INTERFACE_NAME"),
service_class="ni.measurementlink.v1.grpcdeviceserver",
)
return None


def create_session_management_client(
Expand Down
38 changes: 9 additions & 29 deletions examples/nidcpower_source_dc_voltage/_nidcpower_helpers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""nidcpower Helper classes and functions for MeasurementLink examples."""

from typing import Any, Dict, Iterable, Optional
from typing import Any, Dict, Optional

import grpc
import nidcpower
Expand All @@ -14,10 +14,8 @@

def create_session(
session_info: nims.session_management.SessionInformation,
session_grpc_channel: grpc.Channel,
initialization_behavior: Optional[
nidcpower.SessionInitializationBehavior
] = nidcpower.SessionInitializationBehavior.AUTO,
session_grpc_channel: Optional[grpc.Channel] = None,
initialization_behavior=nidcpower.SessionInitializationBehavior.AUTO,
) -> nidcpower.Session:
"""Create driver session based on reserved session and grpc channel."""
options: Dict[str, Any] = {}
Expand All @@ -27,31 +25,13 @@ def create_session(

session_kwargs: Dict[str, Any] = {}

session_kwargs["grpc_options"] = nidcpower.GrpcSessionOptions(
session_grpc_channel,
session_name=session_info.session_name,
initialization_behavior=initialization_behavior,
)
if session_grpc_channel is not None:
session_kwargs["grpc_options"] = nidcpower.GrpcSessionOptions(
session_grpc_channel,
session_name=session_info.session_name,
initialization_behavior=initialization_behavior,
)

return nidcpower.Session(
resource_name=session_info.resource_name, options=options, **session_kwargs
)


def reserve_session(
session_management_client: nims.session_management.Client,
pin_map_context: nims.session_management.PinMapContext,
pin_names: Optional[Iterable[str]] = None,
timeout: Optional[float] = None,
) -> nims.session_management.Reservation:
"""Reserve session(s).
Reserve session(s) for the given pins and returns the
information needed to create or access the session.
"""
return session_management_client.reserve_sessions(
context=pin_map_context,
pin_or_relay_names=pin_names,
instrument_type_id=nims.session_management.INSTRUMENT_TYPE_NI_DCPOWER,
timeout=timeout,
)
Loading

0 comments on commit fe5e4ce

Please sign in to comment.