diff --git a/Cargo.lock b/Cargo.lock index 8a082ef..a5ebd05 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -120,6 +120,12 @@ version = "3.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "572f695136211188308f16ad2ca5c851a712c464060ae6974944458eb83880ba" +[[package]] +name = "byteorder" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" + [[package]] name = "bytes" version = "1.2.1" @@ -142,6 +148,7 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" name = "chimemon" version = "0.1.0" dependencies = [ + "async-trait", "chrony-candm", "clap", "env_logger", @@ -152,7 +159,9 @@ dependencies = [ "log", "serde", "serde_derive", + "sysctl", "tokio", + "tokio-stream", ] [[package]] @@ -1331,6 +1340,15 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4501abdff3ae82a1c1b477a17252eb69cee9e66eb915c1abaa4f44d873df9f09" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "schannel" version = "0.1.20" @@ -1514,6 +1532,19 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sysctl" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f99d037b2bef227ab8963f4b0acc33ecbb1f9a2e7365add7789372b387ec19e1" +dependencies = [ + "bitflags", + "byteorder", + "libc", + "thiserror", + "walkdir", +] + [[package]] name = "tap" version = "1.0.1" @@ -1645,6 +1676,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] @@ -1812,6 +1844,17 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "walkdir" +version = "2.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56" +dependencies = [ + "same-file", + "winapi", + "winapi-util", +] + [[package]] name = "want" version = "0.3.0" diff --git a/Cargo.toml b/Cargo.toml index c0116ae..be1ef5e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" [dependencies] serde = "1.0" serde_derive = "1.0" -influxdb2 = "0.3" +influxdb2 = "0.3.3" tokio = { version = "1", features = ["rt"] } clap = { version = "4.0", features = ["derive"] } log = "0.4" @@ -14,6 +14,9 @@ figment = { version = "0.10", features = ["toml"] } gethostname = "0.3" env_logger = "0.9.1" futures = "0.3.24" +sysctl = "0.5.2" +async-trait = "0.1.58" +tokio-stream = { version = "0.1.11", features = ["sync"] } [dependencies.chrony-candm] git = "https://github.com/aws/chrony-candm" diff --git a/config.toml.dist b/config.toml.dist index 4f2cb54..56730ac 100644 --- a/config.toml.dist +++ b/config.toml.dist @@ -1,16 +1,29 @@ -[sources.chrony] -enabled = true -poll_interval = 10 -timeout = 1 -measurement_prefix = "chrony." -tracking_measurement = "tracking" -sources_measurement = "sources" +[sources] + [sources.chrony] + enabled = true + # host = "127.0.0.1:323" # default; port is required + tracking_interval = 10 + sources_interval = 60 + timeout = 1 + measurement_prefix = "chrony." + tracking_measurement = "tracking" + sources_measurement = "sources" + + [sources.hwmon] + enabled = true + interval = 60 + measurement_prefix = "hwmon." + + [sources.hwmon.sensors.temp1] + name = "hwmon0" + sensor = "temp1_input" [influxdb] -# url = "http://localhost:8086" # defaults to localhost +url = "http://localhost:8086" org = "default" bucket = "default" token = "" [influxdb.tags] -# host = "localhost" # defaults to gethostname() +# host = "qubit" # default is the local hostname +# arbitrary = "tags" # are allowed \ No newline at end of file diff --git a/src/chrony.rs b/src/chrony.rs index 9c9a40a..9948cc3 100644 --- a/src/chrony.rs +++ b/src/chrony.rs @@ -1,18 +1,114 @@ -use chrony_candm::reply::{self, ReplyBody}; +use async_trait::async_trait; +use chimemon::{ChimemonSource, ChimemonSourceChannel, Config}; +use chrony_candm::reply::{self, ReplyBody, SourceMode}; use chrony_candm::request::{self, RequestBody}; -use std::net::SocketAddr; -use tokio::time::timeout; +use influxdb2::models::DataPoint; +use log::{info, warn}; +use std::net::{SocketAddr, ToSocketAddrs}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use tokio::runtime::Handle; +use tokio::{join, time::timeout}; pub struct ChronyClient { pub server: SocketAddr, + config: Config, client: chrony_candm::Client, timeout: std::time::Duration, } +fn datapoint_from_tracking( + t: &reply::Tracking, + config: &Config, +) -> Result> { + let now = SystemTime::now().duration_since(UNIX_EPOCH)?; + let measurement = config.sources.chrony.measurement_prefix.to_owned() + + &config.sources.chrony.tracking_measurement; + let mut builder = + DataPoint::builder(&measurement).timestamp(now.as_nanos().try_into().unwrap()); + for (key, value) in &config.influxdb.tags { + builder = builder.tag(key, value); + } + + let point = builder + .field("ref_id", t.ref_id as i64) + .field("ref_ip_addr", t.ip_addr.to_string()) + .field("stratum", t.stratum as i64) + .field("leap_status", t.leap_status as i64) + .field("current_correction", f64::from(t.current_correction)) + .field("last_offset", f64::from(t.last_offset)) + .field("rms_offset", f64::from(t.rms_offset)) + .field("freq_ppm", f64::from(t.freq_ppm)) + .field("resid_freq_ppm", f64::from(t.resid_freq_ppm)) + .field("skew_ppm", f64::from(t.skew_ppm)) + .field("root_delay", f64::from(t.root_delay)) + .field("root_dispersion", f64::from(t.root_dispersion)) + .field("last_update_interval", f64::from(t.last_update_interval)) + .build()?; + + Ok(point) +} + +pub fn datapoint_from_sourcedata( + d: &reply::SourceData, + config: &Config, +) -> Result> { + let now = SystemTime::now().duration_since(UNIX_EPOCH)?; + let measurement = config.sources.chrony.measurement_prefix.to_owned() + + &config.sources.chrony.sources_measurement; + let mut builder = + DataPoint::builder(&measurement).timestamp(now.as_nanos().try_into().unwrap()); + for (key, value) in &config.influxdb.tags { + builder = builder.tag(key, value) + } + builder = builder + .tag("ref_id", d.ip_addr.to_string()) + .tag( + "mode", + match d.mode { + SourceMode::Client => String::from("server"), + SourceMode::Peer => String::from("peer"), + SourceMode::Ref => String::from("refclock"), + }, + ) + .tag( + "state", + match d.state { + reply::SourceState::Selected => String::from("best"), + reply::SourceState::NonSelectable => String::from("unusable"), + reply::SourceState::Falseticker => String::from("falseticker"), + reply::SourceState::Jittery => String::from("jittery"), + reply::SourceState::Unselected => String::from("unused"), + reply::SourceState::Selectable => String::from("combined"), + }, + ) + .field("poll", d.poll as i64) + .field("stratum", d.stratum as i64) + .field("flags", d.flags.bits() as i64) + .field("reachability", d.reachability.count_ones() as i64) + .field("since_sample", d.since_sample as i64) + .field("orig_latest_meas", f64::from(d.orig_latest_meas)) + .field("latest_meas", f64::from(d.latest_meas)) + .field("latest_meas_err", f64::from(d.latest_meas_err)); + let point = builder.build()?; + + Ok(point) +} + impl ChronyClient { - pub fn new(client: chrony_candm::Client, server: SocketAddr, timeout: std::time::Duration) -> Self { - Self { + pub fn new(config: Config) -> Self { + let server = config + .sources + .chrony + .host + .to_socket_addrs() + .unwrap() + .next() + .expect("Unable to parse host:port:"); + let client = chrony_candm::Client::spawn(&Handle::current(), Default::default()); + let timeout = Duration::from_secs(config.sources.chrony.timeout); + ChronyClient { server, + config, client, timeout, } @@ -79,6 +175,100 @@ impl ChronyClient { )), }?; + // if sourcedata.mode == SourceMode::Ref { + // // Get the name if it's a refclock + // let reply = timeout( + // self.timeout, + // self.client.query( + // RequestBody::NtpSourceName(request::NtpSourceName { ip_addr: sourcedata.ip_addr }), + // self.server, + // ), + // ) + // .await??; + + // let sourcename = match reply.body { + // ReplyBody::NtpSourceName(sourcename) => Ok(sourcename), + // _ => Err(std::io::Error::new( + // std::io::ErrorKind::InvalidData, + // "Invalid response", + // )), + // }?; + + // sourcedata.ip_addr = sourcename; + // } + Ok(sourcedata) } + + async fn tracking_poll( + &self, + chan: &ChimemonSourceChannel, + ) -> Result<(), Box> { + let tracking = self.get_tracking().await?; + + let tracking_data = datapoint_from_tracking(&tracking, &self.config)?; + + info!("Writing tracking data: {:?}", tracking_data); + + chan.send(tracking_data) + .expect("Unable to send tracking data to targets"); + + Ok(()) + } + + async fn sources_poll( + &self, + chan: &ChimemonSourceChannel, + ) -> Result<(), Box> { + let sources = self.get_sources().await?; + for ds in sources { + let source_data = datapoint_from_sourcedata(&ds, &self.config)?; + info!("Writing source data: {:?}", source_data); + chan.send(source_data) + .expect("Unable to send source data to targets"); + } + Ok(()) + } +} + +#[async_trait] +impl ChimemonSource for ChronyClient { + async fn run(self, chan: tokio::sync::broadcast::Sender) { + info!("Chrony task started"); + + let mut t_interval = tokio::time::interval(Duration::from_secs( + self.config.sources.chrony.tracking_interval, + )); + let mut s_interval = tokio::time::interval(Duration::from_secs( + self.config.sources.chrony.sources_interval, + )); + + let t_future = async { + let lchan = chan.clone(); + loop { + t_interval.tick().await; + + match self.tracking_poll(&lchan).await { + Ok(_) => (), + Err(e) => { + warn!("Error in chrony task: {}", e.to_string()); + } + } + } + }; + let s_future = async { + let lchan = chan.clone(); + loop { + s_interval.tick().await; + + match self.sources_poll(&lchan).await { + Ok(_) => (), + Err(e) => { + warn!("Error in chrony task: {}", e.to_string()); + } + } + } + }; + join!(t_future, s_future); + } } diff --git a/src/hwmon.rs b/src/hwmon.rs new file mode 100644 index 0000000..221104f --- /dev/null +++ b/src/hwmon.rs @@ -0,0 +1,75 @@ +use async_trait::async_trait; +use chimemon::{ChimemonSource, ChimemonSourceChannel, Config}; +use futures::{stream, StreamExt}; +use influxdb2::models::DataPoint; +use log::{info, debug}; +use std::{ + path::{Path, PathBuf}, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; + +pub struct HwmonSource { + config: Config, + sensors: Vec, +} + +struct HwmonSensor { + path: PathBuf, + device: String, + sensor: String, + name: String, +} + +const HWMON_ROOT: &str = "/sys/class/hwmon"; + +impl HwmonSource { + pub fn new(config: Config) -> Self { + let sensors = config + .sources + .hwmon + .sensors + .iter() + .map(|(k, v)| HwmonSensor { + name: k.into(), + device: v.name.clone(), + sensor: v.sensor.clone(), + path: PathBuf::from(HWMON_ROOT).join(&v.name).join(&v.sensor), + }) + .collect(); + + HwmonSource { config, sensors } + } + + async fn get_raw_value(sensor: &HwmonSensor) -> Result { + tokio::fs::read_to_string(&sensor.path).await + } +} + +#[async_trait] +impl ChimemonSource for HwmonSource { + async fn run(self, chan: tokio::sync::broadcast::Sender) { + info!("hwmon task started"); + let mut interval = + tokio::time::interval(Duration::from_secs(self.config.sources.hwmon.interval)); + loop { + interval.tick().await; + stream::iter(&self.sensors).for_each_concurrent(None, |s| async { + let sensor_val = HwmonSource::get_raw_value(s) + .await + .expect("Unable to read sensor"); + debug!("hwmon {} raw value {}", s.path.to_string_lossy(), sensor_val); + let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); + let mut builder = + DataPoint::builder(&s.device).timestamp(now.as_nanos().try_into().unwrap()); + for (key, value) in &self.config.influxdb.tags { + builder = builder.tag(key, value) + } + builder = builder.field(&s.name, sensor_val.trim().parse::().unwrap()); + let dp = builder.build().unwrap(); + + info!("Writing hwmon data: {:?}", dp); + chan.send(dp).unwrap(); + }).await; + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 8cd9dba..e748eb9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -use chrony_candm::reply; +use async_trait::async_trait; use figment::{ providers::{Format, Serialized, Toml}, util::map, @@ -8,11 +8,8 @@ use figment::{ use gethostname::gethostname; use influxdb2::models::DataPoint; use serde_derive::{Deserialize, Serialize}; -use std::time::{SystemTime, UNIX_EPOCH}; -use std::{ - net::{IpAddr, Ipv4Addr}, - path::Path, -}; +use std::path::Path; +use tokio::sync::broadcast::*; #[derive(Serialize, Deserialize, Clone)] pub struct InfluxConfig { @@ -40,12 +37,12 @@ impl Default for InfluxConfig { pub struct ChronyConfig { pub enabled: bool, pub timeout: u64, - pub poll_interval: u64, + pub tracking_interval: u64, + pub sources_interval: u64, pub measurement_prefix: String, pub tracking_measurement: String, pub sources_measurement: String, - pub host: IpAddr, - pub port: u16, + pub host: String, } impl Default for ChronyConfig { @@ -53,12 +50,37 @@ impl Default for ChronyConfig { ChronyConfig { enabled: false, timeout: 5, - poll_interval: 60, + tracking_interval: 60, + sources_interval: 300, measurement_prefix: "chrony.".into(), tracking_measurement: "tracking".into(), sources_measurement: "sources".into(), - host: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), - port: 323, + host: "127.0.0.1:323".into(), + } + } +} + +#[derive(Serialize, Deserialize, Clone)] +pub struct HwmonSensorConfig { + pub name: String, + pub sensor: String, +} + +#[derive(Serialize, Deserialize, Clone)] +pub struct HwmonConfig { + pub enabled: bool, + pub interval: u64, + pub measurement_prefix: String, + pub sensors: Map, +} + +impl Default for HwmonConfig { + fn default() -> Self { + HwmonConfig { + enabled: false, + interval: 60, + measurement_prefix: "hwmon.".into(), + sensors: map! {}, } } } @@ -66,6 +88,7 @@ impl Default for ChronyConfig { #[derive(Serialize, Deserialize, Clone, Default)] pub struct SourcesConfig { pub chrony: ChronyConfig, + pub hwmon: HwmonConfig, } #[derive(Serialize, Deserialize, Clone, Default)] @@ -74,70 +97,19 @@ pub struct Config { pub sources: SourcesConfig, } -pub fn load_config(filename: &Path) -> Result> { - let config = Figment::from(Serialized::defaults(Config::default())) - .merge(Toml::file(filename)) - .extract()?; - Ok(config) +pub fn load_config(filename: &Path) -> Figment { + Figment::from(Serialized::defaults(Config::default())).merge(Toml::file(filename)) } -pub fn datapoint_from_tracking( - t: &reply::Tracking, - config: &Config, -) -> Result> { - let now = SystemTime::now().duration_since(UNIX_EPOCH)?; - let measurement = config.sources.chrony.measurement_prefix.to_owned() - + &config.sources.chrony.tracking_measurement; - let mut builder = - DataPoint::builder(&measurement).timestamp(now.as_nanos().try_into().unwrap()); - for (key, value) in &config.influxdb.tags { - builder = builder.tag(key, value); - } +pub type ChimemonSourceChannel = Sender; +pub type ChimemonTargetChannel = Receiver; - let point = builder - .field("ref_id", t.ref_id as i64) - .field("ref_ip_addr", t.ip_addr.to_string()) - .field("stratum", t.stratum as i64) - .field("leap_status", t.leap_status as i64) - .field("current_correction", f64::from(t.current_correction)) - .field("last_offset", f64::from(t.last_offset)) - .field("rms_offset", f64::from(t.rms_offset)) - .field("freq_ppm", f64::from(t.freq_ppm)) - .field("resid_freq_ppm", f64::from(t.resid_freq_ppm)) - .field("skew_ppm", f64::from(t.skew_ppm)) - .field("root_delay", f64::from(t.root_delay)) - .field("root_dispersion", f64::from(t.root_dispersion)) - .field("last_update_interval", f64::from(t.last_update_interval)) - .build()?; - - Ok(point) +#[async_trait] +pub trait ChimemonSource { + async fn run(self, chan: ChimemonSourceChannel); } -pub fn datapoint_from_sourcedata( - d: &reply::SourceData, - config: &Config, -) -> Result> { - let now = SystemTime::now().duration_since(UNIX_EPOCH)?; - let measurement = config.sources.chrony.measurement_prefix.to_owned() - + &config.sources.chrony.sources_measurement; - let mut builder = - DataPoint::builder(&measurement).timestamp(now.as_nanos().try_into().unwrap()); - for (key, value) in &config.influxdb.tags { - builder = builder.tag(key, value) - } - builder = builder - .tag("ref_id", d.ip_addr.to_string()) - .tag("mode", (d.mode as i64).to_string()) - .tag("state", (d.state as i64).to_string()) - .field("poll", d.poll as i64) - .field("stratum", d.stratum as i64) - .field("flags", d.flags.bits() as i64) - .field("reachability", d.reachability as i64) - .field("since_sample", d.since_sample as i64) - .field("orig_latest_meas", f64::from(d.orig_latest_meas)) - .field("latest_meas", f64::from(d.latest_meas)) - .field("latest_meas_err", f64::from(d.latest_meas_err)); - let point = builder.build()?; - - Ok(point) +#[async_trait] +pub trait ChimemonTarget { + async fn run(self, chan: ChimemonTargetChannel); } diff --git a/src/main.rs b/src/main.rs index 2f0e485..78dc8f6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,63 +1,55 @@ mod chrony; +mod hwmon; -use clap::Parser; -use env_logger; +use clap::{Parser, ValueEnum}; +use env_logger::{self, Env}; use futures::{future::join_all, prelude::*}; use log::{error, info, warn}; -use std::{path::Path, time::Duration}; -use std::net::SocketAddr; -use tokio::runtime::Handle; +use std::path::Path; +use tokio::sync::broadcast; -use crate::chrony::*; +use crate::{chrony::*, hwmon::HwmonSource}; use chimemon::*; const PROGRAM_NAME: &str = "chimemon"; const VERSION: &str = "0.0.1"; -const DEFAULT_RETRY_DELAY: Duration = Duration::new(60, 0); -const MAX_RETRY_DELAY: Duration = Duration::new(1800, 0); +#[derive(ValueEnum, Clone)] +enum Level { + Debug, + Info, + Warn, + Error, +} #[derive(Parser)] struct Args { /// TOML configuration to load #[arg(short, long, default_value_t = String::from("config.toml"))] config_file: String, -} - -async fn do_chrony_poll( - config: &Config, - chrony: &ChronyClient, - influx: &influxdb2::Client, -) -> Result<(), Box> { - let tracking = chrony.get_tracking().await?; - let sources = chrony.get_sources().await?; - - let mut frame = Vec::new(); - - let tracking_data = datapoint_from_tracking(&tracking, &config)?; - - info!(target: "chrony", "Writing tracking data: {:?}", tracking_data); - - frame.push(tracking_data); - for ds in sources { - let source_data = datapoint_from_sourcedata(&ds, &config)?; - info!(target: "chrony", "Writing source data: {:?}", source_data); - frame.push(source_data); - } - - influx - .write(&config.influxdb.bucket, stream::iter(frame)) - .await?; - Ok(()) + #[arg(value_enum, default_value_t = Level::Warn)] + log_level: Level, } #[tokio::main(flavor = "current_thread")] async fn main() -> Result<(), Box> { - env_logger::init(); let args = Args::parse(); + let loglevel = args + .log_level + .to_possible_value() + .unwrap() + .get_name() + .to_owned(); + env_logger::Builder::from_env(Env::default().default_filter_or(loglevel)).init(); + info!("{} v{} starting...", PROGRAM_NAME, VERSION); - let config = load_config(Path::new(&args.config_file))?; + let fig = load_config(Path::new(&args.config_file)); + warn!("{:?}", fig); + let config: Config = fig.extract()?; + let mut tasks = Vec::new(); + let (tx, _) = broadcast::channel(16); + let sourcechan: ChimemonSourceChannel = tx; info!( "Connecting to influxdb {} org: {} using token", @@ -69,48 +61,48 @@ async fn main() -> Result<(), Box> { &config.influxdb.token, ); - if config.sources.chrony.enabled { - let chrony = ChronyClient::new( - chrony_candm::Client::spawn(&Handle::current(), Default::default()), - SocketAddr::new( - config.sources.chrony.host, - config.sources.chrony.port, - ), - Duration::from_secs(config.sources.chrony.timeout), - ); + let chrony = if config.sources.chrony.enabled { + Some(ChronyClient::new(config.to_owned())) + } else { + None + }; + match chrony { + Some(c) => { + tasks.push(tokio::spawn(c.run(sourcechan.clone()))); + } + None => (), + }; - let mut retry_delay = DEFAULT_RETRY_DELAY; - let mut fail_count = 0; + let hwmon = if config.sources.hwmon.enabled { + Some(HwmonSource::new(config.to_owned())) + } else { + None + }; + match hwmon { + Some(hwmon) => { + tasks.push(tokio::spawn(hwmon.run(sourcechan.clone()))); + } + None => (), + }; - let interval = Duration::new(config.sources.chrony.poll_interval, 0); - let mut interval = tokio::time::interval(interval); - interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let mut influxrx = sourcechan.subscribe(); + tasks.push(tokio::spawn(async move { + loop { + let dp = influxrx.recv().await.unwrap(); + influx + .write(&config.influxdb.bucket, stream::iter([dp])) + .await + .expect("Error writing to influxdb"); + } + })); - tasks.push(tokio::spawn(async move { - info!(target: "chrony", "Chrony task started"); - loop { - interval.tick().await; - match do_chrony_poll(&config, &chrony, &influx.to_owned()).await { - Ok(_) => { - fail_count = 0; - retry_delay = DEFAULT_RETRY_DELAY; - } - Err(e) => { - warn!(target: "chrony", "Error in chrony task: {}", e.to_string()); - if fail_count % 4 == 0 { - // exponential backoff, every 4 failures double the retry timer, cap at max - retry_delay = std::cmp::min(MAX_RETRY_DELAY, retry_delay * 2); - } - fail_count += 1; - } - } - if fail_count > 0 { - tokio::time::sleep(retry_delay).await; - interval.reset(); - }; - } - })); - } + // let mut debugrx = sourcechan.subscribe(); + // tasks.push(tokio::spawn(async move { + // loop { + // let v = debugrx.recv().await; + // warn!("streamed: {:?}", v.unwrap()); + // } + // })); join_all(tasks).await;