#![type_length_limit = "2097152"] #![allow(clippy::cognitive_complexity, clippy::type_complexity)] #[cfg(feature = "constellation")] use constellation::*; use std::{ collections::HashMap, path::PathBuf, time::{Duration, SystemTime} }; use amadeus::dist::prelude::*; fn main() { if cfg!(miri) { return; } #[cfg(feature = "constellation")] init(Resources::default()); tokio::runtime::Builder::new() .threaded_scheduler() .enable_all() .build() .unwrap() .block_on(async { let thread_pool_time = { let thread_pool = ThreadPool::new(None, None).unwrap(); run(&thread_pool).await }; #[cfg(feature = "constellation")] let process_pool_time = { let process_pool = ProcessPool::new(None, None, None, Resources::default()).unwrap(); run(&process_pool).await }; #[cfg(not(feature = "constellation"))] let process_pool_time = "-"; println!("in {:?} {:?}", thread_pool_time, process_pool_time); }) } async fn run(pool: &P) -> Duration { let start = SystemTime::now(); let rows = Parquet::<_, Value>::new(ParquetDirectory::new(PathBuf::from( "amadeus-testing/parquet/cf-accesslogs/", ))) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) .count(pool) .await, 207_535 ); #[cfg(feature = "aws")] { let rows = Parquet::<_, Value>::new(vec![S3File::new_with(AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/year=2018/month=11/day=03/part-00137-17868f39-cd99-4b60-bb48-8daf9072122e.c000.snappy.parquet", AwsCredentials::Anonymous);20]).await.unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) .count(pool) .await, 45_167 * 20 ); let rows = Parquet::<_, Value>::new(ParquetDirectory::new(S3Directory::new_with( AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/", AwsCredentials::Anonymous, ))) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) .count(pool) .await, 207_535 ); let rows = Parquet::<_, Value>::new(vec![ S3File::new_with(AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/year=2018/month=11/day=02/part-00176-17868f39-cd99-4b60-bb48-8daf9072122e.c000.snappy.parquet", AwsCredentials::Anonymous), S3File::new_with(AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/year=2018/month=11/day=02/part-00176-ed461019-4a12-46fa-a3f3-246d58f0ee06.c000.snappy.parquet", AwsCredentials::Anonymous), S3File::new_with(AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/year=2018/month=11/day=03/part-00137-17868f39-cd99-4b60-bb48-8daf9072122e.c000.snappy.parquet", AwsCredentials::Anonymous), S3File::new_with(AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/year=2018/month=11/day=04/part-00173-17868f39-cd99-4b60-bb48-8daf9072122e.c000.snappy.parquet", AwsCredentials::Anonymous), S3File::new_with(AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/year=2018/month=11/day=05/part-00025-17868f39-cd99-4b60-bb48-8daf9072122e.c000.snappy.parquet", AwsCredentials::Anonymous), S3File::new_with(AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/year=2018/month=11/day=05/part-00025-96c249f4-3a10-4509-b6b8-693a5d90dbf3.c000.snappy.parquet", AwsCredentials::Anonymous), S3File::new_with(AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/year=2018/month=11/day=06/part-00185-96c249f4-3a10-4509-b6b8-693a5d90dbf3.c000.snappy.parquet", AwsCredentials::Anonymous), S3File::new_with(AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/year=2018/month=11/day=07/part-00151-96c249f4-3a10-4509-b6b8-693a5d90dbf3.c000.snappy.parquet", AwsCredentials::Anonymous), ]).await.unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) .count(pool) .await, 207_535 ); let rows = Parquet::<_, Value>::new(ParquetDirectory::new(S3Directory::new_with( AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/", AwsCredentials::Anonymous, ))) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) .count(pool) .await, 207_535 ); } #[derive(Data, Clone, PartialEq, Debug)] struct StockSimulatedDerived { bp1: Option, bp2: Option, bp3: Option, bp4: Option, bp5: Option, bs1: Option, bs2: Option, bs3: Option, bs4: Option, bs5: Option, ap1: Option, ap2: Option, ap3: Option, ap4: Option, ap5: Option, as1: Option, as2: Option, as3: Option, as4: Option, as5: Option, valid: Option, __index_level_0__: Option, } let rows = Parquet::<_, StockSimulatedDerived>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", )) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) .count(pool) .await, 42_000 ); let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", )) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result| -> Value { let value = row.unwrap(); let _: StockSimulatedDerived = value.clone().downcast().unwrap(); value })) .count(pool) .await, 42_000 ); #[derive(Data, Clone, PartialEq, Debug)] struct StockSimulatedDerivedProjection1 { bs5: Option, __index_level_0__: Option, } let rows = Parquet::<_, StockSimulatedDerivedProjection1>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", )) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) .count(pool) .await, 42_000 ); let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", )) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result| -> Value { let value = row.unwrap(); let _: StockSimulatedDerivedProjection1 = value.clone().downcast().unwrap(); value })) .count(pool) .await, 42_000 ); #[derive(Data, Clone, PartialEq, Debug)] struct StockSimulatedDerivedProjection2 {} let rows = Parquet::<_, StockSimulatedDerivedProjection2>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", )) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) .count(pool) .await, 42_000 ); let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", )) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result| -> Value { let value = row.unwrap(); let _: StockSimulatedDerivedProjection2 = value.clone().downcast().unwrap(); value })) .count(pool) .await, 42_000 ); type TenKayVeeTwo = ( List, i32, i64, bool, f32, f64, List, // [u8;1024], DateTime, ); #[derive(Data, Clone, PartialEq, Debug)] struct TenKayVeeTwoDerived { binary_field: List, int32_field: i32, int64_field: i64, boolean_field: bool, float_field: f32, double_field: f64, flba_field: List, // [u8;1024], int96_field: DateTime, } let rows = Parquet::<_, TenKayVeeTwo>::new(PathBuf::from("amadeus-testing/parquet/10k-v2.parquet")) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) .count(pool) .await, 10_000 ); let rows = Parquet::<_, TenKayVeeTwoDerived>::new(PathBuf::from( "amadeus-testing/parquet/10k-v2.parquet", )) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) .count(pool) .await, 10_000 ); let rows = Parquet::<_, Value>::new(PathBuf::from("amadeus-testing/parquet/10k-v2.parquet")) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result| -> Value { let value = row.unwrap(); let _: TenKayVeeTwo = value.clone().downcast().unwrap(); let _: TenKayVeeTwoDerived = value.clone().downcast().unwrap(); value })) .count(pool) .await, 10_000 ); type AlltypesDictionary = ( Option, Option, Option, Option, Option, Option, Option, Option, Option>, Option>, Option, ); #[derive(Data, Clone, PartialEq, Debug)] struct AlltypesDictionaryDerived { id: Option, bool_col: Option, tinyint_col: Option, smallint_col: Option, int_col: Option, bigint_col: Option, float_col: Option, double_col: Option, date_string_col: Option>, string_col: Option>, timestamp_col: Option, } let rows = Parquet::<_, AlltypesDictionary>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_dictionary.parquet", )) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) .count(pool) .await, 2 ); let rows = Parquet::<_, AlltypesDictionaryDerived>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_dictionary.parquet", )) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) .count(pool) .await, 2 ); let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_dictionary.parquet", )) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result| -> Value { let value = row.unwrap(); let _: AlltypesDictionary = value.clone().downcast().unwrap(); let _: AlltypesDictionaryDerived = value.clone().downcast().unwrap(); value })) .count(pool) .await, 2 ); type AlltypesPlain = ( Option, Option, Option, Option, Option, Option, Option, Option, Option>, Option>, Option, ); #[derive(Data, Clone, PartialEq, Debug)] struct AlltypesPlainDerived { id: Option, bool_col: Option, tinyint_col: Option, smallint_col: Option, int_col: Option, bigint_col: Option, float_col: Option, double_col: Option, date_string_col: Option>, string_col: Option>, timestamp_col: Option, } let rows = Parquet::<_, AlltypesPlain>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.parquet", )) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) .count(pool) .await, 8 ); let rows = Parquet::<_, AlltypesPlainDerived>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.parquet", )) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) .count(pool) .await, 8 ); let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.parquet", )) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result| -> Value { let value = row.unwrap(); let _: AlltypesPlain = value.clone().downcast().unwrap(); let _: AlltypesPlainDerived = value.clone().downcast().unwrap(); value })) .count(pool) .await, 8 ); type AlltypesPlainSnappy = ( Option, Option, Option, Option, Option, Option, Option, Option, Option>, Option>, Option, ); #[derive(Data, Clone, PartialEq, Debug)] struct AlltypesPlainSnappyDerived { id: Option, bool_col: Option, tinyint_col: Option, smallint_col: Option, int_col: Option, bigint_col: Option, float_col: Option, double_col: Option, date_string_col: Option>, string_col: Option>, timestamp_col: Option, } let rows = Parquet::<_, AlltypesPlainSnappy>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.snappy.parquet", )) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) .count(pool) .await, 2 ); let rows = Parquet::<_, AlltypesPlainSnappyDerived>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.snappy.parquet", )) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) .count(pool) .await, 2 ); let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.snappy.parquet", )) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result| -> Value { let value = row.unwrap(); let _: AlltypesPlainSnappy = value.clone().downcast().unwrap(); let _: AlltypesPlainSnappyDerived = value.clone().downcast().unwrap(); value })) .count(pool) .await, 2 ); // TODO // type NationDictMalformed = (Option, Option>, Option, Option>); // let rows = Parquet::<_,NationDictMalformed>::new(vec![PathBuf::from( // "amadeus-testing/parquet/nation.dict-malformed.parquet", // )]).await.unwrap(); // assert_eq!( // rows.dist_stream().collect::>(pool), // [Err(amadeus::source::parquet::Error::Parquet( // amadeus_parquet::internal::errors::ParquetError::General(String::from( // "underlying IO error: failed to fill whole buffer" // )) // ))] // ); // let rows = Parquet::<_,Value>::new(vec![PathBuf::from( // "amadeus-testing/parquet/nation.dict-malformed.parquet", // )]).await.unwrap(); // assert_eq!( // rows.dist_stream().collect::>(pool), // [Err(amadeus::source::parquet::Error::Parquet( // amadeus_parquet::internal::errors::ParquetError::General(String::from( // "underlying IO error: failed to fill whole buffer" // )) // ))] // ); type NestedLists = ( Option>>>>>>, i32, ); #[derive(Data, Clone, PartialEq, Debug)] struct NestedListsDerived { a: Option>>>>>>, b: i32, } let rows = Parquet::<_, NestedLists>::new(PathBuf::from( "amadeus-testing/parquet/nested_lists.snappy.parquet", )) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) .count(pool) .await, 3 ); let rows = Parquet::<_, NestedListsDerived>::new(PathBuf::from( "amadeus-testing/parquet/nested_lists.snappy.parquet", )) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) .count(pool) .await, 3 ); let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nested_lists.snappy.parquet", )) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result| -> Value { let value = row.unwrap(); let _: NestedLists = value.clone().downcast().unwrap(); let _: NestedListsDerived = value.clone().downcast().unwrap(); value })) .count(pool) .await, 3 ); type NestedMaps = ( Option>>>, i32, f64, ); #[derive(Data, Clone, PartialEq, Debug)] struct NestedMapsDerived { a: Option>>>, b: i32, c: f64, } let rows = Parquet::<_, NestedMaps>::new(PathBuf::from( "amadeus-testing/parquet/nested_maps.snappy.parquet", )) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) .count(pool) .await, 6 ); let rows = Parquet::<_, NestedMapsDerived>::new(PathBuf::from( "amadeus-testing/parquet/nested_maps.snappy.parquet", )) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) .count(pool) .await, 6 ); let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nested_maps.snappy.parquet", )) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result| -> Value { let value = row.unwrap(); let _: NestedMaps = value.clone().downcast().unwrap(); let _: NestedMapsDerived = value.clone().downcast().unwrap(); value })) .count(pool) .await, 6 ); type Nonnullable = ( i64, List, List>, HashMap, List>, ( i32, List, (List>,), HashMap,),)>, ), ); #[derive(Data, Clone, PartialEq, Debug)] struct NonnullableDerived { #[amadeus(name = "ID")] id: i64, #[amadeus(name = "Int_Array")] int_array: List, int_array_array: List>, #[amadeus(name = "Int_Map")] int_map: HashMap, int_map_array: List>, #[amadeus(name = "nested_Struct")] nested_struct: NonnullableDerivedInner, } #[derive(Data, Clone, PartialEq, Debug)] struct NonnullableDerivedInner { a: i32, #[amadeus(name = "B")] b: List, c: NonnullableDerivedInnerInner, #[amadeus(name = "G")] g: HashMap,),)>, } #[derive(Data, Clone, PartialEq, Debug)] struct NonnullableDerivedInnerInner { #[amadeus(name = "D")] d: List>, } #[derive(Data, Clone, PartialEq, Debug)] struct NonnullableDerivedInnerInnerInner { e: i32, f: String, } let rows = Parquet::<_, Nonnullable>::new(PathBuf::from( "amadeus-testing/parquet/nonnullable.impala.parquet", )) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) .count(pool) .await, 1 ); let rows = Parquet::<_, NonnullableDerived>::new(PathBuf::from( "amadeus-testing/parquet/nonnullable.impala.parquet", )) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) .count(pool) .await, 1 ); let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nonnullable.impala.parquet", )) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result| -> Value { let value = row.unwrap(); let _: Nonnullable = value.clone().downcast().unwrap(); let _: NonnullableDerived = value.clone().downcast().unwrap(); value })) .count(pool) .await, 1 ); type Nullable = ( Option, Option>>, Option>>>>, Option>>, Option>>>>, Option<( Option, Option>>, Option<(Option, Option)>>>>>,)>, Option>>,)>,)>>>, )>, ); #[derive(Data, Clone, PartialEq, Debug)] struct NullableDerived { id: Option, int_array: Option>>, #[amadeus(name = "int_array_Array")] int_array_array: Option>>>>, int_map: Option>>, #[amadeus(name = "int_Map_Array")] int_map_array: Option>>>>, nested_struct: Option<( Option, Option>>, Option<(Option, Option)>>>>>,)>, Option>>,)>,)>>>, )>, } let rows = Parquet::<_, Nullable>::new(PathBuf::from( "amadeus-testing/parquet/nullable.impala.parquet", )) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) .count(pool) .await, 7 ); let rows = Parquet::<_, NullableDerived>::new(PathBuf::from( "amadeus-testing/parquet/nullable.impala.parquet", )) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) .count(pool) .await, 7 ); let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nullable.impala.parquet", )) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result| -> Value { let value = row.unwrap(); let _: Nullable = value.clone().downcast().unwrap(); let _: NullableDerived = value.clone().downcast().unwrap(); value })) .count(pool) .await, 7 ); type Nulls = (Option<(Option,)>,); #[derive(Data, Clone, PartialEq, Debug)] struct NullsDerived { b_struct: Option<(Option,)>, } let rows = Parquet::<_, Nulls>::new(PathBuf::from( "amadeus-testing/parquet/nulls.snappy.parquet", )) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) .count(pool) .await, 8 ); let rows = Parquet::<_, NullsDerived>::new(PathBuf::from( "amadeus-testing/parquet/nulls.snappy.parquet", )) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) .count(pool) .await, 8 ); let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nulls.snappy.parquet", )) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result| -> Value { let value = row.unwrap(); let _: Nulls = value.clone().downcast().unwrap(); let _: NullsDerived = value.clone().downcast().unwrap(); value })) .count(pool) .await, 8 ); type Repeated = (i32, Option<(List<(i64, Option)>,)>); #[derive(Data, Clone, PartialEq, Debug)] struct RepeatedDerived { id: i32, #[amadeus(name = "phoneNumbers")] phone_numbers: Option<(List<(i64, Option)>,)>, } let rows = Parquet::<_, Repeated>::new(PathBuf::from( "amadeus-testing/parquet/repeated_no_annotation.parquet", )) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) .count(pool) .await, 6 ); let rows = Parquet::<_, RepeatedDerived>::new(PathBuf::from( "amadeus-testing/parquet/repeated_no_annotation.parquet", )) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) .count(pool) .await, 6 ); let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/repeated_no_annotation.parquet", )) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result| -> Value { let value = row.unwrap(); let _: Repeated = value.clone().downcast().unwrap(); let _: RepeatedDerived = value.clone().downcast().unwrap(); value })) .count(pool) .await, 6 ); type TestDatapage = (Option, i32, f64, bool, Option>); #[derive(Data, Clone, PartialEq, Debug)] struct TestDatapageDerived { a: Option, b: i32, c: f64, d: bool, e: Option>, } let rows = Parquet::<_, TestDatapage>::new(PathBuf::from( "amadeus-testing/parquet/datapage_v2.snappy.parquet", )) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) .count(pool) .await, 5 ); let rows = Parquet::<_, TestDatapageDerived>::new(PathBuf::from( "amadeus-testing/parquet/datapage_v2.snappy.parquet", )) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) .count(pool) .await, 5 ); let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/datapage_v2.snappy.parquet", )) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result| -> Value { let value = row.unwrap(); let _: TestDatapage = value.clone().downcast().unwrap(); let _: TestDatapageDerived = value.clone().downcast().unwrap(); value })) .count(pool) .await, 5 ); #[derive(Data, Clone, PartialEq, Debug)] struct CommitsDerived { id: Option, delay: Option, age: Option, ismerge: Option, squashof: Option, author_name: Option, author_email: Option, committer_name: Option, committer_email: Option, author_time: Option, committer_time: Option, loc_d: Option, loc_i: Option, comp_d: Option, comp_i: Option, nfiles: Option, message: Option, ndiffs: Option, author_email_dedup: Option, author_name_dedup: Option, committer_email_dedup: Option, committer_name_dedup: Option, __index_level_0__: Option, } let rows = Parquet::<_, CommitsDerived>::new(PathBuf::from("amadeus-testing/parquet/commits.parquet")) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result<_, _>| row.unwrap())) .count(pool) .await, 14_444 ); let rows = Parquet::<_, Value>::new(PathBuf::from("amadeus-testing/parquet/commits.parquet")) .await .unwrap(); assert_eq!( rows.dist_stream() .map(FnMut!(|row: Result| -> Value { let value = row.unwrap(); let _: CommitsDerived = value.clone().downcast().unwrap(); value })) .count(pool) .await, 14_444 ); start.elapsed().unwrap() }