#![type_length_limit = "1572864"] #![allow( clippy::cognitive_complexity, clippy::type_complexity, clippy::suspicious_map )] use std::{collections::HashMap, path::PathBuf, time::SystemTime}; use amadeus::prelude::*; #[tokio::test(threaded_scheduler)] #[cfg_attr(miri, ignore)] async fn parquet() { let start = SystemTime::now(); let pool = &ThreadPool::new(None, None).unwrap(); let rows = Parquet::<_, Value>::new(vec![ PathBuf::from("amadeus-testing/parquet/cf-accesslogs/year=2018/month=11/day=02/part-00176-17868f39-cd99-4b60-bb48-8daf9072122e.c000.snappy.parquet"), PathBuf::from("amadeus-testing/parquet/cf-accesslogs/year=2018/month=11/day=02/part-00176-ed461019-4a12-46fa-a3f3-246d58f0ee06.c000.snappy.parquet"), PathBuf::from("amadeus-testing/parquet/cf-accesslogs/year=2018/month=11/day=03/part-00137-17868f39-cd99-4b60-bb48-8daf9072122e.c000.snappy.parquet"), PathBuf::from("amadeus-testing/parquet/cf-accesslogs/year=2018/month=11/day=04/part-00173-17868f39-cd99-4b60-bb48-8daf9072122e.c000.snappy.parquet"), PathBuf::from("amadeus-testing/parquet/cf-accesslogs/year=2018/month=11/day=05/part-00025-17868f39-cd99-4b60-bb48-8daf9072122e.c000.snappy.parquet"), PathBuf::from("amadeus-testing/parquet/cf-accesslogs/year=2018/month=11/day=05/part-00025-96c249f4-3a10-4509-b6b8-693a5d90dbf3.c000.snappy.parquet"), PathBuf::from("amadeus-testing/parquet/cf-accesslogs/year=2018/month=11/day=06/part-00185-96c249f4-3a10-4509-b6b8-693a5d90dbf3.c000.snappy.parquet"), PathBuf::from("amadeus-testing/parquet/cf-accesslogs/year=2018/month=11/day=07/part-00151-96c249f4-3a10-4509-b6b8-693a5d90dbf3.c000.snappy.parquet"), ]).await.unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) .count(pool) .await, 207_535 ); let rows = Parquet::<_, Value>::new(ParquetDirectory::new(PathBuf::from( "amadeus-testing/parquet/cf-accesslogs/", ))) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) .count(pool) .await, 207_535 ); #[cfg(feature = "aws")] { let rows = Parquet::<_, Value>::new(vec![S3File::new_with(AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/year=2018/month=11/day=03/part-00137-17868f39-cd99-4b60-bb48-8daf9072122e.c000.snappy.parquet", AwsCredentials::Anonymous);20]).await.unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) .count(pool) .await, 45_167 * 20 ); let rows = Parquet::<_, Value>::new(ParquetDirectory::new(S3Directory::new_with( AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/", AwsCredentials::Anonymous, ))) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) .count(pool) .await, 207_535 ); let rows = Parquet::<_, Value>::new(vec![ S3File::new_with(AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/year=2018/month=11/day=02/part-00176-17868f39-cd99-4b60-bb48-8daf9072122e.c000.snappy.parquet", AwsCredentials::Anonymous), S3File::new_with(AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/year=2018/month=11/day=02/part-00176-ed461019-4a12-46fa-a3f3-246d58f0ee06.c000.snappy.parquet", AwsCredentials::Anonymous), S3File::new_with(AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/year=2018/month=11/day=03/part-00137-17868f39-cd99-4b60-bb48-8daf9072122e.c000.snappy.parquet", AwsCredentials::Anonymous), S3File::new_with(AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/year=2018/month=11/day=04/part-00173-17868f39-cd99-4b60-bb48-8daf9072122e.c000.snappy.parquet", AwsCredentials::Anonymous), S3File::new_with(AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/year=2018/month=11/day=05/part-00025-17868f39-cd99-4b60-bb48-8daf9072122e.c000.snappy.parquet", AwsCredentials::Anonymous), S3File::new_with(AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/year=2018/month=11/day=05/part-00025-96c249f4-3a10-4509-b6b8-693a5d90dbf3.c000.snappy.parquet", AwsCredentials::Anonymous), S3File::new_with(AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/year=2018/month=11/day=06/part-00185-96c249f4-3a10-4509-b6b8-693a5d90dbf3.c000.snappy.parquet", AwsCredentials::Anonymous), S3File::new_with(AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/year=2018/month=11/day=07/part-00151-96c249f4-3a10-4509-b6b8-693a5d90dbf3.c000.snappy.parquet", AwsCredentials::Anonymous), ]).await.unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) .count(pool) .await, 207_535 ); let rows = Parquet::<_, Value>::new(ParquetDirectory::new(S3Directory::new_with( AwsRegion::UsEast1, "us-east-1.data-analytics", "cflogworkshop/optimized/cf-accesslogs/", AwsCredentials::Anonymous, ))) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) .count(pool) .await, 207_535 ); } #[derive(Data, Clone, PartialEq, Debug)] struct StockSimulatedDerived { bp1: Option, bp2: Option, bp3: Option, bp4: Option, bp5: Option, bs1: Option, bs2: Option, bs3: Option, bs4: Option, bs5: Option, ap1: Option, ap2: Option, ap3: Option, ap4: Option, ap5: Option, as1: Option, as2: Option, as3: Option, as4: Option, as5: Option, valid: Option, __index_level_0__: Option, } let rows = Parquet::<_, StockSimulatedDerived>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", )) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) .count(pool) .await, 42_000 ); let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", )) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result| -> Value { let value = row.unwrap(); let _: StockSimulatedDerived = value.clone().downcast().unwrap(); value }) .count(pool) .await, 42_000 ); #[derive(Data, Clone, PartialEq, Debug)] struct StockSimulatedDerivedProjection1 { bs5: Option, __index_level_0__: Option, } let rows = Parquet::<_, StockSimulatedDerivedProjection1>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", )) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) .count(pool) .await, 42_000 ); let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", )) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result| -> Value { let value = row.unwrap(); let _: StockSimulatedDerivedProjection1 = value.clone().downcast().unwrap(); value }) .count(pool) .await, 42_000 ); #[derive(Data, Clone, PartialEq, Debug)] struct StockSimulatedDerivedProjection2 {} let rows = Parquet::<_, StockSimulatedDerivedProjection2>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", )) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) .count(pool) .await, 42_000 ); let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/stock_simulated.parquet", )) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result| -> Value { let value = row.unwrap(); let _: StockSimulatedDerivedProjection2 = value.clone().downcast().unwrap(); value }) .count(pool) .await, 42_000 ); type TenKayVeeTwo = ( List, i32, i64, bool, f32, f64, List, // [u8;1024], DateTime, ); #[derive(Data, Clone, PartialEq, Debug)] struct TenKayVeeTwoDerived { binary_field: List, int32_field: i32, int64_field: i64, boolean_field: bool, float_field: f32, double_field: f64, flba_field: List, // [u8;1024], int96_field: DateTime, } let rows = Parquet::<_, TenKayVeeTwo>::new(PathBuf::from("amadeus-testing/parquet/10k-v2.parquet")) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) .count(pool) .await, 10_000 ); let rows = Parquet::<_, TenKayVeeTwoDerived>::new(PathBuf::from( "amadeus-testing/parquet/10k-v2.parquet", )) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) .count(pool) .await, 10_000 ); let rows = Parquet::<_, Value>::new(PathBuf::from("amadeus-testing/parquet/10k-v2.parquet")) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result| -> Value { let value = row.unwrap(); let _: TenKayVeeTwo = value.clone().downcast().unwrap(); let _: TenKayVeeTwoDerived = value.clone().downcast().unwrap(); value }) .count(pool) .await, 10_000 ); type AlltypesDictionary = ( Option, Option, Option, Option, Option, Option, Option, Option, Option>, Option>, Option, ); #[derive(Data, Clone, PartialEq, Debug)] struct AlltypesDictionaryDerived { id: Option, bool_col: Option, tinyint_col: Option, smallint_col: Option, int_col: Option, bigint_col: Option, float_col: Option, double_col: Option, date_string_col: Option>, string_col: Option>, timestamp_col: Option, } let rows = Parquet::<_, AlltypesDictionary>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_dictionary.parquet", )) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) .count(pool) .await, 2 ); let rows = Parquet::<_, AlltypesDictionaryDerived>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_dictionary.parquet", )) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) .count(pool) .await, 2 ); let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_dictionary.parquet", )) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result| -> Value { let value = row.unwrap(); let _: AlltypesDictionary = value.clone().downcast().unwrap(); let _: AlltypesDictionaryDerived = value.clone().downcast().unwrap(); value }) .count(pool) .await, 2 ); type AlltypesPlain = ( Option, Option, Option, Option, Option, Option, Option, Option, Option>, Option>, Option, ); #[derive(Data, Clone, PartialEq, Debug)] struct AlltypesPlainDerived { id: Option, bool_col: Option, tinyint_col: Option, smallint_col: Option, int_col: Option, bigint_col: Option, float_col: Option, double_col: Option, date_string_col: Option>, string_col: Option>, timestamp_col: Option, } let rows = Parquet::<_, AlltypesPlain>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.parquet", )) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) .count(pool) .await, 8 ); let rows = Parquet::<_, AlltypesPlainDerived>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.parquet", )) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) .count(pool) .await, 8 ); let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.parquet", )) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result| -> Value { let value = row.unwrap(); let _: AlltypesPlain = value.clone().downcast().unwrap(); let _: AlltypesPlainDerived = value.clone().downcast().unwrap(); value }) .count(pool) .await, 8 ); type AlltypesPlainSnappy = ( Option, Option, Option, Option, Option, Option, Option, Option, Option>, Option>, Option, ); #[derive(Data, Clone, PartialEq, Debug)] struct AlltypesPlainSnappyDerived { id: Option, bool_col: Option, tinyint_col: Option, smallint_col: Option, int_col: Option, bigint_col: Option, float_col: Option, double_col: Option, date_string_col: Option>, string_col: Option>, timestamp_col: Option, } let rows = Parquet::<_, AlltypesPlainSnappy>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.snappy.parquet", )) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) .count(pool) .await, 2 ); let rows = Parquet::<_, AlltypesPlainSnappyDerived>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.snappy.parquet", )) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) .count(pool) .await, 2 ); let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/alltypes_plain.snappy.parquet", )) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result| -> Value { let value = row.unwrap(); let _: AlltypesPlainSnappy = value.clone().downcast().unwrap(); let _: AlltypesPlainSnappyDerived = value.clone().downcast().unwrap(); value }) .count(pool) .await, 2 ); // TODO // type NationDictMalformed = (Option, Option>, Option, Option>); // let rows = Parquet::<_,NationDictMalformed>::new(vec![PathBuf::from( // "amadeus-testing/parquet/nation.dict-malformed.parquet", // )]).await.unwrap(); // assert_eq!( // rows.par_stream().collect::>(pool), // [Err(amadeus::source::parquet::Error::Parquet( // amadeus_parquet::internal::errors::ParquetError::General(String::from( // "underlying IO error: failed to fill whole buffer" // )) // ))] // ); // let rows = Parquet::<_,Value>::new(vec![PathBuf::from( // "amadeus-testing/parquet/nation.dict-malformed.parquet", // )]).await.unwrap(); // assert_eq!( // rows.par_stream().collect::>(pool), // [Err(amadeus::source::parquet::Error::Parquet( // amadeus_parquet::internal::errors::ParquetError::General(String::from( // "underlying IO error: failed to fill whole buffer" // )) // ))] // ); type NestedLists = ( Option>>>>>>, i32, ); #[derive(Data, Clone, PartialEq, Debug)] struct NestedListsDerived { a: Option>>>>>>, b: i32, } let rows = Parquet::<_, NestedLists>::new(PathBuf::from( "amadeus-testing/parquet/nested_lists.snappy.parquet", )) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) .count(pool) .await, 3 ); let rows = Parquet::<_, NestedListsDerived>::new(PathBuf::from( "amadeus-testing/parquet/nested_lists.snappy.parquet", )) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) .count(pool) .await, 3 ); let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nested_lists.snappy.parquet", )) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result| -> Value { let value = row.unwrap(); let _: NestedLists = value.clone().downcast().unwrap(); let _: NestedListsDerived = value.clone().downcast().unwrap(); value }) .count(pool) .await, 3 ); type NestedMaps = ( Option>>>, i32, f64, ); #[derive(Data, Clone, PartialEq, Debug)] struct NestedMapsDerived { a: Option>>>, b: i32, c: f64, } let rows = Parquet::<_, NestedMaps>::new(PathBuf::from( "amadeus-testing/parquet/nested_maps.snappy.parquet", )) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) .count(pool) .await, 6 ); let rows = Parquet::<_, NestedMapsDerived>::new(PathBuf::from( "amadeus-testing/parquet/nested_maps.snappy.parquet", )) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) .count(pool) .await, 6 ); let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nested_maps.snappy.parquet", )) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result| -> Value { let value = row.unwrap(); let _: NestedMaps = value.clone().downcast().unwrap(); let _: NestedMapsDerived = value.clone().downcast().unwrap(); value }) .count(pool) .await, 6 ); type Nonnullable = ( i64, List, List>, HashMap, List>, ( i32, List, (List>,), HashMap,),)>, ), ); #[derive(Data, Clone, PartialEq, Debug)] struct NonnullableDerived { #[amadeus(name = "ID")] id: i64, #[amadeus(name = "Int_Array")] int_array: List, int_array_array: List>, #[amadeus(name = "Int_Map")] int_map: HashMap, int_map_array: List>, #[amadeus(name = "nested_Struct")] nested_struct: NonnullableDerivedInner, } #[derive(Data, Clone, PartialEq, Debug)] struct NonnullableDerivedInner { a: i32, #[amadeus(name = "B")] b: List, c: NonnullableDerivedInnerInner, #[amadeus(name = "G")] g: HashMap,),)>, } #[derive(Data, Clone, PartialEq, Debug)] struct NonnullableDerivedInnerInner { #[amadeus(name = "D")] d: List>, } #[derive(Data, Clone, PartialEq, Debug)] struct NonnullableDerivedInnerInnerInner { e: i32, f: String, } let rows = Parquet::<_, Nonnullable>::new(PathBuf::from( "amadeus-testing/parquet/nonnullable.impala.parquet", )) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) .count(pool) .await, 1 ); let rows = Parquet::<_, NonnullableDerived>::new(PathBuf::from( "amadeus-testing/parquet/nonnullable.impala.parquet", )) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) .count(pool) .await, 1 ); let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nonnullable.impala.parquet", )) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result| -> Value { let value = row.unwrap(); let _: Nonnullable = value.clone().downcast().unwrap(); let _: NonnullableDerived = value.clone().downcast().unwrap(); value }) .count(pool) .await, 1 ); type Nullable = ( Option, Option>>, Option>>>>, Option>>, Option>>>>, Option<( Option, Option>>, Option<(Option, Option)>>>>>,)>, Option>>,)>,)>>>, )>, ); #[derive(Data, Clone, PartialEq, Debug)] struct NullableDerived { id: Option, int_array: Option>>, #[amadeus(name = "int_array_Array")] int_array_array: Option>>>>, int_map: Option>>, #[amadeus(name = "int_Map_Array")] int_map_array: Option>>>>, nested_struct: Option<( Option, Option>>, Option<(Option, Option)>>>>>,)>, Option>>,)>,)>>>, )>, } let rows = Parquet::<_, Nullable>::new(PathBuf::from( "amadeus-testing/parquet/nullable.impala.parquet", )) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) .count(pool) .await, 7 ); let rows = Parquet::<_, NullableDerived>::new(PathBuf::from( "amadeus-testing/parquet/nullable.impala.parquet", )) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) .count(pool) .await, 7 ); let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nullable.impala.parquet", )) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result| -> Value { let value = row.unwrap(); let _: Nullable = value.clone().downcast().unwrap(); let _: NullableDerived = value.clone().downcast().unwrap(); value }) .count(pool) .await, 7 ); type Nulls = (Option<(Option,)>,); #[derive(Data, Clone, PartialEq, Debug)] struct NullsDerived { b_struct: Option<(Option,)>, } let rows = Parquet::<_, Nulls>::new(PathBuf::from( "amadeus-testing/parquet/nulls.snappy.parquet", )) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) .count(pool) .await, 8 ); let rows = Parquet::<_, NullsDerived>::new(PathBuf::from( "amadeus-testing/parquet/nulls.snappy.parquet", )) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) .count(pool) .await, 8 ); let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/nulls.snappy.parquet", )) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result| -> Value { let value = row.unwrap(); let _: Nulls = value.clone().downcast().unwrap(); let _: NullsDerived = value.clone().downcast().unwrap(); value }) .count(pool) .await, 8 ); type Repeated = (i32, Option<(List<(i64, Option)>,)>); #[derive(Data, Clone, PartialEq, Debug)] struct RepeatedDerived { id: i32, #[amadeus(name = "phoneNumbers")] phone_numbers: Option<(List<(i64, Option)>,)>, } let rows = Parquet::<_, Repeated>::new(PathBuf::from( "amadeus-testing/parquet/repeated_no_annotation.parquet", )) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) .count(pool) .await, 6 ); let rows = Parquet::<_, RepeatedDerived>::new(PathBuf::from( "amadeus-testing/parquet/repeated_no_annotation.parquet", )) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) .count(pool) .await, 6 ); let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/repeated_no_annotation.parquet", )) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result| -> Value { let value = row.unwrap(); let _: Repeated = value.clone().downcast().unwrap(); let _: RepeatedDerived = value.clone().downcast().unwrap(); value }) .count(pool) .await, 6 ); type TestDatapage = (Option, i32, f64, bool, Option>); #[derive(Data, Clone, PartialEq, Debug)] struct TestDatapageDerived { a: Option, b: i32, c: f64, d: bool, e: Option>, } let rows = Parquet::<_, TestDatapage>::new(PathBuf::from( "amadeus-testing/parquet/datapage_v2.snappy.parquet", )) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) .count(pool) .await, 5 ); let rows = Parquet::<_, TestDatapageDerived>::new(PathBuf::from( "amadeus-testing/parquet/datapage_v2.snappy.parquet", )) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) .count(pool) .await, 5 ); let rows = Parquet::<_, Value>::new(PathBuf::from( "amadeus-testing/parquet/datapage_v2.snappy.parquet", )) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result| -> Value { let value = row.unwrap(); let _: TestDatapage = value.clone().downcast().unwrap(); let _: TestDatapageDerived = value.clone().downcast().unwrap(); value }) .count(pool) .await, 5 ); #[derive(Data, Clone, PartialEq, Debug)] struct CommitsDerived { id: Option, delay: Option, age: Option, ismerge: Option, squashof: Option, author_name: Option, author_email: Option, committer_name: Option, committer_email: Option, author_time: Option, committer_time: Option, loc_d: Option, loc_i: Option, comp_d: Option, comp_i: Option, nfiles: Option, message: Option, ndiffs: Option, author_email_dedup: Option, author_name_dedup: Option, committer_email_dedup: Option, committer_name_dedup: Option, __index_level_0__: Option, } let rows = Parquet::<_, CommitsDerived>::new(PathBuf::from("amadeus-testing/parquet/commits.parquet")) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) .count(pool) .await, 14_444 ); let rows = Parquet::<_, Value>::new(PathBuf::from("amadeus-testing/parquet/commits.parquet")) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result| -> Value { let value = row.unwrap(); let _: CommitsDerived = value.clone().downcast().unwrap(); value }) .count(pool) .await, 14_444 ); println!("in {:?}", start.elapsed().unwrap()); }