Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SFR-2245: Refactor dev setup process and fix infinite cluster loop #396

Merged
merged 1 commit into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,6 @@ services:
- /var/run/docker.sock:/var/run/docker.sock
- ./localstack/init-localstack-resources.sh:/etc/localstack/init/ready.d/init-localstack-resources.sh

devsetup:
container_name: drb_local_devSetUp
depends_on:
- database
- elasticsearch
- s3
build:
context: .
command: -e local-compose -p DevelopmentSetupProcess
volumes:
- type: bind
source: .
target: /src
read_only: true

volumes:
drb_local_dbdata:
drb_local_esdata:
Expand Down
225 changes: 129 additions & 96 deletions processes/developmentSetup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,136 +11,169 @@

from managers.db import DBManager
from .core import CoreProcess
from logger import createLog
from mappings.hathitrust import HathiMapping
from .oclcClassify import ClassifyProcess
from .oclcCatalog import CatalogProcess
from .sfrCluster import ClusterProcess

logger = createLog(__name__)


class DevelopmentSetupProcess(CoreProcess):
def __init__(self, *args):
self.adminDBConnection = DBManager(
self.admin_db_manager = DBManager(
user=os.environ['ADMIN_USER'],
pswd=os.environ['ADMIN_PSWD'],
host=os.environ['POSTGRES_HOST'],
port=os.environ['POSTGRES_PORT'],
db='postgres'
)
self.initializeDB()

self.initialize_db()

super(DevelopmentSetupProcess, self).__init__(*args[:4])

def runProcess(self):
self.generateEngine()
self.createSession()

self.initializeDatabase()

self.createElasticConnection()
self.waitForElasticSearch()
self.createElasticSearchIndex()

self.createRabbitConnection()
self.createOrConnectQueue(os.environ['OCLC_QUEUE'], os.environ['OCLC_ROUTING_KEY'])
self.createOrConnectQueue(os.environ['FILE_QUEUE'], os.environ['FILE_ROUTING_KEY'])

self.fetchHathiSampleData()

procArgs = ['complete'] + ([None] * 4)

self.createRedisClient()
self.clear_cache()

classifyProc = ClassifyProcess(*procArgs)
classifyProc.runProcess()

catalogProc = CatalogProcess(*procArgs)
catalogProc.runProcess(max_attempts=1)
try:
self.generateEngine()
self.createSession()

self.initializeDatabase()

self.createElasticConnection()
self.wait_for_elastic_search()
self.createElasticSearchIndex()

self.createRabbitConnection()
self.createOrConnectQueue(os.environ['OCLC_QUEUE'], os.environ['OCLC_ROUTING_KEY'])
self.createOrConnectQueue(os.environ['FILE_QUEUE'], os.environ['FILE_ROUTING_KEY'])

self.fetch_hathi_sample_data()

process_args = ['complete'] + ([None] * 4)

self.createRedisClient()
self.clear_cache()

classify_process = ClassifyProcess(*process_args)
classify_process.runProcess()

catalog_process = CatalogProcess(*process_args)
catalog_process.runProcess(max_attempts=1)

cluster_process = ClusterProcess(*process_args)
cluster_process.runProcess()
except Exception:
logger.exception(f'Failed to run development setup process')

def initialize_db(self):
self.admin_db_manager.generateEngine()

with self.admin_db_manager.engine.connect() as admin_db_connection:
admin_db_connection.connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)

self.create_database(admin_db_connection)
self.create_database_user(admin_db_connection)

self.admin_db_manager.engine.dispose()

def create_database(self, db_connection):
try:
db_connection.execute(
sa.text(f"CREATE DATABASE {os.environ['POSTGRES_NAME']}"),
)
except ProgrammingError:
pass
except Exception as e:
logger.exception('Failed to create database')
raise e

def create_database_user(self, db_connection):
try:
db_connection.execute(
sa.text(
f"CREATE USER {os.environ['POSTGRES_USER']} "
f"WITH PASSWORD '{os.environ['POSTGRES_PSWD']}'",
),
)
db_connection.execute(
sa.text(
f"GRANT ALL PRIVILEGES ON DATABASE {os.environ['POSTGRES_NAME']} "
f"TO {os.environ['POSTGRES_USER']}",
),
)
except ProgrammingError:
pass
except Exception as e:
logger.exception('Failed to create database user')
raise e
Comment on lines +81 to +110
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea to separate those lines of code into two separate methods for clarity.


clusterProc = ClusterProcess(*procArgs)
clusterProc.runProcess()

def initializeDB(self):
self.adminDBConnection.generateEngine()
with self.adminDBConnection.engine.connect() as conn:
conn.connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
try:
conn.execute(
sa.text(f"CREATE DATABASE {os.environ['POSTGRES_NAME']}"),
)
except ProgrammingError:
pass

try:
conn.execute(
sa.text(
f"CREATE USER {os.environ['POSTGRES_USER']} "
f"WITH PASSWORD '{os.environ['POSTGRES_PSWD']}'",
),
)
conn.execute(
sa.text(
f"GRANT ALL PRIVILEGES ON DATABASE {os.environ['POSTGRES_NAME']} "
f"TO {os.environ['POSTGRES_USER']}",
),
)
except ProgrammingError:
pass

self.adminDBConnection.engine.dispose()

def fetchHathiSampleData(self):
self.importFromHathiTrustDataFile()
self.saveRecords()
self.commitChanges()

def waitForElasticSearch(self):
def wait_for_elastic_search(self):
increment = 5
totalTime = 0
while True and totalTime < 60:
max_time = 60

for _ in range(0, max_time, increment):
try:
self.es.info()
break
except ConnectionError:
pass
except Exception as e:
logger.exception('Failed to wait for elastic search')
raise e

totalTime += increment
sleep(increment)

@staticmethod
def returnHathiDateFormat(strDate):
if 'T' in strDate and '-' in strDate:
return '%Y-%m-%dT%H:%M:%S%z'
elif 'T' in strDate:
return '%Y-%m-%dT%H:%M:%S'
else:
return '%Y-%m-%d %H:%M:%S %z'
def fetch_hathi_sample_data(self):
self.import_from_hathi_trust_data_file()

self.saveRecords()
self.commitChanges()

def importFromHathiTrustDataFile(self):
fileList = requests.get(os.environ['HATHI_DATAFILES'])
if fileList.status_code != 200:
raise IOError('Unable to load data files')
def import_from_hathi_trust_data_file(self):
hathi_files_response = requests.get(os.environ['HATHI_DATAFILES'])

fileJSON = fileList.json()
if hathi_files_response.status_code != 200:
raise Exception('Unable to load Hathi Trust data files')

fileJSON.sort(
key=lambda x: datetime.strptime(
x['created'],
self.returnHathiDateFormat(x['created'])
hathi_files_json = hathi_files_response.json()

hathi_files_json.sort(
key=lambda file: datetime.strptime(
file['created'],
self.map_to_hathi_date_format(file['created'])
).timestamp(),
reverse=True
)

with open('/tmp/tmp_hathi.txt.gz', 'wb') as hathiTSV:
hathiReq = requests.get(fileJSON[0]['url'])
hathiTSV.write(hathiReq.content)
temp_hathi_file = '/tmp/tmp_hathi.txt.gz'
in_copyright_statuses = { 'ic', 'icus', 'ic-world', 'und' }

with open(temp_hathi_file, 'wb') as hathi_tsv_file:
hathi_data_response = requests.get(hathi_files_json[0]['url'])

hathi_tsv_file.write(hathi_data_response.content)

with gzip.open('/tmp/tmp_hathi.txt.gz', 'rt') as unzipTSV:
hathiTSV = csv.reader(unzipTSV, delimiter='\t')
for i, row in enumerate(hathiTSV):
if row[2] not in ['ic', 'icus', 'ic-world', 'und']:
hathiRec = HathiMapping(row, self.statics)
hathiRec.applyMapping()
self.addDCDWToUpdateList(hathiRec)
with gzip.open(temp_hathi_file, 'rt') as unzipped_tsv_file:
hathi_tsv_file = csv.reader(unzipped_tsv_file, delimiter='\t')

if i >= 500:
for number_of_books_ingested, book in enumerate(hathi_tsv_file):
if number_of_books_ingested > 500:
break

book_right = book[2]

if book_right not in in_copyright_statuses:
hathi_record = HathiMapping(book, self.statics)
hathi_record.applyMapping()

self.addDCDWToUpdateList(hathi_record)

def map_to_hathi_date_format(self, date: str):
if 'T' in date and '-' in date:
return '%Y-%m-%dT%H:%M:%S%z'
elif 'T' in date:
return '%Y-%m-%dT%H:%M:%S'
else:
return '%Y-%m-%d %H:%M:%S %z'
4 changes: 2 additions & 2 deletions processes/sfrCluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ def clusterRecords(self, full=False, startDateTime=None):
logger.warning('Skipping record {}'.format(rec))
self.updateMatchedRecordsStatus([rec.id])
self.session.commit()
except Exception:
except Exception as e:
logger.exception(f'Failed to cluster record {rec}')
continue
raise e

if len(indexingWorks) >= 50:
self.updateElasticSearch(indexingWorks, deletingWorks)
Expand Down
Loading