Skip to content

Commit

Permalink
🦍 refactor evo search alg (#188)
Browse files Browse the repository at this point in the history
* begin evo search refactor

* split deploy-id sync and async methods

* attempt hanging bug fix

* add prefect_db test marker
  • Loading branch information
jacksund authored Jul 13, 2022
1 parent 36b55eb commit 66de06e
Show file tree
Hide file tree
Showing 17 changed files with 252 additions and 219 deletions.
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ markers = [
"blender: requires blender installed",
"pymatgen: runs a pymatgen-compatibility test",
"vasp: requires vasp installed",
"prefect_bug: hangs in github CI but not local tests" # see https://github.com/jacksund/simmate/issues/187
"prefect_db: requires access to the prefect database"
]

# By default, we only want to run unmarked tests. The simplest way to do this
Expand All @@ -27,7 +27,7 @@ markers = [
# migration folders.
# I manually remove -m when testing coverage, but am unsure if there's a better
# way to do this.
addopts = "--no-migrations -m 'not blender and not pymatgen and not vasp and not prefect_bug'"
addopts = "--no-migrations -m 'not blender and not pymatgen and not vasp'"

# There are a number of warnings that are expected when running our tests.
# We remove these from our output for clarity.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
)


@pytest.mark.prefect_db
@pytest.mark.django_db
@pytest.mark.prefect_bug
def test_neb(sample_structures, tmpdir, mocker):

copy_test_files(
Expand Down Expand Up @@ -43,6 +43,3 @@ def test_neb(sample_structures, tmpdir, mocker):
directory=str(tmpdir),
)
assert state.is_completed()

# BUG: see https://github.com/jacksund/simmate/issues/187
Diffusion__Vasp__NebAllPaths.nflows_submitted
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def run_config(

static_result = static_workflow.run(
structure=parameters_cleaned["structure"],
command=parameters_cleaned["command"],
command=parameters_cleaned.get("command"),
directory=parameters_cleaned["directory"]
+ os.path.sep
+ static_workflow.name_full,
Expand All @@ -72,7 +72,7 @@ def run_config(
"database_table": static_workflow.database_table.__name__,
"directory": static_result["directory"],
},
command=parameters_cleaned["command"],
command=parameters_cleaned.get("command"),
directory=parameters_cleaned["directory"]
+ os.path.sep
+ dos_workflow.name_full,
Expand All @@ -84,7 +84,7 @@ def run_config(
"database_table": static_workflow.database_table.__name__,
"directory": static_result["directory"],
},
command=parameters_cleaned["command"],
command=parameters_cleaned.get("command"),
directory=parameters_cleaned["directory"]
+ os.path.sep
+ bandstruct_workflow.name_full,
Expand Down
4 changes: 2 additions & 2 deletions src/simmate/calculators/vasp/workflows/relaxation/staged.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def run_config(
current_task = tasks_to_run[0]
state = current_task.run(
structure=parameters_cleaned["structure"],
command=parameters_cleaned["command"],
command=parameters_cleaned.get("command"),
directory=parameters_cleaned["directory"]
+ os.path.sep
+ current_task.name_full,
Expand All @@ -94,7 +94,7 @@ def run_config(
"directory": result["directory"], # uses preceding result
"structure_field": "structure_final",
},
command=parameters_cleaned["command"],
command=parameters_cleaned.get("command"),
directory=parameters_cleaned["directory"]
+ os.path.sep
+ current_task.name_full,
Expand Down
14 changes: 14 additions & 0 deletions src/simmate/database/base_data_types/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,20 @@ def from_toolkit(cls, as_dict: bool = False, **kwargs):
# return the dictionary
return all_data if as_dict else cls(**all_data)

def update_from_toolkit(self, **kwargs):
"""
Given fundamental "base_info" and toolkit objects, this method will populate
all relevant columns.
This method is meant for updating existing database entries with new
data. If your creating a brand-new database entry, use the
`from_toolkit` method instead.
"""
new_kwargs = self.from_toolkit(as_dict=True, **kwargs)
for new_kwarg, new_value in new_kwargs.items():
setattr(self, new_kwarg, new_value)
self.save()

@classmethod
def _confirm_override(
cls,
Expand Down
95 changes: 31 additions & 64 deletions src/simmate/database/base_data_types/relaxation.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class ExampleRelaxation(Relaxation):
from pymatgen.io.vasp.outputs import Vasprun


class Relaxation(Structure, Calculation):
class Relaxation(Structure, Thermodynamics, Calculation):
"""
This table holds all data from a structure relaxation and also links to
IonicStep table which holds all of the structure/energy/forces for each
Expand Down Expand Up @@ -237,59 +237,25 @@ def from_directory(cls, directory: str):

@classmethod
def from_vasp_run(cls, vasprun: Vasprun):
# The data is actually easier to access as a dictionary and everything
# we need is stored under the "output" key
data = vasprun.as_dict()["output"]

# The only other data we need to grab is the list of structures. We can
# pull the structure for each ionic step from the vasprun class directly.
structures = vasprun.structures

# Make the relaxation entry. Note we need to save this to the database
# in order to link/save the ionic steps below. We save the structure
# as the final one in the calculation.
relaxation = cls.from_toolkit(
structure=structures[-1],
volume_change=(structures[-1].volume - structures[0].volume)
/ structures[0].volume,
band_gap=data.get("bandgap"),
is_gap_direct=data.get("is_gap_direct"),
energy_fermi=data.get("efermi"),
conduction_band_minimum=data.get("cbm"),
valence_band_maximum=data.get("vbm"),
)

# lastly, we also want to save the corrections made and directory it ran in
# relaxation.corrections = corrections
# relaxation.directory = directory
# Note, the information does not matter at this point because it will be
# populated below
relaxation = cls.from_toolkit(structure=vasprun.structures[-1])
# TODO: need to pull prefect_flow_run_id from metadata file.

# Now we have the relaxation data all loaded and can save it to the database
relaxation.save()

# Now let's iterate through the ionic steps and save these to the database.
for number, (structure, ionic_step) in enumerate(
zip(structures, data["ionic_steps"])
):
# first pull all the data together and save it to the database. We
# are saving this to an IonicStepStructure datatable.
structure_db = cls.structures.field.model.from_toolkit(
number=number,
structure=structure,
energy=ionic_step["e_wo_entrp"],
site_forces=ionic_step["forces"],
lattice_stress=ionic_step["stress"],
relaxation=relaxation, # this links the structure to this relaxation
)
structure_db.save()

# If this is the first structure, we want to link it as such
if number == 0:
relaxation.structure_start_id = structure_db.id
# and same for the final structure. Note, we can't use elif becuase
# there's a chance the start/end structure are the same, which occurs
# when the starting structure is found to be relaxed already.
if number == len(structures) - 1:
relaxation.structure_final_id = structure_db.id
# Save the rest of the results using the update method from this class
relaxation.update_from_vasp_run(
vasprun=vasprun,
corrections=[],
directory=None,
)
# TODO: load corrections/directory from the metadata and corrections files.

return relaxation

Expand Down Expand Up @@ -347,25 +313,26 @@ def update_from_vasp_run(
# when the starting structure is found to be relaxed already.
if number == len(structures) - 1:
self.structure_final_id = structure.id
# calculate extra data for storing
self.volume_change = (
structures[-1].volume - structures[0].volume
) / structures[0].volume

# There is also extra data for the final structure that we save directly
# in the relaxation table. We use .get() in case the key isn't provided.
self.band_gap = data.get("bandgap")
self.is_gap_direct = data.get("is_gap_direct")
self.energy_fermi = data.get("efermi")
self.conduction_band_minimum = data.get("cbm")
self.valence_band_maximum = data.get("vbm")

# lastly, we also want to save the corrections made and directory it ran in
self.corrections = corrections
self.directory = directory

# Now we have the relaxation data all loaded and can save it to the database
self.save()
# update our relaxation entry with new data
self.update_from_toolkit(
# use the final ionic setup for the structure and energy
structure=structures[-1],
energy=data["ionic_steps"][-1]["e_wo_entrp"],
# calculate extra data for storing
volume_change=structures[-1].volume
- structures[0].volume / structures[0].volume,
# There is also extra data for the final structure that we save directly
# in the relaxation table. We use .get() in case the key isn't provided.
band_gap=data.get("bandgap"),
is_gap_direct=data.get("is_gap_direct"),
energy_fermi=data.get("efermi"),
conduction_band_minimum=data.get("cbm"),
valence_band_maximum=data.get("vbm"),
# lastly, we also want to save the corrections made and directory it ran in
corrections=corrections,
directory=directory,
)

def get_convergence_plot(self):

Expand Down
2 changes: 1 addition & 1 deletion src/simmate/database/third_parties/materials_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class Meta:
base_info = ["id", "structure_string", "energy"]
source = "Materials Project"
source_doi = "https://doi.org/10.1063/1.4812323"
remote_archive_link = "https://archives.simmate.org/MatprojStructure-2022-01-26.zip"
remote_archive_link = "https://archives.simmate.org/MatProjStructure-2022-01-26.zip"

id = table_column.CharField(max_length=25, primary_key=True)
"""
Expand Down
79 changes: 49 additions & 30 deletions src/simmate/toolkit/structure_prediction/evolution/database.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
# -*- coding: utf-8 -*-

from simmate.database.base_data_types import table_column, DatabaseTable

import plotly.graph_objects as plotly_go
from django.apps import apps as django_apps
from prefect.client import get_client
from prefect.orion.schemas.filters import FlowRunFilter

# from prefect.client import Client
# from prefect.utilities.graphql import with_args

import plotly.graph_objects as plotly_go
from simmate.database.base_data_types import table_column, DatabaseTable
from simmate.utilities import async_to_sync


class EvolutionarySearch(DatabaseTable):
Expand Down Expand Up @@ -131,6 +130,9 @@ def individuals(self):
def individuals_completed(self):
# If there is an energy_per_atom, we can treat the calculation as completed
return self.individuals.filter(energy_per_atom__isnull=False)
# OPTIMIZE: would it be better to check energy_per_atom or structure_final?
# Ideally, I could make a relation to the prefect flow run table but this
# would require a large amount of work to implement.

@property
def best_individual(self):
Expand Down Expand Up @@ -172,7 +174,9 @@ def get_correctness_plot(self, structure_known):
import numpy
from simmate.toolkit import Composition
from matminer.featurizers.site import CrystalNNFingerprint
from matminer.featurizers.structure.sites import PartialsSiteStatsFingerprint
from matminer.featurizers.structure.sites import (
PartialsSiteStatsFingerprint,
)

sitefingerprint_method = CrystalNNFingerprint.from_preset(
"ops", distance_cutoffs=None, x_diff_weight=3
Expand Down Expand Up @@ -265,35 +269,50 @@ class Meta:
related_name="sources",
)

@staticmethod
@async_to_sync
async def _check_still_running_ids(prefect_flow_run_ids):
"""
Queries Prefect to see check on a list of flow run ids and determines
which ones are still in a scheduled, running, or pending state.
From the list of ids given, it will return a list of the ones that
haven't finished yet.
This is normally used within `update_flow_run_ids` and shouldn't
be called directly.
"""

# The reason we have this code as a separate method is because we want
# to isolate Prefect's async calls from Django's sync-restricted calls
# (i.e. django raises errors if called within an async context).

async with get_client() as client:
response = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(
id={"any_": prefect_flow_run_ids},
state={"type": {"any_": ["SCHEDULED", "PENDING", "RUNNING"]}},
),
)

still_running_ids = [str(entry.id) for entry in response]

return still_running_ids

def update_flow_run_ids(self):
"""
Queries Prefect to see how many workflows are in a scheduled, running,
or pending state from the list of run ids that are associate with this
structure source.
"""

# Using our list of current run ids, we query prefect to see which of
# these still are running or in the queue.
# OPTIMIZE: This may be a really bad way to query Prefect...
query = {
"query": {
with_args(
"flow_run",
{
"where": {
"state": {"_in": ["Running", "Scheduled"]},
"id": {"_in": self.prefect_flow_run_ids},
},
},
): ["id"]
}
}
client = Client()
result = client.graphql(query)
# graphql gives a weird format, so I reparse it into just a list of ids
result = [run["id"] for run in result["data"]["flow_run"]]
# make the async call to Prefect client
still_running_ids = self._check_still_running_ids(self.prefect_flow_run_ids)

# we now have our new list of IDs! Let's update it to the database
self.prefect_flow_run_ids = result
self.prefect_flow_run_ids = still_running_ids
self.save()

# in case we need the list of ids, we return it too
return result
return still_running_ids

@property
def nprefect_flow_runs(self):
Expand Down
Loading

0 comments on commit 66de06e

Please sign in to comment.