Source code for metatensor.torch.atomistic.ase_calculator
import logging
import os
import pathlib
import warnings
from typing import Dict, List, Optional, Union
import numpy as np
import torch
import vesin
from torch.profiler import record_function
from .. import Labels, TensorBlock, TensorMap
from . import (
MetatensorAtomisticModel,
ModelEvaluationOptions,
ModelMetadata,
ModelOutput,
System,
load_atomistic_model,
register_autograd_neighbors,
)
import ase # isort: skip
import ase.neighborlist # isort: skip
import ase.calculators.calculator # isort: skip
from ase.calculators.calculator import ( # isort: skip
InputError,
PropertyNotImplementedError,
all_properties as ALL_ASE_PROPERTIES,
)
if os.environ.get("METATENSOR_IMPORT_FOR_SPHINX", "0") == "0":
# this can not be imported when building the documentation
from .. import sum_over_samples # isort: skip
FilePath = Union[str, bytes, pathlib.PurePath]
LOGGER = logging.getLogger(__name__)
STR_TO_DTYPE = {
"float32": torch.float32,
"float64": torch.float64,
}
[docs]
class MetatensorCalculator(ase.calculators.calculator.Calculator):
"""
The :py:class:`MetatensorCalculator` class implements ASE's
:py:class:`ase.calculators.calculator.Calculator` API using metatensor atomistic
models to compute energy, forces and any other supported property.
This class can be initialized with any :py:class:`MetatensorAtomisticModel`, and
used to run simulations using ASE's MD facilities.
Neighbor lists are computed using the fast
`vesin <https://luthaf.fr/vesin/latest/index.html>`_ neighbor list library,
unless the system has mixed periodic and non-periodic boundary conditions (which
are not yet supported by ``vesin``), in which case the slower ASE neighbor list
is used.
"""
additional_outputs: Dict[str, TensorMap]
"""
Additional outputs computed by :py:meth:`calculate` are stored in this dictionary.
The keys will match the keys of the ``additional_outputs`` parameters to the
constructor; and the values will be the corresponding raw
:py:class:`metatensor.torch.TensorMap` produced by the model.
"""
def __init__(
self,
model: Union[FilePath, MetatensorAtomisticModel],
*,
additional_outputs: Optional[Dict[str, ModelOutput]] = None,
extensions_directory=None,
check_consistency=False,
device=None,
):
"""
:param model: model to use for the calculation. This can be a file path, a
Python instance of :py:class:`MetatensorAtomisticModel`, or the output of
:py:func:`torch.jit.script` on :py:class:`MetatensorAtomisticModel`.
:param additional_outputs: Dictionary of additional outputs to be computed by
the model. These outputs will always be computed whenever the
:py:meth:`calculate` function is called (e.g. by
:py:meth:`ase.Atoms.get_potential_energy`,
:py:meth:`ase.optimize.optimize.Dynamics.run`, *etc.*) and stored in the
:py:attr:`additional_outputs` attribute. If you want more control over when
and how to compute specific outputs, you should use :py:meth:`run_model`
instead.
:param extensions_directory: if the model uses extensions, we will try to load
them from this directory
:param check_consistency: should we check the model for consistency when
running, defaults to False.
:param device: torch device to use for the calculation. If ``None``, we will try
the options in the model's ``supported_device`` in order.
"""
super().__init__()
self.parameters = {
"check_consistency": check_consistency,
}
# Load the model
if isinstance(model, (str, bytes, pathlib.PurePath)):
if not os.path.exists(model):
raise InputError(f"given model path '{model}' does not exist")
self.parameters["model_path"] = str(model)
model = load_atomistic_model(
model, extensions_directory=extensions_directory
)
elif isinstance(model, torch.jit.RecursiveScriptModule):
if model.original_name != "MetatensorAtomisticModel":
raise InputError(
"torch model must be 'MetatensorAtomisticModel', "
f"got '{model.original_name}' instead"
)
elif isinstance(model, MetatensorAtomisticModel):
# nothing to do
pass
else:
raise TypeError(f"unknown type for model: {type(model)}")
self.parameters["device"] = str(device) if device is not None else None
# check if the model supports the requested device
capabilities = model.capabilities()
if device is None:
device = _find_best_device(capabilities.supported_devices)
else:
device = torch.device(device)
device_is_supported = False
for supported in capabilities.supported_devices:
try:
supported = torch.device(supported)
except RuntimeError as e:
warnings.warn(
"the model contains an invalid device in `supported_devices`: "
f"{e}",
stacklevel=2,
)
continue
if supported.type == device.type:
device_is_supported = True
break
if not device_is_supported:
raise ValueError(
f"This model does not support the requested device ({device}), "
"the following devices are supported: "
f"{capabilities.supported_devices}"
)
if capabilities.dtype in STR_TO_DTYPE:
self._dtype = STR_TO_DTYPE[capabilities.dtype]
else:
raise ValueError(
f"found unexpected dtype in model capabilities: {capabilities.dtype}"
)
if additional_outputs is None:
self._additional_output_requests = {}
else:
assert isinstance(additional_outputs, dict)
for name, output in additional_outputs.items():
assert isinstance(name, str)
assert isinstance(output, torch.ScriptObject)
assert "explicit_gradients_setter" in output._method_names(), (
"outputs must be ModelOutput instances"
)
self._additional_output_requests = additional_outputs
self._device = device
self._model = model.to(device=self._device)
# We do our own check to verify if a property is implemented in `calculate()`,
# so we pretend to be able to compute all properties ASE knows about.
self.implemented_properties = ALL_ASE_PROPERTIES
def todict(self):
if "model_path" not in self.parameters:
raise RuntimeError(
"can not save metatensor model in ASE `todict`, please initialize "
"`MetatensorCalculator` with a path to a saved model file if you need "
"to use `todict`"
)
return self.parameters
@classmethod
def fromdict(cls, data):
return MetatensorCalculator(
model=data["model_path"],
check_consistency=data["check_consistency"],
device=data["device"],
)
[docs]
def metadata(self) -> ModelMetadata:
"""Get the metadata of the underlying model"""
return self._model.metadata()
[docs]
def run_model(
self,
atoms: ase.Atoms,
outputs: Dict[str, ModelOutput],
selected_atoms: Optional[Labels] = None,
) -> Dict[str, TensorMap]:
"""
Run the model on the given ``atoms``, computing the requested ``outputs`` and
only these.
The output of the model is returned directly, and as such the blocks' ``values``
will be :py:class:`torch.Tensor`.
This is intended as an easy way to run metatensor models on
:py:class:`ase.Atoms` when the model can compute outputs not supported by the
standard ASE's calculator interface.
All the parameters have the same meaning as the corresponding ones in
:py:meth:`metatensor.torch.atomistic.ModelInterface.forward`.
:param atoms: system on which to run the model
:param outputs: outputs of the model that should be predicted
:param selected_atoms: subset of atoms on which to run the calculation
"""
types, positions, cell, pbc = _ase_to_torch_data(
atoms=atoms, dtype=self._dtype, device=self._device
)
system = System(types, positions, cell, pbc)
# Compute the neighbors lists requested by the model using ASE NL
for options in self._model.requested_neighbor_lists():
neighbors = _compute_ase_neighbors(
atoms, options, dtype=self._dtype, device=self._device
)
register_autograd_neighbors(
system,
neighbors,
check_consistency=self.parameters["check_consistency"],
)
system.add_neighbor_list(options, neighbors)
options = ModelEvaluationOptions(
length_unit="angstrom",
outputs=outputs,
selected_atoms=selected_atoms,
)
return self._model(
systems=[system],
options=options,
check_consistency=self.parameters["check_consistency"],
)
[docs]
def calculate(
self,
atoms: ase.Atoms,
properties: List[str],
system_changes: List[str],
) -> None:
"""
Compute some ``properties`` with this calculator, and return them in the format
expected by ASE.
This is not intended to be called directly by users, but to be an implementation
detail of ``atoms.get_energy()`` and related functions. See
:py:meth:`ase.calculators.calculator.Calculator.calculate` for more information.
"""
super().calculate(
atoms=atoms,
properties=properties,
system_changes=system_changes,
)
# In the next few lines, we decide which properties to calculate among energy,
# forces and stress. In addition to the requested properties, we calculate the
# energy if any of the three is requested, as it is an intermediate step in the
# calculation of the other two. We also calculate the forces if the stress is
# requested, and vice-versa. The overhead for the latter operation is also
# small, assuming that the majority of the model computes forces and stresses
# by backward propagation as opposed to forward-mode differentiation.
calculate_energy = (
"energy" in properties
or "energies" in properties
or "forces" in properties
or "stress" in properties
)
calculate_energies = "energies" in properties
calculate_forces = "forces" in properties or "stress" in properties
calculate_stress = "stress" in properties or "forces" in properties
if "stresses" in properties:
raise NotImplementedError("'stresses' are not implemented yet")
with record_function("ASECalculator::prepare_inputs"):
outputs = _ase_properties_to_metatensor_outputs(properties)
outputs.update(self._additional_output_requests)
capabilities = self._model.capabilities()
for name in outputs.keys():
if name not in capabilities.outputs:
raise ValueError(
f"you asked for the calculation of {name}, but this model "
"does not support it"
)
types, positions, cell, pbc = _ase_to_torch_data(
atoms=atoms, dtype=self._dtype, device=self._device
)
do_backward = False
if calculate_forces:
do_backward = True
positions.requires_grad_(True)
if calculate_stress:
do_backward = True
strain = torch.eye(
3, requires_grad=True, device=self._device, dtype=self._dtype
)
positions = positions @ strain
positions.retain_grad()
cell = cell @ strain
run_options = ModelEvaluationOptions(
length_unit="angstrom",
outputs=outputs,
selected_atoms=None,
)
with record_function("ASECalculator::compute_neighbors"):
# convert from ase.Atoms to metatensor.torch.atomistic.System
system = System(types, positions, cell, pbc)
for options in self._model.requested_neighbor_lists():
neighbors = _compute_ase_neighbors(
atoms, options, dtype=self._dtype, device=self._device
)
register_autograd_neighbors(
system,
neighbors,
check_consistency=self.parameters["check_consistency"],
)
system.add_neighbor_list(options, neighbors)
# no `record_function` here, this will be handled by MetatensorAtomisticModel
outputs = self._model(
[system],
run_options,
check_consistency=self.parameters["check_consistency"],
)
energy = outputs["energy"]
with record_function("ASECalculator::sum_energies"):
if run_options.outputs["energy"].per_atom:
assert len(energy) == 1
assert energy.sample_names == ["system", "atom"]
assert torch.all(energy.block().samples["system"] == 0)
assert torch.all(
energy.block().samples["atom"] == torch.arange(positions.shape[0])
)
energies = energy.block().values
assert energies.shape == (len(atoms), 1)
energy = sum_over_samples(energy, sample_names=["atom"])
assert len(energy.block().gradients_list()) == 0
energy = energy.block().values
assert energy.shape == (1, 1)
with record_function("ASECalculator::run_backward"):
if do_backward:
energy.backward()
with record_function("ASECalculator::convert_outputs"):
self.results = {}
if calculate_energies:
energies_values = energies.detach().reshape(-1)
energies_values = energies_values.to(device="cpu").to(
dtype=torch.float64
)
self.results["energies"] = energies_values.numpy()
if calculate_energy:
energy_values = energy.detach()
energy_values = energy_values.to(device="cpu").to(dtype=torch.float64)
self.results["energy"] = energy_values.numpy()[0, 0]
if calculate_forces:
forces_values = -system.positions.grad.reshape(-1, 3)
forces_values = forces_values.to(device="cpu").to(dtype=torch.float64)
self.results["forces"] = forces_values.numpy()
if calculate_stress:
stress_values = strain.grad.reshape(3, 3) / atoms.cell.volume
stress_values = stress_values.to(device="cpu").to(dtype=torch.float64)
self.results["stress"] = _full_3x3_to_voigt_6_stress(
stress_values.numpy()
)
self.additional_outputs = {}
for name in self._additional_output_requests:
self.additional_outputs[name] = outputs[name]
def _find_best_device(devices: List[str]) -> torch.device:
"""
Find the best device from the list of ``devices`` that is available to the current
PyTorch installation.
"""
for device in devices:
if device == "cpu":
return torch.device("cpu")
elif device == "cuda":
if torch.cuda.is_available():
return torch.device("cuda")
else:
LOGGER.warning(
"the model suggested to use CUDA devices before CPU, "
"but we are unable to find it"
)
elif device == "mps":
if (
hasattr(torch.backends, "mps")
and torch.backends.mps.is_built()
and torch.backends.mps.is_available()
):
return torch.device("mps")
else:
LOGGER.warning(
"the model suggested to use MPS devices before CPU, "
"but we are unable to find it"
)
else:
warnings.warn(
f"unknown device in the model's `supported_devices`: '{device}'",
stacklevel=2,
)
warnings.warn(
"could not find a valid device in the model's `supported_devices`, "
"falling back to CPU",
stacklevel=2,
)
return torch.device("cpu")
def _ase_properties_to_metatensor_outputs(properties):
energy_properties = []
for p in properties:
if p in ["energy", "energies", "forces", "stress", "stresses"]:
energy_properties.append(p)
else:
raise PropertyNotImplementedError(
f"property '{p}' it not yet supported by this calculator, "
"even if it might be supported by the model"
)
output = ModelOutput()
output.quantity = "energy"
output.unit = "ev"
output.explicit_gradients = []
if "energies" in properties or "stresses" in properties:
output.per_atom = True
else:
output.per_atom = False
if "stresses" in properties:
output.explicit_gradients = ["cell"]
return {"energy": output}
def _compute_ase_neighbors(atoms, options, dtype, device):
# options.strict is ignored by this function, since `ase.neighborlist.neighbor_list`
# only computes strict NL, and these are valid even with `strict=False`
if np.all(atoms.pbc) or np.all(~atoms.pbc):
nl_i, nl_j, nl_S, nl_D = vesin.ase_neighbor_list(
"ijSD",
atoms,
cutoff=options.engine_cutoff(engine_length_unit="angstrom"),
)
else:
nl_i, nl_j, nl_S, nl_D = ase.neighborlist.neighbor_list(
"ijSD",
atoms,
cutoff=options.engine_cutoff(engine_length_unit="angstrom"),
)
# The pair selection code here below avoids a relatively slow loop over
# all pairs to improve performance
reject_condition = (
# we want a half neighbor list, so drop all duplicated neighbors
(nl_j < nl_i)
| (
(nl_i == nl_j)
& (
# only create pairs with the same atom twice if the pair spans more
# than one unit cell
((nl_S[:, 0] == 0) & (nl_S[:, 1] == 0) & (nl_S[:, 2] == 0))
|
# When creating pairs between an atom and one of its periodic images,
# the code generates multiple redundant pairs
# (e.g. with shifts 0 1 1 and 0 -1 -1); and we want to only keep one of
# these. We keep the pair in the positive half plane of shifts.
(
(nl_S.sum(axis=1) < 0)
| (
(nl_S.sum(axis=1) == 0)
& ((nl_S[:, 2] < 0) | ((nl_S[:, 2] == 0) & (nl_S[:, 1] < 0)))
)
)
)
)
)
selected = np.logical_not(reject_condition)
n_pairs = np.sum(selected)
if options.full_list:
distances = np.empty((2 * n_pairs, 3), dtype=np.float64)
samples = np.empty((2 * n_pairs, 5), dtype=np.int32)
else:
distances = np.empty((n_pairs, 3), dtype=np.float64)
samples = np.empty((n_pairs, 5), dtype=np.int32)
samples[:n_pairs, 0] = nl_i[selected]
samples[:n_pairs, 1] = nl_j[selected]
samples[:n_pairs, 2:] = nl_S[selected]
distances[:n_pairs] = nl_D[selected]
if options.full_list:
samples[n_pairs:, 0] = nl_j[selected]
samples[n_pairs:, 1] = nl_i[selected]
samples[n_pairs:, 2:] = -nl_S[selected]
distances[n_pairs:] = -nl_D[selected]
distances = torch.from_numpy(distances).to(dtype=dtype).to(device=device)
return TensorBlock(
values=distances.reshape(-1, 3, 1),
samples=Labels(
names=[
"first_atom",
"second_atom",
"cell_shift_a",
"cell_shift_b",
"cell_shift_c",
],
values=torch.from_numpy(samples),
).to(device=device),
components=[Labels.range("xyz", 3).to(device)],
properties=Labels.range("distance", 1).to(device),
)
def _ase_to_torch_data(atoms, dtype, device):
"""Get the positions, cell and pbc from ASE atoms as torch tensors"""
types = torch.from_numpy(atoms.numbers).to(dtype=torch.int32, device=device)
positions = torch.from_numpy(atoms.positions).to(dtype=dtype, device=device)
cell = torch.zeros((3, 3), dtype=dtype, device=device)
pbc = torch.tensor(atoms.pbc, dtype=torch.bool, device=device)
cell[pbc] = torch.tensor(atoms.cell[atoms.pbc], dtype=dtype, device=device)
return types, positions, cell, pbc
def _full_3x3_to_voigt_6_stress(stress):
"""
Re-implementation of ``ase.stress.full_3x3_to_voigt_6_stress`` which does not do the
stress symmetrization correctly (they do ``(stress[1, 2] + stress[1, 2]) / 2.0``)
"""
return np.array(
[
stress[0, 0],
stress[1, 1],
stress[2, 2],
(stress[1, 2] + stress[2, 1]) / 2.0,
(stress[0, 2] + stress[2, 0]) / 2.0,
(stress[0, 1] + stress[1, 0]) / 2.0,
]
)