From 533a81e5ab41e345cac0d6b322299df5ba8417f0 Mon Sep 17 00:00:00 2001 From: kyle Date: Thu, 10 Oct 2024 14:39:04 -0400 Subject: [PATCH] SFR-2245: Refactor dev setup process and fix infinite cluster loop --- docker-compose.yml | 15 --- processes/developmentSetup.py | 225 +++++++++++++++++++--------------- processes/sfrCluster.py | 4 +- 3 files changed, 131 insertions(+), 113 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index a6b0d35e48..aa04819770 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -75,21 +75,6 @@ services: - /var/run/docker.sock:/var/run/docker.sock - ./localstack/init-localstack-resources.sh:/etc/localstack/init/ready.d/init-localstack-resources.sh - devsetup: - container_name: drb_local_devSetUp - depends_on: - - database - - elasticsearch - - s3 - build: - context: . - command: -e local-compose -p DevelopmentSetupProcess - volumes: - - type: bind - source: . - target: /src - read_only: true - volumes: drb_local_dbdata: drb_local_esdata: diff --git a/processes/developmentSetup.py b/processes/developmentSetup.py index 6a7c7d7ff7..38e806cce7 100644 --- a/processes/developmentSetup.py +++ b/processes/developmentSetup.py @@ -11,136 +11,169 @@ from managers.db import DBManager from .core import CoreProcess +from logger import createLog from mappings.hathitrust import HathiMapping from .oclcClassify import ClassifyProcess from .oclcCatalog import CatalogProcess from .sfrCluster import ClusterProcess +logger = createLog(__name__) + + class DevelopmentSetupProcess(CoreProcess): def __init__(self, *args): - self.adminDBConnection = DBManager( + self.admin_db_manager = DBManager( user=os.environ['ADMIN_USER'], pswd=os.environ['ADMIN_PSWD'], host=os.environ['POSTGRES_HOST'], port=os.environ['POSTGRES_PORT'], db='postgres' ) - self.initializeDB() + + self.initialize_db() super(DevelopmentSetupProcess, self).__init__(*args[:4]) def runProcess(self): - self.generateEngine() - self.createSession() - - self.initializeDatabase() - - self.createElasticConnection() - self.waitForElasticSearch() - self.createElasticSearchIndex() - - self.createRabbitConnection() - self.createOrConnectQueue(os.environ['OCLC_QUEUE'], os.environ['OCLC_ROUTING_KEY']) - self.createOrConnectQueue(os.environ['FILE_QUEUE'], os.environ['FILE_ROUTING_KEY']) - - self.fetchHathiSampleData() - - procArgs = ['complete'] + ([None] * 4) - - self.createRedisClient() - self.clear_cache() - - classifyProc = ClassifyProcess(*procArgs) - classifyProc.runProcess() - - catalogProc = CatalogProcess(*procArgs) - catalogProc.runProcess(max_attempts=1) + try: + self.generateEngine() + self.createSession() + + self.initializeDatabase() + + self.createElasticConnection() + self.wait_for_elastic_search() + self.createElasticSearchIndex() + + self.createRabbitConnection() + self.createOrConnectQueue(os.environ['OCLC_QUEUE'], os.environ['OCLC_ROUTING_KEY']) + self.createOrConnectQueue(os.environ['FILE_QUEUE'], os.environ['FILE_ROUTING_KEY']) + + self.fetch_hathi_sample_data() + + process_args = ['complete'] + ([None] * 4) + + self.createRedisClient() + self.clear_cache() + + classify_process = ClassifyProcess(*process_args) + classify_process.runProcess() + + catalog_process = CatalogProcess(*process_args) + catalog_process.runProcess(max_attempts=1) + + cluster_process = ClusterProcess(*process_args) + cluster_process.runProcess() + except Exception: + logger.exception(f'Failed to run development setup process') + + def initialize_db(self): + self.admin_db_manager.generateEngine() + + with self.admin_db_manager.engine.connect() as admin_db_connection: + admin_db_connection.connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) + + self.create_database(admin_db_connection) + self.create_database_user(admin_db_connection) + + self.admin_db_manager.engine.dispose() + + def create_database(self, db_connection): + try: + db_connection.execute( + sa.text(f"CREATE DATABASE {os.environ['POSTGRES_NAME']}"), + ) + except ProgrammingError: + pass + except Exception as e: + logger.exception('Failed to create database') + raise e + + def create_database_user(self, db_connection): + try: + db_connection.execute( + sa.text( + f"CREATE USER {os.environ['POSTGRES_USER']} " + f"WITH PASSWORD '{os.environ['POSTGRES_PSWD']}'", + ), + ) + db_connection.execute( + sa.text( + f"GRANT ALL PRIVILEGES ON DATABASE {os.environ['POSTGRES_NAME']} " + f"TO {os.environ['POSTGRES_USER']}", + ), + ) + except ProgrammingError: + pass + except Exception as e: + logger.exception('Failed to create database user') + raise e - clusterProc = ClusterProcess(*procArgs) - clusterProc.runProcess() - - def initializeDB(self): - self.adminDBConnection.generateEngine() - with self.adminDBConnection.engine.connect() as conn: - conn.connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) - try: - conn.execute( - sa.text(f"CREATE DATABASE {os.environ['POSTGRES_NAME']}"), - ) - except ProgrammingError: - pass - - try: - conn.execute( - sa.text( - f"CREATE USER {os.environ['POSTGRES_USER']} " - f"WITH PASSWORD '{os.environ['POSTGRES_PSWD']}'", - ), - ) - conn.execute( - sa.text( - f"GRANT ALL PRIVILEGES ON DATABASE {os.environ['POSTGRES_NAME']} " - f"TO {os.environ['POSTGRES_USER']}", - ), - ) - except ProgrammingError: - pass - - self.adminDBConnection.engine.dispose() - - def fetchHathiSampleData(self): - self.importFromHathiTrustDataFile() - self.saveRecords() - self.commitChanges() - - def waitForElasticSearch(self): + def wait_for_elastic_search(self): increment = 5 - totalTime = 0 - while True and totalTime < 60: + max_time = 60 + + for _ in range(0, max_time, increment): try: self.es.info() break except ConnectionError: pass + except Exception as e: + logger.exception('Failed to wait for elastic search') + raise e - totalTime += increment sleep(increment) - @staticmethod - def returnHathiDateFormat(strDate): - if 'T' in strDate and '-' in strDate: - return '%Y-%m-%dT%H:%M:%S%z' - elif 'T' in strDate: - return '%Y-%m-%dT%H:%M:%S' - else: - return '%Y-%m-%d %H:%M:%S %z' + def fetch_hathi_sample_data(self): + self.import_from_hathi_trust_data_file() + + self.saveRecords() + self.commitChanges() - def importFromHathiTrustDataFile(self): - fileList = requests.get(os.environ['HATHI_DATAFILES']) - if fileList.status_code != 200: - raise IOError('Unable to load data files') + def import_from_hathi_trust_data_file(self): + hathi_files_response = requests.get(os.environ['HATHI_DATAFILES']) - fileJSON = fileList.json() + if hathi_files_response.status_code != 200: + raise Exception('Unable to load Hathi Trust data files') - fileJSON.sort( - key=lambda x: datetime.strptime( - x['created'], - self.returnHathiDateFormat(x['created']) + hathi_files_json = hathi_files_response.json() + + hathi_files_json.sort( + key=lambda file: datetime.strptime( + file['created'], + self.map_to_hathi_date_format(file['created']) ).timestamp(), reverse=True ) - with open('/tmp/tmp_hathi.txt.gz', 'wb') as hathiTSV: - hathiReq = requests.get(fileJSON[0]['url']) - hathiTSV.write(hathiReq.content) + temp_hathi_file = '/tmp/tmp_hathi.txt.gz' + in_copyright_statuses = { 'ic', 'icus', 'ic-world', 'und' } + + with open(temp_hathi_file, 'wb') as hathi_tsv_file: + hathi_data_response = requests.get(hathi_files_json[0]['url']) + + hathi_tsv_file.write(hathi_data_response.content) - with gzip.open('/tmp/tmp_hathi.txt.gz', 'rt') as unzipTSV: - hathiTSV = csv.reader(unzipTSV, delimiter='\t') - for i, row in enumerate(hathiTSV): - if row[2] not in ['ic', 'icus', 'ic-world', 'und']: - hathiRec = HathiMapping(row, self.statics) - hathiRec.applyMapping() - self.addDCDWToUpdateList(hathiRec) + with gzip.open(temp_hathi_file, 'rt') as unzipped_tsv_file: + hathi_tsv_file = csv.reader(unzipped_tsv_file, delimiter='\t') - if i >= 500: + for number_of_books_ingested, book in enumerate(hathi_tsv_file): + if number_of_books_ingested > 500: break + + book_right = book[2] + + if book_right not in in_copyright_statuses: + hathi_record = HathiMapping(book, self.statics) + hathi_record.applyMapping() + + self.addDCDWToUpdateList(hathi_record) + + def map_to_hathi_date_format(self, date: str): + if 'T' in date and '-' in date: + return '%Y-%m-%dT%H:%M:%S%z' + elif 'T' in date: + return '%Y-%m-%dT%H:%M:%S' + else: + return '%Y-%m-%d %H:%M:%S %z' diff --git a/processes/sfrCluster.py b/processes/sfrCluster.py index a25a99f0b1..a1a5833be3 100644 --- a/processes/sfrCluster.py +++ b/processes/sfrCluster.py @@ -67,9 +67,9 @@ def clusterRecords(self, full=False, startDateTime=None): logger.warning('Skipping record {}'.format(rec)) self.updateMatchedRecordsStatus([rec.id]) self.session.commit() - except Exception: + except Exception as e: logger.exception(f'Failed to cluster record {rec}') - continue + raise e if len(indexingWorks) >= 50: self.updateElasticSearch(indexingWorks, deletingWorks)