diff --git a/taptrade-cli-demo/coordinator/src/coordinator/mempool_monitoring.rs b/taptrade-cli-demo/coordinator/src/coordinator/mempool_monitoring.rs new file mode 100644 index 0000000..d8ca9e4 --- /dev/null +++ b/taptrade-cli-demo/coordinator/src/coordinator/mempool_monitoring.rs @@ -0,0 +1,117 @@ +use super::*; +use anyhow::Ok; +use bdk::bitcoin::consensus::encode::deserialize; +use bdk::bitcoin::Transaction; +use bdk::bitcoin::{TxIn, Txid}; +use bdk::bitcoincore_rpc::{Client, RpcApi}; +use serde::Deserialize; +use std::collections::{HashMap, HashSet}; +use std::ops::Deref; +use std::sync::RwLock; + +struct Mempool { + transactions: Arc>>>, + utxo_set: Arc>>, + json_rpc_client: Arc, +} + +impl Mempool { + fn new(json_rpc_client: Arc) -> Self { + Self { + transactions: Arc::new(RwLock::new(HashMap::new())), + utxo_set: Arc::new(RwLock::new(HashSet::new())), + json_rpc_client, + } + } +} + +fn run_mempool(mempool: Arc) { + loop { + // sleep for a while + std::thread::sleep(std::time::Duration::from_secs(15)); + debug!("Fetching mempool"); + let mempool_txs = match mempool.json_rpc_client.deref().get_raw_mempool() { + std::result::Result::Ok(mempool_txs) => mempool_txs, + Err(e) => { + error!("Error fetching mempool: {}", e); + continue; + } + }; + let mut mempool_state = mempool + .transactions + .write() + .expect("Error locking mempool write mutex"); + for txid in &mempool_txs { + if mempool_state.contains_key(txid) { + continue; + } else { + let tx = match mempool + .json_rpc_client + .deref() + .get_raw_transaction(&txid, None) + { + std::result::Result::Ok(tx) => tx, + Err(e) => { + error!( + "Error fetching transaction {} from mempool: {}", + txid.to_string(), + e + ); + continue; + } + }; + let mut inputs = Vec::new(); + for input in tx.input { + inputs.push(input); + } + mempool_state.insert(*txid, inputs); + } + } + mempool_state.retain(|txid, _| mempool_txs.contains(txid)); + let mut utxo_set = mempool + .utxo_set + .write() + .expect("Error locking utxo_set write mutex"); + utxo_set.clear(); + for (_, inputs) in mempool_state.iter() { + for input in inputs { + utxo_set.insert(input.clone()); + } + } + } +} + +pub struct MempoolHandler { + mempool: Arc, +} + +impl MempoolHandler { + pub async fn new(json_rpc_client: Arc) -> Self { + let mempool = Arc::new(Mempool::new(json_rpc_client)); + let mempool_clone = Arc::clone(&mempool); + tokio::task::spawn_blocking(move || run_mempool(mempool_clone)); + Self { mempool } + } + + pub async fn lookup_mempool_inputs( + &self, + bonds: &Vec, + ) -> Result, (MonitoringBond, anyhow::Error)>> { + let mut bonds_to_punish: HashMap, (MonitoringBond, anyhow::Error)> = HashMap::new(); + let utxo_set = self + .mempool + .utxo_set + .read() + .expect("Error locking utxo_set read mutex"); + for bond in bonds { + let bond_tx: Transaction = deserialize(&hex::decode(&bond.bond_tx_hex)?)?; + for input in bond_tx.input { + if utxo_set.contains(&input) { + bonds_to_punish.insert(bond.id()?, (bond.clone(), anyhow!("Input in mempool"))); + break; + } + } + } + Ok(bonds_to_punish) + } +} diff --git a/taptrade-cli-demo/coordinator/src/coordinator/mod.rs b/taptrade-cli-demo/coordinator/src/coordinator/mod.rs index b049b1a..43bdf0b 100755 --- a/taptrade-cli-demo/coordinator/src/coordinator/mod.rs +++ b/taptrade-cli-demo/coordinator/src/coordinator/mod.rs @@ -1,4 +1,6 @@ -pub mod monitoring; -pub mod create_taproot; // commented out for testing +pub mod create_taproot; +// pub mod mempool_actor; +pub mod mempool_monitoring; +pub mod monitoring; // commented out for testing use super::*; diff --git a/taptrade-cli-demo/coordinator/src/coordinator/monitoring.rs b/taptrade-cli-demo/coordinator/src/coordinator/monitoring.rs index 8918ff0..3120bc7 100644 --- a/taptrade-cli-demo/coordinator/src/coordinator/monitoring.rs +++ b/taptrade-cli-demo/coordinator/src/coordinator/monitoring.rs @@ -4,6 +4,7 @@ // Also needs to implement punishment logic in case a fraud is detected. use super::*; use anyhow::Context; +use mempool_monitoring::MempoolHandler; use sha2::{Digest, Sha256}; #[derive(Debug, Clone, PartialEq)] diff --git a/taptrade-cli-demo/coordinator/src/main.rs b/taptrade-cli-demo/coordinator/src/main.rs index cdc738b..b6bed8e 100755 --- a/taptrade-cli-demo/coordinator/src/main.rs +++ b/taptrade-cli-demo/coordinator/src/main.rs @@ -33,7 +33,7 @@ async fn main() -> Result<()> { // Initialize the database pool let coordinator = Arc::new(Coordinator { coordinator_db: Arc::new(CoordinatorDB::init().await?), - coordinator_wallet: Arc::new(init_coordinator_wallet()?), + coordinator_wallet: Arc::new(init_coordinator_wallet().await?), }); // begin monitoring bonds diff --git a/taptrade-cli-demo/coordinator/src/wallet/mod.rs b/taptrade-cli-demo/coordinator/src/wallet/mod.rs index 8e8b7cb..e72e2f0 100644 --- a/taptrade-cli-demo/coordinator/src/wallet/mod.rs +++ b/taptrade-cli-demo/coordinator/src/wallet/mod.rs @@ -10,8 +10,9 @@ use bdk::{ sled::{self, Tree}, template::Bip86, wallet::verify::*, - KeychainKind, SyncOptions, Wallet, + KeychainKind, Wallet, }; +use coordinator::mempool_monitoring::MempoolHandler; use std::{collections::HashMap, str::FromStr}; use std::{fmt, ops::Deref}; use utils::*; @@ -22,6 +23,7 @@ pub struct CoordinatorWallet { pub wallet: Arc>>, pub backend: Arc, pub json_rpc_client: Arc, + pub mempool: Arc, } #[derive(PartialEq, Debug, Clone)] @@ -31,7 +33,7 @@ pub struct BondRequirements { pub min_input_sum_sat: u64, } -pub fn init_coordinator_wallet() -> Result> { +pub async fn init_coordinator_wallet() -> Result> { let wallet_xprv = ExtendedPrivKey::from_str( &env::var("WALLET_XPRV").context("loading WALLET_XPRV from .env failed")?, )?; @@ -44,7 +46,10 @@ pub fn init_coordinator_wallet() -> Result> { wallet_name: env::var("BITCOIN_RPC_WALLET_NAME")?, sync_params: None, }; - let json_rpc_client = Client::new(&rpc_config.url, rpc_config.auth.clone().into())?; + let json_rpc_client = Arc::new(Client::new( + &rpc_config.url, + rpc_config.auth.clone().into(), + )?); let backend = RpcBlockchain::from_config(&rpc_config)?; // let backend = EsploraBlockchain::new(&env::var("ESPLORA_BACKEND")?, 1000); let sled_db = sled::open(env::var("BDK_DB_PATH")?)?.open_tree("default_wallet")?; @@ -55,6 +60,9 @@ pub fn init_coordinator_wallet() -> Result> { sled_db, )?; + let json_rpc_client_clone = Arc::clone(&json_rpc_client); + let mempool = MempoolHandler::new(json_rpc_client_clone).await; + // wallet // .sync(&backend, SyncOptions::default()) // .context("Connection to blockchain server failed.")?; // we could also use Esplora to make this async @@ -62,7 +70,8 @@ pub fn init_coordinator_wallet() -> Result> { Ok(CoordinatorWallet { wallet: Arc::new(Mutex::new(wallet)), backend: Arc::new(backend), - json_rpc_client: Arc::new(json_rpc_client), + json_rpc_client: json_rpc_client, + mempool: Arc::new(mempool), }) } @@ -176,11 +185,15 @@ impl CoordinatorWallet { // now test all bonds with bitcoin core rpc testmempoolaccept let json_rpc_client = self.json_rpc_client.clone(); - let mempool_accept_future = - tokio::task::spawn_blocking(move || test_mempool_accept_bonds(json_rpc_client, bonds)); + let bonds_clone = Arc::clone(&bonds); + let mempool_accept_future = tokio::task::spawn_blocking(move || { + test_mempool_accept_bonds(json_rpc_client, bonds_clone) + }); let invalid_bonds_testmempoolaccept = mempool_accept_future.await??; invalid_bonds.extend(invalid_bonds_testmempoolaccept.into_iter()); + let mempool_bonds = self.mempool.lookup_mempool_inputs(&bonds).await?; + invalid_bonds.extend(mempool_bonds.into_iter()); debug!("validate_bond_tx_hex(): Bond validation done."); Ok(invalid_bonds) }