Skip to content

Commit

Permalink
update huntsman-pocs installation process and remote archive script (#…
Browse files Browse the repository at this point in the history
…598)

- change huntsman-pocs to require python>=3.9
- update git urls in docker files/requirement files
- update remote data archive script to use context manager for ssh/sftp clients
- general refactor of data archiver script
  • Loading branch information
fergusL authored Aug 14, 2023
1 parent bca333c commit 68a6b1d
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 113 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/pythontest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [ 3.8 ]
python-version: [3.9]
steps:
- name: Checkout code
uses: actions/checkout@v2
Expand All @@ -28,7 +28,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [ 3.8 ]
python-version: [3.9]
steps:
- name: Checkout huntsman-pocs
uses: actions/checkout@v2
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ htmlcov/*
junit.xml
coverage.xml
.coverage.*
.hypothesis/
.pytest_cache/

# Build and docs folder/files
Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ ARG image_tag=v0.7.8
FROM ${image_url}:${image_tag} AS pocs-base

ARG pocs_tag="v0.7.8"
ARG pocs_url="https://github.com/panoptes/POCS.git@${pocs_tag}#egg=panoptes-pocs"
ARG pocs_url="https://github.com/panoptes/POCS.git#${pocs_tag}#egg=panoptes-pocs"

LABEL description="Huntsman POCS Service"
LABEL maintainers="[email protected]"
Expand Down
2 changes: 1 addition & 1 deletion docker/POCS/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ RUN echo "Updating micromamba environment" && \

FROM panoptes-pocs-dependencies AS panoptes-pocs

ARG pip_install_name="git+https://github.com/panoptes/POCS.git@v0.7.8#egg=panoptes-pocs"
ARG pip_install_name="git+https://github.com/panoptes/POCS.git#v0.7.8#egg=panoptes-pocs"
ARG pip_install_extras="[google]"

USER "${USERID}"
Expand Down
2 changes: 1 addition & 1 deletion docker/POCS/environment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ dependencies:
- streamz
- uvicorn[standard]
- pip:
- "git+https://github.com/panoptes/panoptes-utils.git@v0.2.35#egg=panoptes-utils[config,images,social]"
- "git+https://github.com/panoptes/panoptes-utils.git#v0.2.35#egg=panoptes-utils[config,images,social]"
229 changes: 126 additions & 103 deletions scripts/sync_data_with_remote_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,32 @@

from panoptes.utils import error
from huntsman.pocs.utils.logger import get_logger
from huntsman.pocs.utils.config import get_config
from astropy import units as u

VALID_EXTENSIONS = (".fits", ".fits.fz")


class RemoteArchiver(Archiver):
""" Class to watch the archive directory on Huntsman control computer for new files and move
them to the Data Central cloud after enough time has passed.
""" Class to watch the archive directory on Huntsman control computer for new files and transfer
them to Data Central after enough time (delay_interval) has passed.
"""
_valid_extensions = VALID_EXTENSIONS

def __init__(self, images_directory=None, archive_directory=None, delay_interval=None,
sleep_interval=None, status_interval=60, logger=None, remote_host=None,
username=None, password=None, port=None, *args, **kwargs):
def __init__(
self, images_directory, archive_directory, username, remote_host, port, pkey_path,
delay_interval=None, sleep_interval=None, status_interval=60, logger=None, *args, **
kwargs):
"""
Args:
images_directory (str): The images directory to archive. If None (default), uses
the directories.images config entry.
archive_directory (str): The archive directory. If None (default), uses
the directories.archive config entry.
username (str): The username for ssh access to remote host
remote_host (str): The name or IP address of the remote host to connect to.
port (int): The port number used to connect to the remote host.
pkey_path (str): Filepath to ed25519 key for authenticating with the remote host.
delay_interval (u.Quantity): The minimum amount of time a file must spend in the
archive queue before it is archived. If None (default), uses the
archiver.delay_time config entry.
Expand All @@ -37,130 +43,147 @@ def __init__(self, images_directory=None, archive_directory=None, delay_interval
60s.
logger (logger, optional): The logger instance. If not provided, use default Huntsman
logger.
remote_host (str): The name or IP address of the remote host to connect to.
username (str): The username used to connect to the remote host.
password (str): The password required to log in to the host.
port (int): The port number used to connect to the remote host.
*args, **kwargs: Parsed to PanBase initialiser.
"""
if not logger:
logger = get_logger()

super().__init__(delay_interval=delay_interval, sleep_interval=sleep_interval,
images_directory=images_directory, archive_directory=archive_directory,
super().__init__(images_directory=images_directory, archive_directory=archive_directory,
delay_interval=delay_interval, sleep_interval=sleep_interval,
status_interval=status_interval, logger=logger, *args, **kwargs)
if remote_host is None:
remote_host = self.get_config("remote_host")
self.remote_host = str(remote_host)
self.logger.debug(f"Remote Host: {self.remote_host}")

if username is None:
username = self.get_config("username")
self.username = str(username)
self.logger.debug(f"Username: {self.username}")
self.username = username
self.remote_host = remote_host
self.port = port
self.private_key = pm.Ed25519Key(filename=pkey_path)

if password is None:
password = self.get_config("password")
self.password = str(password)

if port is None:
port = self.get_config("port")
self.port = str(port)
self.logger.debug(
f"Remote Host: {self.remote_host}, port: {self.port}, pkey_path: {pkey_path}")

def _archive_file(self, filename):
"""Archive the file.
Args:
filename (str): The filename string.
filename (str): The local filename string.
"""
if not os.path.exists(filename): # May have already been archived or deleted
if not os.path.exists(filename):
self.logger.debug(f"Tried to archive {filename} but it does not exist.")
raise FileNotFoundError

# Get the archived filename
upload_filename = self._get_archive_filename(filename)

# # Make sure the upload directory exists
# self._check_destination_directory()
# os.makedirs(os.path.dirname(upload_filename), exist_ok=True)

# Move the file to the DC directory
self.logger.debug(f"Moving {filename} to {upload_filename}.")
self.transfer_data(filename, upload_filename)

# Finally, delete the original
os.remove(filename)

def _setup_sftp(self):
self.ssh = pm.SSHClient()
self.ssh.set_missing_host_key_policy(pm.AutoAddPolicy())
self.ssh.connect(
self.remote_host,
username=self.username,
password=self.password,
port=self.port,
)
self.sftp = self.ssh.open_sftp()

def transfer_data(self, filename, destination):
self._setup_sftp()
try:
self.logger.info(
"Checking whether the provided destination directory exists"
)
self.sftp.stat(os.path.dirname(destination))
self.logger.info("The destination directory exisets in the cloud!")
except FileNotFoundError:
self.logger.info("The destination directory was not found!")
self.logger.info(f"Creating {os.path.dirname(destination)} in the cloud.")
folders = os.path.dirname(destination).split("/")
for folder in folders:
# Get the filename for the remote archive file
remote_filename = self._get_archive_filename(filename)

self.logger.debug(f"Moving {filename} to {remote_filename}.")
success = self.transfer_data(filename, remote_filename)

if success:
self.logger.debug(f"Transfer successful, deleting {filename} on local machine.")
os.remove(filename)
else:
self.logger.debug(
"Transfer unsuccessful, archiver will reattempt upload at next iteration.")

def transfer_data(self, local_filename, remote_filename):
"""Create an SFTP session and copy the local file to the remote host.
Args:
local_filename (str): local file to be copied
remote_filename (str): the filename/path to copy the local file to on the remote host
"""
success = False
with pm.SSHClient() as ssh:
ssh.set_missing_host_key_policy(pm.AutoAddPolicy())
ssh.connect(self.remote_host, port=self.port,
username=self.username, pkey=self.private_key)
with ssh.open_sftp() as sftp:
# FIRST verify directory structure exists on remote host and create it if it doesn't
try:
self.sftp.chdir(folder)
self.logger.debug("Verify that the remote_filename directory exists")
sftp.stat(os.path.dirname(remote_filename))
self.logger.debug("Destination directory exists")
except FileNotFoundError:
# Create the folder if it does not exist
self.sftp.mkdir(folder)
self.sftp.chdir(folder)
# self.sftp.mkdir(os.path.dirname(destination))
self.logger.info(
f"Copying {filename} into the destination directory: {destination}"
)
self.sftp.put(filename, destination)
self.logger.info("Copying completed!")
self.sftp.close()
self.ssh.close()
self.logger.info(
f"The {os.path.dirname(remote_filename)} directory was not found! \
Creating it now.")
# get the relative path of the remote_filename when compared with
# the archive directory on the remote machine
relpath = os.path.relpath(remote_filename, self.archive_directory)
folders = os.path.dirname(relpath).split("/")
folder_to_create = self.archive_directory
for folder in folders:
folder_to_create = os.path.join(folder_to_create, folder)
try:
sftp.chdir(folder_to_create)
except FileNotFoundError:
# Create the folder if it does not exist
sftp.mkdir(folder_to_create)

# SECOND: copy file to desired location on remote host
try:
self.logger.info(
f"Copying {local_filename} to destination directory: {remote_filename}"
)
# Note: when using confirm=True, sftp.put will check the file size after
# the transfer to confirm success
sftp.put(local_filename, remote_filename, confirm=True)
except PermissionError as pe:
self.logger.warning(
f"Copying {local_filename} to {remote_filename} failed due to a \
permission error: {pe!r}")
return success
except Exception as e:
self.logger.warning(
f"Copying {local_filename} to {remote_filename} failed due to an \
Exception: {e!r}")
return success

# double check that filesize of the local and remote file match after transfer
local_file_size = os.path.getsize(local_filename)
remote_file_size = sftp.stat(remote_filename).st_size

if local_file_size == remote_file_size:
success = True
self.logger.info("File transfer was successful: {success}")
return success
else:
self.logger.info("File transfer was successful: {success}")
return success


if __name__ == "__main__":
remote_host = "remote_host"
username = "username"
password = "password"
port = int("port")
images_directory = "images_directory"
archive_directory = "path/to/huntsman/on/dc-cloud"

delay_interval = 2 * u.second
sleep_interval = 3 * u.second
archiver = RemoteArchiver(
delay_interval=delay_interval,
sleep_interval=sleep_interval,
images_directory=images_directory,
archive_directory=archive_directory,
remote_host=remote_host,
username=username,
password=password,
port=port,
)
# required position args in order :
# [images_directory, archive_directory, username, remote_host, port, pkey_path]
args = list()
args.append(get_config("remote_archiver.images_directory", None))
args.append(get_config("remote_archiver.remote_archive_directory", None))
args.append(get_config("remote_archiver.username", 'huntsman'))
args.append(get_config("remote_archiver.remote_host", None))
args.append(get_config("remote_archiver.port", None))
args.append(get_config("remote_archiver.private_key_path", None))

arg_names = ["images_directory", "archive_directory",
"username", "remote_host", "port", "pkey_path"]
for arg_value, arg_name in zip(args, arg_names):
if arg_value is None:
raise ValueError(f"{arg_name} must be specified, cannot be set to {arg_value}.")

kwargs = dict()
# The minimum amount of time a file must spend in the archive queue before it is uploaded
kwargs['delay_interval'] = get_config("remote_archiver.delay_interval", 300 * u.second)
# how often the remote archiver checks for new local files to upload
kwargs['sleep_interval'] = get_config("remote_archiver.sleep_interval", 900 * u.second)

archiver = RemoteArchiver(*args, **kwargs)
archiver.start()

try:
while True:
if not archiver.is_running:
raise error.PanError("Archiver is no longer running.")
time.sleep(10)
raise error.PanError("RemoteArchiver is no longer running.")
time.sleep(60)
except KeyboardInterrupt:
# Gracefully handle KeyboardInterrupt (Ctrl+C) and SystemExit during file transfer
archiver.logger.info("KeyboardInterrupt received. Stopping the RemoteArchiver gracefully...")
archiver.logger.info(
"KeyboardInterrupt received. Stopping the RemoteArchiver.")
archiver.stop()

archiver.logger.info("Archiver stopped.")
# Make sure to exit the script properly after handling the KeyboardInterrupt or SystemExit
archiver.logger.info("RemoteArchiver stopped.")
sys.exit(0)
9 changes: 5 additions & 4 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ classifiers =
License :: OSI Approved :: MIT License
Operating System :: POSIX
Programming Language :: Python :: 3
Programming Language :: Python :: 3.8
Programming Language :: Python :: 3.9
Programming Language :: Python :: 3 :: Only
Topic :: Scientific/Engineering :: Astronomy
Topic :: Scientific/Engineering :: Physics
Expand All @@ -49,18 +49,19 @@ install_requires =
netifaces
numpy
msgpack
panoptes-pocs[focuser]>=0.7.7
panoptes-utils[config,images,social]>=0.2.31
panoptes-utils[config,images,social]==0.2.35
panoptes-pocs[focuser]==0.7.8
photutils
requests
pyusb>=1.1.1
paramiko

# The usage of test_requires is discouraged, see `Dependency Management` docs
# tests_require = pytest; pytest-cov
# Require a specific Python version, e.g. Python 2.7 or >= 3.4
# The usage of test_requires is discouraged, see `Dependency Management` docs
# tests_require = pytest; pytest-cov
python_requires = >=3.8
python_requires = >=3.9

[options.packages.find]
where = src
Expand Down
2 changes: 1 addition & 1 deletion tests/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ RUN echo "Building from ${image_name}:${image_tag}" && \
COPY docker/environment.yaml .
RUN micromamba update -n base -f environment.yaml && \
# POCS from github.
pip install "git+https://github.com/panoptes/POCS.git@v0.7.8#egg=panoptes-pocs[focuser]"
pip install "git+https://github.com/panoptes/POCS.git#v0.7.8#egg=panoptes-pocs[focuser]"

ARG pip_install_extras="[testing]"

Expand Down

0 comments on commit 68a6b1d

Please sign in to comment.