Source code for metatensor.torch.atomistic.model
import datetime
import hashlib
import json
import math
import os
import platform
import shutil
import site
import warnings
from typing import Dict, List, Optional
import torch
from .. import Labels, TensorBlock, TensorMap
from .. import __version__ as metatensor_version
from .. import dtype_name
from . import (
ModelCapabilities,
ModelEvaluationOptions,
ModelMetadata,
ModelOutput,
NeighborsListOptions,
System,
unit_conversion_factor,
)
from .outputs import _check_outputs
[docs]
class ModelInterface(torch.nn.Module):
"""
Interface for models that can be used with :py:class:`MetatensorAtomisticModel`.
There are several requirements that models must satisfy to be usable with
:py:class:`MetatensorAtomisticModel`. The main one is concerns the
:py:meth:`forward` function, which must have the signature defined in this
interface.
Additionally, the model can request neighbor lists to be computed by the simulation
engine, and stored inside the input :py:class:`System`. This is done by defining the
optional :py:meth:`requested_neighbors_lists` method for the model or any of it's
sub-module.
:py:class:`MetatensorAtomisticModel` will check if ``requested_neighbors_lists`` is
defined for all the sub-modules of the model, then collect and unify identical
requests for the simulation engine.
"""
def __init__():
""""""
pass
[docs]
def forward(
self,
systems: List[System],
outputs: Dict[str, ModelOutput],
selected_atoms: Optional[Labels],
) -> Dict[str, TensorMap]:
"""
This function should run the model for the given ``systems``, returning the
requested ``outputs``. If ``selected_atoms`` is a set of :py:class:`Labels`,
only the corresponding atoms should be included as "main" atoms in the
calculation and the output.
``outputs`` will be a subset of the capabilities that where declared when
exporting the model. For example if a model can compute both an ``"energy"`` and
a ``"charge"`` output, the simulation engine might only request one them.
The returned dictionary should have the same keys as ``outputs``, and the values
should contains the corresponding properties of the ``systems``, as computed for
the subset of atoms defined in ``selected_atoms``. For some specific outputs,
there are additional constrains on how the associated metadata should look like,
documented in the :ref:`atomistic-models-outputs` section.
The main use case for ``selected_atoms`` is domain decomposition, where the
:py:class:`System` given to a model might contain both atoms in the current
domain and some atoms from other domains; and the calculation should produce
per-atom output only for the atoms in the domain (but still accounting for atoms
from the other domains as potential neighbors).
:param systems: atomistic systems on which to run the calculation
:param outputs: set of outputs requested by the simulation engine
:param selected_atoms: subset of atoms that should be included in the output,
defaults to None
:return: properties of the systems, as predicted by the machine learning model
"""
[docs]
def requested_neighbors_lists(self) -> List[NeighborsListOptions]:
"""
Optional function declaring which neighbors list this model requires.
This function can be defined on either the root model or any of it's
sub-modules. A single module can request multiple neighbors list simultaneously
if it needs them.
It is then the responsibility of the code calling the model to:
1. call this function (or more generally
:py:meth:`MetatensorAtomisticModel.requested_neighbors_lists`) to get the
list of requests;
2. compute all neighbor lists corresponding to these requests and add them to
the systems before calling the model.
"""
[docs]
class MetatensorAtomisticModel(torch.nn.Module):
"""
:py:class:`MetatensorAtomisticModel` is the main entry point for atomistic machine
learning based on metatensor. It is the interface between custom, user-defined
models and simulation engines. Users should wrap their models with this class, and
use :py:meth:`export()` to save and export the model to a file. The exported models
can then be loaded by a simulation engine to compute properties of atomistic
systems.
When wrapping a ``module``, you should declare what the model is capable of (using
:py:class:`ModelCapabilities`). This includes what units the model expects as input
and what properties the model can compute (using :py:class:`ModelOutput`). The
simulation engine will then ask the model to compute some subset of these properties
(through a :py:class:`ModelEvaluationOptions`), on all or a subset of atoms of an
atomistic system.
The wrapped module must follow the interface defined by :py:class:`ModelInterface`,
should not already be compiled by TorchScript, and should be in "eval" mode (i.e.
``module.training`` should be ``False``).
For example, a custom module predicting the energy as a constant value times the
number of atoms could look like this
>>> class ConstantEnergy(torch.nn.Module):
... def __init__(self, constant: float):
... super().__init__()
... self.constant = torch.tensor(constant).reshape(1, 1)
...
... def forward(
... self,
... systems: List[System],
... outputs: Dict[str, ModelOutput],
... selected_atoms: Optional[Labels] = None,
... ) -> Dict[str, TensorMap]:
... results: Dict[str, TensorMap] = {}
... if "energy" in outputs:
... if outputs["energy"].per_atom:
... raise NotImplementedError("per atom energy is not implemented")
...
... dtype = systems[0].positions.dtype
... energies = torch.zeros(len(systems), 1, dtype=dtype)
... for i, system in enumerate(systems):
... if selected_atoms is None:
... n_atoms = len(system)
... else:
... n_atoms = len(selected_atoms)
...
... energies[i] = self.constant * n_atoms
...
... systems_idx = torch.tensor([[i] for i in range(len(systems))])
... energy_block = TensorBlock(
... values=energies,
... samples=Labels(["system"], systems_idx.to(torch.int32)),
... components=torch.jit.annotate(List[Labels], []),
... properties=Labels(["energy"], torch.tensor([[0]])),
... )
...
... results["energy"] = TensorMap(
... keys=Labels(["_"], torch.tensor([[0]])),
... blocks=[energy_block],
... )
...
... return results
...
Wrapping and exporting this model would then look like this:
>>> import os
>>> import tempfile
>>> from metatensor.torch.atomistic import MetatensorAtomisticModel
>>> from metatensor.torch.atomistic import (
... ModelCapabilities,
... ModelOutput,
... ModelMetadata,
... )
>>> model = ConstantEnergy(constant=3.141592)
>>> # put the model in inference mode
>>> model = model.eval()
>>> # Define the model capabilities
>>> capabilities = ModelCapabilities(
... outputs={
... "energy": ModelOutput(
... quantity="energy",
... unit="eV",
... per_atom=False,
... explicit_gradients=[],
... ),
... },
... atomic_types=[1, 2, 6, 8, 12],
... interaction_range=0.0,
... length_unit="angstrom",
... supported_devices=["cpu"],
... dtype="float64",
... )
>>> # define metadata about this model
>>> metadata = ModelMetadata(
... name="model-name",
... authors=["Some Author", "Another One"],
... # references and long description can also be added
... )
>>> # wrap the model
>>> wrapped = MetatensorAtomisticModel(model, metadata, capabilities)
>>> # export the model
>>> with tempfile.TemporaryDirectory() as directory:
... wrapped.export(os.path.join(directory, "constant-energy-model.pt"))
...
"""
# Some annotation to make the TorchScript compiler happy
_requested_neighbors_lists: List[NeighborsListOptions]
def __init__(
self,
module: ModelInterface,
metadata: ModelMetadata,
capabilities: ModelCapabilities,
):
"""
:param module: The torch module to wrap and export.
:param capabilities: Description of the model capabilities.
"""
super().__init__()
if not isinstance(module, torch.nn.Module):
raise TypeError(f"`module` should be a torch.nn.Module, not {type(module)}")
if isinstance(module, torch.jit.RecursiveScriptModule):
raise TypeError("module should not already be a ScriptModule")
if module.training:
raise ValueError("module should not be in training mode")
_check_annotation(module)
self._module = module
# ============================================================================ #
# recursively explore `module` to get all the requested_neighbors_lists
self._requested_neighbors_lists = []
_get_requested_neighbors_lists(
self._module,
self._module.__class__.__name__,
self._requested_neighbors_lists,
capabilities.length_unit,
)
# ============================================================================ #
self._metadata = metadata
self._capabilities = capabilities
# check that some required capabilities are set
if capabilities.interaction_range < 0:
raise ValueError(
"`capabilities.interaction_range` was not set, "
"but it is required to run simulations"
)
if math.isnan(capabilities.interaction_range):
raise ValueError(
"`capabilities.interaction_range` should be a "
"float between 0 and infinity"
)
if len(capabilities.supported_devices) == 0:
raise ValueError(
"`capabilities.supported_devices` was not set, "
"but it is required to run simulations."
)
if capabilities.dtype == "":
raise ValueError(
"`capabilities.dtype` was not set, "
"but it is required to run simulations."
)
if capabilities.dtype == "float32":
self._model_dtype = torch.float32
elif capabilities.dtype == "float64":
self._model_dtype = torch.float64
else:
raise ValueError(f"unknown dtype in capabilities: {capabilities.dtype}")
[docs]
def wrapped_module(self) -> torch.nn.Module:
"""Get the module wrapped in this :py:class:`MetatensorAtomisticModel`"""
return self._module
[docs]
@torch.jit.export
def capabilities(self) -> ModelCapabilities:
"""Get the capabilities of the wrapped model"""
return self._capabilities
[docs]
@torch.jit.export
def metadata(self) -> ModelMetadata:
"""Get the metadata of the wrapped model"""
return self._metadata
[docs]
@torch.jit.export
def requested_neighbors_lists(self) -> List[NeighborsListOptions]:
"""
Get the neighbors lists required by the wrapped model or any of the child
module.
"""
return self._requested_neighbors_lists
[docs]
def forward(
self,
systems: List[System],
options: ModelEvaluationOptions,
check_consistency: bool,
) -> Dict[str, TensorMap]:
"""Run the wrapped model and return the corresponding outputs.
Before running the model, this will convert the ``systems`` data from the engine
unit to the model unit, including all neighbors lists distances.
After running the model, this will convert all the outputs from the model units
to the engine units.
:param systems: input systems on which we should run the model. The systems
should already contain all neighbors lists corresponding to the options in
:py:meth:`requested_neighbors_lists()`.
:param options: options for this run of the model
:param check_consistency: Should we run additional check that everything is
consistent? This should be set to ``True`` when verifying a model, and to
``False`` once you are sure everything is running fine.
:return: A dictionary containing all the model outputs
"""
if check_consistency:
_check_inputs(
capabilities=self._capabilities,
requested_neighbors_lists=self._requested_neighbors_lists,
systems=systems,
options=options,
expected_dtype=self._model_dtype,
)
# convert systems from engine to model units
if self._capabilities.length_unit != options.length_unit:
conversion = unit_conversion_factor(
quantity="length",
from_unit=options.length_unit,
to_unit=self._capabilities.length_unit,
)
systems = _convert_systems_units(
systems,
conversion,
model_length_unit=self._capabilities.length_unit,
system_length_unit=options.length_unit,
)
# run the actual calculations
outputs = self._module(
systems=systems,
outputs=options.outputs,
selected_atoms=options.selected_atoms,
)
if check_consistency:
_check_outputs(
systems=systems,
requested=options.outputs,
selected_atoms=options.selected_atoms,
outputs=outputs,
expected_dtype=self._model_dtype,
)
# convert outputs from model to engine units
for name, output in outputs.items():
declared = self._capabilities.outputs[name]
requested = options.outputs[name]
if declared.quantity == "" or requested.quantity == "":
continue
if declared.quantity != requested.quantity:
raise ValueError(
f"model produces values as '{declared.quantity}' for the '{name}' "
f"output, but the engine requested '{requested.quantity}'"
)
conversion = unit_conversion_factor(
quantity=declared.quantity,
from_unit=declared.unit,
to_unit=requested.unit,
)
if conversion != 1.0:
for block in output.blocks():
block.values[:] *= conversion
for _, gradient in block.gradients():
gradient.values[:] *= conversion
return outputs
[docs]
def export(self, file: str, collect_extensions: Optional[str] = None):
"""Export this model to a file that can then be loaded by simulation engine.
:param file: where to save the model. This can be a path or a file-like object.
:param collect_extensions: if not None, all currently loaded PyTorch extension
will be collected in this directory. If this directory already exists, it
is removed and re-created.
"""
module = self.eval()
if os.environ.get("PYTORCH_JIT") == "0":
raise RuntimeError(
"found PYTORCH_JIT=0 in the environment, "
"we can not export models without TorchScript"
)
try:
module = torch.jit.script(module)
except RuntimeError as e:
raise RuntimeError("could not convert the module to TorchScript") from e
# TODO: can we freeze these?
# module = torch.jit.freeze(module)
# record the list of loaded extensions, to check that they are also loaded when
# executing the model.
if collect_extensions is not None:
if os.path.exists(collect_extensions):
shutil.rmtree(collect_extensions)
os.makedirs(collect_extensions)
# TODO: the extensions are currently collected in a separate directory,
# should we store the files directly inside the model file? This would makes
# the model platform-specific but much more convenient (since the end user
# does not have to move a model around)
extensions = []
for library in torch.ops.loaded_libraries:
# Remove any site-package prefix
path = library
for site_packages in site.getsitepackages():
if path.startswith(site_packages):
path = os.path.relpath(path, site_packages)
break
if collect_extensions is not None:
collect_path = os.path.join(collect_extensions, path)
if os.path.exists(collect_path):
raise RuntimeError(
f"more than one extension would be collected at {collect_path}"
)
os.makedirs(os.path.dirname(collect_path), exist_ok=True)
shutil.copyfile(library, collect_path)
# get the name of the library, excluding any shared object prefix/suffix
name = os.path.basename(library)
if name.startswith("lib"):
name = name[3:]
if name.endswith(".so"):
name = name[:-3]
if name.endswith(".dll"):
name = name[:-4]
if name.endswith(".dylib"):
name = name[:-6]
# Collect the hash of the extension shared library. We don't currently use
# this, but it would allow for binary-level reproducibility later.
with open(library, "rb") as fd:
sha256 = hashlib.sha256(fd.read()).hexdigest()
extensions.append({"path": path, "name": name, "sha256": sha256})
# Metadata about where and when the model was exported
export_metadata = {
"date": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"platform": platform.machine() + "-" + platform.system(),
# TODO: user/hostname?
}
if collect_extensions is not None:
export_metadata["extensions_directory"] = str(collect_extensions)
torch.jit.save(
module,
file,
_extra_files={
"torch-version": torch.__version__,
"metatensor-version": metatensor_version,
"extensions": json.dumps(extensions),
"metadata": json.dumps(export_metadata),
},
)
def _get_requested_neighbors_lists(
module: torch.nn.Module,
module_name: str,
requested: List[NeighborsListOptions],
length_unit: str,
):
if hasattr(module, "requested_neighbors_lists"):
for new_options in module.requested_neighbors_lists():
new_options.add_requestor(module_name)
already_requested = False
for existing in requested:
if existing == new_options:
already_requested = True
for requestor in new_options.requestors():
existing.add_requestor(requestor)
if not already_requested:
if new_options.length_unit not in ["", length_unit]:
raise ValueError(
f"NeighborsListOptions from {module_name} already have a "
f"length unit ('{new_options.length_unit}') which does not "
f"match the model length units ('{length_unit}')"
)
new_options.length_unit = length_unit
requested.append(new_options)
for child_name, child in module.named_children():
_get_requested_neighbors_lists(
module=child,
module_name=module_name + "." + child_name,
requested=requested,
length_unit=length_unit,
)
def _check_annotation(module: torch.nn.Module):
# check annotations on forward
annotations = module.forward.__annotations__
expected_arguments = [
"systems",
"outputs",
"selected_atoms",
"return",
]
expected_signature = (
"`forward(self, "
"systems: List[System], "
"outputs: Dict[str, ModelOutput], "
"selected_atoms: Optional[Labels]"
") -> Dict[str, TensorMap]`"
)
if list(annotations.keys()) != expected_arguments:
raise TypeError(
"`module.forward()` takes unexpected arguments, expected signature is "
+ expected_signature
)
if annotations["systems"] != List[System]:
raise TypeError(
"`systems` argument must be a list of metatensor atomistic `System`, "
f"not {annotations['system']}"
)
if annotations["outputs"] != Dict[str, ModelOutput]:
raise TypeError(
"`outputs` argument must be `Dict[str, ModelOutput]`, "
f"not {annotations['outputs']}"
)
if annotations["selected_atoms"] != Optional[Labels]:
raise TypeError(
"`selected_atoms` argument must be `Optional[Labels]`, "
f"not {annotations['selected_atoms']}"
)
if annotations["return"] != Dict[str, TensorMap]:
raise TypeError(
"`forward()` must return a `Dict[str, TensorMap]`, "
f"not {annotations['return']}"
)
def _check_inputs(
capabilities: ModelCapabilities,
requested_neighbors_lists: List[NeighborsListOptions],
systems: List[System],
options: ModelEvaluationOptions,
expected_dtype: torch.dtype,
):
if len(systems) == 0:
return
global_device = systems[0].device
global_dtype = systems[0].positions.dtype
if global_dtype != expected_dtype:
raise ValueError(
f"wrong dtype for the data: the model wants {dtype_name(expected_dtype)}, "
f"we got {dtype_name(global_dtype)}"
)
# check that the requested outputs match what the model can do
for name, requested in options.outputs.items():
if name not in capabilities.outputs:
raise ValueError(
f"this model can not compute '{name}', the implemented "
f"outputs are {capabilities.outputs.keys()}"
)
possible = capabilities.outputs[name]
for parameter in requested.explicit_gradients:
if parameter not in possible.explicit_gradients:
raise ValueError(
f"this model can not compute explicit gradients of '{name}' "
f"with respect to '{parameter}'"
)
if requested.per_atom and not possible.per_atom:
raise ValueError(
f"this model can not compute '{name}' per atom, only globally"
)
selected_atoms = options.selected_atoms
if selected_atoms is not None:
if selected_atoms.device != global_device:
raise ValueError(
"expected all selected_atoms to be on the same device as the systems, "
f"got {selected_atoms.device} and {global_device}"
)
if selected_atoms.names != ["system", "atom"]:
raise ValueError(
"invalid names for selected_atoms: expected "
f"['system', 'atom'], got {selected_atoms.names}"
)
possible_atoms_values: List[List[int]] = []
for s, system in enumerate(systems):
for a in range(len(system)):
possible_atoms_values.append([s, a])
possible_atoms = Labels(
["system", "atom"],
torch.tensor(possible_atoms_values),
)
intersection = selected_atoms.intersection(possible_atoms)
if len(intersection) != len(selected_atoms):
raise ValueError(
"invalid selected_atoms: there are entries that are not "
"possible for the current systems"
)
for system in systems:
if system.device != global_device:
raise ValueError(
"expected all systems to be on the same device, "
f"got {global_device} and {system.device}"
)
if not system.positions.dtype == global_dtype:
raise ValueError(
"expected all systems to have the same dtype, "
f"got {global_dtype} and {system.positions.dtype}"
)
# check that the atomic types of the system match the one the model supports
all_types = torch.unique(system.types)
for atom_type in all_types:
if atom_type not in capabilities.atomic_types:
raise ValueError(
f"this model can not run for the atomic type '{atom_type}'"
)
# Check neighbors lists
known_neighbors_lists = system.known_neighbors_lists()
for request in requested_neighbors_lists:
found = False
for known in known_neighbors_lists:
if request == known:
found = True
if not found:
raise ValueError(
"missing neighbors list in the system: the model requested "
f"a list for {request}, but it was not computed and stored "
"in the system"
)
def _convert_systems_units(
systems: List[System],
conversion: float,
model_length_unit: str,
system_length_unit: str,
) -> List[System]:
if conversion == 1.0:
return systems
new_systems: List[System] = []
for system in systems:
new_system = System(
types=system.types,
positions=conversion * system.positions,
cell=conversion * system.cell,
)
# also update the neighbors list distances
for request in system.known_neighbors_lists():
neighbors = system.get_neighbors_list(request)
new_system.add_neighbors_list(
request,
TensorBlock(
values=conversion * neighbors.values,
samples=neighbors.samples,
components=neighbors.components,
properties=neighbors.properties,
),
)
known_data = system.known_data()
if len(known_data) != 0:
warnings.warn(
"the model requires a different length unit "
f"({model_length_unit}) than the system ({system_length_unit}), "
f"but we don't know how to convert custom data ({known_data}) "
"accordingly",
stacklevel=2,
)
for data in known_data:
new_system.add_data(data, system.get_data(data))
new_systems.append(new_system)
return new_systems