From 369961a2ea27df30e10de9292e5a90ef52f64a63 Mon Sep 17 00:00:00 2001 From: Felix <51097237+f321x@users.noreply.github.com> Date: Fri, 12 Jul 2024 10:54:43 +0000 Subject: [PATCH] refactor spawning of monitoring thread, working on spawn_blocking for sync bitcoinrpc call --- .../coordinator/src/coordinator/monitoring.rs | 6 +- taptrade-cli-demo/coordinator/src/main.rs | 12 +- .../coordinator/src/wallet/mod.rs | 105 +++++++++--------- 3 files changed, 69 insertions(+), 54 deletions(-) diff --git a/taptrade-cli-demo/coordinator/src/coordinator/monitoring.rs b/taptrade-cli-demo/coordinator/src/coordinator/monitoring.rs index a9d18a7..0c92385 100644 --- a/taptrade-cli-demo/coordinator/src/coordinator/monitoring.rs +++ b/taptrade-cli-demo/coordinator/src/coordinator/monitoring.rs @@ -41,8 +41,10 @@ pub async fn monitor_bonds(coordinator: Arc) -> Result<()> { loop { // fetch all bonds - let bonds = coordinator_db.fetch_all_bonds().await?; - let validation_results = coordinator_wallet.validate_bonds(&bonds).await?; + let bonds = Arc::new(coordinator_db.fetch_all_bonds().await?); + let validation_results = coordinator_wallet + .validate_bonds(Arc::clone(&bonds)) + .await?; debug!("Monitoring active bonds: {}", bonds.len()); // verify all bonds and initiate punishment if necessary for (bond, error) in validation_results { diff --git a/taptrade-cli-demo/coordinator/src/main.rs b/taptrade-cli-demo/coordinator/src/main.rs index 10d09b0..b83972f 100755 --- a/taptrade-cli-demo/coordinator/src/main.rs +++ b/taptrade-cli-demo/coordinator/src/main.rs @@ -37,8 +37,16 @@ async fn main() -> Result<()> { }); // begin monitoring bonds - // spawn_blocking(monitor_bonds(Arc::clone(&coordinator))); - + let coordinator_ref = Arc::clone(&coordinator); + tokio::spawn(async move { + loop { + if let Err(e) = monitor_bonds(coordinator_ref.clone()).await { + error!("Error in monitor_bonds: {:?}", e); + // Optionally add a delay before retrying + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + } + } + }); // Start the API server api_server(coordinator).await?; Ok(()) diff --git a/taptrade-cli-demo/coordinator/src/wallet/mod.rs b/taptrade-cli-demo/coordinator/src/wallet/mod.rs index f9a9381..24cd8f6 100644 --- a/taptrade-cli-demo/coordinator/src/wallet/mod.rs +++ b/taptrade-cli-demo/coordinator/src/wallet/mod.rs @@ -86,7 +86,9 @@ impl CoordinatorWallet { requirements: requirements.clone(), table: Table::Memory, }; - let invalid_bond = self.validate_bonds(&vec![dummy_monitoring_bond]).await?; + let invalid_bond = self + .validate_bonds(Arc::new(vec![dummy_monitoring_bond])) + .await?; if !invalid_bond.is_empty() { return Err(anyhow!(invalid_bond[0].1.to_string())); } @@ -99,14 +101,14 @@ impl CoordinatorWallet { // blockchain::get_tx to get input pub async fn validate_bonds( &self, - bonds: &Vec, + bonds: Arc>, ) -> Result> { let mut invalid_bonds: Vec<(MonitoringBond, anyhow::Error)> = Vec::new(); let blockchain = &*self.backend; { let wallet = self.wallet.lock().await; - for bond in bonds { + for bond in *bonds { let input_sum: u64; let tx: Transaction = deserialize(&hex::decode(&bond.bond_tx_hex)?)?; @@ -162,57 +164,17 @@ impl CoordinatorWallet { } } } - self.test_mempool_accept_bonds(bonds, &mut invalid_bonds)?; + // let invalid_bonds = Arc::new(invalid_bonds); + // let json_rpc_client = self.json_rpc_client.clone(); + // let mempool_accept_future = tokio::task::spawn_blocking(move || { + // test_mempool_accept_bonds(json_rpc_client, bonds, &mut invalid_bonds) + // }); + // mempool_accept_future.await??; + debug!("validate_bond_tx_hex(): Bond validation done."); Ok(invalid_bonds) } - fn test_mempool_accept_bonds( - &self, - bonds: &Vec, - invalid_bonds: &mut Vec<(MonitoringBond, anyhow::Error)>, - ) -> Result<()> { - let raw_bonds: Vec = bonds - .iter() - .map(|bond| bond.bond_tx_hex.clone().raw_hex()) // Assuming `raw_hex()` returns a String or &str - .collect(); - - let test_mempool_accept_res = self - .json_rpc_client - .deref() - .test_mempool_accept(&raw_bonds)?; - - for res in test_mempool_accept_res { - if !res.allowed { - let invalid_bond: MonitoringBond = - Self::search_monitoring_bond_by_txid(&bonds, &res.txid.to_string())?; - invalid_bonds.push(( - invalid_bond, - anyhow!( - "Bond not accepted by testmempoolaccept: {:?}", - res.reject_reason - .unwrap_or("rejected by testmempoolaccept".to_string()) - ), - )); - }; - } - Ok(()) - } - - fn search_monitoring_bond_by_txid( - // this should not happen often, so the inefficiency is acceptable - monitoring_bonds: &Vec, - txid: &str, - ) -> Result { - for bond in monitoring_bonds { - let bond_tx: Transaction = deserialize(&hex::decode(&bond.bond_tx_hex)?)?; - if bond_tx.txid().to_string() == txid { - return Ok(bond.clone()); - } - } - Err(anyhow!("Bond not found in monitoring bonds")) - } - pub fn publish_bond_tx_hex(&self, bond: &str) -> Result<()> { warn!("publish_bond_tx_hex(): publishing cheating bond tx!"); let blockchain = &*self.backend; @@ -223,6 +185,49 @@ impl CoordinatorWallet { } } +fn search_monitoring_bond_by_txid( + // this should not happen often, so the inefficiency is acceptable + monitoring_bonds: &Vec, + txid: &str, +) -> Result { + for bond in monitoring_bonds { + let bond_tx: Transaction = deserialize(&hex::decode(&bond.bond_tx_hex)?)?; + if bond_tx.txid().to_string() == txid { + return Ok(bond.clone()); + } + } + Err(anyhow!("Bond not found in monitoring bonds")) +} + +fn test_mempool_accept_bonds( + json_rpc_client: Arc, + bonds: Arc>, + invalid_bonds: &mut Vec<(MonitoringBond, anyhow::Error)>, +) -> Result<()> { + let raw_bonds: Vec = bonds + .iter() + .map(|bond| bond.bond_tx_hex.clone().raw_hex()) // Assuming `raw_hex()` returns a String or &str + .collect(); + + let test_mempool_accept_res = json_rpc_client.deref().test_mempool_accept(&raw_bonds)?; + + for res in test_mempool_accept_res { + if !res.allowed { + let invalid_bond: MonitoringBond = + search_monitoring_bond_by_txid(&bonds, &res.txid.to_string())?; + invalid_bonds.push(( + invalid_bond, + anyhow!( + "Bond not accepted by testmempoolaccept: {:?}", + res.reject_reason + .unwrap_or("rejected by testmempoolaccept".to_string()) + ), + )); + }; + } + Ok(()) +} + impl fmt::Debug for CoordinatorWallet { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("CoordinatorWallet")