use anyhow::Error; use async_stream::try_stream; use futures::{future, stream, StreamExt, TryStream, TryStreamExt}; use lembaran::LinkedPages; use reqwest::{ header::{ACCEPT, USER_AGENT}, Client, Url, }; use serde::Deserialize; #[derive(Deserialize, Debug)] struct Repository { name: String, } /// Request factory. async fn create_request(url: Option) -> Result<(Vec, Option), Error> { // Create URL or get it from the prev. request. let url = match url { Some(x) => x, None => "https://api.github.com/orgs/rust-lang/repos".parse()?, }; // Probably want to create this outside of the request factory so the client can reuse its connection. let client = Client::new(); let req = client .get(url) .header(USER_AGENT, format!("{}/{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"))) .header(ACCEPT, "application/vnd.github.v3+json") .send(); let res = req.await?.error_for_status()?; // Get next URL. let next_url = res.get_next_url()?; let data: Vec = res.json().await?; Ok((data, next_url)) } fn repositories() -> impl TryStream { try_stream! { let mut url = None; loop { let fut = create_request(url); let (data, next): (Vec<_>, Option<_>) = fut.await?; yield data; // End stream when `next` is `None`. if let None = next { break; } url = next; } } .flat_map(|x: Result, _>| match x { Ok(x) => stream::iter(x).map(|x| Ok(x)).boxed(), Err(x) => stream::once(future::ready(Err(x))).boxed(), }) } #[tokio::main] async fn main() { repositories() .try_for_each(|Repository { name }| { println!("{}", name); future::ok(()) }) .await .unwrap(); }