--- type: Constraint spec: name: AllAssetsComputed root: Universe requires: - HistogramsComputed - Replicated - ComputeFilter - SVMRegressionModelsTrained - ConvertPredictionsCSVTableToORCTable --- type: Constraint spec: name: CSVTableSchemasCreated root: HiveTableStorage requiresProgram: true requires: - HiveDirectoriesCreated title: Create schemas for temporary CSV tables. body: | We will use Hive tables with external storage as a staging location for our data. We need to create these schemas to be able to write data to them. attachIf: | |root: Concept, ancestry: &ConceptAncestry| match ancestry.static_data_table(root.clone()) { Ok(sdt) => match &sdt.setup { aorist_core::StorageSetup::ReplicationStorageSetup(_) => true, aorist_core::StorageSetup::ComputedFromLocalData(cfld) => match &cfld.target { aorist_core::Storage::HiveTableStorage(hts) => match hts.encoding { aorist_core::Encoding::ORCEncoding(_) => { let data_set = ancestry.data_set(root.clone()).unwrap(); let template = data_set.get_template_for_asset(sdt); match template { Ok(aorist_core::DatumTemplate::PredictionsFromTrainedFloatMeasure(_)) => true, Ok(_) => false, Err(err) => panic!("{}", err), } }, _ => false, } _ => false, }, _ => false, }, _ => false, } --- type: Constraint spec: name: HistogramsComputed root: StaticDataTable requiresProgram: true requires: - Replicated title: Computing aggregations body: | We compute a few aggregations on the source data. attachIf: | |root: Concept, ancestry: &ConceptAncestry| { use aorist_core::TDataSet; use aorist_core::DatumTemplate; use aorist_core::DataSchema; let dataset = ancestry.data_set(root.clone()).unwrap(); let root_tbl = ancestry.static_data_table(root.clone()).unwrap(); let schema = &root_tbl.schema; if let DataSchema::TabularSchema(ref tbl_schema) = schema { let template_name = tbl_schema.datumTemplateName.clone(); /*println!("Template name: {}", template_name); for k in dataset.get_mapped_datum_templates().keys() { println!("template: {}", k); }*/ match dataset.get_mapped_datum_templates().get(&template_name).unwrap() { DatumTemplate::IntegerMeasure(_) => true, _ => false } } else { false } } --- type: Constraint spec: name: IsAuditedTable root: StaticDataTable requires: - Replicated attachIf: | |root: Concept, ancestry: &ConceptAncestry| match ancestry.static_data_table(root.clone()).unwrap().setup { aorist_core::StorageSetup::ReplicationStorageSetup(_) => true, _ => false, } --- type: Constraint spec: name: IsAudited root: Universe requires: - IsAuditedTable --- type: Constraint spec: name: ReplicatedData root: ReplicationStorageSetup requires: - UploadDataToLocal - FileReadyForUpload --- type: Constraint spec: name: FileReadyForUpload root: ReplicationStorageSetup requires: - RemoveFileHeader - DownloadDataFromRemote --- type: Constraint spec: name: DownloadDataFromRemote root: RemoteStorage requires: - DownloadDataFromRemoteGCSLocation - DownloadDataFromRemoteWebLocation - DownloadDataFromRemotePushshiftAPILocationToNewlineDelimitedJSON - DownloadDataFromGithubLocation --- type: Constraint spec: name: DownloadDataFromRemoteGCSLocation root: GCSLocation requiresProgram: true title: Downloading data from GCS body: | Data for this particular asset(s) is located in Google Cloud Storage. We need to download it to a local directory first, before we can do anything with it. --- type: Constraint spec: name: DownloadDataFromRemotePushshiftAPILocationToNewlineDelimitedJSON root: PushshiftAPILocation requiresProgram: true title: Downloading data from the Pushshift API body: | Data for this particular asset(s) is located in the Pushshift API. We need to download it to a local directory first, before we can do anything with it. attachIf: | |root: Concept, ancestry: &ConceptAncestry| match ancestry.replication_storage_setup(root.clone()) { Ok(x) => match x.tmp_encoding { aorist_core::Encoding::NewlineDelimitedJSONEncoding(_) => true, aorist_core::Encoding::CSVEncoding(_) => true, _ => false, } _ => false, } --- type: Constraint spec: name: DownloadDataFromRemoteWebLocation root: WebLocation requiresProgram: true title: Downloading data from remote web location body: | Data for this particular asset(s) is located somewhere on the web. We need to download it to a local directory first, before we can do anything with it. --- type: Constraint spec: name: RemoveFileHeader root: FileHeader requiresProgram: true requires: - FileIsDecompressed - DownloadDataFromRemote title: Removing file header body: | We are dealing with a tabular file with a header. Before we can process it we need to remove the header. --- type: Constraint spec: name: FileIsDecompressed root: RemoteStorage requires: - Decompress --- type: Constraint spec: name: Decompress root: DataCompression requires: - DecompressGzip - DecompressZip --- type: Constraint spec: name: DecompressGzip root: GzipCompression requiresProgram: true requires: - DownloadDataFromRemote title: Decompressing Gzip file body: | Data for this particular asset(s) is compressed with the GZIP algorith. Before we can process it further we need to decompress it. --- type: Constraint spec: name: DecompressZip root: ZipCompression requiresProgram: true requires: - DownloadDataFromRemote title: Decompressing Zipped file body: | Data for this particular asset(s) is compressed with the Zip algorith. Before we can process it further we need to decompress it. --- type: Constraint spec: name: UploadDataToLocal root: OnPremiseLocation requires: - UploadDataToAlluxio - UploadDataToMinio - UploadDataToSQLite - UploadDataToPostgres - UploadDataToBigQuery - UploadDataToS3 attachIf: | |root: Concept, ancestry: &ConceptAncestry| ancestry.replication_storage_setup(root.clone()).is_ok() --- type: Constraint spec: name: UploadDataToAlluxio root: AlluxioLocation requiresProgram: true requires: - CSVIsConverted - ReplicatedSchema title: Upload data to Alluxio body: | Now that data has been pre-processed we can upload it to the underlying Alluxio storage. attachIf: | |root: Concept, ancestry: &ConceptAncestry| ancestry.replication_storage_setup(root.clone()).is_ok() --- type: Constraint spec: name: UploadDataToMinio root: MinioLocation requiresProgram: true requires: - CSVIsConverted - ReplicatedSchema title: Upload data to Min.IO body: | Now that data has been pre-processed we can upload it to the underlying Min.IO storage. attachIf: | |root: Concept, ancestry: &ConceptAncestry| ancestry.replication_storage_setup(root.clone()).is_ok() --- type: Constraint spec: name: UploadDataToSQLite root: SQLiteLocation requiresProgram: true requires: - CSVIsConverted title: Upload data to SQLite body: | Now that data has been converted to a CSV we can upload it to SQLite. attachIf: | |root: Concept, ancestry: &ConceptAncestry| ancestry.replication_storage_setup(root.clone()).is_ok() --- type: Constraint spec: name: UploadDataToPostgres root: PostgresLocation requiresProgram: true requires: - CSVIsConverted title: Upload data to Postgres body: | Now that data has been converted to a CSV we can upload it to Postgres. attachIf: | |root: Concept, ancestry: &ConceptAncestry| ancestry.replication_storage_setup(root.clone()).is_ok() --- type: Constraint spec: name: UploadDataToBigQuery root: BigQueryLocation requires: - UploadStaticDataToBigQuery - UploadEventDataToBigQuery title: Upload data to BigQuery body: | Now that data has been converted to a CSV we can upload it to BigQuery. attachIf: | |root: Concept, ancestry: &ConceptAncestry| ancestry.replication_storage_setup(root.clone()).is_ok() --- type: Constraint spec: name: UploadStaticDataToBigQuery root: BigQueryLocation requiresProgram: true requires: - CSVIsConverted - DownloadDataFromPostgres title: Upload data to BigQuery body: | Now that data has been converted to a CSV we can upload it to BigQuery. attachIf: | |root: Concept, ancestry: &ConceptAncestry| { if !ancestry.replication_storage_setup(root.clone()).is_ok() { return false; } match ancestry.static_data_table(root.clone()) { Ok(ref sdt) => match sdt.schema { aorist_core::DataSchema::TimeOrderedTabularSchema(_) => false, _ => true, } _ => false, } } --- type: Constraint spec: name: CSVIsConverted root: StaticDataTable requires: - ConvertTSVToCSV - ConvertGDBToCSV attachIf: | |root: Concept, ancestry: &ConceptAncestry| match ancestry.static_data_table(root.clone()).unwrap().setup { aorist_core::StorageSetup::ReplicationStorageSetup(_) => true, _ => false, } --- type: Constraint spec: name: RenameCSV root: RemoteStorage requiresProgram: true requires: - DownloadDataFromRemote - RemoveFileHeader title: Move CSV to tmp body: | We need to move the CSV data to the tmp directory as expected in later phases. attachIf: | |root: Concept, ancestry: &ConceptAncestry| match ancestry.remote_storage(root.clone()).unwrap().encoding { aorist_core::Encoding::CSVEncoding(_) => ancestry.replication_storage_setup(root.clone()).is_ok(), _ => false, } --- type: Constraint spec: name: ConvertJSONToCSV root: RemoteStorage requiresProgram: true requires: - DownloadDataFromRemote - RemoveFileHeader - DownloadDataFromRemotePushshiftAPILocationToNewlineDelimitedJSON title: Convert JSON data to CSV body: | We need to convert the JSON data to CSV format to process it further. attachIf: | |root: Concept, ancestry: &ConceptAncestry| { match ancestry.remote_storage(root.clone()).unwrap().encoding { aorist_core::Encoding::NewlineDelimitedJSONEncoding(_) => { ancestry.replication_storage_setup(root.clone()).is_ok() }, _ => false, } } --- type: Constraint spec: name: ConvertTSVToCSV root: RemoteStorage requiresProgram: true requires: - DownloadDataFromRemote - RemoveFileHeader title: Convert TSV data to CSV attachIf: | |root: Concept, ancestry: &ConceptAncestry| match ancestry.remote_storage(root.clone()).unwrap().encoding { aorist_core::Encoding::TSVEncoding(_) => ancestry.replication_storage_setup(root.clone()).is_ok(), _ => false, } body: | We need to convert the TSV data to CSV format to process it further. --- type: Constraint spec: name: ConvertPredictionsCSVTableToORCTable root: HiveTableStorage requires: - ComputePredictions requiresProgram: true title: Convert CSV Table to ORC Table body: | Hive tables can be stored in external CSV format, but this is inefficient. We can convert them to ORC (the native Hive format) to speed up access. attachIf: | |root: Concept, ancestry: &ConceptAncestry| match ancestry.static_data_table(root.clone()) { Ok(sdt) => match &sdt.setup { aorist_core::StorageSetup::ComputedFromLocalData(cfld) => match &cfld.target { aorist_core::Storage::HiveTableStorage(hts) => match hts.encoding { aorist_core::Encoding::ORCEncoding(_) => { let data_set = ancestry.data_set(root.clone()).unwrap(); let template = data_set.get_template_for_asset(sdt); match template { Ok(aorist_core::DatumTemplate::PredictionsFromTrainedFloatMeasure(_)) => true, Ok(_) => false, Err(err) => panic!("{}", err), } }, _ => false, } _ => false, }, _ => false, }, _ => false, } --- type: Constraint spec: name: SVMRegressionModelsTrained root: SupervisedModel requiresProgram: true requires: - SourceAssetHasBeenComputed title: Training SVM Regression Models body: | Support Vector Machines are some of the simplest ML models possible. These are even supported by Trino! attachIf: | |root: Concept, ancestry: &ConceptAncestry| match ancestry.supervised_model(root.clone()).unwrap().algorithm { aorist_core::RegressionAlgorithm::SVMRegressionAlgorithm(_) => true, _ => false, } && match ancestry.supervised_model(root.clone()).unwrap().setup { aorist_core::StorageSetup::ComputedFromLocalData(ref setup) => match &setup.target { aorist_core::Storage::LocalFileStorage(storage) => match storage.location { OnPremiseLocation::MinioLocation(_) => true, _ => false, } _ => false, }, _ => false, } --- type: Constraint spec: name: SourceAssetHasBeenComputed root: ComputedFromLocalData requires: - ReplicatedAssets - TableSchemasCreated requiredConstraintsClosure: | |root: Concept, ancestry: &ConceptAncestry| { let setup = ancestry.computed_from_local_data(root.clone()).unwrap(); let dataset = ancestry.data_set(root.clone()).unwrap(); let uuids: Vec<_> = dataset.get_source_assets(&setup).unwrap().values().map(|x| x.get_uuid().clone()).collect(); if uuids.len() == 0 { panic!( "Cannot find source asset with names {}", setup.source_asset_names.values().map(|x| x.clone()).collect::>().join(",") ); } uuids } --- type: Constraint spec: name: ComputeFilter requiresProgram: true root: ComputedFromLocalData requires: - SourceAssetHasBeenComputed - ReplicatedAssets attachIf: | |root: Concept, ancestry: &ConceptAncestry| { use aorist_core::DatumTemplate; use aorist_core::DataSchema; use aorist_core::TDataSet; let maybe_derived_asset = ancestry.derived_asset(root.clone()); if let Ok(derived_asset) = maybe_derived_asset { let data_set = ancestry.data_set(root.clone()).unwrap(); if let DataSchema::TabularSchema(ref t) = derived_asset.schema { let mapped_templates = data_set.get_mapped_datum_templates(); let template = mapped_templates.get(&t.datumTemplateName); return match template { Some(DatumTemplate::Filter(_)) => true, _ => false }; } } false } requiredConstraintsClosure: | |root: Concept, ancestry: &ConceptAncestry| { let setup = ancestry.computed_from_local_data(root.clone()).unwrap(); let dataset = ancestry.data_set(root.clone()).unwrap(); let uuids: Vec<_> = dataset.get_source_assets(&setup).unwrap().values().map(|x| x.get_uuid().clone()).collect(); if uuids.len() == 0 { panic!( "Cannot find source asset with names {}", setup.source_asset_names.values().map(|x| x.clone()).collect::>().join(",") ); } uuids } --- type: Constraint spec: name: ComputePredictions root: ComputedFromLocalData requiresProgram: true requires: - SVMRegressionModelsTrained attachIf: | |root: Concept, ancestry: &ConceptAncestry| { use aorist_core::DatumTemplate; use aorist_core::DataSchema; use aorist_core::TDataSet; let maybe_static_data_table = ancestry.static_data_table(root.clone()); if let Ok(static_data_table) = maybe_static_data_table { let data_set = ancestry.data_set(root.clone()).unwrap(); if let DataSchema::TabularSchema(ref t) = static_data_table.schema { let mapped_templates = data_set.get_mapped_datum_templates(); let template = mapped_templates.get(&t.datumTemplateName); return match template { Some(DatumTemplate::PredictionsFromTrainedFloatMeasure(_)) => true, None => panic!("Could not find datum template: {}", t.datumTemplateName), _ => false, }; } } false } requiredConstraintsClosure: | |root: Concept, ancestry: &ConceptAncestry| { let setup = ancestry.computed_from_local_data(root.clone()).unwrap(); let dataset = ancestry.data_set(root.clone()).unwrap(); let uuids: Vec<_> = dataset.get_source_assets(&setup).unwrap().values().map(|x| x.get_uuid().clone()).collect(); if uuids.len() == 0 { panic!( "Cannot find source asset with names {}", setup.source_asset_names.values().map(|x| x.clone()).collect::>().join(",") ); } uuids } --- type: Constraint spec: name: InferSchema root: DataSet requiresProgram: true --- type: Constraint spec: name: InferSchemasFromPostgresAndThenReplicateToBigQuery root: Universe requiresProgram: true requires: - InferSchema --- type: Constraint spec: name: DownloadDataFromPostgres root: PostgresLocation requiresProgram: true requires: - DetermineMaxBigQueryTimeStamp --- type: Constraint spec: name: GenerateHub root: DataSet requiresProgram: true requires: - GenerateHubForAssets --- type: Constraint spec: name: GenerateHubForAssets root: Asset requires: - GenerateHubForStaticDataTables --- type: Constraint spec: name: GenerateHubForStaticDataTables root: StaticDataTable requiresProgram: true --- type: Constraint spec: name: PandasFromCSVData root: ReplicationStorageSetup requiresProgram: true requires: - DownloadDataFromRemote - CSVIsConverted - RemoveFileHeader attachIf: | |root: Concept, ancestry: &ConceptAncestry| match ancestry.replication_storage_setup(root.clone()) { Ok(x) => match x.tmp_encoding { aorist_core::Encoding::NewlineDelimitedJSONEncoding(_) => false, _ => true, } _ => true, } title: Create Pandas Dataframe from CSV data body: | We convert an intermediary CSV file to a Pandas dataframe. --- type: Constraint spec: name: NumpyFromCSVData root: ReplicationStorageSetup requiresProgram: true requires: - DownloadDataFromRemote - CSVIsConverted - RemoveFileHeader attachIf: | |root: Concept, ancestry: &ConceptAncestry| match ancestry.replication_storage_setup(root.clone()) { Ok(x) => match x.tmp_encoding { aorist_core::Encoding::NewlineDelimitedJSONEncoding(_) => false, _ => true, } _ => true, } title: Create Numpy ndarray from CSV data body: | We convert an intermediary CSV file to a Numpy ndarray. --- type: Constraint spec: name: PandasFromNewlineDelimitedJSONData root: ReplicationStorageSetup requiresProgram: true requires: - DownloadDataFromRemote attachIf: | |root: Concept, ancestry: &ConceptAncestry| match ancestry.replication_storage_setup(root.clone()) { Ok(x) => match x.tmp_encoding { aorist_core::Encoding::NewlineDelimitedJSONEncoding(_) => true, _ => false, } _ => false, } title: Pandas from newline-delimited JSON file body: | We convert an intermediary newline-delimited JSON file to a Pandas dataframe. --- type: Constraint spec: name: NumpyFromNewlineDelimitedJSONData root: ReplicationStorageSetup requiresProgram: true requires: - DownloadDataFromRemote attachIf: | |root: Concept, ancestry: &ConceptAncestry| match ancestry.replication_storage_setup(root.clone()) { Ok(x) => match x.tmp_encoding { aorist_core::Encoding::NewlineDelimitedJSONEncoding(_) => true, _ => false, } _ => false, } title: Numpy from newline-delimited JSON file body: | We convert an intermediary newline-delimited JSON file to a Numpy ndarray. --- type: Constraint spec: name: NumpyData root: ReplicationStorageSetup requires: - NumpyFromCSVData - NumpyFromNewlineDelimitedJSONData --- type: Constraint spec: name: PandasData root: ReplicationStorageSetup requires: - PandasFromCSVData - PandasFromNewlineDelimitedJSONData --- type: Constraint spec: name: UploadEventDataToBigQuery root: BigQueryLocation requiresProgram: true requires: - CSVIsConverted - DownloadDataFromPostgres title: Upload data to BigQuery body: | Now that data has been converted to a CSV we can upload it to BigQuery. attachIf: | |root: Concept, ancestry: &ConceptAncestry| { if !ancestry.replication_storage_setup(root.clone()).is_ok() { return false; } match ancestry.static_data_table(root.clone()) { Ok(ref sdt) => match sdt.schema { aorist_core::DataSchema::TimeOrderedTabularSchema(_) => true, _ => false, } _ => false, } } --- type: Constraint spec: name: DetermineMaxBigQueryTimeStamp root: BigQueryLocation requiresProgram: true title: Find maximum time stamp of data already stored in BigQuery body: | We will only download data created after a certain timestamp. attachIf: | |root: Concept, ancestry: &ConceptAncestry| { if !ancestry.replication_storage_setup(root.clone()).is_ok() { return false; } // TODO: we should also check that this is a target of replication match ancestry.static_data_table(root.clone()) { Ok(ref sdt) => match sdt.schema { aorist_core::DataSchema::TimeOrderedTabularSchema(_) => true, _ => false, } _ => false, } } --- type: Constraint spec: name: DownloadDataFromGithubLocation root: RemoteStorage requiresProgram: true title: Downloading data from Github body: | Data is stored in Github. We will only do a shallow clone of the file from the repo, without downloading the entire dataset. attachIf: | |root: Concept, ancestry: &ConceptAncestry| { if !ancestry.replication_storage_setup(root.clone()).is_ok() { return false; } match ancestry.remote_storage(root.clone()).unwrap().location { aorist_core::RemoteLocation::GithubLocation(_) => true, _ => false, } } --- type: Constraint spec: name: RDataFrame root: ReplicationStorageSetup requires: - RDataFrameFromCSVData - RDataFrameFromNewlineDelimitedJSONData --- type: Constraint spec: name: RDataFrameFromCSVData root: ReplicationStorageSetup requiresProgram: true requires: - DownloadDataFromRemote - CSVIsConverted attachIf: | |root: Concept, ancestry: &ConceptAncestry| match ancestry.replication_storage_setup(root.clone()) { Ok(x) => match x.tmp_encoding { aorist_core::Encoding::NewlineDelimitedJSONEncoding(_) => false, _ => true, } _ => true, } title: Create an R data frame from CSV data body: | We convert an intermediary CSV file to an R dataframe. --- type: Constraint spec: name: RDataFrameFromNewlineDelimitedJSONData root: ReplicationStorageSetup requiresProgram: true requires: - DownloadDataFromRemote attachIf: | |root: Concept, ancestry: &ConceptAncestry| match ancestry.replication_storage_setup(root.clone()) { Ok(x) => match x.tmp_encoding { aorist_core::Encoding::NewlineDelimitedJSONEncoding(_) => true, _ => false, } _ => false, } title: R data frame from newline-delimited JSON file body: | We convert an intermediary newline-delimited JSON file to an R data frame. --- type: Constraint spec: name: UploadDataToS3 root: S3Location requiresProgram: true requires: - CSVIsConverted - ReplicatedSchema title: Upload data to S3 body: | Now that data has been pre-processed we can upload it to the underlying S3 storage. attachIf: | |root: Concept, ancestry: &ConceptAncestry| ancestry.replication_storage_setup(root.clone()).is_ok() --- type: Constraint spec: name: ConvertGDBToCSV root: RemoteStorage requiresProgram: true requires: - DownloadDataFromRemote - RemoveFileHeader title: Convert GDB data to CSV attachIf: | |root: Concept, ancestry: &ConceptAncestry| match ancestry.remote_storage(root.clone()).unwrap().encoding { aorist_core::Encoding::GDBEncoding(_) => ancestry.replication_storage_setup(root.clone()).is_ok(), _ => false, } body: | We need to convert the GDB data to CSV format to process it further.