Source code for metatensor.io

import ctypes
import io
import pathlib
import warnings
from typing import BinaryIO, Callable, Union

import numpy as np

from ._c_api import c_uintptr_t, mts_array_t, mts_create_array_callback_t, mts_labels_t
from ._c_lib import _get_library
from .block import TensorBlock
from .data.array import ArrayWrapper, _is_numpy_array, _is_torch_array
from .labels import Labels
from .status import _save_exception
from .tensor import TensorMap
from .utils import catch_exceptions


[docs] @catch_exceptions def create_numpy_array(shape_ptr, shape_count, array): """ Callback function that can be used with :py:func:`metatensor.io.load_custom_array` to load data in numpy arrays. """ shape = [] for i in range(shape_count): shape.append(shape_ptr[i]) data = np.empty(shape, dtype=np.float64) wrapper = ArrayWrapper(data) array[0] = wrapper.into_mts_array()
[docs] @catch_exceptions def create_torch_array(shape_ptr, shape_count, array): """ Callback function that can be used with :py:func:`metatensor.io.load_custom_array` to load data in torch tensors. The resulting tensors are stored on CPU, and their dtype is ``torch.float64``. """ import torch shape = [] for i in range(shape_count): shape.append(shape_ptr[i]) data = torch.empty(shape, dtype=torch.float64, device="cpu") wrapper = ArrayWrapper(data) array[0] = wrapper.into_mts_array()
[docs] def save( file: Union[str, pathlib.Path, BinaryIO], data: Union[TensorMap, Labels], use_numpy=False, ): """ Save the given data (either :py:class:`TensorMap` or :py:class:`Labels`) to the given ``file``. :py:class:`TensorMap` are serialized using numpy's ``.npz`` format, i.e. a ZIP file without compression (storage method is ``STORED``), where each file is stored as a ``.npy`` array. See the C API documentation for more information on the format. :param file: where to save the data. This can be a string, :py:class:`pathlib.Path` containing the path to the file to load, or a file-like object that should be opened in binary mode. :param data: data to serialize and save :param use_numpy: should we use numpy or the native serializer implementation? Numpy should be able to process more dtypes than the native implementation, which is limited to float64, but the native implementation is usually faster than going through numpy. This is ignored when saving :py:class:`Labels`. """ if isinstance(data, Labels): return _save_labels(file=file, labels=data) elif isinstance(data, TensorMap): return _save_tensor(file=file, tensor=data, use_numpy=use_numpy) else: raise TypeError( f"`data` must be either 'Labels' or 'TensorMap', not {type(data)}" )
[docs] def save_buffer( data: Union[TensorMap, Labels], use_numpy=False, ) -> memoryview: """ Save the given data (either :py:class:`TensorMap` or :py:class:`Labels`) to an in-memory buffer. :param data: data to serialize and save :param use_numpy: should we use numpy or the native serializer implementation? """ if isinstance(data, Labels): return memoryview(_save_labels_buffer_raw(labels=data)) elif isinstance(data, TensorMap): if use_numpy: file = io.BytesIO() save(file, data=data, use_numpy=use_numpy) return file.getbuffer() else: return memoryview(_save_tensormap_buffer_raw(tensor=data)) else: raise TypeError( f"`data` must be either 'Labels' or 'TensorMap', not {type(data)}" )
[docs] def load(file: Union[str, pathlib.Path, BinaryIO], use_numpy=False) -> TensorMap: """ Load a previously saved :py:class:`TensorMap` from the given file. :py:class:`TensorMap` are serialized using numpy's ``.npz`` format, i.e. a ZIP file without compression (storage method is ``STORED``), where each file is stored as a ``.npy`` array. See the C API documentation for more information on the format. :param file: file to load: this can be a string, a :py:class:`pathlib.Path` containing the path to the file to load, or a file-like object that should be opened in binary mode. :param use_numpy: should we use numpy or the native implementation? Numpy should be able to process more dtypes than the native implementation, which is limited to float64, but the native implementation is usually faster than going through numpy. """ if use_numpy: return _read_npz(file) else: if isinstance(file, (str, pathlib.Path)): return load_custom_array(file, create_numpy_array) else: # assume we have a file-like object buffer = file.read() assert isinstance(buffer, bytes) return load_buffer_custom_array(buffer, create_numpy_array)
[docs] def load_buffer( buffer: Union[bytes, bytearray, memoryview], use_numpy=False ) -> TensorMap: """ Load a previously saved :py:class:`TensorMap` from an in-memory buffer. :param buffer: In-memory buffer containing a serialized ``TensorMap`` :param use_numpy: should we use numpy or the native implementation? """ if use_numpy: return _read_npz(io.BytesIO(buffer)) else: return load_buffer_custom_array(buffer, create_numpy_array)
CreateArrayCallback = Callable[ [ctypes.POINTER(c_uintptr_t), c_uintptr_t, ctypes.POINTER(mts_array_t)], None ]
[docs] def load_custom_array( path: Union[str, pathlib.Path], create_array: CreateArrayCallback, ) -> TensorMap: """ Load a previously saved :py:class:`TensorMap` from the given path using a custom array creation callback. This is an advanced functionality, which should not be needed by most users. This function allows to specify the kind of array to use when loading the data through the ``create_array`` callback. This callback should take three arguments: a pointer to the shape, the number of elements in the shape, and a pointer to the ``mts_array_t`` to be filled. :py:func:`metatensor.io.create_numpy_array` and :py:func:`metatensor.io.create_torch_array` can be used to load data into numpy and torch arrays respectively. :param path: path of the file to load :param create_array: callback used to create arrays as needed """ lib = _get_library() if isinstance(path, str): path = path.encode("utf8") elif isinstance(path, pathlib.Path): path = bytes(path) ptr = lib.mts_tensormap_load(path, mts_create_array_callback_t(create_array)) return TensorMap._from_ptr(ptr)
[docs] def load_buffer_custom_array( buffer: Union[bytes, bytearray, memoryview], create_array: CreateArrayCallback, ) -> TensorMap: """ Load a previously saved :py:class:`TensorMap` from the given buffer using a custom array creation callback. This is an advanced functionality, which should not be needed by most users. This function allows to specify the kind of array to use when loading the data through the ``create_array`` callback. This callback should take three arguments: a pointer to the shape, the number of elements in the shape, and a pointer to the ``mts_array_t`` to be filled. :py:func:`metatensor.io.create_numpy_array` and :py:func:`metatensor.io.create_torch_array` can be used to load data into numpy and torch arrays respectively. :param buffer: in-memory buffer containing a saved :py:class:`TensorMap` :param create_array: callback used to create arrays as needed """ lib = _get_library() if isinstance(buffer, bytearray): char_array = ctypes.c_char * len(buffer) buffer = char_array.from_buffer(buffer) elif isinstance(buffer, memoryview): char_array = ctypes.c_char * len(buffer) # FIXME: we would prefer not to make a copy here, but ctypes does not support # passing a memory view to C, even if it is contiguous. # https://github.com/python/cpython/issues/60190 buffer = char_array.from_buffer_copy(buffer) ptr = lib.mts_tensormap_load_buffer( buffer, len(buffer), mts_create_array_callback_t(create_array), ) return TensorMap._from_ptr(ptr)
[docs] def load_labels(file: Union[str, pathlib.Path, BinaryIO]) -> Labels: """ Load previously saved :py:class:`Labels` from the given file. :param file: file to load: this can be a string, a :py:class:`pathlib.Path` containing the path to the file to load, or a file-like object opened in binary mode. """ if isinstance(file, (str, pathlib.Path)): lib = _get_library() if isinstance(file, str): path = file.encode("utf8") elif isinstance(path, pathlib.Path): path = bytes(path) labels = mts_labels_t() lib.mts_labels_load(path, labels) return Labels._from_mts_labels_t(labels) else: # assume we have a file-like object buffer = file.read() assert isinstance(buffer, bytes) return load_labels_buffer(buffer)
[docs] def load_labels_buffer(buffer: Union[bytes, bytearray, memoryview]) -> Labels: """ Load previously saved :py:class:`Labels` from an in-memory buffer. :param buffer: in-memory buffer containing saved :py:class:`Labels` """ lib = _get_library() if isinstance(buffer, bytearray): char_array = ctypes.c_char * len(buffer) buffer = char_array.from_buffer(buffer) elif isinstance(buffer, memoryview): char_array = ctypes.c_char * len(buffer) # FIXME: we would prefer not to make a copy here, but ctypes does not support # passing a memory view to C, even if it is contiguous. # https://github.com/python/cpython/issues/60190 buffer = char_array.from_buffer_copy(buffer) labels = mts_labels_t() lib.mts_labels_load_buffer(buffer, len(buffer), labels) return Labels._from_mts_labels_t(labels)
def _save_labels( file: Union[str, pathlib.Path, BinaryIO], labels: Labels, ): """ Save :py:class:`Labels` to the given file. :param file: where to save the data. This can be a string, :py:class:`pathlib.Path` containing the path to the file to load, or a file-like object that should be opened in binary mode. :param labels: Labels to save """ assert isinstance(labels, Labels) if isinstance(file, (str, pathlib.Path)): if not file.endswith(".npy"): file += ".npy" warnings.warn( message=f"adding '.npy' extension, the file will be saved at '{file}'", stacklevel=1, ) lib = _get_library() if isinstance(file, (str, pathlib.Path)): if isinstance(file, str): path = file.encode("utf8") elif isinstance(file, pathlib.Path): path = bytes(file) lib.mts_labels_save(path, labels._labels) else: # assume we have a file-like object buffer = _save_labels_buffer_raw(labels) file.write(buffer.raw) def _save_labels_buffer_raw(labels: Labels) -> ctypes.Array: """ Save Labels to an in-memory buffer, returning the data as a ctypes array of ``ctypes.c_char``. """ lib = _get_library() return _save_buffer_raw(lib.mts_labels_save_buffer, labels._labels) def _save_tensormap_buffer_raw(tensor: TensorMap) -> ctypes.Array: """ Save a TensorMap to an in-memory buffer, returning the data as a ctypes array of ``ctypes.c_char``. """ lib = _get_library() return _save_buffer_raw(lib.mts_tensormap_save_buffer, tensor._ptr) def _save_buffer_raw(mts_function, data) -> ctypes.Array: def realloc(buffer, _ptr, new_size): try: # convert void* to PyObject* and dereference to get a PyObject buffer = ctypes.cast(buffer, ctypes.POINTER(ctypes.py_object)) buffer = buffer.contents.value # resize the buffer to grow it ctypes.resize(buffer, new_size) buffer._length_ = new_size return ctypes.addressof(buffer) except Exception as e: # we don't want to propagate exceptions through C, so we catch anything # here, save the error and return a NULL pointer error = RuntimeError("failed to allocate more memory in realloc") error.__cause__ = e _save_exception(error) return None # start with a buffer of 128 bytes in a ctypes string buffer (i.e. array of c_char) # we will be able to resize the allocation in `realloc` above, but the type will # stay `array of 128 c_char elements`. buffer = ctypes.create_string_buffer(128) # store the initial pointer and buffer_size on the stack, they will be modified by # `mts_tensormap_save_buffer` buffer_ptr = ctypes.c_char_p(ctypes.addressof(buffer)) buffer_size = c_uintptr_t(buffer._length_) realloc_type = ctypes.CFUNCTYPE( ctypes.c_char_p, ctypes.c_void_p, ctypes.c_char_p, c_uintptr_t ) mts_function( buffer_ptr, buffer_size, # convert PyObject to void* to pass it to realloc ctypes.cast(ctypes.pointer(ctypes.py_object(buffer)), ctypes.c_void_p), realloc_type(realloc), data, ) # remove extra data from the buffer, resizing it to the number of written bytes # (stored in buffer_size by the mts_function) ctypes.resize(buffer, buffer_size.value) buffer._length_ = buffer_size.value return buffer def _save_tensor( file: Union[str, pathlib.Path, BinaryIO], tensor: TensorMap, use_numpy=False, ): assert isinstance(tensor, TensorMap) if isinstance(file, (str, pathlib.Path)): if not file.endswith(".npz"): file += ".npz" warnings.warn( message=f"adding '.npz' extension, the file will be saved at '{file}'", stacklevel=1, ) if use_numpy: all_entries = _tensor_map_to_dict(tensor) np.savez(file, **all_entries) else: lib = _get_library() if isinstance(file, (str, pathlib.Path)): if isinstance(file, str): path = file.encode("utf8") elif isinstance(file, pathlib.Path): path = bytes(file) lib.mts_tensormap_save(path, tensor._ptr) else: # assume we have a file-like object buffer = _save_tensormap_buffer_raw(tensor) file.write(buffer.raw) def _array_to_numpy(array): if _is_numpy_array(array): return array elif _is_torch_array(array): return array.cpu().numpy() else: raise ValueError("unknown array type passed to `metatensor.save`") def _tensor_map_to_dict(tensor_map): result = { "keys": _labels_to_npz(tensor_map.keys), } for block_i, block in enumerate(tensor_map.blocks()): prefix = f"blocks/{block_i}" result.update(_block_to_dict(block, prefix)) result[f"{prefix}/properties"] = _labels_to_npz(block.properties) return result def _block_to_dict(block, prefix): result = {} result[f"{prefix}/values"] = _array_to_numpy(block.values) result[f"{prefix}/samples"] = _labels_to_npz(block.samples) for i, component in enumerate(block.components): result[f"{prefix}/components/{i}"] = _labels_to_npz(component) for parameter, gradient in block.gradients(): result.update(_block_to_dict(gradient, f"{prefix}/gradients/{parameter}")) return result def _labels_from_npz(data): names = data.dtype.names return Labels(names=names, values=data.view(dtype=np.int32).reshape(-1, len(names))) def _labels_to_npz(labels): dtype = [(name, np.int32) for name in labels.names] return labels.values.view(dtype=dtype).reshape((labels.values.shape[0],)) def _read_npz(file): dictionary = np.load(file) keys = _labels_from_npz(dictionary["keys"]) blocks = [] for block_i in range(len(keys)): prefix = f"blocks/{block_i}" properties = _labels_from_npz(dictionary[f"{prefix}/properties"]) block = _read_block(prefix, dictionary, properties) blocks.append(block) return TensorMap(keys, blocks) def _read_block(prefix, dictionary, properties): values = dictionary[f"{prefix}/values"] samples = _labels_from_npz(dictionary[f"{prefix}/samples"]) components = [] for i in range(len(values.shape) - 2): components.append(_labels_from_npz(dictionary[f"{prefix}/components/{i}"])) block = TensorBlock(values, samples, components, properties) parameters = set() gradient_prefix = f"{prefix}/gradients/" for name in dictionary.keys(): if name.startswith(gradient_prefix) and name.endswith("/values"): parameter = name[len(gradient_prefix) :] parameter = parameter.split("/")[0] parameters.add(parameter) for parameter in parameters: gradient = _read_block( f"{prefix}/gradients/{parameter}", dictionary, properties, ) block.add_gradient(parameter, gradient) return block