From 6552ce559daf3ad6787d8f89a06abcaa63eae178 Mon Sep 17 00:00:00 2001 From: f321x Date: Fri, 12 Jul 2024 16:37:22 +0200 Subject: [PATCH] finish monitoring bonds function refactor --- taptrade-cli-demo/coordinator/Cargo.lock | 1 + taptrade-cli-demo/coordinator/Cargo.toml | 1 + .../coordinator/src/coordinator/monitoring.rs | 44 +++++++--- .../coordinator/src/wallet/mod.rs | 86 +++++++++++-------- 4 files changed, 82 insertions(+), 50 deletions(-) diff --git a/taptrade-cli-demo/coordinator/Cargo.lock b/taptrade-cli-demo/coordinator/Cargo.lock index d97f0ec..22fca69 100644 --- a/taptrade-cli-demo/coordinator/Cargo.lock +++ b/taptrade-cli-demo/coordinator/Cargo.lock @@ -484,6 +484,7 @@ dependencies = [ "rand", "reqwest", "serde", + "sha2", "sqlx", "tokio", "tower", diff --git a/taptrade-cli-demo/coordinator/Cargo.toml b/taptrade-cli-demo/coordinator/Cargo.toml index d7921e5..a76e1d8 100644 --- a/taptrade-cli-demo/coordinator/Cargo.toml +++ b/taptrade-cli-demo/coordinator/Cargo.toml @@ -23,6 +23,7 @@ tokio = { version = "1.38.0", features = ["full", "test-util", "rt"] } tower = "0.4.13" log = "0.4.22" env_logger = "0.11.3" +sha2 = "0.10.8" [profile.release] lto = true diff --git a/taptrade-cli-demo/coordinator/src/coordinator/monitoring.rs b/taptrade-cli-demo/coordinator/src/coordinator/monitoring.rs index 0c92385..188007b 100644 --- a/taptrade-cli-demo/coordinator/src/coordinator/monitoring.rs +++ b/taptrade-cli-demo/coordinator/src/coordinator/monitoring.rs @@ -3,6 +3,7 @@ // prevent querying the db all the time. // Also needs to implement punishment logic in case a fraud is detected. use super::*; +use sha2::{Digest, Sha256}; #[derive(Debug, Clone)] pub enum Table { @@ -20,19 +21,27 @@ pub struct MonitoringBond { pub table: Table, } -// the current implementation only publishes the bond and removes the offer from the db -// in a more advanced implementation we could increase the transaction fee (cpfp) and -// continue monitoring the bond transaction until a confirmation happens for maximum pain -// in case the trader is actively malicious and did not just accidentally invalidate the bond -// we could directly forward bond sats to the other parties payout address in case it is a taken trade -async fn punish_trader(coordinator: &Coordinator, bond: &MonitoringBond) -> Result<()> { - // publish bond - coordinator - .coordinator_wallet - .publish_bond_tx_hex(&bond.bond_tx_hex)?; // can be made async with esplora backend if we figure out the compilation error of bdk +impl MonitoringBond { + // used a hash of bond instead of txid to prevent issues when a valid txid can't be generated + // due to missing fields etc. (crate error) + pub fn id(&self) -> Result> { + Ok(sha256(&hex::decode(&self.bond_tx_hex)?)) + } - // remove offer from db/orderbook - Ok(()) + // the current implementation only publishes the bond and removes the offer from the db + // in a more advanced implementation we could increase the transaction fee (cpfp) and + // continue monitoring the bond transaction until a confirmation happens for maximum pain + // in case the trader is actively malicious and did not just accidentally invalidate the bond + // we could directly forward bond sats to the other parties payout address in case it is a taken trade + async fn punish(&self, coordinator: &Coordinator) -> Result<()> { + // publish bond + coordinator + .coordinator_wallet + .publish_bond_tx_hex(&self.bond_tx_hex)?; // can be made async with esplora backend if we figure out the compilation error of bdk + + // remove offer from db/orderbook + Ok(()) + } } pub async fn monitor_bonds(coordinator: Arc) -> Result<()> { @@ -47,7 +56,7 @@ pub async fn monitor_bonds(coordinator: Arc) -> Result<()> { .await?; debug!("Monitoring active bonds: {}", bonds.len()); // verify all bonds and initiate punishment if necessary - for (bond, error) in validation_results { + for (_, (bond, error)) in validation_results { warn!("Bond validation failed: {:?}", error); match env::var("PUNISHMENT_ENABLED") .unwrap_or_else(|_| "0".to_string()) @@ -55,7 +64,7 @@ pub async fn monitor_bonds(coordinator: Arc) -> Result<()> { { "1" => { dbg!("Punishing trader for bond violation: {:?}", error); - punish_trader(&coordinator, &bond).await?; + bond.punish(&coordinator).await?; } "0" => { dbg!("Punishment disabled, ignoring bond violation: {:?}", error); @@ -68,3 +77,10 @@ pub async fn monitor_bonds(coordinator: Arc) -> Result<()> { tokio::time::sleep(tokio::time::Duration::from_secs(15)).await; } } + +fn sha256(data: &[u8]) -> Vec { + let mut hasher = Sha256::new(); + hasher.update(data); + let result = hasher.finalize(); + result.to_vec() +} diff --git a/taptrade-cli-demo/coordinator/src/wallet/mod.rs b/taptrade-cli-demo/coordinator/src/wallet/mod.rs index 1c6bee3..b9680f8 100644 --- a/taptrade-cli-demo/coordinator/src/wallet/mod.rs +++ b/taptrade-cli-demo/coordinator/src/wallet/mod.rs @@ -12,7 +12,7 @@ use bdk::{ wallet::verify::*, KeychainKind, SyncOptions, Wallet, }; -use std::str::FromStr; +use std::{collections::HashMap, str::FromStr}; use std::{fmt, ops::Deref}; use utils::*; // use verify_tx::*; @@ -90,7 +90,8 @@ impl CoordinatorWallet { .validate_bonds(Arc::new(vec![dummy_monitoring_bond])) .await?; if !invalid_bond.is_empty() { - return Err(anyhow!(invalid_bond[0].1.to_string())); + let (_, error) = invalid_bond.values().next().unwrap(); + return Err(anyhow!(error.to_string())); } Ok(()) } @@ -102,10 +103,9 @@ impl CoordinatorWallet { pub async fn validate_bonds( &self, bonds: Arc>, - ) -> Result> { - let mut invalid_bonds: Vec<(MonitoringBond, anyhow::Error)> = Vec::new(); + ) -> Result, (MonitoringBond, anyhow::Error)>> { + let mut invalid_bonds: HashMap, (MonitoringBond, anyhow::Error)> = HashMap::new(); let blockchain = &*self.backend; - { let wallet = self.wallet.lock().await; for bond in bonds.as_ref().iter() { @@ -116,7 +116,7 @@ impl CoordinatorWallet { // we need to test this with signed and invalid/unsigned transactions // checks signatures and inputs if let Err(e) = verify_tx(&tx, &*wallet.database(), blockchain) { - invalid_bonds.push((bond.clone(), anyhow!(e))); + invalid_bonds.insert(bond.id()?, (bond.clone(), anyhow!(e))); continue; } @@ -124,10 +124,13 @@ impl CoordinatorWallet { input_sum = match tx.input_sum(blockchain, &*wallet.database()) { Ok(amount) => { if amount < bond.requirements.min_input_sum_sat { - invalid_bonds.push(( - bond.clone(), - anyhow!("Bond input sum too small: {}", amount), - )); + invalid_bonds.insert( + bond.id()?, + ( + bond.clone(), + anyhow!("Bond input sum too small: {}", amount), + ), + ); continue; } amount @@ -140,10 +143,13 @@ impl CoordinatorWallet { match tx.bond_output_sum(&bond.requirements.bond_address) { Ok(amount) => { if amount < bond.requirements.locking_amount_sat { - invalid_bonds.push(( - bond.clone(), - anyhow!("Bond output sum too small: {}", amount), - )); + invalid_bonds.insert( + bond.id()?, + ( + bond.clone(), + anyhow!("Bond output sum too small: {}", amount), + ), + ); continue; } amount @@ -153,23 +159,27 @@ impl CoordinatorWallet { } }; if ((input_sum - tx.all_output_sum()) / tx.vsize() as u64) < 200 { - invalid_bonds.push(( - bond.clone(), - anyhow!( - "Bond fee rate too low: {}", - (input_sum - tx.all_output_sum()) / tx.vsize() as u64 + invalid_bonds.insert( + bond.id()?, + ( + bond.clone(), + anyhow!( + "Bond fee rate too low: {}", + (input_sum - tx.all_output_sum()) / tx.vsize() as u64 + ), ), - )); + ); continue; } } } - // let invalid_bonds = Arc::new(invalid_bonds); - // let json_rpc_client = self.json_rpc_client.clone(); - // let mempool_accept_future = tokio::task::spawn_blocking(move || { - // test_mempool_accept_bonds(json_rpc_client, bonds, &mut invalid_bonds) - // }); - // mempool_accept_future.await??; + + // now test all bonds with bitcoin core rpc testmempoolaccept + let json_rpc_client = self.json_rpc_client.clone(); + let mempool_accept_future = + tokio::task::spawn_blocking(move || test_mempool_accept_bonds(json_rpc_client, bonds)); + let invalid_bonds_testmempoolaccept = mempool_accept_future.await??; + invalid_bonds.extend(invalid_bonds_testmempoolaccept.into_iter()); debug!("validate_bond_tx_hex(): Bond validation done."); Ok(invalid_bonds) @@ -202,8 +212,9 @@ fn search_monitoring_bond_by_txid( fn test_mempool_accept_bonds( json_rpc_client: Arc, bonds: Arc>, - invalid_bonds: &mut Vec<(MonitoringBond, anyhow::Error)>, -) -> Result<()> { +) -> Result, (MonitoringBond, anyhow::Error)>> { + let mut invalid_bonds: HashMap, (MonitoringBond, anyhow::Error)> = HashMap::new(); + let raw_bonds: Vec = bonds .iter() .map(|bond| bond.bond_tx_hex.clone().raw_hex()) // Assuming `raw_hex()` returns a String or &str @@ -215,17 +226,20 @@ fn test_mempool_accept_bonds( if !res.allowed { let invalid_bond: MonitoringBond = search_monitoring_bond_by_txid(&bonds, &res.txid.to_string())?; - invalid_bonds.push(( - invalid_bond, - anyhow!( - "Bond not accepted by testmempoolaccept: {:?}", - res.reject_reason - .unwrap_or("rejected by testmempoolaccept".to_string()) + invalid_bonds.insert( + invalid_bond.id()?, + ( + invalid_bond, + anyhow!( + "Bond not accepted by testmempoolaccept: {:?}", + res.reject_reason + .unwrap_or("rejected by testmempoolaccept".to_string()) + ), ), - )); + ); }; } - Ok(()) + Ok(invalid_bonds) } impl fmt::Debug for CoordinatorWallet {