# Integration of Reductionist and PyActiveStorage Reductionist has been integrated with the [PyActiveStorage](https://github.com/valeriupredoi/PyActiveStorage) library, and PyActiveStorage acts as a client of the Reductionist server. PyActiveStorage currently works with data in netCDF4 format, and is able to perform reductions on a variable within such a dataset. Numerical operations are performed on individual storage chunks, with the results later aggregated. The original POSIX/NumPy storage chunk reduction in PyActiveStorage is implementated in a `reduce_chunk` Python function in `activestorage/storage.py`, and this interface was used as the basis for the integration of Reductionist. The following code snippet shows the `reduce_chunk` function signature. ```python def reduce_chunk(rfile, offset, size, compression, filters, missing, dtype, shape, order, chunk_selection, method=None): """ We do our own read of chunks and decoding etc rfile - the actual file with the data offset, size - where and what we want ... compression - optional `numcodecs.abc.Codec` compression codec filters - optional list of `numcodecs.abc.Codec` filter codecs dtype - likely float32 in most cases. shape - will be a tuple, something like (3,3,1), this is the dimensionality of the chunk itself order - typically 'C' for c-type ordering chunk_selection - python slice tuples for each dimension, e.g. (slice(0, 2, 1), slice(1, 3, 1), slice(0, 1, 1)) this defines the part of the chunk which is to be obtained or operated upon. method - computation desired (in this Python version it's an actual method, in storage implementations we'll change to controlled vocabulary) """ ``` For Reductionist, the `reduce_chunk` function signature in `activestorage/reductionist.py` is similar, but replaces the local file path with a `requests.Session` object, the Reductionist server URL, S3-compatible object store URL, and the bucket and object containing the data. ```python def reduce_chunk(session, server, source, bucket, object, offset, size, compression, filters, missing, dtype, shape, order, chunk_selection, operation): """Perform a reduction on a chunk using Reductionist. :param session: requests.Session object :param server: Reductionist server URL :param source: S3 URL :param bucket: S3 bucket :param object: S3 object :param offset: offset of data in object :param size: size of data in object :param compression: optional `numcodecs.abc.Codec` compression codec :param filters: optional list of `numcodecs.abc.Codec` filter codecs :param missing: optional 4-tuple describing missing data :param dtype: numpy data type :param shape: will be a tuple, something like (3,3,1), this is the dimensionality of the chunk itself :param order: typically 'C' for c-type ordering :param chunk_selection: N-tuple where N is the length of `shape`, and each item is an integer or slice. e.g. (slice(0, 2, 1), slice(1, 3, 1), slice(0, 1, 1)) this defines the part of the chunk which is to be obtained or operated upon. :param operation: name of operation to perform :returns: the reduced data as a numpy array or scalar :raises ReductionistError: if the request to Reductionist fails """ ``` Within the `reduce_chunk` implementation for Reductionist, the following steps are taken: * build Reductionist API request data * build Reductionist API URL * perform an HTTP(S) POST request to Reductionist * on success, return a NumPy array containing the data in the response payload, with data type, shape and count determined by response headers * on failure, raise a `ReductionistError` with the response status code and JSON encoded error response The use of a `requests.Session` object allows for TCP connection pooling, reducing connection overhead when multiple requests are made within a short timeframe. It should be possible to provide a unified interface to storage systems by abstracting away the details of the storage system and data source, but this has not yet been done. Other changes to the main `activestorage.Active` class were necessary for integration of Reductionist. These include: * Support for reading netCDF metadata from files stored in S3 using the [s3fs](https://s3fs.readthedocs.io/) and [h5netcdf](https://pypi.org/project/h5netcdf/) libraries * Configuration options in `activestorage/config.py` to specify the Reductionist API URL, S3-compatible object store URL, S3 access key, secret key and bucket * Constructor `storage_type` argument for `activestorage.Active` to specify the storage backend * Use of a thread pool to execute storage chunk reductions in parallel * Unit tests to cover new and modified code * Integration test changes to allow running against a POSIX or S3 storage backend