// Copyright 2023 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. //! Counterpart to the Python example `run_scanner.py`. //! //! Device deduplication is done here rather than relying on the controller's filtering to provide //! for additional features, like the ability to make deduplication time-bounded. use bumble::{ adv::CommonDataType, wrapper::{ core::AdvertisementDataUnit, device::Device, hci::{packets::AddressType, Address}, transport::Transport, }, }; use clap::Parser as _; use itertools::Itertools; use owo_colors::{OwoColorize, Style}; use pyo3::PyResult; use std::{ collections, sync::{Arc, Mutex}, time, }; #[pyo3_asyncio::tokio::main] async fn main() -> PyResult<()> { env_logger::builder() .filter_level(log::LevelFilter::Info) .init(); let cli = Cli::parse(); let transport = Transport::open(cli.transport).await?; let address = Address::new("F0:F1:F2:F3:F4:F5", AddressType::RandomDeviceAddress)?; let mut device = Device::with_hci("Bumble", address, transport.source()?, transport.sink()?)?; // in practice, devices can send multiple advertisements from the same address, so we keep // track of a timestamp for each set of data let seen_advertisements = Arc::new(Mutex::new(collections::HashMap::< Vec, collections::HashMap, time::Instant>, >::new())); let seen_adv_clone = seen_advertisements.clone(); device.on_advertisement(move |_py, adv| { let rssi = adv.rssi()?; let data_units = adv.data()?.data_units()?; let addr = adv.address()?; let show_adv = if cli.filter_duplicates { let addr_bytes = addr.as_le_bytes()?; let mut seen_adv_cache = seen_adv_clone.lock().unwrap(); let expiry_duration = time::Duration::from_secs(cli.dedup_expiry_secs); let advs_from_addr = seen_adv_cache.entry(addr_bytes).or_default(); // we expect cache hits to be the norm, so we do a separate lookup to avoid cloning // on every lookup with entry() let show = if let Some(prev) = advs_from_addr.get_mut(&data_units) { let expired = prev.elapsed() > expiry_duration; *prev = time::Instant::now(); expired } else { advs_from_addr.insert(data_units.clone(), time::Instant::now()); true }; // clean out anything we haven't seen in a while advs_from_addr.retain(|_, instant| instant.elapsed() <= expiry_duration); show } else { true }; if !show_adv { return Ok(()); } let addr_style = if adv.is_connectable()? { Style::new().yellow() } else { Style::new().red() }; let (type_style, qualifier) = match adv.address()?.address_type()? { AddressType::PublicIdentityAddress | AddressType::PublicDeviceAddress => { (Style::new().cyan(), "") } _ => { if addr.is_static()? { (Style::new().green(), "(static)") } else if addr.is_resolvable()? { (Style::new().magenta(), "(resolvable)") } else { (Style::new().default_color(), "") } } }; println!( ">>> {} [{:?}] {qualifier}:\n RSSI: {}", addr.as_hex()?.style(addr_style), addr.address_type()?.style(type_style), rssi, ); data_units.into_iter().for_each(|(code, data)| { let matching = CommonDataType::for_type_code(code).collect::>(); let code_str = if matching.is_empty() { format!("0x{}", hex::encode_upper([code.into()])) } else { matching .iter() .map(|t| format!("{}", t)) .join(" / ") .blue() .to_string() }; // use the first matching type's formatted data, if any let data_str = matching .iter() .filter_map(|t| { t.format_data(&data).map(|formatted| { format!( "{} {}", formatted, format!("(raw: 0x{})", hex::encode_upper(&data)).dimmed() ) }) }) .next() .unwrap_or_else(|| format!("0x{}", hex::encode_upper(&data))); println!(" [{}]: {}", code_str, data_str) }); Ok(()) })?; device.power_on().await?; // do our own dedup device.start_scanning(false).await?; // wait until user kills the process tokio::signal::ctrl_c().await?; Ok(()) } #[derive(clap::Parser)] #[command(author, version, about, long_about = None)] struct Cli { /// Bumble transport spec. /// /// #[arg(long)] transport: String, /// Filter duplicate advertisements #[arg(long, default_value_t = false)] filter_duplicates: bool, /// How long before a deduplicated advertisement that hasn't been seen in a while is considered /// fresh again, in seconds #[arg(long, default_value_t = 10, requires = "filter_duplicates")] dedup_expiry_secs: u64, }