From 58fd7fb54d494926a042075d2d3546574a2f5b96 Mon Sep 17 00:00:00 2001 From: mjwen Date: Tue, 7 Nov 2023 09:59:07 -0600 Subject: [PATCH 01/16] Move ssh tunnel to a new file --- src/maggma/stores/gridfs.py | 3 +- src/maggma/stores/mongolike.py | 84 ++------------------------------- src/maggma/stores/ssh_tunnel.py | 84 +++++++++++++++++++++++++++++++++ tests/stores/test_ssh_tunnel.py | 3 +- 4 files changed, 91 insertions(+), 83 deletions(-) create mode 100644 src/maggma/stores/ssh_tunnel.py diff --git a/src/maggma/stores/gridfs.py b/src/maggma/stores/gridfs.py index 1d56ec9d8..db948d46e 100644 --- a/src/maggma/stores/gridfs.py +++ b/src/maggma/stores/gridfs.py @@ -18,7 +18,8 @@ from ruamel import yaml from maggma.core import Sort, Store, StoreError -from maggma.stores.mongolike import MongoStore, SSHTunnel +from maggma.stores.mongolike import MongoStore +from maggma.stores.ssh_tunnel import SSHTunnel # https://github.com/mongodb/specifications/ # blob/master/source/gridfs/gridfs-spec.rst#terms diff --git a/src/maggma/stores/mongolike.py b/src/maggma/stores/mongolike.py index 28ecaf096..5c2d21b11 100644 --- a/src/maggma/stores/mongolike.py +++ b/src/maggma/stores/mongolike.py @@ -7,10 +7,11 @@ import warnings from itertools import chain, groupby from pathlib import Path -from socket import socket from ruamel import yaml +from maggma.stores.ssh_tunnel import SSHTunnel + try: from typing import Any, Callable, Dict, Iterator, List, Literal, Optional, Tuple, Union except ImportError: @@ -23,12 +24,11 @@ import orjson from monty.dev import requires from monty.io import zopen -from monty.json import MSONable, jsanitize +from monty.json import jsanitize from monty.serialization import loadfn from pydash import get, has, set_ from pymongo import MongoClient, ReplaceOne, uri_parser from pymongo.errors import ConfigurationError, DocumentTooLarge, OperationFailure -from sshtunnel import SSHTunnelForwarder from maggma.core import Sort, Store, StoreError from maggma.utils import confirm_field_index, to_dt @@ -39,79 +39,6 @@ MontyClient = None -class SSHTunnel(MSONable): - __TUNNELS: Dict[str, SSHTunnelForwarder] = {} - - def __init__( - self, - tunnel_server_address: str, - remote_server_address: str, - username: Optional[str] = None, - password: Optional[str] = None, - private_key: Optional[str] = None, - **kwargs, - ): - """ - Args: - tunnel_server_address: string address with port for the SSH tunnel server - remote_server_address: string address with port for the server to connect to - username: optional username for the ssh tunnel server - password: optional password for the ssh tunnel server; If a private_key is - supplied this password is assumed to be the private key password - private_key: ssh private key to authenticate to the tunnel server - kwargs: any extra args passed to the SSHTunnelForwarder - """ - - self.tunnel_server_address = tunnel_server_address - self.remote_server_address = remote_server_address - self.username = username - self.password = password - self.private_key = private_key - self.kwargs = kwargs - - if remote_server_address in SSHTunnel.__TUNNELS: - self.tunnel = SSHTunnel.__TUNNELS[remote_server_address] - else: - open_port = _find_free_port("127.0.0.1") - local_bind_address = ("127.0.0.1", open_port) - - ssh_address, ssh_port = tunnel_server_address.split(":") - ssh_port = int(ssh_port) # type: ignore - - remote_bind_address, remote_bind_port = remote_server_address.split(":") - remote_bind_port = int(remote_bind_port) # type: ignore - - if private_key is not None: - ssh_password = None - ssh_private_key_password = password - else: - ssh_password = password - ssh_private_key_password = None - - self.tunnel = SSHTunnelForwarder( - ssh_address_or_host=(ssh_address, ssh_port), - local_bind_address=local_bind_address, - remote_bind_address=(remote_bind_address, remote_bind_port), - ssh_username=username, - ssh_password=ssh_password, - ssh_private_key_password=ssh_private_key_password, - ssh_pkey=private_key, - **kwargs, - ) - - def start(self): - if not self.tunnel.is_active: - self.tunnel.start() - - def stop(self): - if self.tunnel.tunnel_is_up: - self.tunnel.stop() - - @property - def local_address(self) -> Tuple[str, int]: - return self.tunnel.local_bind_address - - class MongoStore(Store): """ A Store that connects to a Mongo collection @@ -1006,8 +933,3 @@ def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = No self._collection.replace_one(search_doc, d, upsert=True) - -def _find_free_port(address="0.0.0.0"): - s = socket() - s.bind((address, 0)) # Bind to a free port provided by the host. - return s.getsockname()[1] # Return the port number assigned. diff --git a/src/maggma/stores/ssh_tunnel.py b/src/maggma/stores/ssh_tunnel.py new file mode 100644 index 000000000..adb8c5e78 --- /dev/null +++ b/src/maggma/stores/ssh_tunnel.py @@ -0,0 +1,84 @@ +from socket import socket +from typing import Dict, Optional, Tuple + +from monty.json import MSONable +from sshtunnel import SSHTunnelForwarder + + +class SSHTunnel(MSONable): + __TUNNELS: Dict[str, SSHTunnelForwarder] = {} + + def __init__( + self, + tunnel_server_address: str, + remote_server_address: str, + username: Optional[str] = None, + password: Optional[str] = None, + private_key: Optional[str] = None, + **kwargs, + ): + """ + Args: + tunnel_server_address: string address with port for the SSH tunnel server + remote_server_address: string address with port for the server to connect to + username: optional username for the ssh tunnel server + password: optional password for the ssh tunnel server; If a private_key is + supplied this password is assumed to be the private key password + private_key: ssh private key to authenticate to the tunnel server + kwargs: any extra args passed to the SSHTunnelForwarder + """ + + self.tunnel_server_address = tunnel_server_address + self.remote_server_address = remote_server_address + self.username = username + self.password = password + self.private_key = private_key + self.kwargs = kwargs + + if remote_server_address in SSHTunnel.__TUNNELS: + self.tunnel = SSHTunnel.__TUNNELS[remote_server_address] + else: + open_port = _find_free_port("127.0.0.1") + local_bind_address = ("127.0.0.1", open_port) + + ssh_address, ssh_port = tunnel_server_address.split(":") + ssh_port = int(ssh_port) # type: ignore + + remote_bind_address, remote_bind_port = remote_server_address.split(":") + remote_bind_port = int(remote_bind_port) # type: ignore + + if private_key is not None: + ssh_password = None + ssh_private_key_password = password + else: + ssh_password = password + ssh_private_key_password = None + + self.tunnel = SSHTunnelForwarder( + ssh_address_or_host=(ssh_address, ssh_port), + local_bind_address=local_bind_address, + remote_bind_address=(remote_bind_address, remote_bind_port), + ssh_username=username, + ssh_password=ssh_password, + ssh_private_key_password=ssh_private_key_password, + ssh_pkey=private_key, + **kwargs, + ) + + def start(self): + if not self.tunnel.is_active: + self.tunnel.start() + + def stop(self): + if self.tunnel.tunnel_is_up: + self.tunnel.stop() + + @property + def local_address(self) -> Tuple[str, int]: + return self.tunnel.local_bind_address + + +def _find_free_port(address="0.0.0.0"): + s = socket() + s.bind((address, 0)) # Bind to a free port provided by the host. + return s.getsockname()[1] # Return the port number assigned. diff --git a/tests/stores/test_ssh_tunnel.py b/tests/stores/test_ssh_tunnel.py index 041d6906f..6198603f0 100644 --- a/tests/stores/test_ssh_tunnel.py +++ b/tests/stores/test_ssh_tunnel.py @@ -1,7 +1,8 @@ import paramiko import pymongo import pytest -from maggma.stores.mongolike import MongoStore, SSHTunnel +from maggma.stores.mongolike import MongoStore +from maggma.stores.ssh_tunnel import SSHTunnel from monty.serialization import dumpfn, loadfn from paramiko.ssh_exception import AuthenticationException, NoValidConnectionsError, SSHException From c6c98ab13f156ed33bfdc9775e80d1dfae5e8281 Mon Sep 17 00:00:00 2001 From: mjwen Date: Tue, 7 Nov 2023 10:31:28 -0600 Subject: [PATCH 02/16] Add new `local_port` arg, to allow S3 9000 port --- src/maggma/stores/ssh_tunnel.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/maggma/stores/ssh_tunnel.py b/src/maggma/stores/ssh_tunnel.py index adb8c5e78..7ddaa8a2d 100644 --- a/src/maggma/stores/ssh_tunnel.py +++ b/src/maggma/stores/ssh_tunnel.py @@ -12,6 +12,7 @@ def __init__( self, tunnel_server_address: str, remote_server_address: str, + local_port: int = None, username: Optional[str] = None, password: Optional[str] = None, private_key: Optional[str] = None, @@ -21,6 +22,8 @@ def __init__( Args: tunnel_server_address: string address with port for the SSH tunnel server remote_server_address: string address with port for the server to connect to + local_port: optional port to use for the local address (127.0.0.1); + if `None`, a random open port will be automatically selected username: optional username for the ssh tunnel server password: optional password for the ssh tunnel server; If a private_key is supplied this password is assumed to be the private key password @@ -30,6 +33,7 @@ def __init__( self.tunnel_server_address = tunnel_server_address self.remote_server_address = remote_server_address + self.local_port = local_port self.username = username self.password = password self.private_key = private_key @@ -38,8 +42,9 @@ def __init__( if remote_server_address in SSHTunnel.__TUNNELS: self.tunnel = SSHTunnel.__TUNNELS[remote_server_address] else: - open_port = _find_free_port("127.0.0.1") - local_bind_address = ("127.0.0.1", open_port) + if local_port is None: + local_port = _find_free_port("127.0.0.1") + local_bind_address = ("127.0.0.1", local_port) ssh_address, ssh_port = tunnel_server_address.split(":") ssh_port = int(ssh_port) # type: ignore From ea77d0f967db398716b9c97254aba3b6792bd499 Mon Sep 17 00:00:00 2001 From: mjwen Date: Tue, 7 Nov 2023 10:31:54 -0600 Subject: [PATCH 03/16] Add sshtunnel support for S3Store --- src/maggma/stores/aws.py | 35 +++++++++++++++++++++++++++++------ 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/src/maggma/stores/aws.py b/src/maggma/stores/aws.py index 385683883..a9931c2f0 100644 --- a/src/maggma/stores/aws.py +++ b/src/maggma/stores/aws.py @@ -14,6 +14,7 @@ from maggma.core import Sort, Store from maggma.utils import grouper, to_isoformat_ceil_ms +from maggma.stores.ssh_tunnel import SSHTunnel try: import boto3 @@ -40,6 +41,7 @@ def __init__( sub_dir: Optional[str] = None, s3_workers: int = 1, s3_resource_kwargs: Optional[dict] = None, + ssh_tunnel: Optional[SSHTunnel] = None, key: str = "fs_id", store_hash: bool = True, unpack_data: bool = True, @@ -59,11 +61,16 @@ def __init__( aws_session_token (string) -- AWS temporary session token region_name (string) -- Default region when creating new connections compress: compress files inserted into the store - endpoint_url: endpoint_url to allow interface to minio service - sub_dir: (optional) subdirectory of the s3 bucket to store the data + endpoint_url: endpoint_url to allow interface to minio service; ignored if + `ssh_tunnel` is provided, in which case the endpoint_url is inferred + sub_dir: (optional) subdirectory of the s3 bucket to store the data s3_workers: number of concurrent S3 puts to run + s3_resource_kwargs: additional kwargs to pass to the boto3 session resource + ssh_tunnel: optional SSH tunnel to use for the S3 connection + key: main key to index on store_hash: store the sha1 hash right before insertion to the database. - unpack_data: whether to decompress and unpack byte data when querying from the bucket. + unpack_data: whether to decompress and unpack byte data when querying from + the bucket searchable_fields: fields to keep in the index store """ if boto3 is None: @@ -79,6 +86,7 @@ def __init__( self.s3_bucket: Any = None self.s3_workers = s3_workers self.s3_resource_kwargs = s3_resource_kwargs if s3_resource_kwargs is not None else {} + self.ssh_tunnel = ssh_tunnel self.unpack_data = unpack_data self.searchable_fields = searchable_fields if searchable_fields is not None else [] self.store_hash = store_hash @@ -107,7 +115,8 @@ def name(self) -> str: def connect(self, *args, **kwargs): # lgtm[py/conflicting-attributes] """Connect to the source data.""" session = self._get_session() - resource = session.resource("s3", endpoint_url=self.endpoint_url, **self.s3_resource_kwargs) + endpoint_url = self._get_endpoint_url() + resource = session.resource("s3", endpoint_url=endpoint_url, **self.s3_resource_kwargs) if not self.s3: self.s3 = resource @@ -127,6 +136,9 @@ def close(self): self.s3 = None self.s3_bucket = None + if self.ssh_tunnel is not None: + self.ssh_tunnel.stop() + @property def _collection(self): """ @@ -266,7 +278,7 @@ def ensure_index(self, key: str, unique: bool = False) -> bool: Args: key: single key to index - unique: Whether or not this index contains only unique keys + unique: whether or not this index contains only unique keys Returns: bool indicating if the index exists/was created @@ -322,19 +334,30 @@ def update( self.index.update(search_docs, key=self.key) def _get_session(self): + if self.ssh_tunnel is not None: + self.ssh_tunnel.start() + if not hasattr(self._thread_local, "s3_bucket"): if isinstance(self.s3_profile, dict): return Session(**self.s3_profile) return Session(profile_name=self.s3_profile) return None + def _get_endpoint_url(self): + if self.ssh_tunnel is None: + return self.endpoint_url + else: + host, port = self.ssh_tunnel.local_address + return f"http://{host}:{port}" + def _get_bucket(self): """If on the main thread return the bucket created above, else create a new bucket on each thread.""" if threading.current_thread().name == "MainThread": return self.s3_bucket if not hasattr(self._thread_local, "s3_bucket"): session = self._get_session() - resource = session.resource("s3", endpoint_url=self.endpoint_url) + endpoint_url = self._get_endpoint_url() + resource = session.resource("s3", endpoint_url=endpoint_url) self._thread_local.s3_bucket = resource.Bucket(self.bucket) return self._thread_local.s3_bucket From e8fcac6d972e295bbf3c16714cd4d07e43c22d49 Mon Sep 17 00:00:00 2001 From: mjwen Date: Tue, 7 Nov 2023 11:34:07 -0600 Subject: [PATCH 04/16] Run pre-commit --- src/maggma/stores/aws.py | 2 +- src/maggma/stores/mongolike.py | 3 +-- src/maggma/stores/ssh_tunnel.py | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/maggma/stores/aws.py b/src/maggma/stores/aws.py index a9931c2f0..805bf7f5a 100644 --- a/src/maggma/stores/aws.py +++ b/src/maggma/stores/aws.py @@ -13,8 +13,8 @@ from monty.msgpack import default as monty_default from maggma.core import Sort, Store -from maggma.utils import grouper, to_isoformat_ceil_ms from maggma.stores.ssh_tunnel import SSHTunnel +from maggma.utils import grouper, to_isoformat_ceil_ms try: import boto3 diff --git a/src/maggma/stores/mongolike.py b/src/maggma/stores/mongolike.py index 5c2d21b11..ad08ee31d 100644 --- a/src/maggma/stores/mongolike.py +++ b/src/maggma/stores/mongolike.py @@ -669,7 +669,7 @@ def __init__( super().__init__(**kwargs) - def connect(self, force_reset: bool =False): + def connect(self, force_reset: bool = False): """ Loads the files into the collection in memory @@ -932,4 +932,3 @@ def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = No search_doc = {k: d[k] for k in key} if isinstance(key, list) else {key: d[key]} self._collection.replace_one(search_doc, d, upsert=True) - diff --git a/src/maggma/stores/ssh_tunnel.py b/src/maggma/stores/ssh_tunnel.py index 7ddaa8a2d..ac00e0f5a 100644 --- a/src/maggma/stores/ssh_tunnel.py +++ b/src/maggma/stores/ssh_tunnel.py @@ -12,7 +12,7 @@ def __init__( self, tunnel_server_address: str, remote_server_address: str, - local_port: int = None, + local_port: Optional[int] = None, username: Optional[str] = None, password: Optional[str] = None, private_key: Optional[str] = None, From e51d006f6c6c759b3b4fb9f823b0ce2b4b61629d Mon Sep 17 00:00:00 2001 From: mjwen Date: Fri, 10 Nov 2023 17:11:53 -0600 Subject: [PATCH 05/16] Enable force_reset for connect --- src/maggma/stores/aws.py | 49 +++++++++++++++++++++++----------------- 1 file changed, 28 insertions(+), 21 deletions(-) diff --git a/src/maggma/stores/aws.py b/src/maggma/stores/aws.py index 805bf7f5a..a5bc9fe5b 100644 --- a/src/maggma/stores/aws.py +++ b/src/maggma/stores/aws.py @@ -112,21 +112,15 @@ def name(self) -> str: """ return f"s3://{self.bucket}" - def connect(self, *args, **kwargs): # lgtm[py/conflicting-attributes] - """Connect to the source data.""" - session = self._get_session() - endpoint_url = self._get_endpoint_url() - resource = session.resource("s3", endpoint_url=endpoint_url, **self.s3_resource_kwargs) - - if not self.s3: - self.s3 = resource - try: - self.s3.meta.client.head_bucket(Bucket=self.bucket) - except ClientError: - raise RuntimeError(f"Bucket not present on AWS: {self.bucket}") + def connect(self, force_reset: bool = False): # lgtm[py/conflicting-attributes] + """Connect to the source data. - self.s3_bucket = resource.Bucket(self.bucket) - self.index.connect(*args, **kwargs) + Args: + force_reset: whether to force a reset of the connection + """ + if self.s3 is None or force_reset: + self.s3, self.s3_bucket = self._get_resource_and_bucket() + self.index.connect(force_reset=force_reset) def close(self): """Closes any connections.""" @@ -136,8 +130,7 @@ def close(self): self.s3 = None self.s3_bucket = None - if self.ssh_tunnel is not None: - self.ssh_tunnel.stop() + self.ssh_tunnel.stop() @property def _collection(self): @@ -351,16 +344,30 @@ def _get_endpoint_url(self): return f"http://{host}:{port}" def _get_bucket(self): - """If on the main thread return the bucket created above, else create a new bucket on each thread.""" + """If on the main thread return the bucket created above, else create a new + bucket on each thread.""" if threading.current_thread().name == "MainThread": return self.s3_bucket + if not hasattr(self._thread_local, "s3_bucket"): - session = self._get_session() - endpoint_url = self._get_endpoint_url() - resource = session.resource("s3", endpoint_url=endpoint_url) - self._thread_local.s3_bucket = resource.Bucket(self.bucket) + _, bucket = self._get_resource_and_bucket() + self._thread_local.s3_bucket = bucket + return self._thread_local.s3_bucket + def _get_resource_and_bucket(self): + """Helper function to create the resource and bucket objects.""" + session = self._get_session() + endpoint_url = self._get_endpoint_url() + resource = session.resource("s3", endpoint_url=endpoint_url, **self.s3_resource_kwargs) + try: + self.s3.meta.client.head_bucket(Bucket=self.bucket) + except ClientError: + raise RuntimeError(f"Bucket `{self.bucket}` not present on AWS") + bucket = resource.Bucket(self.bucket) + + return resource, bucket + def write_doc_to_s3(self, doc: Dict, search_keys: List[str]): """ Write the data to s3 and return the metadata to be inserted into the index db. From 6db3d02684ab44ce222d997276450b82795b4c47 Mon Sep 17 00:00:00 2001 From: mjwen Date: Fri, 10 Nov 2023 17:21:52 -0600 Subject: [PATCH 06/16] Fix some doc string --- src/maggma/stores/aws.py | 133 +++++++++++++++++++-------------------- 1 file changed, 65 insertions(+), 68 deletions(-) diff --git a/src/maggma/stores/aws.py b/src/maggma/stores/aws.py index a5bc9fe5b..a71dc369b 100644 --- a/src/maggma/stores/aws.py +++ b/src/maggma/stores/aws.py @@ -1,4 +1,5 @@ -"""Advanced Stores for connecting to AWS data.""" +"""Stores for connecting to AWS data.""" + import threading import warnings import zlib @@ -27,7 +28,8 @@ class S3Store(Store): """ - GridFS like storage using Amazon S3 and a regular store for indexing + GridFS like storage using Amazon S3 and a regular store for indexing. + Assumes Amazon AWS key and secret key are set in environment or default config file. """ @@ -52,26 +54,26 @@ def __init__( Initializes an S3 Store. Args: - index: a store to use to index the S3 Bucket - bucket: name of the bucket - s3_profile: name of aws profile containing credentials for role. - Alternatively you can pass in a dictionary with the full credentials: + index: a store to use to index the S3 bucket. + bucket: name of the bucket. + s3_profile: name of AWS profile containing the credentials. Alternatively + you can pass in a dictionary with the full credentials: aws_access_key_id (string) -- AWS access key ID aws_secret_access_key (string) -- AWS secret access key aws_session_token (string) -- AWS temporary session token region_name (string) -- Default region when creating new connections - compress: compress files inserted into the store - endpoint_url: endpoint_url to allow interface to minio service; ignored if - `ssh_tunnel` is provided, in which case the endpoint_url is inferred - sub_dir: (optional) subdirectory of the s3 bucket to store the data - s3_workers: number of concurrent S3 puts to run - s3_resource_kwargs: additional kwargs to pass to the boto3 session resource - ssh_tunnel: optional SSH tunnel to use for the S3 connection - key: main key to index on - store_hash: store the sha1 hash right before insertion to the database. + compress: compress files inserted into the store. + endpoint_url: this allows the interface with minio service; ignored if + `ssh_tunnel` is provided, in which case it is inferred. + sub_dir: subdirectory of the S3 bucket to store the data. + s3_workers: number of concurrent S3 puts to run. + s3_resource_kwargs: additional kwargs to pass to the boto3 session resource. + ssh_tunnel: optional SSH tunnel to use for the S3 connection. + key: main key to index on. + store_hash: store the SHA1 hash right before insertion to the database. unpack_data: whether to decompress and unpack byte data when querying from - the bucket - searchable_fields: fields to keep in the index store + the bucket. + searchable_fields: fields to keep in the index store. """ if boto3 is None: raise RuntimeError("boto3 and botocore are required for S3Store") @@ -106,10 +108,7 @@ def __init__( @property def name(self) -> str: - """ - Returns: - a string representing this data source. - """ + """String representing this data source.""" return f"s3://{self.bucket}" def connect(self, force_reset: bool = False): # lgtm[py/conflicting-attributes] @@ -135,11 +134,10 @@ def close(self): @property def _collection(self): """ - Returns: - a handle to the pymongo collection object. + A handle to the pymongo collection object. Important: - Not guaranteed to exist in the future + Not guaranteed to exist in the future. """ # For now returns the index collection since that is what we would "search" on return self.index._collection @@ -149,7 +147,7 @@ def count(self, criteria: Optional[Dict] = None) -> int: Counts the number of documents matching the query criteria. Args: - criteria: PyMongo filter for documents to count in + criteria: PyMongo filter for documents to count in. """ return self.index.count(criteria) @@ -165,12 +163,12 @@ def query( Queries the Store for a set of documents. Args: - criteria: PyMongo filter for documents to search in - properties: properties to return in grouped documents - sort: Dictionary of sort order for fields. Keys are field names and - values are 1 for ascending or -1 for descending. - skip: number documents to skip - limit: limit on total number of documents returned + criteria: PyMongo filter for documents to search in. + properties: properties to return in grouped documents. + sort: Dictionary of sort order for fields. Keys are field names and values + are 1 for ascending or -1 for descending. + skip: number documents to skip. + limit: limit on total number of documents returned. """ prop_keys = set() @@ -225,8 +223,8 @@ def distinct(self, field: str, criteria: Optional[Dict] = None, all_exist: bool Get all distinct values for a field. Args: - field: the field(s) to get distinct values for - criteria: PyMongo filter for documents to search in + field: the field(s) to get distinct values for. + criteria: PyMongo filter for documents to search in. """ # Index is a store so it should have its own distinct function return self.index.distinct(field, criteria=criteria) @@ -241,17 +239,16 @@ def groupby( limit: int = 0, ) -> Iterator[Tuple[Dict, List[Dict]]]: """ - Simple grouping function that will group documents - by keys. + Simple grouping function that will group documents by keys. Args: - keys: fields to group documents - criteria: PyMongo filter for documents to search in - properties: properties to return in grouped documents - sort: Dictionary of sort order for fields. Keys are field names and - values are 1 for ascending or -1 for descending. - skip: number documents to skip - limit: limit on total number of documents returned + keys: fields to group documents. + criteria: PyMongo filter for documents to search in. + properties: properties to return in grouped documents. + sort: Dictionary of sort order for fields. Keys are field names and values + are 1 for ascending or -1 for descending. + skip: number documents to skip. + limit: limit on total number of documents returned. Returns: generator returning tuples of (dict, list of docs) @@ -270,11 +267,11 @@ def ensure_index(self, key: str, unique: bool = False) -> bool: Tries to create an index and return true if it succeeded. Args: - key: single key to index - unique: whether or not this index contains only unique keys + key: single key to index. + unique: whether this index contains only unique keys. Returns: - bool indicating if the index exists/was created + bool indicating if the index exists/was created. """ return self.index.ensure_index(key, unique=unique) @@ -288,12 +285,11 @@ def update( Update documents into the Store. Args: - docs: the document or list of documents to update - key: field name(s) to determine uniqueness for a - document, can be a list of multiple fields, - a single field, or None if the Store's key - field is to be used - additional_metadata: field(s) to include in the s3 store's metadata + docs: the document or list of documents to update. + key: field name(s) to determine uniqueness for a document, can be a list of + multiple fields, a single field, or None if the Store's key field is to + be used. + additional_metadata: field(s) to include in the S3 store's metadata. """ if not isinstance(docs, list): docs = [docs] @@ -373,9 +369,9 @@ def write_doc_to_s3(self, doc: Dict, search_keys: List[str]): Write the data to s3 and return the metadata to be inserted into the index db. Args: - doc: the document + doc: the document. search_keys: list of keys to pull from the docs and be inserted into the - index db + index db. """ s3_bucket = self._get_bucket() @@ -442,8 +438,8 @@ def remove_docs(self, criteria: Dict, remove_s3_object: bool = False): Remove docs matching the query dictionary. Args: - criteria: query dictionary to match - remove_s3_object: whether to remove the actual S3 Object or not + criteria: query dictionary to match. + remove_s3_object: whether to remove the actual S3 object or not. """ if not remove_s3_object: self.index.remove_docs(criteria=criteria) @@ -463,15 +459,13 @@ def last_updated(self): def newer_in(self, target: Store, criteria: Optional[Dict] = None, exhaustive: bool = False) -> List[str]: """ - Returns the keys of documents that are newer in the target - Store than this Store. + Returns the keys of documents that are newer in the target Store than this Store. Args: - target: target Store - criteria: PyMongo filter for documents to search in - exhaustive: triggers an item-by-item check vs. checking - the last_updated of the target Store and using - that to filter out new items in + target: target Store. + criteria: PyMongo filter for documents to search in. + exhaustive: triggers an item-by-item check vs. checking the last_updated of + the target Store and using that to filter out new items in. """ if hasattr(target, "index"): return self.index.newer_in(target=target.index, criteria=criteria, exhaustive=exhaustive) @@ -482,9 +476,10 @@ def __hash__(self): def rebuild_index_from_s3_data(self, **kwargs): """ - Rebuilds the index Store from the data in S3 - Relies on the index document being stores as the metadata for the file - This can help recover lost databases. + Rebuilds the index Store from the data in S3. + + Relies on the index document being stores as the metadata for the file. This can + help recover lost databases. """ bucket = self.s3_bucket objects = bucket.objects.filter(Prefix=self.sub_dir) @@ -499,8 +494,9 @@ def rebuild_index_from_s3_data(self, **kwargs): def rebuild_metadata_from_index(self, index_query: Optional[dict] = None): """ - Read data from the index store and populate the metadata of the S3 bucket - Force all of the keys to be lower case to be Minio compatible + Read data from the index store and populate the metadata of the S3 bucket. + Force all the keys to be lower case to be Minio compatible. + Args: index_query: query on the index store. """ @@ -523,7 +519,8 @@ def rebuild_metadata_from_index(self, index_query: Optional[dict] = None): def __eq__(self, other: object) -> bool: """ - Check equality for S3Store + Check equality for S3Store. + other: other S3Store to compare with. """ if not isinstance(other, S3Store): From 92f807bb06ffeeb7e4e4ac3ce27f29fba122f112 Mon Sep 17 00:00:00 2001 From: mjwen Date: Sun, 19 Nov 2023 11:33:57 -0600 Subject: [PATCH 07/16] Fix to pass original S3 tests --- src/maggma/stores/aws.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/maggma/stores/aws.py b/src/maggma/stores/aws.py index a71dc369b..e0ed8fbcd 100644 --- a/src/maggma/stores/aws.py +++ b/src/maggma/stores/aws.py @@ -129,7 +129,8 @@ def close(self): self.s3 = None self.s3_bucket = None - self.ssh_tunnel.stop() + if self.ssh_tunnel is not None: + self.ssh_tunnel.stop() @property def _collection(self): @@ -357,9 +358,9 @@ def _get_resource_and_bucket(self): endpoint_url = self._get_endpoint_url() resource = session.resource("s3", endpoint_url=endpoint_url, **self.s3_resource_kwargs) try: - self.s3.meta.client.head_bucket(Bucket=self.bucket) + resource.meta.client.head_bucket(Bucket=self.bucket) except ClientError: - raise RuntimeError(f"Bucket `{self.bucket}` not present on AWS") + raise RuntimeError(f"Bucket not present on AWS") bucket = resource.Bucket(self.bucket) return resource, bucket From b7301309bb40063cf83f75c62b2866b238897ef5 Mon Sep 17 00:00:00 2001 From: mjwen Date: Sun, 19 Nov 2023 11:50:18 -0600 Subject: [PATCH 08/16] Add test force_reset --- tests/stores/test_aws.py | 25 +++++++++++++++++++++++-- tests/stores/test_ssh_tunnel.py | 5 +---- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/tests/stores/test_aws.py b/tests/stores/test_aws.py index 5f54379bb..17671e438 100644 --- a/tests/stores/test_aws.py +++ b/tests/stores/test_aws.py @@ -80,7 +80,7 @@ def test_keys(): conn.create_bucket(Bucket="bucket1") index = MemoryStore("index", key=1) with pytest.raises(AssertionError, match=r"Since we are.*"): - store = S3Store(index, "bucket1", s3_workers=4, key=1) + store = S3Store(index, "bucket1", s3_workers=4, key="1") index = MemoryStore("index", key="key1") with pytest.warns(UserWarning, match=r"The desired S3Store.*$"): store = S3Store(index, "bucket1", s3_workers=4, key="key2") @@ -125,7 +125,7 @@ def test_count(s3store): assert s3store.count({"task_id": "mp-3"}) == 1 -def test_qeuery(s3store): +def test_qeury(s3store): assert s3store.query_one(criteria={"task_id": "mp-2"}) is None assert s3store.query_one(criteria={"task_id": "mp-1"})["data"] == "asd" assert s3store.query_one(criteria={"task_id": "mp-3"})["data"] == "sdf" @@ -339,3 +339,24 @@ def test_no_bucket(): store = S3Store(index, "bucket2") with pytest.raises(RuntimeError, match=r".*Bucket not present.*"): store.connect() + + +def test_force_reset(s3store): + content = [ + { + "task_id": "mp-4", + "data": "abc", + s3store.last_updated_field: datetime.utcnow(), + } + ] + + s3store.connect(force_reset=True) + s3store.update(content) + assert s3store.count({"task_id": "mp-4"}) == 1 + + s3store.s3 = None + s3store.connect() + s3store.update(content) + assert s3store.count({"task_id": "mp-4"}) == 1 + + s3store.close() diff --git a/tests/stores/test_ssh_tunnel.py b/tests/stores/test_ssh_tunnel.py index 6198603f0..55ffe4cd9 100644 --- a/tests/stores/test_ssh_tunnel.py +++ b/tests/stores/test_ssh_tunnel.py @@ -9,10 +9,7 @@ @pytest.fixture() def ssh_server_available(): # noqa: PT004 - """ - Fixture to determine if an SSH server is available - to test the SSH tunnel - """ + """Fixture to determine if an SSH server is available to test the SSH tunnel.""" client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: From 814ad4034215adca8102e17165101bf63753b7dc Mon Sep 17 00:00:00 2001 From: mjwen Date: Sun, 19 Nov 2023 12:16:35 -0600 Subject: [PATCH 09/16] Add test for local_port for ssh tunnel --- tests/stores/test_ssh_tunnel.py | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/tests/stores/test_ssh_tunnel.py b/tests/stores/test_ssh_tunnel.py index 55ffe4cd9..58d5ba6aa 100644 --- a/tests/stores/test_ssh_tunnel.py +++ b/tests/stores/test_ssh_tunnel.py @@ -19,10 +19,24 @@ def ssh_server_available(): # noqa: PT004 pytest.skip("No SSH server to test tunnel against") -def test_mongostore_connect_via_ssh(ssh_server_available): - server = SSHTunnel("127.0.0.1:22", "127.0.0.1:27017") +def local_port_available(local_port): # noqa: PT004 + """Fixture to determine if a local port is available to test the SSH tunnel.""" + client = paramiko.SSHClient() + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + try: + client.connect("127.0.0.1", local_port) + client.close() + except (AuthenticationException, NoValidConnectionsError, SSHException): + pytest.skip("Local port unavailable to test tunnel against") + - mongostore = MongoStore("maggma_test", "test", ssh_tunnel=server) +@pytest.mark.parametrize("local_port", [None, 9000]) +def test_mongostore_connect_via_ssh(ssh_server_available, local_port): + if local_port is not None: + local_port_available(local_port) + + tunnel = SSHTunnel("127.0.0.1:22", "127.0.0.1:27017", local_port=local_port) + mongostore = MongoStore("maggma_test", "test", ssh_tunnel=tunnel) mongostore.connect() assert isinstance(mongostore._collection, pymongo.collection.Collection) mongostore.remove_docs({}) @@ -34,8 +48,12 @@ def test_mongostore_connect_via_ssh(ssh_server_available): mongostore.close() -def test_serialization(tmpdir, ssh_server_available): - tunnel = SSHTunnel("127.0.0.1:22", "127.0.0.1:27017") +@pytest.mark.parametrize("local_port", [None, 9000]) +def test_serialization(tmpdir, ssh_server_available, local_port): + if local_port is not None: + local_port_available(local_port) + + tunnel = SSHTunnel("127.0.0.1:22", "127.0.0.1:27017", local_port=local_port) dumpfn(tunnel, tmpdir / "tunnel.json") new_tunnel = loadfn(tmpdir / "tunnel.json") From 0b0d2da2b84eb604e4fa078365e845fc54bd8912 Mon Sep 17 00:00:00 2001 From: mjwen Date: Sun, 19 Nov 2023 12:36:59 -0600 Subject: [PATCH 10/16] Add test S3 store wit h ssh tunnel --- tests/stores/test_aws.py | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/tests/stores/test_aws.py b/tests/stores/test_aws.py index 17671e438..72d932fe9 100644 --- a/tests/stores/test_aws.py +++ b/tests/stores/test_aws.py @@ -5,6 +5,7 @@ import pytest from botocore.exceptions import ClientError from maggma.stores import MemoryStore, MongoStore, S3Store +from maggma.stores.ssh_tunnel import SSHTunnel from moto import mock_s3 @@ -16,6 +17,17 @@ def mongostore(): store._collection.drop() +@pytest.fixture() +def ssh_tunnel(): + try: + tunnel = SSHTunnel("127.0.0.1:22", "127.0.0.1:27017", local_port=9000) + except: + # fallback to not use a tunnel if there is error in creating the tunnel + tunnel = None + + return tunnel + + @pytest.fixture() def s3store(): with mock_s3(): @@ -74,6 +86,19 @@ def s3store_multi(): yield store +@pytest.fixture() +def s3store_with_tunnel(ssh_tunnel): + with mock_s3(): + conn = boto3.resource("s3", region_name="us-east-1") + conn.create_bucket(Bucket="bucket1") + + index = MemoryStore("index", key="task_id") + store = S3Store(index, "bucket1", key="task_id", ssh_tunnel=ssh_tunnel) + store.connect() + + yield store + + def test_keys(): with mock_s3(): conn = boto3.resource("s3", region_name="us-east-1") @@ -360,3 +385,18 @@ def test_force_reset(s3store): assert s3store.count({"task_id": "mp-4"}) == 1 s3store.close() + + +def test_ssh_tunnel(s3store_with_tunnel): + content = [ + { + "task_id": "mp-4", + "data": "abc", + s3store_with_tunnel.last_updated_field: datetime.utcnow(), + } + ] + + s3store_with_tunnel.update(content) + assert s3store_with_tunnel.count({"task_id": "mp-4"}) == 1 + + s3store_with_tunnel.close() From bf56d2a04998e3de93c68c12d810e6e1317e7c6c Mon Sep 17 00:00:00 2001 From: mjwen Date: Sun, 19 Nov 2023 15:47:40 -0600 Subject: [PATCH 11/16] Add brief doc on using SSHTunnel --- docs/getting_started/using_ssh_tunnel.md | 56 ++++++++++++++++++++++++ docs/reference/stores.md | 2 + src/maggma/stores/ssh_tunnel.py | 2 + 3 files changed, 60 insertions(+) create mode 100644 docs/getting_started/using_ssh_tunnel.md diff --git a/docs/getting_started/using_ssh_tunnel.md b/docs/getting_started/using_ssh_tunnel.md new file mode 100644 index 000000000..c1d9076ce --- /dev/null +++ b/docs/getting_started/using_ssh_tunnel.md @@ -0,0 +1,56 @@ +# Using `SSHTunnel` to conect to remote database + +One of the typical scenarios to use `maggma` is to connect to a remote database that is behind a firewall and thus cannot be accessed directly from your local computer (as shown below, [image credits](https://github.com/pahaz/sshtunnel/)). + +In this case, you can use `SSHTunnel` to first connect to the remote server, and then connect to the database from the server. + +``` +---------------------------------------------------------------------- + + | +-------------+ | +----------+ +--------- + LOCAL | | | REMOTE | | PRIVATE + COMPUTER | <== SSH ========> | SERVER | <== local ==> | SERVER +-------------+ | +----------+ +--------- + | + FIREWALL (only port 22 is open) + +---------------------------------------------------------------------- + +Note, the `local` indicates that the connction to the PRIVATE SERVER can only be made from the REMOTE SERVER. +``` + + +Below is an example of how to use `SSHTunnel` to connect to an AWS `S3Store` hosted on a private server. + +Let's assume that, from you local computer, you can ssh to the remote server using the following command with your credentials (e.g. ): + +```bash +ssh @ +``` + +and then from the remote server, you can access your database using, e.g., the following information: +``` +private_server_address: COMPUTE_NODE_1 +private_server_port: 9000 +``` + +You can create an `SSHTunnel` object as follows: + +```python +from maggma.stores.ssh_tunnel import SSHTunnel + +tunnel = SSHTunnel( + tunnel_server_address = ":22", + username = "", + password= "", + remote_server_address = "COMPUTE_NODE_1:9000", + local_port = 9000, +) +``` +and then pass it to the `S3Store` to connect to the database. +By doing so, you can access the database at the localhost address `http://127.0.0.1:9000` from your local computer as if it is hosted on your local computer. + +The arguments of the `SSHTunnel` are self-explanatory, but `local_port` needs more explanation. We assume that on the local computer, we want t o connect to the localhost address `http://127.0.0.1`, so we do not need to provide the address, but only the port number (`9000` in this case.) + +In essence, `SSHTunnel` allows you to connect to `COMPUTE_NODE_1:9000` on the private server from `http://127.0.0.1:9000` on your local computer. \ No newline at end of file diff --git a/docs/reference/stores.md b/docs/reference/stores.md index 78209268f..2f94909e5 100644 --- a/docs/reference/stores.md +++ b/docs/reference/stores.md @@ -9,3 +9,5 @@ ::: maggma.stores.advanced_stores ::: maggma.stores.compound_stores + +::: maggma.stores.ssh_tunnel diff --git a/src/maggma/stores/ssh_tunnel.py b/src/maggma/stores/ssh_tunnel.py index ac00e0f5a..0e496803b 100644 --- a/src/maggma/stores/ssh_tunnel.py +++ b/src/maggma/stores/ssh_tunnel.py @@ -6,6 +6,8 @@ class SSHTunnel(MSONable): + """SSH tunnel to remote server.""" + __TUNNELS: Dict[str, SSHTunnelForwarder] = {} def __init__( From afa105054a8145e3c2b484336f7a73453479accb Mon Sep 17 00:00:00 2001 From: mjwen Date: Sun, 19 Nov 2023 15:49:49 -0600 Subject: [PATCH 12/16] Remove unnecessary assignment --- tests/stores/test_aws.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/stores/test_aws.py b/tests/stores/test_aws.py index 72d932fe9..05befa937 100644 --- a/tests/stores/test_aws.py +++ b/tests/stores/test_aws.py @@ -105,7 +105,7 @@ def test_keys(): conn.create_bucket(Bucket="bucket1") index = MemoryStore("index", key=1) with pytest.raises(AssertionError, match=r"Since we are.*"): - store = S3Store(index, "bucket1", s3_workers=4, key="1") + S3Store(index, "bucket1", s3_workers=4, key="1") index = MemoryStore("index", key="key1") with pytest.warns(UserWarning, match=r"The desired S3Store.*$"): store = S3Store(index, "bucket1", s3_workers=4, key="key2") From aa387757b1895e2193d64cddd43c6e2eba959ae7 Mon Sep 17 00:00:00 2001 From: mjwen Date: Sun, 19 Nov 2023 15:55:10 -0600 Subject: [PATCH 13/16] explicit catch error --- tests/stores/test_aws.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/stores/test_aws.py b/tests/stores/test_aws.py index 05befa937..505f4144b 100644 --- a/tests/stores/test_aws.py +++ b/tests/stores/test_aws.py @@ -7,6 +7,7 @@ from maggma.stores import MemoryStore, MongoStore, S3Store from maggma.stores.ssh_tunnel import SSHTunnel from moto import mock_s3 +from sshtunnel import BaseSSHTunnelForwarderError @pytest.fixture() @@ -21,7 +22,7 @@ def mongostore(): def ssh_tunnel(): try: tunnel = SSHTunnel("127.0.0.1:22", "127.0.0.1:27017", local_port=9000) - except: + except (ValueError, BaseSSHTunnelForwarderError): # fallback to not use a tunnel if there is error in creating the tunnel tunnel = None From 07d3df93d444b251958dff1f0654ee2fd3098351 Mon Sep 17 00:00:00 2001 From: mjwen Date: Mon, 20 Nov 2023 19:38:12 -0600 Subject: [PATCH 14/16] Add sshtunnel to mkdocs.yml --- mkdocs.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/mkdocs.yml b/mkdocs.yml index 93cd48b4d..f1821ebad 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -17,6 +17,7 @@ nav: - Working with MapBuilder: getting_started/map_builder.md - Working with GroupBuilder: getting_started/group_builder.md - Setting up MongoDB: getting_started/mongodb.md + - Using SSHTunnel: getting_started/using_ssh_tunnel.md - Reference: Core: Store: reference/core_store.md From cc048d151f44d7178e893477be33f33a1f5fd594 Mon Sep 17 00:00:00 2001 From: mjwen Date: Mon, 20 Nov 2023 20:14:33 -0600 Subject: [PATCH 15/16] Add a fake ssh tunnel to test S3Store --- src/maggma/stores/aws.py | 1 + tests/stores/test_aws.py | 43 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+) diff --git a/src/maggma/stores/aws.py b/src/maggma/stores/aws.py index e0ed8fbcd..74fedf364 100644 --- a/src/maggma/stores/aws.py +++ b/src/maggma/stores/aws.py @@ -331,6 +331,7 @@ def _get_session(self): if isinstance(self.s3_profile, dict): return Session(**self.s3_profile) return Session(profile_name=self.s3_profile) + return None def _get_endpoint_url(self): diff --git a/tests/stores/test_aws.py b/tests/stores/test_aws.py index 505f4144b..9366fb6f4 100644 --- a/tests/stores/test_aws.py +++ b/tests/stores/test_aws.py @@ -389,6 +389,11 @@ def test_force_reset(s3store): def test_ssh_tunnel(s3store_with_tunnel): + """This test will actually create a real tunnel to test the functionality. + + The tunnel will be set to `None` if the tunnel cannot be created. As a result, + it becomes a test not testing the functionality of S3Store with the tunnel. + """ content = [ { "task_id": "mp-4", @@ -401,3 +406,41 @@ def test_ssh_tunnel(s3store_with_tunnel): assert s3store_with_tunnel.count({"task_id": "mp-4"}) == 1 s3store_with_tunnel.close() + + +def test_ssh_tunnel_2(): + """ + This test mocks the SSHTunnel behavior by creating a fake tunnel. + + The purpose is to check the behavior of the S3Store when the tunnel is not `None`. + This complements the `test_ssh_tunnel` test above. + """ + + class FakeTunnel: + def __init__(self, *args, **kwargs): + pass + + def start(self): + pass + + def stop(self): + pass + + def local_address(self): + return "ADDRESS", "PORT" + + def get_store(): + with mock_s3(): + conn = boto3.resource("s3", region_name="us-east-1") + conn.create_bucket(Bucket="bucket1") + + index = MemoryStore("index", key="task_id") + store = S3Store(index, "bucket1", key="task_id", ssh_tunnel=FakeTunnel()) + store.connect() + store._get_session() + assert store._get_endpoint_url() == "http://ADDRESS:PORT" + store.close() + + yield store + + get_store() From 31c6ce711928b9fbb50d1a0e593585ef0bfee312 Mon Sep 17 00:00:00 2001 From: mjwen Date: Tue, 21 Nov 2023 14:18:46 -0600 Subject: [PATCH 16/16] Fix typo --- docs/getting_started/using_ssh_tunnel.md | 13 +++++++++---- tests/stores/test_aws.py | 2 +- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/docs/getting_started/using_ssh_tunnel.md b/docs/getting_started/using_ssh_tunnel.md index c1d9076ce..29259725f 100644 --- a/docs/getting_started/using_ssh_tunnel.md +++ b/docs/getting_started/using_ssh_tunnel.md @@ -20,6 +20,7 @@ In this case, you can use `SSHTunnel` to first connect to the remote server, and Note, the `local` indicates that the connction to the PRIVATE SERVER can only be made from the REMOTE SERVER. ``` +## Example usage with `S3Store` Below is an example of how to use `SSHTunnel` to connect to an AWS `S3Store` hosted on a private server. @@ -48,9 +49,13 @@ tunnel = SSHTunnel( local_port = 9000, ) ``` -and then pass it to the `S3Store` to connect to the database. -By doing so, you can access the database at the localhost address `http://127.0.0.1:9000` from your local computer as if it is hosted on your local computer. +and then pass it to the `S3Store` to connect to the database. The arguments of the `SSHTunnel` are self-explanatory, but `local_port` needs more explanation. We assume that on the local computer, we want to connect to the localhost address `http://127.0.0.1`, so we do not need to provide the address, but only the port number (`9000` in this case.) -The arguments of the `SSHTunnel` are self-explanatory, but `local_port` needs more explanation. We assume that on the local computer, we want t o connect to the localhost address `http://127.0.0.1`, so we do not need to provide the address, but only the port number (`9000` in this case.) + In essence, `SSHTunnel` allows the connection to the database at `COMPUTE_NODE_1:9000` on the private server from the localhost address `http://127.0.0.1:9000` on the local computer as if the database is hosted on the local computer. -In essence, `SSHTunnel` allows you to connect to `COMPUTE_NODE_1:9000` on the private server from `http://127.0.0.1:9000` on your local computer. \ No newline at end of file +## Other use cases + +Alternative to using `username` and `password` for authentication with the remote server, `SSHTunnel` also supports authentication using SSH keys. In this case, you will need to provide your SSH credentials using the `private_key` argument. Read the docs of the `SSHTunnel` for more information. + + +`SSHTunnel` can also be used with other stores such as `MongoStore`, `MongoURIStore`, and `GridFSStore`. The usage is similar to the example above, but you might need to adjust the arguments to the `SSHTunnel` to match the use case. diff --git a/tests/stores/test_aws.py b/tests/stores/test_aws.py index 9366fb6f4..f5d02ad6e 100644 --- a/tests/stores/test_aws.py +++ b/tests/stores/test_aws.py @@ -151,7 +151,7 @@ def test_count(s3store): assert s3store.count({"task_id": "mp-3"}) == 1 -def test_qeury(s3store): +def test_query(s3store): assert s3store.query_one(criteria={"task_id": "mp-2"}) is None assert s3store.query_one(criteria={"task_id": "mp-1"})["data"] == "asd" assert s3store.query_one(criteria={"task_id": "mp-3"})["data"] == "sdf"