From 70a80fe602fb0d221e63a88652f45e0b202eee8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Samuel=20Moln=C3=A1r?= Date: Sat, 28 Sep 2024 11:47:36 +0200 Subject: [PATCH] Improve import --- config/schedule.rb | 2 +- ml/decree-embeddings/embed.py | 67 ++++++++++++------------------ ml/decree-embeddings/repository.py | 24 +++++++++++ 3 files changed, 51 insertions(+), 42 deletions(-) diff --git a/config/schedule.rb b/config/schedule.rb index 899f5c8..c73cde6 100644 --- a/config/schedule.rb +++ b/config/schedule.rb @@ -22,7 +22,7 @@ set :output, 'log/cron.log' every :day, at: '10 pm' do - runner 'ExceptionHandler.run { system("OPENCOURTS_DATABASE_NAME=opencourts_production INCREMENTAL=true python3 ml/decree-embeddings/embed.py") }' + runner 'ExceptionHandler.run { system("OPENCOURTS_DATABASE_NAME=opencourts_production python3 ml/decree-embeddings/embed.py") }' end every :day, at: '3:00 am' do diff --git a/ml/decree-embeddings/embed.py b/ml/decree-embeddings/embed.py index c4d3214..cda6b72 100644 --- a/ml/decree-embeddings/embed.py +++ b/ml/decree-embeddings/embed.py @@ -15,14 +15,6 @@ if __name__ == "__main__": try: - model_path = os.path.join( - os.path.dirname(os.path.abspath(__file__)), "models", "reducer.pk" - ) - incremental = os.getenv("INCREMENTAL", "false").lower() == "true" - - if incremental: - logger.info("Incremental mode enabled") - vocabulary = repository.decrees_vocabulary() logger.info(f"Loaded vocabulary with [{len(vocabulary)}] values") @@ -32,7 +24,7 @@ ids = [] embeddings = [] - for batch in repository.decrees(without_embedding_only=incremental): + for batch in repository.decrees(): start_time = time() vectors = decrees_to_embeddings(vocabulary, batch) @@ -56,46 +48,33 @@ logger.info(f"Finished embeddings in [{embeddings_time_in_ms:.2f}ms]") - if incremental: - logger.info(f"Loading reducer model from [{model_path}] ...") - - reducer_start_time = time() + memory = csr_matrix_memory_in_bytes(embeddings) - with open(model_path, "rb") as f: - reducer = pk.load(f) - - reducer_time_in_ms = (time() - reducer_start_time) * 1000 - - logger.info(f"Reducer model loaded in [{reducer_time_in_ms:.2f}ms]") - else: - memory = csr_matrix_memory_in_bytes(embeddings) - - logger.debug( - f"Memory usage for embeddings is [{memory / 1024 / 1024:.2f}MB]" - ) + logger.debug(f"Memory usage for embeddings is [{memory / 1024 / 1024:.2f}MB]") - sparsity = 1.0 - ( - embeddings.count_nonzero() / (embeddings.shape[0] * embeddings.shape[1]) - ) - eps = math.floor(sparsity / 0.05) * 0.05 + sparsity = 1.0 - ( + embeddings.count_nonzero() / (embeddings.shape[0] * embeddings.shape[1]) + ) + eps = math.floor(sparsity / 0.05) * 0.05 - logger.info(f"Sparsity of embeddings is [{sparsity}]") - logger.info(f"Setting eps of GaussianRandomProjection to [{eps}]") + logger.info(f"Sparsity of embeddings is [{sparsity}]") + logger.info(f"Setting eps of GaussianRandomProjection to [{eps}]") - reducer = GaussianRandomProjection(n_components=768, eps=eps) + reducer = GaussianRandomProjection(n_components=768, eps=eps) - reducer_start_time = time() - reducer.fit(embeddings) - reducer_time_in_ms = (time() - reducer_start_time) * 1000 + reducer_start_time = time() + reducer.fit(embeddings) + reducer_time_in_ms = (time() - reducer_start_time) * 1000 - logger.info( - f"Finished fitting dimensionality reduction for embeddings in [{reducer_time_in_ms:.2f}ms]" - ) + logger.info( + f"Finished fitting dimensionality reduction for embeddings in [{reducer_time_in_ms:.2f}ms]" + ) - with open(model_path, "wb") as f: - pk.dump(reducer, f) + remove_index_start_time = time() + repository.remove_embeddings_index() + remove_index_time_in_ms = (time() - remove_index_start_time) * 1000 - logger.info(f"Reducer model saved to [{model_path}]") + logger.info(f"Removed embeddings index in [{remove_index_time_in_ms:.2f}ms]") batch_size = 10_000 @@ -132,6 +111,12 @@ f"Stored [#{ i // batch_size}] batch of embeddings in [{time_in_ms:.2f}ms]" ) + create_index_start_time = time() + repository.create_embeddings_index() + create_index_time_in_ms = (time() - create_index_start_time) * 1000 + + logger.info(f"Created embeddings index in [{create_index_time_in_ms:.2f}ms]") + similarities = cosine_similarity(vstack(testing_embeddings)) reduced_similarities = cosine_similarity( np.concatenate(testing_reduced_embeddings) diff --git a/ml/decree-embeddings/repository.py b/ml/decree-embeddings/repository.py index ee83f9d..b3327e3 100644 --- a/ml/decree-embeddings/repository.py +++ b/ml/decree-embeddings/repository.py @@ -187,5 +187,29 @@ def store_decrees_embeddings(self, decrees): time_in_ms = (time() - start_time) * 1000 logger.info(f"Stored [{len(decrees)}] embeddings in [{time_in_ms:.2f}ms]") + @retry + @connection + def remove_embeddings_index(self): + cur = self._connection.cursor() + + cur.execute("DROP INDEX IF EXISTS decrees_embedding_idx") + + self._connection.commit() + + cur.close() + + @retry + @connection + def create_embeddings_index(self): + cur = self._connection.cursor() + + cur.execute( + "CREATE INDEX decrees_embedding_idx ON decrees USING hnsw (embedding vector_cosine_ops) WITH (m = 20, ef_construction = 64);" + ) + + self._connection.commit() + + cur.close() + repository = Repository()