Source code for metatensor.operations._reduce_over_properties

from typing import List, Optional, Union

import numpy as np

from . import _dispatch
from ._backend import (
    Labels,
    TensorBlock,
    TensorMap,
    torch_jit_is_scripting,
    torch_jit_script,
)


def _reduce_over_properties_block(
    block: TensorBlock,
    property_names: Union[List[str], str],
    reduction: str,
    remaining_properties: Optional[List[str]] = None,
) -> TensorBlock:
    """
    Create a new :py:class:`TensorBlock` reducing the ``properties`` among the
    selected ``properties``.

    The output :py:class:`TensorBlocks` have the same components as the input.
    "sum", "mean", "std" or "var" reductions can be performed.

    :param block:
        input block
    :param property_names:
        names of properties to reduce over. it is ignored if remaining_properties is
        given
    :param remaining_properties:
        names of properties that should remain after reducing reduce over.
        it is computed automatically from property_names if missing or set to None
    :param reduction:
        how to reduce, only available values are "mean", "sum", "std" or "var"
    """
    if isinstance(property_names, str):
        property_names_list = [property_names]
    else:
        property_names_list = property_names

    block_properties = block.properties

    remaining_properties = None
    if remaining_properties is None:
        remaining_property_names: List[str] = []
        for p_name in block_properties.names:
            if p_name in property_names_list:
                continue
            remaining_property_names.append(p_name)
    else:
        remaining_property_names = remaining_properties

    for sample in remaining_property_names:
        assert sample in block_properties.names

    assert reduction in ["sum", "mean", "var", "std"]
    # get the indices of the selected property
    remaining_property_columns = [
        block_properties.names.index(prop) for prop in remaining_property_names
    ]

    if len(block.properties) == 0:
        properties = Labels(
            remaining_property_names,
            _dispatch.zeros_like(block.values, [0, len(remaining_property_names)]),
        )

        result_block = TensorBlock(
            values=block.values,
            samples=block.samples,
            components=block.components,
            properties=properties,
        )

        # The gradient does not change because the only thing that matters for
        # the gradients are the samples to which they are connected, but in this
        # case there are no samples in the TensorBlock
        for parameter, gradient in block.gradients():
            if len(gradient.gradients_list()) != 0:
                raise NotImplementedError("gradients of gradients are not supported")

            result_block.add_gradient(
                parameter=parameter,
                gradient=TensorBlock(
                    values=gradient.values,
                    samples=gradient.samples,
                    components=gradient.components,
                    properties=gradient.properties,
                ),
            )

        return result_block

    # get which properties will still be there after reduction
    if len(remaining_property_names) == 0:
        new_properties = _dispatch.zeros_like(block_properties.values, shape=(1, 0))
        index = _dispatch.zeros_like(
            block_properties.values, shape=(block_properties.values.shape[0],)
        )
    else:
        new_properties, index = _dispatch.unique_with_inverse(
            block_properties.values[:, remaining_property_columns], axis=0
        )
        index = index.reshape(-1)

    block_values = block.values
    shape = (block_values.shape)[:-1] + (new_properties.shape[0],)
    values_sum = _dispatch.zeros_like(block_values, shape=shape)

    _dispatch.columns_add(values_sum, block_values, index)

    values_mean = _dispatch.empty_like(values_sum, [0])
    values_result = _dispatch.empty_like(values_sum, [0])

    if reduction == "mean" or reduction == "std" or reduction == "var":
        bincount = _dispatch.make_like(_dispatch.bincount(index), values_sum)
        bin_shape = list(block_values.shape)
        bin_shape = [(-1 if i == 1 else 1) for i in range(len(shape))]
        bincount = bincount.reshape(bin_shape)
        values_mean = values_sum / bincount

        if reduction == "std" or reduction == "var":
            values_var = _dispatch.zeros_like(block_values, shape=shape)
            _dispatch.columns_add(
                values_var,
                (block_values - _dispatch.slice_last_dim(values_mean, index)) ** 2,
                index,
            )
            values_var = values_var / bincount

            if reduction == "std":
                values_result = _dispatch.sqrt(values_var)
            elif reduction == "var":
                values_result = values_var

        elif reduction == "mean":
            values_result = values_mean
    elif reduction == "sum":
        values_result = values_sum

    if len(remaining_property_names) == 0:
        properties_label = Labels(
            names="_",
            values=_dispatch.zeros_like(block_properties.values, shape=(1, 1)),
        )
    else:
        properties_label = Labels(
            remaining_property_names,
            new_properties,
        )

    result_block = TensorBlock(
        values=values_result,
        samples=block.samples,
        components=block.components,
        properties=properties_label,
    )

    for parameter, gradient in block.gradients():
        if len(gradient.gradients_list()) != 0:
            raise NotImplementedError("gradients of gradients are not supported")

        # check if all gradients are zeros
        if len(gradient.properties) == 0:
            # The gradients does not change because, if they are all zeros, the
            # gradients after reducing operation is still zero.

            # For any function of the TensorBlock values x(t):
            # f(x(t))-> df(x(t))/dx * dx/dt
            # and dx/dt == 0.
            result_block.add_gradient(
                parameter=parameter,
                gradient=TensorBlock(
                    values=gradient.values,
                    samples=gradient.samples,
                    components=gradient.components,
                    properties=gradient.properties,
                ),
            )
            continue

        new_gradient_properties, index_gradient = result_block.properties.values, index
        index_gradient = index_gradient.reshape(-1)

        gradient_values = gradient.values
        other_shape = gradient_values.shape[:-1]
        gradient_values_result = _dispatch.zeros_like(
            gradient_values,
            shape=other_shape + (new_gradient_properties.shape[0],),
        )
        _dispatch.columns_add(gradient_values_result, gradient_values, index_gradient)

        if reduction == "mean" or reduction == "var" or reduction == "std":
            bincount = _dispatch.bincount(index_gradient)
            bincount = bincount.reshape((1,) * len(other_shape) + (-1,))
            gradient_values_result = gradient_values_result / bincount
            if reduction == "std" or reduction == "var":
                values_times_gradient_values = _dispatch.zeros_like(gradient_values)

                for i in range(gradient.samples.values.shape[0]):
                    s = gradient.samples.entry(i)
                    values_times_gradient_values[i] = (
                        gradient_values[i] * block_values[int(s[0])]
                    )

                values_grad_result = _dispatch.zeros_like(
                    gradient_values,
                    shape=other_shape + (new_gradient_properties.shape[0],),
                )
                _dispatch.columns_add(
                    values_grad_result,
                    values_times_gradient_values,
                    index_gradient,
                )

                values_grad_result = values_grad_result / bincount
                sample_indices = gradient.samples.values[:, 0]
                if reduction == "var":
                    for i, _ in enumerate(new_gradient_properties):
                        shape = (-1,) + (1,) * (gradient_values_result.ndim - 2)
                        gradient_values_result = _dispatch.scatter_last_dim(
                            gradient_values_result,
                            i,
                            _dispatch.slice_last_dim(gradient_values_result, i)
                            * _dispatch.slice_last_dim(values_mean, i)[
                                sample_indices
                            ].reshape(shape),
                        )
                    gradient_values_result = 2 * (
                        values_grad_result - gradient_values_result
                    )
                else:  # std
                    for i, _ in enumerate(new_gradient_properties):
                        shape = (-1,) + (1,) * (gradient_values_result.ndim - 2)
                        if torch_jit_is_scripting():
                            gradient_values_result = _dispatch.scatter_last_dim(
                                gradient_values_result,
                                i,
                                _dispatch.slice_last_dim(gradient_values_result, i)
                                * _dispatch.slice_last_dim(values_mean, i)[
                                    sample_indices
                                ].reshape(shape)
                                / _dispatch.slice_last_dim(values_result, i)[
                                    sample_indices
                                ].reshape(shape),
                            )
                        else:
                            # no need to be torchscript compatible, let's keep it simple
                            # only numpy raise a warning for division by zero
                            with np.errstate(divide="ignore", invalid="ignore"):
                                gradient_values_result[..., i] = (
                                    values_grad_result[..., i]
                                    - (
                                        gradient_values_result[..., i]
                                        * values_mean[..., i][sample_indices].reshape(
                                            shape
                                        )
                                    )
                                ) / values_result[..., i][sample_indices].reshape(shape)
                        gradient_values_result = _dispatch.scatter_last_dim(
                            gradient_values_result,
                            i,
                            _dispatch.nan_to_num(
                                _dispatch.slice_last_dim(gradient_values_result, i),
                                nan=0.0,
                                posinf=0.0,
                                neginf=0.0,
                            ),
                        )
                        gradient_values_result[..., i] = _dispatch.nan_to_num(
                            gradient_values_result[..., i],
                            nan=0.0,
                            posinf=0.0,
                            neginf=0.0,
                        )

        # no check for the len of the gradient sample is needed because there
        # always will be at least one sample in the gradient
        result_block.add_gradient(
            parameter=parameter,
            gradient=TensorBlock(
                values=gradient_values_result,
                samples=gradient.samples,
                components=gradient.components,
                properties=result_block.properties,
            ),
        )

    return result_block


def _reduce_over_properties(
    tensor: TensorMap, property_names: Union[List[str], str], reduction: str
) -> TensorMap:
    """
    Create a new :py:class:`TensorMap` with the same keys as as the input
    ``tensor``, and each :py:class:`TensorBlock` is obtained summing the
    corresponding input :py:class:`TensorBlock` over the ``property_names``
    indices.

    "sum", "mean", "std" or "var" reductions can be performed.

    :param tensor: input :py:class:`TensorMap`
    :param property_names: names of properties to reduce over
    :param reduction: how to reduce, only available values are "mean", "sum",
    "std" or "var"
    """
    if isinstance(property_names, str):
        property_names_list = [property_names]
    else:
        property_names_list = property_names

    for property in property_names_list:
        if property not in tensor.property_names:
            raise ValueError(
                f"one of the requested property name ({property}) is not part of "
                "this TensorMap"
            )

    remaining_properties: List[str] = []
    for p_name in tensor.property_names:
        if p_name in property_names_list:
            continue
        remaining_properties.append(p_name)

    blocks: List[TensorBlock] = []
    for block in tensor.blocks():
        blocks.append(
            _reduce_over_properties_block(
                block=block,
                property_names=property_names_list,
                reduction=reduction,
                remaining_properties=remaining_properties,
            )
        )
    return TensorMap(tensor.keys, blocks)


[docs] @torch_jit_script def sum_over_properties_block( block: TensorBlock, property_names: Union[List[str], str] ) -> TensorBlock: """ Sum a :py:class:`TensorBlock`, combining the properties according to ``property_names``. This function creates a new :py:class:`TensorBlock` in which each property is obtained summing over the ``property_names`` indices, so that the resulting :py:class:`TensorBlock` does not have those indices. ``property_names`` indicates over which dimensions in the properties the sum is performed. It accept either a single string or a list of the string with the property names corresponding to the directions along which the sum is performed. A single string is equivalent to a list with a single element: ``property_names = "i"`` is the same as ``property_names = ["i"]``. :param block: input :py:class:`TensorBlock` :param property_names: names of properties to sum over :returns: a :py:class:`TensorBlock` containing the reduced values and property labels >>> from metatensor import Labels, TensorBlock, TensorMap >>> block = TensorBlock( ... values=np.array( ... [ ... [1, 3, 7, 10], ... [2, 5, 8, 11], ... [4, 6, 9, 12], ... ] ... ), ... samples=Labels( ... ["system", "atom"], ... np.array( ... [ ... [0, 0], ... [0, 1], ... [1, 0], ... ] ... ), ... ), ... components=[], ... properties=Labels( ... ["i", "j"], ... np.array( ... [ ... [0, 0], ... [0, 1], ... [1, 0], ... [1, 1], ... ] ... ), ... ), ... ) >>> block_sum = sum_over_properties_block(block, property_names="j") >>> print(block_sum.properties) Labels( i 0 1 ) >>> print(block_sum.values) [[ 4 17] [ 7 19] [10 21]] """ return _reduce_over_properties_block( block=block, property_names=property_names, reduction="sum" )
[docs] @torch_jit_script def sum_over_properties( tensor: TensorMap, property_names: Union[List[str], str] ) -> TensorMap: """ Sum a :py:class:`TensorMap`, combining the properties according to ``property_names``. This function creates a new :py:class:`TensorMap` with the same keys as as the input ``tensor``. Each :py:class:`TensorBlock` is obtained summing the corresponding input :py:class:`TensorBlock` over the ``property_names`` indices, essentially calling :py:func:`sum_over_properties_block` over each block in ``tensor``. ``property_names`` indicates over which dimensions in the properties the sum is performed. It accept either a single string or a list of the string with the property names corresponding to the directions along which the sum is performed. A single string is equivalent to a list with a single element: ``property_names = "atom"`` is the same as ``property_names = ["atom"]``. :param tensor: input :py:class:`TensorMap` :param property_names: names of properties to sum over :returns: a :py:class:`TensorMap` containing the reduced values and property labels >>> from metatensor import Labels, TensorBlock, TensorMap >>> block = TensorBlock( ... values=np.array( ... [ ... [1, 3, 7, 10], ... [2, 5, 8, 11], ... [4, 6, 9, 12], ... ] ... ), ... samples=Labels( ... ["system", "atom"], ... np.array( ... [ ... [0, 0], ... [0, 1], ... [1, 0], ... ] ... ), ... ), ... components=[], ... properties=Labels( ... ["i", "j"], ... np.array( ... [ ... [0, 0], ... [0, 1], ... [1, 0], ... [1, 1], ... ] ... ), ... ), ... ) >>> keys = Labels(names=["key"], values=np.array([[0]])) >>> tensor = TensorMap(keys, [block]) >>> tensor_sum = sum_over_properties(tensor, property_names="j") >>> # only 'i' is left as a property >>> print(tensor_sum.block(0)) TensorBlock samples (3): ['system', 'atom'] components (): [] properties (2): ['i'] gradients: None >>> print(tensor_sum.block(0).properties) Labels( i 0 1 ) >>> print(tensor_sum.block(0).values) [[ 4 17] [ 7 19] [10 21]] """ return _reduce_over_properties( tensor=tensor, property_names=property_names, reduction="sum" )
[docs] @torch_jit_script def mean_over_properties_block( block: TensorBlock, property_names: Union[List[str], str] ) -> TensorBlock: """Averages a :py:class:`TensorBlock`, combining the properties according to ``property_names``. See also :py:func:`sum_over_properties_block` and :py:func:`mean_over_properties` :param block: input :py:class:`TensorBlock` :param property_names: names of properties to average over :returns: a :py:class:`TensorBlock` containing the reduced values and property labels """ return _reduce_over_properties_block( block=block, property_names=property_names, reduction="mean" )
[docs] @torch_jit_script def mean_over_properties( tensor: TensorMap, property_names: Union[str, List[str]] ) -> TensorMap: """Compute the mean of a :py:class:`TensorMap`, combining the properties according to ``property_names``. This function creates a new :py:class:`TensorMap` with the same keys as the input ``tensor``, and each :py:class:`TensorBlock` is obtained averaging the corresponding input :py:class:`TensorBlock` over the ``property_names`` indices. ``property_names`` indicates over which dimensions in the properties the mean is performed. It accept either a single string or a list of the string with the property names corresponding to the directions along which the mean is performed. A single string is equivalent to a list with a single element: ``property_names = "atom"`` is the same as ``property_names = ["atom"]``. For a general discussion of reduction operations and a usage example see the doc for :py:func:`sum_over_properties`. :param tensor: input :py:class:`TensorMap` :param property_names: names of properties to average over """ return _reduce_over_properties( tensor=tensor, property_names=property_names, reduction="mean" )
[docs] @torch_jit_script def std_over_properties_block( block: TensorBlock, property_names: Union[List[str], str] ) -> TensorBlock: """Computes the standard deviation for a :py:class:`TensorBlock`, combining the properties according to ``property_names``. See also :py:func:`sum_over_properties_block` and :py:func:`std_over_properties` :param block: input :py:class:`TensorBlock` :param property_names: names of properties to compute the standard deviation for :returns: a :py:class:`TensorBlock` containing the reduced values and property labels """ return _reduce_over_properties_block( block=block, property_names=property_names, reduction="std" )
[docs] @torch_jit_script def std_over_properties( tensor: TensorMap, property_names: Union[str, List[str]] ) -> TensorMap: r"""Compute the standard deviation of a :py:class:`TensorMap`, combining the properties according to ``property_names``. This function creates a new :py:class:`TensorMap` with the same keys as as the input ``tensor``, and each :py:class:`TensorBlock` is obtained performing the std deviation of the corresponding input :py:class:`TensorBlock` over the ``property_names`` indices. ``property_names`` indicates over which dimensions in the properties the mean is performed. It accept either a single string or a list of the string with the property names corresponding to the directions along which the mean is performed. A single string is equivalent to a list with a single element: ``property_names = "i"`` is the same as ``property_names = ["i"]``. For a general discussion of reduction operations and a usage example see the doc for :py:func:`sum_over_properties()`. The gradient is implemented as follows: .. math:: \nabla[Std(X)] = 0.5(\nabla[Var(X)])/Std(X) = (E[X \nabla X] - E[X]E[\nabla X])/Std(X) :param tensor: input :py:class:`TensorMap` :param property_names: names of properties to perform the standart deviation over """ return _reduce_over_properties( tensor=tensor, property_names=property_names, reduction="std" )
[docs] @torch_jit_script def var_over_properties_block( block: TensorBlock, property_names: Union[List[str], str] ) -> TensorBlock: """Computes the variance for a :py:class:`TensorBlock`, combining the properties according to ``property_names``. See also :py:func:`sum_over_properties_block` and :py:func:`std_over_properties` :param block: input :py:class:`TensorBlock` :param property_names: names of properties to compute the variance for :returns: a :py:class:`TensorBlock` containing the reduced values and property labels """ return _reduce_over_properties_block( block=block, property_names=property_names, reduction="var" )
[docs] @torch_jit_script def var_over_properties( tensor: TensorMap, property_names: Union[str, List[str]] ) -> TensorMap: r"""Compute the variance of a :py:class:`TensorMap`, combining the properties according to ``property_names``. This function creates a new :py:class:`TensorMap` with the same keys as as the input ``tensor``, and each :py:class:`TensorBlock` is obtained performing the variance of the corresponding input :py:class:`TensorBlock` over the ``property_names`` indices. ``property_names`` indicates over which dimensions in the properties the mean is performed. It accept either a single string or a list of the string with the property names corresponding to the directions along which the mean is performed. A single string is equivalent to a list with a single element: ``property_names = "i"`` is the same as ``property_names = ["i"]``. For a general discussion of reduction operations and a usage example see the doc for :py:func:`sum_over_properties`. The gradient is implemented as follow: .. math:: \nabla[Var(X)] = 2(E[X \nabla X] - E[X]E[\nabla X]) :param tensor: input :py:class:`TensorMap` :param property_names: names of properties to perform the variance over """ return _reduce_over_properties( tensor=tensor, property_names=property_names, reduction="var" )