refactor bond validation, use testmempoolaccept, add Bitcoinrpc backend

This commit is contained in:
Felix
2024-07-11 14:44:14 +00:00
parent 7f8866d414
commit 257b0d7201
9 changed files with 204 additions and 94 deletions

View File

@ -10,3 +10,5 @@ services:
- "18332:18332"
- "8333:8333"
- "48332:48332"
# https://bitcointalk.org/index.php?topic=5496494

View File

@ -1,5 +1,6 @@
ELECTRUM_BACKEND="ssl://mempool.space:40002" # we need a node with large mempool size limit for monitoring to miss no bond transactions
ESPLORA_BACKEND="https://blockstream.info/testnet/api" # blockstream.info testnet backend
BITCOIN_RPC_ADDRESS_PORT="127.0.0.1:18332"
BITCOIN_RPC_COOKIE_FILE_PATH="~/.bitcoin/.cookie" # path to the cookie file for the bitcoind RPC
BITCOIN_RPC_WALLET_NAME="coordinator_wallet" # not used yet
DATABASE_PATH="./dbs/trades.db" # path to the coordinator sqlite database storing the trades
BDK_DB_PATH="./dbs/bdk-wallet" # Path to the BDK Sled database (no .db postfix)
WALLET_XPRV="tprv8ZgxMBicQKsPdHuCSjhQuSZP1h6ZTeiRqREYS5guGPdtL7D1uNLpnJmb2oJep99Esq1NbNZKVJBNnD2ZhuXSK7G5eFmmcx73gsoa65e2U32"

View File

@ -252,6 +252,7 @@ dependencies = [
"bdk-macros",
"bitcoin 0.30.2",
"bitcoinconsensus",
"core-rpc",
"electrum-client",
"esplora-client",
"getrandom",
@ -487,6 +488,32 @@ version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f"
[[package]]
name = "core-rpc"
version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26d77079e1b71c2778d6e1daf191adadcd4ff5ec3ccad8298a79061d865b235b"
dependencies = [
"bitcoin-private",
"core-rpc-json",
"jsonrpc",
"log",
"serde",
"serde_json",
]
[[package]]
name = "core-rpc-json"
version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "581898ed9a83f31c64731b1d8ca2dfffcfec14edf1635afacd5234cddbde3a41"
dependencies = [
"bitcoin 0.30.2",
"bitcoin-private",
"serde",
"serde_json",
]
[[package]]
name = "cpufeatures"
version = "0.2.12"
@ -1165,6 +1192,17 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "jsonrpc"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd8d6b3f301ba426b30feca834a2a18d48d5b54e5065496b5c1b05537bee3639"
dependencies = [
"base64 0.13.1",
"serde",
"serde_json",
]
[[package]]
name = "lazy_static"
version = "1.5.0"

View File

@ -9,7 +9,7 @@ bitcoin = "0.32.2"
miniscript = "12.0.0"
axum = { version = "0.7.5", features = ["tokio", "json"] }
# "use-esplora-async", "async-interface", for async esplora
bdk = { version = "0.29.0", default-features = false, features = ["key-value-db", "bitcoinconsensus", "std", "electrum", "use-esplora-ureq","compiler", "verify"] }
bdk = { version = "0.29.0", default-features = false, features = ["key-value-db", "bitcoinconsensus", "std", "electrum", "use-esplora-ureq","compiler", "verify", "rpc"] }
# bitcoinconsensus = "0.106.0"
dotenv = "0.15.0"
@ -19,7 +19,7 @@ rand = "0.8.5"
reqwest = { version = "0.12.4", features = ["blocking", "json"] }
serde = "1.0.203"
sqlx = { version = "0.7.4", features = ["runtime-tokio", "sqlite"] }
tokio = { version = "1.38.0", features = ["full", "test-util"] }
tokio = { version = "1.38.0", features = ["full", "test-util", "rt"] }
tower = "0.4.13"
log = "0.4.22"
env_logger = "0.11.3"

View File

@ -4,16 +4,17 @@
// Also needs to implement punishment logic in case a fraud is detected.
use super::*;
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum Table {
Orderbook,
ActiveTrades,
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct MonitoringBond {
pub bond_tx_hex: String,
pub trade_id_hex: String,
pub robot: Vec<u8>,
pub requirements: BondRequirements,
pub table: Table,
}
@ -23,11 +24,7 @@ pub struct MonitoringBond {
// continue monitoring the bond transaction until a confirmation happens for maximum pain
// in case the trader is actively malicious and did not just accidentally invalidate the bond
// we could directly forward bond sats to the other parties payout address in case it is a taken trade
async fn punish_trader(
coordinator: &Coordinator,
robohash: Vec<u8>,
bond: MonitoringBond,
) -> Result<()> {
async fn punish_trader(coordinator: &Coordinator, bond: &MonitoringBond) -> Result<()> {
// publish bond
coordinator
.coordinator_wallet
@ -44,31 +41,26 @@ pub async fn monitor_bonds(coordinator: Arc<Coordinator>) -> Result<()> {
loop {
// fetch all bonds
let bonds = coordinator_db.fetch_all_bonds().await?;
let validation_results = coordinator_wallet.validate_bonds(&bonds).await?;
debug!("Monitoring active bonds: {}", bonds.len());
// verify all bonds and initiate punishment if necessary
for bond in bonds {
if let Err(e) = coordinator_wallet
.validate_bond_tx_hex(&bond.1.bond_tx_hex, &bond.1.requirements)
.await
for (bond, error) in validation_results {
warn!("Bond validation failed: {:?}", error);
match env::var("PUNISHMENT_ENABLED")
.unwrap_or_else(|_| "0".to_string())
.as_str()
{
warn!("Bond validation failed: {:?}", e);
match env::var("PUNISHMENT_ENABLED")
.unwrap_or_else(|_| "0".to_string())
.as_str()
{
"1" => {
dbg!("Punishing trader for bond violation: {:?}", e);
punish_trader(&coordinator, bond.0, bond.1).await?;
}
"0" => {
dbg!("Punishment disabled, ignoring bond violation: {:?}", e);
continue;
}
_ => Err(anyhow!("Invalid PUNISHMENT_ENABLED env var"))?,
"1" => {
dbg!("Punishing trader for bond violation: {:?}", error);
punish_trader(&coordinator, &bond).await?;
}
"0" => {
dbg!("Punishment disabled, ignoring bond violation: {:?}", e);
continue;
}
_ => Err(anyhow!("Invalid PUNISHMENT_ENABLED env var"))?,
}
}
// sleep for a while
tokio::time::sleep(tokio::time::Duration::from_secs(15)).await;
}

View File

@ -404,8 +404,8 @@ impl CoordinatorDB {
// returns a hashmap of RoboHash, MonitoringBond for the monitoring loop
// in case this gets a bottleneck (db too large for heap) we can implement in place checking
pub async fn fetch_all_bonds(&self) -> Result<HashMap<Vec<u8>, MonitoringBond>> {
let mut bonds = HashMap::new();
pub async fn fetch_all_bonds(&self) -> Result<Vec<MonitoringBond>> {
let mut bonds = Vec::new();
let mut rows_orderbook = sqlx::query(
"SELECT offer_id, robohash, bond_address, bond_amount_sat, amount_sat, bond_tx_hex FROM active_maker_offers",
)
@ -422,11 +422,12 @@ impl CoordinatorDB {
let bond = MonitoringBond {
bond_tx_hex: row.get("bond_tx_hex"),
robot: robohash,
trade_id_hex: row.get("offer_id"),
requirements,
table: Table::Orderbook,
};
bonds.insert(robohash, bond);
bonds.push(bond);
}
let mut rows_taken = sqlx::query(
@ -453,11 +454,12 @@ impl CoordinatorDB {
let bond_maker = MonitoringBond {
bond_tx_hex: row.get("bond_tx_hex_maker"),
robot: robohash_maker,
trade_id_hex: trade_id_hex.clone(),
requirements: requirements_maker,
table: Table::ActiveTrades,
};
bonds.insert(robohash_maker, bond_maker);
bonds.push(bond_maker);
let requirements_maker = BondRequirements {
bond_address: row.get("bond_address_taker"),
@ -465,13 +467,14 @@ impl CoordinatorDB {
min_input_sum_sat,
};
let bond_maker = MonitoringBond {
let bond_taker = MonitoringBond {
bond_tx_hex: row.get("bond_tx_hex_taker"),
trade_id_hex,
robot: robohash_taker,
requirements: requirements_maker,
table: Table::ActiveTrades,
};
bonds.insert(robohash_taker, bond_maker);
bonds.push(bond_taker);
}
Ok(bonds)
}

View File

@ -10,10 +10,10 @@ use coordinator::monitoring::monitor_bonds;
use coordinator::monitoring::*;
use database::CoordinatorDB;
use dotenv::dotenv;
use log::{debug, error, info, trace, warn};
use log::{debug, error, info, warn};
use std::time::{SystemTime, UNIX_EPOCH};
use std::{env, sync::Arc};
use tokio::sync::Mutex;
use tokio::{sync::Mutex, task::spawn_blocking};
use wallet::*;
pub struct Coordinator {
@ -37,7 +37,7 @@ async fn main() -> Result<()> {
});
// begin monitoring bonds
tokio::spawn(monitor_bonds(Arc::clone(&coordinator)));
// spawn_blocking(monitor_bonds(Arc::clone(&coordinator)));
// Start the API server
api_server(coordinator).await?;

View File

@ -5,25 +5,26 @@ use super::*;
use anyhow::Context;
use bdk::{
bitcoin::{self, bip32::ExtendedPrivKey, consensus::encode::deserialize, Transaction},
blockchain::{Blockchain, ElectrumBlockchain},
electrum_client::client::Client,
bitcoincore_rpc::{Client, RawTx, RpcApi},
blockchain::{rpc::Auth, Blockchain, ConfigurableBlockchain, RpcBlockchain, RpcConfig},
sled::{self, Tree},
template::Bip86,
wallet::verify::*,
KeychainKind, SyncOptions, Wallet,
};
use std::fmt;
use std::str::FromStr;
use std::{fmt, ops::Deref};
use utils::*;
// use verify_tx::*;
#[derive(Clone)]
pub struct CoordinatorWallet<D: bdk::database::BatchDatabase> {
pub wallet: Arc<Mutex<Wallet<D>>>,
pub backend: Arc<ElectrumBlockchain>,
pub backend: Arc<RpcBlockchain>,
pub json_rpc_client: Arc<bdk::bitcoincore_rpc::Client>,
}
#[derive(PartialEq, Debug)]
#[derive(PartialEq, Debug, Clone)]
pub struct BondRequirements {
pub bond_address: String,
pub locking_amount_sat: u64,
@ -34,10 +35,17 @@ pub fn init_coordinator_wallet() -> Result<CoordinatorWallet<sled::Tree>> {
let wallet_xprv = ExtendedPrivKey::from_str(
&env::var("WALLET_XPRV").context("loading WALLET_XPRV from .env failed")?,
)?;
let backend = ElectrumBlockchain::from(Client::new(
&env::var("ELECTRUM_BACKEND")
.context("Parsing ELECTRUM_BACKEND from .env failed, is it set?")?,
)?);
let rpc_config = RpcConfig {
url: env::var("BITCOIN_RPC_ADDRESS_PORT")?.to_string(),
auth: Auth::Cookie {
file: env::var("BITCOIN_RPC_COOKIE_FILE_PATH")?.into(),
},
network: bdk::bitcoin::Network::Testnet,
wallet_name: env::var("BITCOIN_RPC_WALLET_NAME")?,
sync_params: None,
};
let json_rpc_client = Client::new(&rpc_config.url, rpc_config.auth.clone().into())?;
let backend = RpcBlockchain::from_config(&rpc_config)?;
// let backend = EsploraBlockchain::new(&env::var("ESPLORA_BACKEND")?, 1000);
let sled_db = sled::open(env::var("BDK_DB_PATH")?)?.open_tree("default_wallet")?;
let wallet = Wallet::new(
@ -54,6 +62,7 @@ pub fn init_coordinator_wallet() -> Result<CoordinatorWallet<sled::Tree>> {
Ok(CoordinatorWallet {
wallet: Arc::new(Mutex::new(wallet)),
backend: Arc::new(backend),
json_rpc_client: Arc::new(json_rpc_client),
})
}
@ -68,57 +77,98 @@ impl<D: bdk::database::BatchDatabase> CoordinatorWallet<D> {
// also check if inputs are confirmed already
// bdk::blockchain::compact_filters::Mempool::iter_txs() -> Vec(Tx) to check if contained in mempool
// blockchain::get_tx to get input
pub async fn validate_bond_tx_hex(
pub async fn validate_bonds(
&self,
bond: &str,
requirements: &BondRequirements,
) -> Result<()> {
let input_sum: u64;
bonds: &Vec<MonitoringBond>,
) -> Result<Vec<(MonitoringBond, anyhow::Error)>> {
let mut invalid_bonds: Vec<(MonitoringBond, anyhow::Error)> = Vec::new();
let blockchain = &*self.backend;
let tx: Transaction = deserialize(&hex::decode(bond)?)?;
{
debug!("Called validate_bond_tx_hex()");
let wallet = self.wallet.lock().await;
if let Err(e) = wallet.sync(blockchain, SyncOptions::default()) {
error!("Error syncing wallet: {:?}", e);
return Ok(()); // if the electrum server goes down all bonds will be considered valid. Maybe redundancy should be added.
};
// we need to test this with signed and invalid/unsigned transactions
// checks signatures and inputs
if let Err(e) = verify_tx(&tx, &*wallet.database(), blockchain) {
return Err(anyhow!(e));
}
// check if the tx has the correct input amounts (have to be >= trading amount)
input_sum = match tx.input_sum(blockchain, &*wallet.database()) {
Ok(amount) => {
if amount < requirements.min_input_sum_sat {
return Err(anyhow!("Bond input sum too small"));
{
let wallet = self.wallet.lock().await;
for bond in bonds {
let input_sum: u64;
let tx: Transaction = deserialize(&hex::decode(&bond.bond_tx_hex)?)?;
debug!("Validating bond in validate_bonds()");
// we need to test this with signed and invalid/unsigned transactions
// checks signatures and inputs
if let Err(e) = verify_tx(&tx, &*wallet.database(), blockchain) {
invalid_bonds.push((bond.clone(), anyhow!(e)));
continue;
}
// check if the tx has the correct input amounts (have to be >= trading amount)
input_sum = match tx.input_sum(blockchain, &*wallet.database()) {
Ok(amount) => {
if amount < bond.requirements.min_input_sum_sat {
invalid_bonds.push((
bond.clone(),
anyhow!("Bond input sum too small: {}", amount),
));
continue;
}
amount
}
amount
Err(e) => {
return Err(anyhow!(e));
}
};
// check if bond output to us is big enough
match tx.bond_output_sum(&bond.requirements.bond_address) {
Ok(amount) => {
if amount < bond.requirements.locking_amount_sat {
invalid_bonds.push((
bond.clone(),
anyhow!("Bond output sum too small: {}", amount),
));
continue;
}
amount
}
Err(e) => {
return Err(anyhow!(e));
}
};
if ((input_sum - tx.all_output_sum()) / tx.vsize() as u64) < 200 {
invalid_bonds.push((
bond.clone(),
anyhow!(
"Bond fee rate too low: {}",
(input_sum - tx.all_output_sum()) / tx.vsize() as u64
),
));
continue;
}
Err(e) => {
return Err(anyhow!(e));
}
};
}
// check if bond output to us is big enough
match tx.bond_output_sum(&requirements.bond_address) {
Ok(amount) => {
if amount < requirements.locking_amount_sat {
return Err(anyhow!("Bond output sum too small"));
}
amount
}
Err(e) => {
return Err(anyhow!(e));
}
};
if ((input_sum - tx.all_output_sum()) / tx.vsize() as u64) < 200 {
return Err(anyhow!("Bond fee rate too low"));
}
debug!("validate_bond_tx_hex(): Bond validation successful.");
Ok(())
let raw_bonds: Vec<String> = bonds
.iter()
.map(|bond| bond.bond_tx_hex.clone().raw_hex()) // Assuming `raw_hex()` returns a String or &str
.collect();
let test_mempool_accept_res = self
.json_rpc_client
.deref()
.test_mempool_accept(&raw_bonds)?;
for res in test_mempool_accept_res {
if !res.allowed {
let invalid_bond =
Self::search_monitoring_bond_by_txid(&bonds, &res.txid.to_string())?;
invalid_bonds.push((
invalid_bond,
anyhow!(
"Bond not accepted by testmempoolaccept: {:?}",
res.reject_reason
.unwrap_or("rejected by testmempoolaccept".to_string())
),
));
}
}
debug!("validate_bond_tx_hex(): Bond validation done.");
Ok(invalid_bonds)
}
pub fn publish_bond_tx_hex(&self, bond: &str) -> Result<()> {
@ -129,6 +179,19 @@ impl<D: bdk::database::BatchDatabase> CoordinatorWallet<D> {
blockchain.broadcast(&tx)?;
Ok(())
}
fn search_monitoring_bond_by_txid(
monitoring_bonds: &Vec<MonitoringBond>,
txid: &str,
) -> Result<MonitoringBond> {
for bond in monitoring_bonds {
let bond_tx: Transaction = deserialize(&hex::decode(&bond.bond_tx_hex)?)?;
if bond_tx.txid().to_string() == txid {
return Ok(bond.clone());
}
}
Err(anyhow!("Bond not found in monitoring bonds"))
}
}
impl fmt::Debug for CoordinatorWallet<Tree> {
@ -147,10 +210,20 @@ mod tests {
use super::*;
use bdk::bitcoin::Network;
use bdk::database::MemoryDatabase;
use bdk::{blockchain::ElectrumBlockchain, Wallet};
use bdk::{blockchain::RpcBlockchain, Wallet};
async fn new_test_wallet(wallet_xprv: &str) -> CoordinatorWallet<MemoryDatabase> {
let backend = ElectrumBlockchain::from(Client::new("ssl://mempool.space:40002").unwrap());
let rpc_config = RpcConfig {
url: env::var("BITCOIN_RPC_ADDRESS_PORT")?.to_string(),
auth: Auth::Cookie {
file: env::var("BITCOIN_RPC_COOKIE_FILE_PATH")?.into(),
},
network: bdk::bitcoin::Network::Testnet,
wallet_name: env::var("BITCOIN_RPC_WALLET_NAME")?,
sync_params: None,
};
let json_rpc_client = Client::new(&rpc_config.url, rpc_config.auth.clone().into())?;
let backend = RpcBlockchain::from_config(&rpc_config)?;
let wallet_xprv = ExtendedPrivKey::from_str(wallet_xprv).unwrap();
let wallet = Wallet::new(
@ -165,6 +238,7 @@ mod tests {
CoordinatorWallet::<MemoryDatabase> {
wallet: Arc::new(Mutex::new(wallet)),
backend: Arc::new(backend),
json_rpc_client: Arc::new(json_rpc_client),
}
}