type: Constraint spec: name: DownloadDataFromRemotePushshiftAPILocationToNewlineDelimitedJSON root: PushshiftAPILocation requiresProgram: true requires: - HiveDirectoriesCreated title: Downloading data from the Pushshift API body: | Data for this particular asset(s) is located in the Pushshift API. We need to download it to a local directory first, before we can do anything with it. attachIf: | |root: AoristRef, ancestry: &ConceptAncestry| match ancestry.replication_storage_setup(root.clone()) { Ok(x) => { debug!("Root has ReplicationStorageSetup."); match *x.0.read().unwrap().tmp_encoding.0.read().unwrap() { aorist_core::Encoding::NewlineDelimitedJSONEncoding(_) => true, aorist_core::Encoding::CSVEncoding(_) => true, _ => false, } } _ => false, } --- type: Constraint spec: name: HiveDirectoriesCreated root: HiveLocation requiresProgram: true title: Created hive directories. body: | We need to create directories or buckets (depending on file system / storage solution) in which we will store our Hive data. attachIf: | |root: AoristRef, ancestry: &ConceptAncestry| ancestry.hive_table_storage(root.clone()).is_ok() && ancestry.replication_storage_setup(root.clone()).is_ok() --- type: Constraint spec: name: ConvertJSONToCSV root: RemoteStorage requiresProgram: true requires: - DownloadDataFromRemotePushshiftAPILocationToNewlineDelimitedJSON title: Convert JSON data to CSV body: | We need to convert the JSON data to CSV format to process it further. attachIf: | |root: AoristRef, ancestry: &ConceptAncestry| { match *ancestry.remote_storage(root.clone()).unwrap().0.read().unwrap().encoding.0.read().unwrap() { aorist_core::Encoding::NewlineDelimitedJSONEncoding(_) => { ancestry.replication_storage_setup(root.clone()).is_ok() }, _ => false, } } --- type: Constraint spec: name: ReadyForUpload root: DataSet requires: - DownloadDataFromRemotePushshiftAPILocationToNewlineDelimitedJSON title: Convert data --- type: Constraint spec: name: UploadDataToMinio root: MinioLocation requiresProgram: true requires: - ReadyForUpload - HiveDirectoriesCreated title: Upload data to Min.IO body: | Now that data has been pre-processed we can upload it to the underlying Min.IO storage. attachIf: | |root: AoristRef, ancestry: &ConceptAncestry| { ancestry.replication_storage_setup(root.clone()).is_ok() } --- type: Constraint spec: name: JSONTableSchemasCreated root: HiveTableStorage requiresProgram: true requires: - HiveDirectoriesCreated title: Create schemas for temporary JSON tables. body: | We will use Hive tables with external storage as a staging location for our data. We need to create these schemas to be able to write data to them. attachIf: | |root: AoristRef, ancestry: &ConceptAncestry| { match &*ancestry.hive_table_storage(root.clone()).unwrap().0.read().unwrap().encoding.0.read().unwrap() { aorist_core::Encoding::NewlineDelimitedJSONEncoding(_) => true, _ => false, } } --- type: Constraint spec: name: DownloadDataFromRemoteWebLocation root: WebLocation requiresProgram: true title: Downloading data from remote web location body: | Data for this particular asset(s) is located somewhere on the web. We need to download it to a local directory first, before we can do anything with it. --- type: Constraint spec: name: ConvertJSONTableToORCTable root: HiveTableStorage requires: - JSONTableSchemasCreated - UploadDataToLocal - ORCTableSchemasCreated requiresProgram: true title: Convert JSON Table to ORC Table body: | Hive tables can be stored in external JSON format, but this is inefficient. We can convert them to ORC (the native Hive format) to speed up access. attachIf: | |root: AoristRef, ancestry: &ConceptAncestry| match ancestry.static_data_table(root.clone()) { Ok(sdt) => match &*sdt.0.read().unwrap().setup.0.read().unwrap() { aorist_core::StorageSetup::ReplicationStorageSetup(_) => true, _ => false, }, _ => false, } --- type: Constraint spec: name: UploadDataToLocal root: OnPremiseLocation requires: - UploadDataToMinio attachIf: | |root: AoristRef, ancestry: &ConceptAncestry| ancestry.replication_storage_setup(root.clone()).is_ok() --- type: Constraint spec: name: ORCTableSchemasCreated root: HiveTableStorage requiresProgram: true title: Creating Table Schemas body: | We will be uploading tabular data into our warehouse. Before we upload data files we need to create schemas for the tables which will refer to these files. attachIf: | |root: AoristRef, ancestry: &ConceptAncestry| ancestry.replication_storage_setup(root.clone()).is_ok() --- type: Constraint spec: name: FasttextTrainingData root: FasttextEmbeddingSchema requires: - ConvertJSONTableToORCTable requiresProgram: true title: Creating Fasttext Training Dataset body: | We download Fasttext data from a Hive table. --- type: Constraint spec: name: TrainFasttextModel root: FasttextEmbeddingSchema requires: - FasttextTrainingData requiresProgram: true title: Training Fasttext Model body: | This operation trains the Fasttext model and saves a dataset mapping words to their embeddings to a local file. --- type: Constraint spec: name: UploadFasttextToMinio root: FasttextEmbeddingSchema requiresProgram: true requires: - TrainFasttextModel title: Upload data to Min.IO body: | Now that data has been pre-processed we can upload it to the underlying Min.IO storage. attachIf: | |root: AoristRef, ancestry: &ConceptAncestry| { match ancestry.fasttext_embedding(root.clone()) { Ok(x) => match *x.0.read().unwrap().setup.0.read().unwrap() { aorist_core::StorageSetup::LocalStorageSetup(ref s) => match *s.0.read().unwrap().local.0.read().unwrap() { aorist_core::Storage::HiveTableStorage(ref h) => match *h.0.read().unwrap().location.0.read().unwrap() { aorist_core::HiveLocation::MinioLocation(_) => true, _ => false, }, _ => false, }, _ => false, } Err(_) => false, } }