import copy
import ctypes
import pathlib
from pickle import PickleBuffer
from typing import Any, BinaryIO, Generator, List, Sequence, Tuple, Union
from . import _data
from ._c_api import c_uintptr_t, mts_array_t, mts_block_t, mts_labels_t
from ._c_lib import _get_library
from ._data import (
Array,
Device,
DType,
)
from ._labels import Labels
from ._status import check_pointer
class TensorBlock:
"""
Basic building block for a :py:class:`TensorMap`.
A single block contains a n-dimensional :py:class:`metatensor.data.Array`,
and n sets of :py:class:`Labels` (one for each dimension). The first
dimension is the *samples* dimension, the last dimension is the *properties*
dimension. Any intermediate dimension is called a *component* dimension.
Samples should be used to describe *what* we are representing, while
properties should contain information about *how* we are representing it.
Finally, components should be used to describe vectorial or tensorial
components of the data.
A block can also contain gradients of the values with respect to a variety
of parameters. In this case, each gradient is a :py:class:`TensorBlock` with
a separate set of samples and possibly components, but which shares the same
property labels as the original :py:class:`TensorBlock`.
>>> import numpy as np
>>> block = TensorBlock(
... values=np.array(
... [
... [1, 2, 4],
... [3, 5, 6],
... ]
... ),
... samples=Labels("samples", np.array([[4], [2]])),
... components=[],
... properties=Labels("properties", np.array([[0], [1], [2]])),
... )
>>> block
TensorBlock
samples (2): ['samples']
components (): []
properties (3): ['properties']
gradients: None
>>> block.samples
Labels(
samples
4
2
)
>>> block.values[block.samples.position([2])]
array([3, 5, 6])
"""
def __init__(
self,
values: Array,
samples: Labels,
components: Sequence[Labels],
properties: Labels,
):
"""
:param values: array containing the values for this block
:param samples: labels describing the samples (first dimension of the array)
:param components: list of labels describing the components (intermediate
dimensions of the array). This should be an empty list for scalar/invariant
data.
:param properties: labels describing the properties (last dimension of the
array)
"""
self._lib = _get_library()
self._parent = None
self._gradient_parameters = []
if not isinstance(samples, Labels):
raise TypeError(f"`samples` must be metatensor Labels, not {type(samples)}")
components = list(components)
for component in components:
if not isinstance(component, Labels):
raise TypeError(
"`components` elements must be metatensor Labels, "
f"not {type(component)}"
)
if not isinstance(properties, Labels):
raise TypeError(
f"`properties` must be metatensor Labels, not {type(properties)}"
)
components_array = ctypes.ARRAY(ctypes.POINTER(mts_labels_t), len(components))()
for i, component in enumerate(components):
components_array[i] = component.as_mts_labels_t()
mts_array = _data.create_mts_array(values)
self._ptr = self._lib.mts_block(
mts_array,
samples.as_mts_labels_t(),
components_array,
len(components_array),
properties.as_mts_labels_t(),
)
check_pointer(self._ptr)
self._cached_dtype = _data.array_dtype(values)
self._cached_device = _data.array_device(values)
[docs]
@staticmethod
def unsafe_from_ptr(block: ctypes.POINTER(mts_block_t)):
"""
Create a :py:class:`TensorBlock` from a raw ``mts_block_t`` pointer.
The :py:class:`TensorBlock` takes ownership of the pointer, and will
release the corresponding memory when garbage-collected.
"""
assert block, "mts_block_t pointer is null"
obj = TensorBlock.__new__(TensorBlock)
obj._lib = _get_library()
obj._gradient_parameters = []
obj._ptr = block
obj._cached_dtype = None
obj._cached_device = None
obj._parent = None
return obj
[docs]
@staticmethod
def unsafe_view_from_ptr(ptr: ctypes.POINTER(mts_block_t), parent: Any):
"""
Create a :py:class:`TensorBlock` from a raw ``mts_block_t`` pointer, keeping a
reference to the ``parent`` to prevent garbage collection.
The :py:class:`TensorBlock` does **not** take ownership of the pointer, and will
not release the corresponding memory.
"""
assert parent is not None, (
"please use TensorBlock.unsafe_from_ptr to take ownership of a pointer"
)
obj = TensorBlock.unsafe_from_ptr(ptr)
# keep a reference to the parent object (usually a TensorMap) to
# prevent it from being garbage-collected & removing this block
obj._parent = parent
return obj
[docs]
def as_mts_block_t(self) -> ctypes.POINTER(mts_block_t):
"""
Get the underlying C pointer for this :py:class:`TensorBlock`.
This class still manages the block memory after the call. Use
:py:meth:`TensorBlock.release` to take ownership of the pointer.
"""
if not self._ptr:
raise ValueError(
"this block has been released or moved inside a TensorBlock "
"or TensorMap and can no longer be used"
)
return self._ptr
[docs]
def release(self):
"""
Release the underlying C pointer of this :py:class:`TensorBlock`.
This class is no longer managing the block memory after the call, the
user is expected to re-create a :py:class:`TensorBlock` with
:py:meth:`TensorBlock.unsafe_from_ptr`, or pass the pointer to a C
function that will call ``mts_block_free``.
"""
if self._parent is not None:
raise RuntimeError(
"can not release this TensorBlock, it is a view inside another "
"TensorBlock or a TensorMap"
)
ptr = self.as_mts_block_t()
self._ptr = None
return ptr
def __del__(self):
if (
hasattr(self, "_lib")
and self._lib is not None
and hasattr(self, "_ptr")
and hasattr(self, "_parent")
):
if self._parent is None:
self._lib.mts_block_free(self._ptr)
def __copy__(self):
return self.copy(deep=False)
def __deepcopy__(self, _memodict):
return self.copy(deep=True)
def __reduce__(self):
raise NotImplementedError(
"Pickling for is not implemented for TensorBlocks, wrap the block in a "
"TensorMap first"
)
def __len__(self) -> int:
"""
Get the length of the values stored in this block
(i.e. the number of samples in the block)
"""
return len(self.values)
@property
def shape(self):
"""
Get the shape of the values array in this block.
"""
return self.values.shape
[docs]
def copy(self, deep: bool = True) -> "TensorBlock":
"""
Get a copy of this block, with the same values and labels. If ``deep`` is
``True``, also make a full copy of the values; otherwise, the values in the new
block will share the same memory as those in this block.
:param deep: if ``True``, create a deep copy of the block
"""
if deep:
new_ptr = self._lib.mts_block_copy(self.as_mts_block_t())
check_pointer(new_ptr)
return TensorBlock.unsafe_from_ptr(new_ptr)
else:
new_block = TensorBlock(
values=self.values,
samples=self.samples,
components=self.components,
properties=self.properties,
)
for parameter in self.gradients_list():
gradient = self.gradient(parameter)
new_block.add_gradient(parameter, gradient.copy(deep=False))
return new_block
def __repr__(self) -> str:
if not self._ptr:
# The block has been released
return "TensorBlock(<empty>)"
if len(self._gradient_parameters) != 0:
s = f"Gradient TensorBlock ('{'/'.join(self._gradient_parameters)}')\n"
else:
s = "TensorBlock\n"
s += f" samples ({len(self.samples)}): {str(list(self.samples.names))}"
s += "\n"
s += " components ("
s += ", ".join([str(len(c)) for c in self.components])
s += "): ["
for ic in self.components:
for name in ic.names[:]:
s += "'" + name + "', "
if len(self.components) > 0:
s = s[:-2]
s += "]\n"
s += f" properties ({len(self.properties)}): "
s += f"{str(list(self.properties.names))}\n"
s += " gradients: "
if len(self.gradients_list()) > 0:
s += f"{str(list(self.gradients_list()))}"
else:
s += "None"
return s
def __eq__(self, other):
from metatensor.operations import equal_block
return equal_block(self, other)
def __ne__(self, other):
from metatensor.operations import equal_block
return not equal_block(self, other)
@property
def _raw_values(self) -> mts_array_t:
"""Get the raw ``mts_array_t`` corresponding to this block's values"""
data = mts_array_t()
self._lib.mts_block_data(self.as_mts_block_t(), data)
return data
@property
def values(self) -> Array:
"""
Get the values for this block.
The array type depends on how the block was created. Currently, numpy
``ndarray`` and torch ``Tensor`` are supported.
"""
return _data.mts_array_to_python_array(self._raw_values, parent=self)
@values.setter
def values(self, new_values):
raise AttributeError(
"Direct assignment to `values` is not possible. "
"Please use block.values[:] = new_values instead."
)
@property
def samples(self) -> Labels:
"""
Get the sample :py:class:`Labels` for this block.
The entries in these labels describe the first dimension of the
``values`` array.
"""
return self._labels(0)
@property
def components(self) -> List[Labels]:
"""
Get the component :py:class:`Labels` for this block.
The entries in these labels describe intermediate dimensions of the
``values`` array.
"""
n_components = len(self.values.shape) - 2
result = []
for axis in range(n_components):
result.append(self._labels(axis + 1))
return result
@property
def properties(self) -> Labels:
"""
Get the property :py:class:`Labels` for this block.
The entries in these labels describe the last dimension of the
``values`` array. The properties are guaranteed to be the same for
values and gradients in the same block.
"""
property_axis = len(self.values.shape) - 1
return self._labels(property_axis)
def _labels(self, axis) -> Labels:
result = self._lib.mts_block_labels(self.as_mts_block_t(), axis)
check_pointer(result)
return Labels.unsafe_from_ptr(result)
[docs]
def gradient(self, parameter: str) -> "TensorBlock":
"""
Get the gradient of the block ``values`` with respect to the given
``parameter``.
:param parameter: check for gradients with respect to this ``parameter``
(e.g. ``positions``, ``cell``, ...)
>>> import numpy as np
>>> from metatensor import Labels, TensorBlock
>>> block = TensorBlock(
... values=np.full((3, 1, 5), 1.0),
... samples=Labels(["system"], np.array([[0], [2], [4]])),
... components=[Labels.range("component", 1)],
... properties=Labels.range("property", 5),
... )
>>> positions_gradient = TensorBlock(
... values=np.full((2, 3, 1, 5), 11.0),
... samples=Labels(["sample", "atom"], np.array([[0, 2], [2, 3]])),
... components=[
... Labels.range("direction", 3),
... Labels.range("component", 1),
... ],
... properties=Labels.range("property", 5),
... )
>>> block.add_gradient("positions", positions_gradient)
>>> cell_gradient = TensorBlock(
... values=np.full((2, 3, 3, 1, 5), 15.0),
... samples=Labels.range("sample", 2),
... components=[
... Labels.range("direction_1", 3),
... Labels.range("direction_2", 3),
... Labels.range("component", 1),
... ],
... properties=Labels.range("property", 5),
... )
>>> block.add_gradient("cell", cell_gradient)
>>> positions_gradient = block.gradient("positions")
>>> print(positions_gradient)
Gradient TensorBlock ('positions')
samples (2): ['sample', 'atom']
components (3, 1): ['direction', 'component']
properties (5): ['property']
gradients: None
>>> cell_gradient = block.gradient("cell")
>>> print(cell_gradient)
Gradient TensorBlock ('cell')
samples (2): ['sample']
components (3, 3, 1): ['direction_1', 'direction_2', 'component']
properties (5): ['property']
gradients: None
"""
gradient_block = ctypes.POINTER(mts_block_t)()
self._lib.mts_block_gradient(
self.as_mts_block_t(), parameter.encode("utf8"), gradient_block
)
check_pointer(gradient_block)
gradient = TensorBlock.unsafe_view_from_ptr(gradient_block, parent=self)
gradient._gradient_parameters = copy.deepcopy(self._gradient_parameters)
gradient._gradient_parameters.append(parameter)
return gradient
[docs]
def add_gradient(self, parameter: str, gradient: "TensorBlock"):
"""
Add gradient with respect to ``parameter`` in this block.
:param parameter:
add gradients with respect to this ``parameter`` (e.g. ``positions``,
``cell``, ...)
:param gradient:
a :py:class:`TensorBlock` whose values contain the gradients of this
:py:class:`TensorBlock` values with respect to ``parameter``. The labels
of the gradient :py:class:`TensorBlock` should be organized as follows:
- its samples must contain ``"sample"`` as the first dimension, with values
containing the index of the corresponding samples in this
:py:class:`TensorBlock`, and arbitrary supplementary samples dimension;
- its components must contain at least the same components as this
:py:class:`TensorBlock`, with any additional components coming before
those;
- its properties must match exactly those of this :py:class:`TensorBlock`.
>>> import numpy as np
>>> from metatensor import Labels, TensorBlock
>>> block = TensorBlock(
... values=np.full((3, 1, 1), 1.0),
... samples=Labels(["system"], np.array([[0], [2], [4]])),
... components=[Labels.range("component", 1)],
... properties=Labels.range("property", 1),
... )
>>> gradient = TensorBlock(
... values=np.full((2, 1, 1), 11.0),
... samples=Labels(["sample", "parameter"], np.array([[0, -2], [2, 3]])),
... components=[Labels.range("component", 1)],
... properties=Labels.range("property", 1),
... )
>>> block.add_gradient("parameter", gradient)
>>> print(block)
TensorBlock
samples (3): ['system']
components (1): ['component']
properties (1): ['property']
gradients: ['parameter']
"""
if self._parent is not None:
raise ValueError(
"cannot add gradient on this block since it is a view inside "
"a TensorMap or another TensorBlock"
)
self._lib.mts_block_add_gradient(
self.as_mts_block_t(), parameter.encode("utf8"), gradient.release()
)
[docs]
def gradients_list(self) -> List[str]:
"""get a list of all gradients defined in this block"""
parameters = ctypes.POINTER(ctypes.c_char_p)()
count = c_uintptr_t()
self._lib.mts_block_gradients_list(self.as_mts_block_t(), parameters, count)
result = []
for i in range(count.value):
result.append(parameters[i].decode("utf8"))
return result
[docs]
def has_gradient(self, parameter: str) -> bool:
"""
Check if this block contains gradient information with respect to the
given ``parameter``.
:param parameter: check for gradients with respect to this ``parameter``
(e.g. ``positions``, ``cell``, ...)
"""
return parameter in self.gradients_list()
[docs]
def gradients(self) -> Generator[Tuple[str, "TensorBlock"], None, None]:
"""Get an iterator over all gradients defined in this block."""
for parameter in self.gradients_list():
yield (parameter, self.gradient(parameter))
@property
def dtype(self) -> DType:
"""
Get the dtype of all the values and gradient arrays stored inside this
:py:class:`TensorBlock`.
"""
if self._cached_dtype is None:
self._cached_dtype = _data.array_dtype(self.values)
return self._cached_dtype
@property
def device(self) -> Device:
"""
Get the device of all the values and gradient arrays stored inside this
:py:class:`TensorBlock`.
"""
if self._cached_device is None:
self._cached_device = _data.array_device(self.values)
return self._cached_device
@property
def is_view(self) -> bool:
"""
Check if this block is a view (i.e. does not own the underlying data).
"""
return self._parent is not None
[docs]
def to(self, *args, **kwargs) -> "TensorBlock":
"""
Move all the data in this block (labels, values, and gradients) to the given
``dtype``, ``device`` and ``arrays`` backend.
:param dtype: new dtype to use for all arrays. The dtype stays the same if this
is set to ``None``.
:param device: new device to use for all arrays. The device stays the same if
this is set to ``None``.
:param Optional[str] arrays: new backend to use for the arrays. This can be
either ``"numpy"``, ``"torch"`` or ``None`` (keeps the existing backend);
and must be given as a keyword argument (``arrays="numpy"``).
:param bool non_blocking: If this is ``True`` and the :py:class:`TensorBlock`
contains ``"torch"`` arrays, the function tries to move the data
asynchronously. See :py:meth:`torch.Tensor.to` for more information.
"""
arrays = kwargs.pop("arrays", None)
non_blocking = kwargs.pop("non_blocking", False)
dtype, device = _data.to_arguments_parse("`TensorBlock.to`", *args, **kwargs)
values = self.values
if arrays is not None:
values = _data.array_change_backend(values, arrays)
if dtype is not None:
values = _data.array_change_dtype(values, dtype, non_blocking=non_blocking)
if device is not None:
values = _data.array_change_device(
values, device, non_blocking=non_blocking
)
block = TensorBlock(
values,
self.samples.to(device=device, arrays=arrays, non_blocking=non_blocking),
[
c.to(device=device, arrays=arrays, non_blocking=non_blocking)
for c in self.components
],
self.properties.to(device=device, arrays=arrays, non_blocking=non_blocking),
)
for parameter, gradient in self.gradients():
block.add_gradient(
parameter,
gradient.to(
dtype=dtype,
device=device,
arrays=arrays,
non_blocking=non_blocking,
),
)
return block
# ===== Serialization support ===== #
@classmethod
def _from_pickle(cls, buffer: Union[bytes, bytearray]):
"""
Passed to pickler to reconstruct TensorBlock from bytes object
"""
from .io import create_numpy_array, load_block_buffer_custom_array
# TODO: make it so when saving data in torch tensors, we load back data in torch
# tensors.
return load_block_buffer_custom_array(buffer, create_numpy_array)
def __reduce_ex__(self, protocol: int):
"""
Used by the Pickler to dump TensorBlock object to bytes object. When protocol >=
5 it supports PickleBuffer which reduces number of copies needed
"""
from .io import _save_block_buffer_raw
buffer = _save_block_buffer_raw(self)
if protocol >= 5:
return self._from_pickle, (PickleBuffer(buffer),)
else:
return self._from_pickle, (buffer.raw,)
[docs]
@staticmethod
def load(
file: Union[str, pathlib.Path, BinaryIO], use_numpy=False
) -> "TensorBlock":
"""
Load a serialized :py:class:`TensorBlock` from a file or a buffer, calling
:py:func:`metatensor.load_block`.
:param file: file path or file object to load from
:param use_numpy: should we use the numpy loader or metatensor's. See
:py:func:`metatensor.load` for more information.
"""
from .io import load_block
return load_block(file=file, use_numpy=use_numpy)
[docs]
@staticmethod
def load_buffer(
buffer: Union[bytes, bytearray, memoryview],
use_numpy=False,
) -> "TensorBlock":
"""
Load a serialized :py:class:`TensorMap` from a buffer, calling
:py:func:`metatensor.io.load_block_buffer`.
:param buffer: in-memory buffer containing the data
:param use_numpy: should we use the numpy loader or metatensor's. See
:py:func:`metatensor.load` for more information.
"""
from .io import load_block_buffer
return load_block_buffer(buffer=buffer)
[docs]
def save(self, file: Union[str, pathlib.Path, BinaryIO], use_numpy=False):
"""
Save this :py:class:`TensorBlock` to a file or a buffer, calling
:py:func:`metatensor.save`.
:param file: file path or file object to save to
:param use_numpy: should we use the numpy serializer or metatensor's. See
:py:func:`metatensor.save` for more information.
"""
from .io import save
return save(file=file, data=self, use_numpy=use_numpy)
[docs]
def save_buffer(self, use_numpy=False) -> memoryview:
"""
Save this :py:class:`TensorBlock` to an in-memory buffer, calling
:py:func:`metatensor.io.save_buffer`.
:param use_numpy: should we use numpy serialization or metatensor's. See
:py:func:`metatensor.save` for more information.
"""
from .io import save_buffer
return save_buffer(data=self, use_numpy=use_numpy)