Skip to content

Commit

Permalink
[RSDK-3639] Create Location Client (#372)
Browse files Browse the repository at this point in the history
  • Loading branch information
8ashar authored Aug 1, 2023
1 parent e6c39f6 commit c928bbe
Show file tree
Hide file tree
Showing 11 changed files with 1,697 additions and 271 deletions.
390 changes: 195 additions & 195 deletions docs/examples/example.ipynb

Large diffs are not rendered by default.

754 changes: 754 additions & 0 deletions src/viam/app/app_client.py

Large diffs are not rendered by default.

Empty file removed src/viam/app/data/__init__.py
Empty file.
94 changes: 37 additions & 57 deletions src/viam/app/data/client.py → src/viam/app/data_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@
from typing import Any, List, Mapping, Optional, Tuple

from google.protobuf.struct_pb2 import Struct
from google.protobuf.timestamp_pb2 import Timestamp
from grpclib.client import Channel

from viam import logging
from viam.proto.app.data import (
AddTagsToBinaryDataByFilterRequest,
AddTagsToBinaryDataByFilterResponse,
AddTagsToBinaryDataByIDsRequest,
AddTagsToBinaryDataByIDsResponse,
BinaryDataByFilterRequest,
BinaryDataByFilterResponse,
BinaryDataByIDsRequest,
Expand Down Expand Up @@ -51,23 +48,23 @@
SensorMetadata,
UploadMetadata,
)
from viam.utils import struct_to_dict
from viam.utils import datetime_to_timestamp, struct_to_dict

LOGGER = logging.getLogger(__name__)


class DataClient:
"""gRPC client for uploading and retrieving data from app.
Constructor is used by `AppClient` to instantiate relevant service stubs. Calls to `DataClient` methods should be made through
`AppClient`.
Constructor is used by `ViamClient` to instantiate relevant service stubs. Calls to `DataClient` methods should be made through
`ViamClient`.
"""

def __init__(self, channel: Channel, metadata: Mapping[str, str]):
"""Create a `DataClient` that maintains a connection to app.
Args:
channel (Channel): Connection to app.
channel (grpclib.client.Channel): Connection to app.
metadata (Mapping[str, str]): Required authorization token to send requests to app.
"""
self._metadata = metadata
Expand Down Expand Up @@ -232,7 +229,7 @@ async def add_tags_to_binary_data_by_ids(self, tags: List[str], binary_ids: List
GRPCError: If no `BinaryID` objects or tags are provided.
"""
request = AddTagsToBinaryDataByIDsRequest(binary_ids=binary_ids, tags=tags)
_: AddTagsToBinaryDataByIDsResponse = await self._data_client.AddTagsToBinaryDataByIDs(request, metadata=self._metadata)
await self._data_client.AddTagsToBinaryDataByIDs(request, metadata=self._metadata)

async def add_tags_to_binary_data_by_filter(self, tags: List[str], filter: Optional[Filter] = None) -> None:
"""Add tags to binary data.
Expand All @@ -247,7 +244,7 @@ async def add_tags_to_binary_data_by_filter(self, tags: List[str], filter: Optio
"""
filter = filter if filter else Filter()
request = AddTagsToBinaryDataByFilterRequest(filter=filter, tags=tags)
_: AddTagsToBinaryDataByFilterResponse = await self._data_client.AddTagsToBinaryDataByFilter(request, metadata=self._metadata)
await self._data_client.AddTagsToBinaryDataByFilter(request, metadata=self._metadata)

async def remove_tags_from_binary_data_by_ids(self, tags: List[str], binary_ids: List[BinaryID]) -> int:
"""Remove tags from binary.
Expand Down Expand Up @@ -302,7 +299,7 @@ async def tags_by_filter(self, filter: Optional[Filter] = None) -> List[str]:
filter = filter if filter else Filter()
request = TagsByFilterRequest(filter=filter)
response: TagsByFilterResponse = await self._data_client.TagsByFilter(request, metadata=self._metadata)
return response.tags
return list(response.tags)

# TODO: implement
async def add_bounding_box_to_image_by_id(self):
Expand All @@ -325,7 +322,7 @@ async def bounding_box_labels_by_filter(self, filter: Optional[Filter] = None) -
filter = filter if filter else Filter()
request = BoundingBoxLabelsByFilterRequest(filter=filter)
response: BoundingBoxLabelsByFilterResponse = await self._data_client.BoundingBoxLabelsByFilter(request, metadata=self._metadata)
return response.labels
return list(response.labels)

async def binary_data_capture_upload(
self,
Expand Down Expand Up @@ -360,8 +357,8 @@ async def binary_data_capture_upload(
sensor_contents = SensorData(
metadata=(
SensorMetadata(
time_requested=self.datetime_to_timestamp(data_request_times[0]) if data_request_times[0] else None,
time_received=self.datetime_to_timestamp(data_request_times[1]) if data_request_times[1] else None,
time_requested=datetime_to_timestamp(data_request_times[0]) if data_request_times[0] else None,
time_received=datetime_to_timestamp(data_request_times[1]) if data_request_times[1] else None,
)
if data_request_times
else None
Expand All @@ -375,12 +372,10 @@ async def binary_data_capture_upload(
component_name=component_name,
method_name=method_name,
type=DataType.DATA_TYPE_BINARY_SENSOR,
file_name=None, # Not used in app.
method_parameters=method_parameters,
file_extension=None, # Will be stored as empty string "".
tags=tags,
)
_: DataCaptureUploadResponse = await self._data_capture_upload(metadata=metadata, sensor_contents=[sensor_contents])
await self._data_capture_upload(metadata=metadata, sensor_contents=[sensor_contents])

async def tabular_data_capture_upload(
self,
Expand Down Expand Up @@ -418,7 +413,7 @@ async def tabular_data_capture_upload(
AssertionError: If a list of `Timestamp` objects is provided and its length does not match the length of the list of tabular
data.
"""
sensor_contents = [None] * len(tabular_data)
sensor_contents = [SensorData()] * len(tabular_data)
if data_request_times:
assert len(data_request_times) == len(tabular_data)

Expand All @@ -428,8 +423,8 @@ async def tabular_data_capture_upload(
sensor_contents[i] = SensorData(
metadata=(
SensorMetadata(
time_requested=self.datetime_to_timestamp(data_request_times[i][0]) if data_request_times[i][0] else None,
time_received=self.datetime_to_timestamp(data_request_times[i][1]) if data_request_times[i][1] else None,
time_requested=datetime_to_timestamp(data_request_times[i][0]) if data_request_times[i][0] else None,
time_received=datetime_to_timestamp(data_request_times[i][1]) if data_request_times[i][1] else None,
)
if data_request_times[i]
else None
Expand All @@ -445,12 +440,10 @@ async def tabular_data_capture_upload(
component_name=component_name,
method_name=method_name,
type=DataType.DATA_TYPE_TABULAR_SENSOR,
file_name=None, # Not used in app.
method_parameters=method_parameters,
file_extension=None, # Will be stored as empty string "".
tags=tags,
)
_: DataCaptureUploadResponse = await self._data_capture_upload(metadata=metadata, sensor_contents=sensor_contents)
await self._data_capture_upload(metadata=metadata, sensor_contents=sensor_contents)

async def _data_capture_upload(self, metadata: UploadMetadata, sensor_contents: List[SensorData]) -> DataCaptureUploadResponse:
request = DataCaptureUploadRequest(metadata=metadata, sensor_contents=sensor_contents)
Expand Down Expand Up @@ -491,16 +484,16 @@ async def file_upload(
"""
metadata = UploadMetadata(
part_id=part_id,
component_type=component_type,
component_name=component_name,
method_name=method_name,
component_type=component_type if component_type else "",
component_name=component_name if component_name else "",
method_name=method_name if method_name else "",
type=DataType.DATA_TYPE_FILE,
file_name=file_name,
file_name=file_name if file_name else "",
method_parameters=method_parameters,
file_extension=file_extension,
file_extension=file_extension if file_extension else "",
tags=tags,
)
_: FileUploadResponse = await self._file_upload(metadata=metadata, file_contents=FileData(data=data))
await self._file_upload(metadata=metadata, file_contents=FileData(data=data if data else bytes()))

async def file_upload_from_path(
self,
Expand Down Expand Up @@ -539,40 +532,27 @@ async def file_upload_from_path(

metadata = UploadMetadata(
part_id=part_id,
component_type=component_type,
component_name=component_name,
method_name=method_name,
component_type=component_type if component_type else "",
component_name=component_name if component_name else "",
method_name=method_name if method_name else "",
type=DataType.DATA_TYPE_FILE,
file_name=file_name,
method_parameters=method_parameters,
file_extension=file_extension,
file_extension=file_extension if file_extension else "",
tags=tags,
)
_: FileUploadResponse = await self._file_upload(metadata=metadata, file_contents=FileData(data=data))
await self._file_upload(metadata=metadata, file_contents=FileData(data=data))

async def _file_upload(self, metadata: UploadMetadata, file_contents: FileData) -> FileUploadResponse:
request_metadata = FileUploadRequest(metadata=metadata)
request_file_contents = FileUploadRequest(file_contents=file_contents)
async with self._data_sync_client.FileUpload.open(metadata=self._metadata) as stream:
await stream.send_message(request_metadata)
await stream.send_message(request_file_contents, end=True)
response: FileUploadResponse = await stream.recv_message()
response = await stream.recv_message()
assert response is not None
return response

@staticmethod
def datetime_to_timestamp(dt: datetime) -> Timestamp:
"""Convert a Python native `datetime` into a `Timestamp`.
Args:
dt (datetime.datetime): A `datetime` object. UTC is assumed in the conversion if the object is naive to timezone information.
Returns:
google.protobuf.timestamp_pb2.Timestamp: The `Timestamp` object.
"""
timestamp = Timestamp()
timestamp.FromDatetime(dt)
return timestamp

@staticmethod
def create_filter(
component_name: Optional[str] = None,
Expand Down Expand Up @@ -613,20 +593,20 @@ def create_filter(
viam.proto.app.data.Filter: The `Filter` object.
"""
return Filter(
component_name=component_name,
component_type=component_type,
method=method,
robot_name=robot_name,
robot_id=robot_id,
part_name=part_name,
part_id=part_id,
component_name=component_name if component_name else "",
component_type=component_type if component_type else "",
method=method if method else "",
robot_name=robot_name if robot_name else "",
robot_id=robot_id if robot_id else "",
part_name=part_name if part_name else "",
part_id=part_id if part_id else "",
location_ids=location_ids,
organization_ids=organization_ids,
mime_type=mime_type,
interval=(
CaptureInterval(
start=DataClient.datetime_to_timestamp(start_time) if start_time else None,
end=DataClient.datetime_to_timestamp(end_time) if end_time else None,
start=datetime_to_timestamp(start_time) if start_time else None,
end=datetime_to_timestamp(end_time) if end_time else None,
)
)
if start_time and end_time
Expand Down
32 changes: 23 additions & 9 deletions src/viam/app/client.py → src/viam/app/viam_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,24 @@
from typing_extensions import Self

from viam import logging
from viam.app.data.client import DataClient
from viam.app.app_client import AppClient
from viam.app.data_client import DataClient
from viam.rpc.dial import DialOptions, _dial_app, _get_access_token

LOGGER = logging.getLogger(__name__)


class AppClient:
class ViamClient:
"""gRPC client for all communication and interaction with app.
Use create() to instantiate an AppClient::
There is currently 1 way to instantiate a `ViamClient` object::
AppClient.create(...)
ViamClient.create_from_dial_options(...)
"""

@classmethod
async def create(cls, dial_options: DialOptions) -> Self:
"""Create an AppClient that establishes a connection to app.viam.com.
async def create_from_dial_options(cls, dial_options: DialOptions) -> Self:
"""Create `ViamClient` that establishes a connection to app.viam.com.
Args:
Expand All @@ -31,10 +32,17 @@ async def create(cls, dial_options: DialOptions) -> Self:
AssertionError: If the type provided in the credentials of the `DialOptions` object is 'robot-secret'.
Returns:
Self: The `AppClient`.
Self: The `ViamClient`.
"""
assert dial_options.credentials.type != "robot-secret"
if dial_options.credentials is None:
raise ValueError("dial_options.credentials cannot be None.")
if dial_options.credentials.type == "robot-secret":
raise ValueError("dial_options.credentials.type cannot be 'robot-secret'")
if dial_options.auth_entity is None:
raise ValueError("dial_options.auth_entity cannot be None.")

self = cls()
self._location_id = dial_options.auth_entity.split(".")[1]
self._channel = await _dial_app(dial_options)
access_token = await _get_access_token(self._channel, dial_options.auth_entity, dial_options)
self._metadata = {"authorization": f"Bearer {access_token}"}
Expand All @@ -43,12 +51,18 @@ async def create(cls, dial_options: DialOptions) -> Self:
_channel: Channel
_metadata: Mapping[str, str]
_closed: bool = False
_location_id: str

@property
def data_client(self) -> DataClient:
"""Insantiate and return a DataClient used to make `data` and `data_sync` method calls."""
"""Insantiate and return a `DataClient` used to make `data` and `data_sync` method calls."""
return DataClient(self._channel, self._metadata)

@property
def app_client(self) -> AppClient:
"""Insantiate and return an `AppClient` used to make `app` method calls."""
return AppClient(self._channel, self._metadata, self._location_id)

def close(self):
"""Close opened channels used for the various service stubs initialized."""
if self._closed:
Expand Down
8 changes: 8 additions & 0 deletions src/viam/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
import functools
import sys
import threading
from datetime import datetime
from typing import Any, Dict, List, Mapping, SupportsBytes, SupportsFloat, Type, TypeVar, Union

from google.protobuf.json_format import MessageToDict, ParseDict
from google.protobuf.message import Message
from google.protobuf.struct_pb2 import ListValue, Struct, Value
from google.protobuf.timestamp_pb2 import Timestamp

from viam.proto.common import GeoPoint, Orientation, ResourceName, Vector3
from viam.resource.base import ResourceBase
Expand Down Expand Up @@ -151,6 +153,12 @@ def struct_to_dict(struct: Struct) -> Dict[str, ValueTypes]:
return {key: value_to_primitive(value) for (key, value) in struct.fields.items()}


def datetime_to_timestamp(dt: datetime) -> Timestamp:
timestamp = Timestamp()
timestamp.FromDatetime(dt)
return timestamp


def sensor_readings_native_to_value(readings: Mapping[str, Any]) -> Mapping[str, Any]:
prim_readings = dict(readings)
for key, reading in readings.items():
Expand Down
Loading

0 comments on commit c928bbe

Please sign in to comment.