implement mempool monitoring for coordinator

This commit is contained in:
f321x
2024-07-16 18:36:29 +02:00
parent c17bf0ea10
commit 78796fe851
5 changed files with 142 additions and 9 deletions

View File

@ -0,0 +1,117 @@
use super::*;
use anyhow::Ok;
use bdk::bitcoin::consensus::encode::deserialize;
use bdk::bitcoin::Transaction;
use bdk::bitcoin::{TxIn, Txid};
use bdk::bitcoincore_rpc::{Client, RpcApi};
use serde::Deserialize;
use std::collections::{HashMap, HashSet};
use std::ops::Deref;
use std::sync::RwLock;
struct Mempool {
transactions: Arc<RwLock<HashMap<Txid, Vec<TxIn>>>>,
utxo_set: Arc<RwLock<HashSet<TxIn>>>,
json_rpc_client: Arc<Client>,
}
impl Mempool {
fn new(json_rpc_client: Arc<Client>) -> Self {
Self {
transactions: Arc::new(RwLock::new(HashMap::new())),
utxo_set: Arc::new(RwLock::new(HashSet::new())),
json_rpc_client,
}
}
}
fn run_mempool(mempool: Arc<Mempool>) {
loop {
// sleep for a while
std::thread::sleep(std::time::Duration::from_secs(15));
debug!("Fetching mempool");
let mempool_txs = match mempool.json_rpc_client.deref().get_raw_mempool() {
std::result::Result::Ok(mempool_txs) => mempool_txs,
Err(e) => {
error!("Error fetching mempool: {}", e);
continue;
}
};
let mut mempool_state = mempool
.transactions
.write()
.expect("Error locking mempool write mutex");
for txid in &mempool_txs {
if mempool_state.contains_key(txid) {
continue;
} else {
let tx = match mempool
.json_rpc_client
.deref()
.get_raw_transaction(&txid, None)
{
std::result::Result::Ok(tx) => tx,
Err(e) => {
error!(
"Error fetching transaction {} from mempool: {}",
txid.to_string(),
e
);
continue;
}
};
let mut inputs = Vec::new();
for input in tx.input {
inputs.push(input);
}
mempool_state.insert(*txid, inputs);
}
}
mempool_state.retain(|txid, _| mempool_txs.contains(txid));
let mut utxo_set = mempool
.utxo_set
.write()
.expect("Error locking utxo_set write mutex");
utxo_set.clear();
for (_, inputs) in mempool_state.iter() {
for input in inputs {
utxo_set.insert(input.clone());
}
}
}
}
pub struct MempoolHandler {
mempool: Arc<Mempool>,
}
impl MempoolHandler {
pub async fn new(json_rpc_client: Arc<Client>) -> Self {
let mempool = Arc::new(Mempool::new(json_rpc_client));
let mempool_clone = Arc::clone(&mempool);
tokio::task::spawn_blocking(move || run_mempool(mempool_clone));
Self { mempool }
}
pub async fn lookup_mempool_inputs(
&self,
bonds: &Vec<MonitoringBond>,
) -> Result<HashMap<Vec<u8>, (MonitoringBond, anyhow::Error)>> {
let mut bonds_to_punish: HashMap<Vec<u8>, (MonitoringBond, anyhow::Error)> = HashMap::new();
let utxo_set = self
.mempool
.utxo_set
.read()
.expect("Error locking utxo_set read mutex");
for bond in bonds {
let bond_tx: Transaction = deserialize(&hex::decode(&bond.bond_tx_hex)?)?;
for input in bond_tx.input {
if utxo_set.contains(&input) {
bonds_to_punish.insert(bond.id()?, (bond.clone(), anyhow!("Input in mempool")));
break;
}
}
}
Ok(bonds_to_punish)
}
}

View File

@ -1,4 +1,6 @@
pub mod monitoring;
pub mod create_taproot; // commented out for testing
pub mod create_taproot;
// pub mod mempool_actor;
pub mod mempool_monitoring;
pub mod monitoring; // commented out for testing
use super::*;

View File

@ -4,6 +4,7 @@
// Also needs to implement punishment logic in case a fraud is detected.
use super::*;
use anyhow::Context;
use mempool_monitoring::MempoolHandler;
use sha2::{Digest, Sha256};
#[derive(Debug, Clone, PartialEq)]

View File

@ -33,7 +33,7 @@ async fn main() -> Result<()> {
// Initialize the database pool
let coordinator = Arc::new(Coordinator {
coordinator_db: Arc::new(CoordinatorDB::init().await?),
coordinator_wallet: Arc::new(init_coordinator_wallet()?),
coordinator_wallet: Arc::new(init_coordinator_wallet().await?),
});
// begin monitoring bonds

View File

@ -10,8 +10,9 @@ use bdk::{
sled::{self, Tree},
template::Bip86,
wallet::verify::*,
KeychainKind, SyncOptions, Wallet,
KeychainKind, Wallet,
};
use coordinator::mempool_monitoring::MempoolHandler;
use std::{collections::HashMap, str::FromStr};
use std::{fmt, ops::Deref};
use utils::*;
@ -22,6 +23,7 @@ pub struct CoordinatorWallet<D: bdk::database::BatchDatabase> {
pub wallet: Arc<Mutex<Wallet<D>>>,
pub backend: Arc<RpcBlockchain>,
pub json_rpc_client: Arc<bdk::bitcoincore_rpc::Client>,
pub mempool: Arc<MempoolHandler>,
}
#[derive(PartialEq, Debug, Clone)]
@ -31,7 +33,7 @@ pub struct BondRequirements {
pub min_input_sum_sat: u64,
}
pub fn init_coordinator_wallet() -> Result<CoordinatorWallet<sled::Tree>> {
pub async fn init_coordinator_wallet() -> Result<CoordinatorWallet<sled::Tree>> {
let wallet_xprv = ExtendedPrivKey::from_str(
&env::var("WALLET_XPRV").context("loading WALLET_XPRV from .env failed")?,
)?;
@ -44,7 +46,10 @@ pub fn init_coordinator_wallet() -> Result<CoordinatorWallet<sled::Tree>> {
wallet_name: env::var("BITCOIN_RPC_WALLET_NAME")?,
sync_params: None,
};
let json_rpc_client = Client::new(&rpc_config.url, rpc_config.auth.clone().into())?;
let json_rpc_client = Arc::new(Client::new(
&rpc_config.url,
rpc_config.auth.clone().into(),
)?);
let backend = RpcBlockchain::from_config(&rpc_config)?;
// let backend = EsploraBlockchain::new(&env::var("ESPLORA_BACKEND")?, 1000);
let sled_db = sled::open(env::var("BDK_DB_PATH")?)?.open_tree("default_wallet")?;
@ -55,6 +60,9 @@ pub fn init_coordinator_wallet() -> Result<CoordinatorWallet<sled::Tree>> {
sled_db,
)?;
let json_rpc_client_clone = Arc::clone(&json_rpc_client);
let mempool = MempoolHandler::new(json_rpc_client_clone).await;
// wallet
// .sync(&backend, SyncOptions::default())
// .context("Connection to blockchain server failed.")?; // we could also use Esplora to make this async
@ -62,7 +70,8 @@ pub fn init_coordinator_wallet() -> Result<CoordinatorWallet<sled::Tree>> {
Ok(CoordinatorWallet {
wallet: Arc::new(Mutex::new(wallet)),
backend: Arc::new(backend),
json_rpc_client: Arc::new(json_rpc_client),
json_rpc_client: json_rpc_client,
mempool: Arc::new(mempool),
})
}
@ -176,11 +185,15 @@ impl<D: bdk::database::BatchDatabase> CoordinatorWallet<D> {
// now test all bonds with bitcoin core rpc testmempoolaccept
let json_rpc_client = self.json_rpc_client.clone();
let mempool_accept_future =
tokio::task::spawn_blocking(move || test_mempool_accept_bonds(json_rpc_client, bonds));
let bonds_clone = Arc::clone(&bonds);
let mempool_accept_future = tokio::task::spawn_blocking(move || {
test_mempool_accept_bonds(json_rpc_client, bonds_clone)
});
let invalid_bonds_testmempoolaccept = mempool_accept_future.await??;
invalid_bonds.extend(invalid_bonds_testmempoolaccept.into_iter());
let mempool_bonds = self.mempool.lookup_mempool_inputs(&bonds).await?;
invalid_bonds.extend(mempool_bonds.into_iter());
debug!("validate_bond_tx_hex(): Bond validation done.");
Ok(invalid_bonds)
}