import constriction import numpy as np import scipy.stats def test_queue_gaussian(): encoder = constriction.stream.queue.RangeEncoder() model = constriction.stream.model.QuantizedGaussian(-100, 100) symbols = np.array([23, -15, 78, 43, -69], dtype=np.int32) means = np.array([35.2, -1.7, 30.1, 71.2, -75.1], dtype=np.float64) stds = np.array([10.1, 25.3, 23.8, 35.4, 3.9], dtype=np.float64) encoder.encode(symbols, model, means, stds) assert encoder.num_bits() == 64 compressed = encoder.get_compressed() print(compressed) assert np.all(compressed == np.array( [473034731, 2276733146], dtype=np.uint32)) decoder1 = constriction.stream.queue.RangeDecoder(compressed) reconstructed1 = decoder1.decode(model, means, stds) assert decoder1.maybe_exhausted() assert np.all(reconstructed1 == symbols) decoder2 = encoder.get_decoder() reconstructed2 = decoder2.decode(model, means, stds) assert decoder2.maybe_exhausted() assert np.all(reconstructed2 == symbols) def test_stack_gaussian(): encoder = constriction.stream.stack.AnsCoder() model = constriction.stream.model.QuantizedGaussian(-100, 100) symbols = np.array([23, -15, 78, 43, -69], dtype=np.int32) means = np.array([35.2, -1.7, 30.1, 71.2, -75.1], dtype=np.float64) stds = np.array([10.1, 25.3, 23.8, 35.4, 3.9], dtype=np.float64) encoder.encode_reverse(symbols, model, means, stds) assert encoder.num_bits() == 64 assert encoder.num_valid_bits() == 51 compressed = encoder.get_compressed() assert np.all(compressed == np.array( [1109163715, 757457], dtype=np.uint32)) decoder1 = constriction.stream.stack.AnsCoder(compressed) reconstructed1 = decoder1.decode(model, means, stds) assert decoder1.is_empty() assert np.all(reconstructed1 == symbols) decoder2 = encoder reconstructed2 = decoder2.decode(model, means, stds) assert decoder2.is_empty() assert np.all(reconstructed2 == symbols) def test_chain_gaussian(): rng = np.random.RandomState(123) original_data = rng.randint(2**32, size=100, dtype=np.uint32) decoder = constriction.stream.chain.ChainCoder(original_data, seal=True) model = constriction.stream.model.QuantizedGaussian(-100, 100) means = np.arange(50, dtype=np.float64) stds = np.array([10.0] * 50, dtype=np.float64) symbols = decoder.decode(model, means, stds) remainders_prefix, remainders_suffix = decoder.get_remainders() print(len(remainders_prefix), len(remainders_suffix), len(original_data)) assert len(remainders_prefix) + len(remainders_suffix) < len(original_data) # Variant 1: treat `remainders_prefix` and `remainders_suffix` separately encoder1 = constriction.stream.chain.ChainCoder( remainders_suffix, is_remainders=True) encoder1.encode_reverse(symbols, model, means, stds) recovered_prefix1, recovered_suffix1 = encoder1.get_data(unseal=True) print(len(recovered_prefix1), len(recovered_suffix1), len(original_data)) assert len(recovered_prefix1) == 0 recovered1 = np.concatenate((remainders_prefix, recovered_suffix1)) assert np.all(recovered1 == original_data) # Variant 2: concatenate `remainders_prefix` and `remainders_suffix` remainders = np.concatenate((remainders_prefix, remainders_suffix)) encoder2 = constriction.stream.chain.ChainCoder( remainders, is_remainders=True) encoder2.encode_reverse(symbols, model, means, stds) recovered_prefix2, recovered_suffix2 = encoder2.get_data(unseal=True) print(len(recovered_prefix2), len(recovered_suffix2), len(original_data)) recovered2 = np.concatenate((recovered_prefix2, recovered_suffix2)) assert np.all(recovered2 == original_data) # Variant 3: directly re-encode onto original coder encoder3 = decoder encoder3.encode_reverse(symbols, model, means, stds) recovered_prefix3, recovered_suffix3 = encoder3.get_data(unseal=True) print(len(recovered_prefix3), len(recovered_suffix3), len(original_data)) assert len(recovered_prefix3) == 0 assert np.all(recovered_suffix3 == original_data) def test_chain_independence(): data = np.array([0x80d1_4131, 0xdda9_7c6c, 0x5017_a640, 0x0117_0a3e], np.uint32) probabilities = np.array([ [0.1, 0.7, 0.1, 0.1], [0.2, 0.2, 0.1, 0.5], [0.2, 0.1, 0.4, 0.3], ]) model = constriction.stream.model.Categorical(perfect=False) ansCoder = constriction.stream.stack.AnsCoder(data, True) assert np.all(ansCoder.decode(model, probabilities) == [0, 0, 2]) probabilities[0, :] = np.array([0.09, 0.71, 0.1, 0.1]) ansCoder = constriction.stream.stack.AnsCoder(data, True) assert np.all(ansCoder.decode(model, probabilities) == [1, 0, 0]) probabilities[0, :] = np.array([0.1, 0.7, 0.1, 0.1]) chainCoder = constriction.stream.chain.ChainCoder(data, False, True) assert np.all(chainCoder.decode(model, probabilities) == [0, 3, 3]) probabilities[0, :] = np.array([0.09, 0.71, 0.1, 0.1]) chainCoder = constriction.stream.chain.ChainCoder(data, False, True) assert np.all(chainCoder.decode(model, probabilities) == [1, 3, 3]) def test_custom_model(): # Begin sketch new test -------------------------------------- import constriction import numpy as np import scipy.stats # Encode non-iid symbols: model_py = scipy.stats.norm model = constriction.stream.model.ScipyModel(model_py, -100, 100) symbols = np.array([-10, 3, 12], dtype=np.int32) means = np.array([-5.2, 5.4, 10], dtype=np.float64) stds = np.array([3.2, 5.3, 9.4], dtype=np.float64) encoder = constriction.stream.queue.RangeEncoder() encoder.encode(symbols, model, means, stds) compressed = encoder.get_compressed() decoder = constriction.stream.queue.RangeDecoder(compressed) decoded = decoder.decode(model, means, stds) print(decoded) assert np.all(decoded == symbols) # Encode iid symbols: model_py = scipy.stats.norm(10.3, 30.5) model = constriction.stream.model.ScipyModel(model_py, -100, 100) symbols = np.array([-15, 33, 22], dtype=np.int32) encoder = constriction.stream.queue.RangeEncoder() encoder.encode(symbols, model) compressed = encoder.get_compressed() decoder = constriction.stream.queue.RangeDecoder(compressed) decoded = decoder.decode(model, 3) print(decoded) assert np.all(decoded == symbols) # Encode non-iid symbols with native model: model = constriction.stream.model.QuantizedGaussian(-100, 100) symbols = np.array([-15, 33, 22], dtype=np.int32) encoder = constriction.stream.queue.RangeEncoder() encoder.encode(symbols, model, means, stds) compressed = encoder.get_compressed() decoder = constriction.stream.queue.RangeDecoder(compressed) decoded = decoder.decode(model, means, stds) print(decoded) assert np.all(decoded == symbols) # Encode iid symbols with native model: model = constriction.stream.model.QuantizedGaussian(-100, 100, 2.1, 3.5) symbols = np.array([-15, 33, 22], dtype=np.int32) encoder = constriction.stream.queue.RangeEncoder() encoder.encode(symbols, model) compressed = encoder.get_compressed() decoder = constriction.stream.queue.RangeDecoder(compressed) decoded = decoder.decode(model, 3) print(decoded) assert np.all(decoded == symbols) # Encode non-iid symbols with native model: symbols = np.array([15, 33, 22], dtype=np.int32) ns = np.array([20, 53, 42], dtype=np.int32) ps = np.array([0.6, 0.7, 0.5], dtype=np.float64) model = constriction.stream.model.Binomial() encoder = constriction.stream.queue.RangeEncoder() encoder.encode(symbols, model, ns, ps) compressed = encoder.get_compressed() decoder = constriction.stream.queue.RangeDecoder(compressed) decoded = decoder.decode(model, ns, ps) print(decoded) assert np.all(decoded == symbols) # Encode non-iid symbols with native model: model = constriction.stream.model.Binomial(100) encoder = constriction.stream.queue.RangeEncoder() encoder.encode(symbols, model, ps) compressed = encoder.get_compressed() decoder = constriction.stream.queue.RangeDecoder(compressed) decoded = decoder.decode(model, ps) print(decoded) assert np.all(decoded == symbols) # Encode iid symbols with native model: model = constriction.stream.model.Binomial(40, 0.5) encoder = constriction.stream.queue.RangeEncoder() encoder.encode(symbols, model) compressed = encoder.get_compressed() decoder = constriction.stream.queue.RangeDecoder(compressed) decoded = decoder.decode(model, 3) print(decoded) assert np.all(decoded == symbols) # End sketch new test ---------------------------------------- symbols = np.array([3, 2, 6, -51, -19, 5, 87], dtype=np.int32) model_py = scipy.stats.norm(1.2, 4.9) model_iid = constriction.stream.model.CustomModel( model_py.cdf, model_py.ppf, -100, 100) model_parameters_iid1 = np.array([1.2]*len(symbols), dtype=np.float64) model_parameters_iid2 = np.array([4.9]*len(symbols), dtype=np.float64) model_parameters1 = np.array([s for s in symbols], dtype=np.float64) model_parameters2 = np.array([4.9]*len(symbols), dtype=np.float64) model = constriction.stream.model.CustomModel( lambda x, loc, scale: scipy.stats.norm.cdf(x, loc, scale), scipy.stats.norm.ppf, # (try providing member function as callback.) -100, 100) def test_coder(Encoder, Decoder, encode_iid, encode, expected_compressed_iid, expected_compressed): expected_compressed = np.array(expected_compressed, dtype=np.uint32) expected_compressed_iid = np.array( expected_compressed_iid, dtype=np.uint32) # Encode and decode i.i.d. symbols encoder = Encoder() encode_iid(encoder, symbols, model_iid) compressed = encoder.get_compressed() print(compressed) assert np.all(compressed == expected_compressed_iid) decoder = Decoder(compressed) reconstructed = decoder.decode(model_iid, len(symbols)) assert np.all(reconstructed == symbols) # Encode and decode i.i.d. symbols, but with parameterized custom model. encoder = Encoder() encode(encoder, symbols, model, model_parameters_iid1, model_parameters_iid2) compressed = encoder.get_compressed() print(compressed) assert np.all(compressed == expected_compressed_iid) decoder = Decoder(compressed) reconstructed = decoder.decode( model, model_parameters_iid1, model_parameters_iid2) assert np.all(reconstructed == symbols) # Encode and decode non-i.i.d. symbols. encoder = Encoder() encode(encoder, symbols, model, model_parameters1, model_parameters2) compressed = encoder.get_compressed() print(compressed) assert np.all(compressed == expected_compressed) decoder = Decoder(compressed) reconstructed = decoder.decode( model, model_parameters1, model_parameters2) assert np.all(reconstructed == symbols) test_coder( constriction.stream.stack.AnsCoder, constriction.stream.stack.AnsCoder, lambda encoder, symbols, model: encoder.encode_reverse( symbols, model), lambda encoder, symbols, model, params1, params2: encoder.encode_reverse( symbols, model, params1, params2), [3187671595, 2410106987, 48580], [3397926478, 6042]) test_coder( constriction.stream.queue.RangeEncoder, constriction.stream.queue.RangeDecoder, lambda encoder, symbols, model: encoder.encode( symbols, model), lambda encoder, symbols, model, params1, params2: encoder.encode( symbols, model, params1, params2), [2789142295, 3128556965, 414280666], [2147484271]) def test_custom_model_probing_range(): # See issue 27. def cdf(x, mu, sigma): assert x >= 0 return scipy.stats.lognorm.cdf(x, mu, sigma) def inverse_cdf(q, mu, sigma): return scipy.stats.lognorm.ppf(q, mu, sigma) rng = np.random.RandomState(20230716) mus = rng.randn(100) sigmas = rng.randn(100)**2 +1 dummy_entropy_model = constriction.stream.model.CustomModel(cdf, inverse_cdf, 0, 10) message = (rng.randn(100)**2).round().astype(np.int32) coder = constriction.stream.stack.AnsCoder() coder.encode_reverse(message, dummy_entropy_model, mus, sigmas) decoded = coder.decode(dummy_entropy_model, mus, sigmas) assert np.all(decoded == message) def test_huffman_queue(): probabilities = np.array([0.3, 0.28, 0.12, 0.1, 0.2], dtype=np.float64) symbols = [1, 3, 2, 4, 0, 1, 4, 0, 2, 1] encoder = constriction.symbol.QueueEncoder() encoder_codebook = constriction.symbol.huffman.EncoderHuffmanTree( probabilities) for symbol in symbols: encoder.encode_symbol(symbol, encoder_codebook) compressed, compressed_len = encoder.get_compressed() print(compressed, compressed_len) assert compressed_len == 23 assert np.all(compressed == np.array([3873993], dtype=np.uint32)) decoder = encoder.get_decoder() decoder_codebook = constriction.symbol.huffman.DecoderHuffmanTree( probabilities) reconstructed = [decoder.decode_symbol( decoder_codebook) for _ in range(len(symbols))] assert reconstructed == symbols