fix wallet tests, add shutdown function to mempool handler

This commit is contained in:
f321x
2024-08-02 12:31:36 +02:00
parent f45703aa63
commit ab193da25a
9 changed files with 161 additions and 48 deletions

View File

@ -8,8 +8,10 @@ use bdk::bitcoin::{OutPoint, Transaction};
use bdk::bitcoin::{TxIn, Txid}; use bdk::bitcoin::{TxIn, Txid};
use bdk::bitcoincore_rpc::{Client, RpcApi}; use bdk::bitcoincore_rpc::{Client, RpcApi};
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::net::Shutdown;
use std::ops::Deref; use std::ops::Deref;
use std::sync::RwLock; use std::sync::RwLock;
use tokio::sync::oneshot;
struct Mempool { struct Mempool {
transactions: Arc<RwLock<HashMap<Txid, Vec<TxIn>>>>, transactions: Arc<RwLock<HashMap<Txid, Vec<TxIn>>>>,
@ -27,8 +29,12 @@ impl Mempool {
} }
} }
fn run_mempool(mempool: Arc<Mempool>) { fn run_mempool(mempool: Arc<Mempool>, mut shutdown_receiver: oneshot::Receiver<()>) {
loop { loop {
if shutdown_receiver.try_recv().is_ok() {
debug!("Shutting down mempool monitoring");
break;
}
// sleep for a while // sleep for a while
std::thread::sleep(std::time::Duration::from_secs(15)); std::thread::sleep(std::time::Duration::from_secs(15));
trace!("Fetching mempool"); trace!("Fetching mempool");
@ -85,14 +91,23 @@ fn run_mempool(mempool: Arc<Mempool>) {
pub struct MempoolHandler { pub struct MempoolHandler {
mempool: Arc<Mempool>, mempool: Arc<Mempool>,
shutdown_sender: Mutex<Option<oneshot::Sender<()>>>,
handle: Mutex<Option<tokio::task::JoinHandle<()>>>,
} }
impl MempoolHandler { impl MempoolHandler {
pub async fn new(json_rpc_client: Arc<Client>) -> Self { pub async fn new(json_rpc_client: Arc<Client>) -> Self {
let mempool = Arc::new(Mempool::new(json_rpc_client)); let mempool = Arc::new(Mempool::new(json_rpc_client));
let mempool_clone = Arc::clone(&mempool); let mempool_clone = Arc::clone(&mempool);
tokio::task::spawn_blocking(move || run_mempool(mempool_clone)); let (shutdown_sender, shutdown_receiver) = oneshot::channel();
Self { mempool }
let handle =
tokio::task::spawn_blocking(move || run_mempool(mempool_clone, shutdown_receiver));
Self {
mempool,
shutdown_sender: Mutex::new(Some(shutdown_sender)),
handle: Mutex::new(Some(handle)),
}
} }
pub async fn lookup_mempool_inputs( pub async fn lookup_mempool_inputs(
@ -118,4 +133,15 @@ impl MempoolHandler {
} }
Ok(bonds_to_punish) Ok(bonds_to_punish)
} }
pub async fn shutdown(&self) {
if let Some(sender) = self.shutdown_sender.lock().await.take() {
let _ = sender.send(()); // Ignore the result, as the receiver might have been dropped
}
if let Some(handle) = self.handle.lock().await.take() {
if let Err(e) = handle.await {
error!("Error shutting down mempool handler: {:?}", e);
}
}
}
} }

View File

@ -169,6 +169,7 @@ pub async fn get_offer_status_maker(
escrow_amount_maker_sat, escrow_amount_maker_sat,
escrow_amount_taker_sat, escrow_amount_taker_sat,
escrow_fee_sat_per_participant, escrow_fee_sat_per_participant,
escrow_psbt_hex,
.. ..
} = match database } = match database
.fetch_escrow_output_information(&payload.offer_id_hex) .fetch_escrow_output_information(&payload.offer_id_hex)
@ -183,6 +184,7 @@ pub async fn get_offer_status_maker(
} }
}; };
Ok(OfferTakenResponse { Ok(OfferTakenResponse {
escrow_psbt_hex,
escrow_output_descriptor, escrow_output_descriptor,
escrow_tx_fee_address, escrow_tx_fee_address,
escrow_amount_maker_sat, escrow_amount_maker_sat,

View File

@ -200,6 +200,9 @@ async fn test_move_offer_to_active() -> Result<()> {
// Create a sample BondSubmissionRequest // Create a sample BondSubmissionRequest
let bond_submission_request = BondSubmissionRequest { let bond_submission_request = BondSubmissionRequest {
bdk_psbt_inputs_hex_csv: "l33t,DEADBEEF".to_string(),
client_change_address: "tb1p5yh969z6fgatg0mvcyvggd08fujnat8890vcdud277q06rr9xgmqwfdkcx"
.to_string(),
robohash_hex: robohash_hex.to_string(), robohash_hex: robohash_hex.to_string(),
signed_bond_hex: "signedBondHex".to_string(), signed_bond_hex: "signedBondHex".to_string(),
payout_address: "1PayoutAddress".to_string(), payout_address: "1PayoutAddress".to_string(),

View File

@ -135,8 +135,7 @@ impl CoordinatorDB {
musig_pubkey_compressed_hex_maker TEXT NOT NULL, musig_pubkey_compressed_hex_maker TEXT NOT NULL,
musig_pub_nonce_hex_taker TEXT NOT NULL, musig_pub_nonce_hex_taker TEXT NOT NULL,
musig_pubkey_compressed_hex_taker TEXT NOT NULL, musig_pubkey_compressed_hex_taker TEXT NOT NULL,
escrow_psbt_hex_maker TEXT, escrow_psbt_hex TEXT,
escrow_psbt_hex_taker TEXT,
escrow_psbt_txid TEXT, escrow_psbt_txid TEXT,
escrow_psbt_is_confirmed INTEGER, escrow_psbt_is_confirmed INTEGER,
maker_happy INTEGER, maker_happy INTEGER,
@ -376,7 +375,7 @@ impl CoordinatorDB {
"INSERT OR REPLACE INTO taken_offers (offer_id, robohash_maker, robohash_taker, is_buy_order, amount_sat, "INSERT OR REPLACE INTO taken_offers (offer_id, robohash_maker, robohash_taker, is_buy_order, amount_sat,
bond_ratio, offer_duration_ts, bond_address_maker, bond_address_taker, bond_amount_sat, bond_tx_hex_maker, bond_ratio, offer_duration_ts, bond_address_maker, bond_address_taker, bond_amount_sat, bond_tx_hex_maker,
bond_tx_hex_taker, payout_address_maker, payout_address_taker, taproot_pubkey_hex_maker, taproot_pubkey_hex_taker, musig_pub_nonce_hex_maker, musig_pubkey_hex_maker, bond_tx_hex_taker, payout_address_maker, payout_address_taker, taproot_pubkey_hex_maker, taproot_pubkey_hex_taker, musig_pub_nonce_hex_maker, musig_pubkey_hex_maker,
musig_pub_nonce_hex_taker, musig_pubkey_hex_taker, escrow_output_descriptor, escrow_tx_fee_address, escrow_psbt_is_confirmed, escrow_ongoing, musig_pub_nonce_hex_taker, musig_pubkey_hex_taker, escrow_psbt_hex, escrow_output_descriptor, escrow_tx_fee_address, escrow_psbt_is_confirmed, escrow_ongoing,
escrow_taproot_pk_coordinator, escrow_amount_maker_sat, escrow_amount_taker_sat, escrow_fee_per_participant) escrow_taproot_pk_coordinator, escrow_amount_maker_sat, escrow_amount_taker_sat, escrow_fee_per_participant)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
) )
@ -401,6 +400,7 @@ impl CoordinatorDB {
.bind(trade_and_taker_info.trade_data.musig_pub_nonce_hex.clone()) .bind(trade_and_taker_info.trade_data.musig_pub_nonce_hex.clone())
.bind(trade_and_taker_info.trade_data.musig_pubkey_hex.clone()) .bind(trade_and_taker_info.trade_data.musig_pubkey_hex.clone())
.bind(&escrow_tx_data.escrow_output_descriptor) .bind(&escrow_tx_data.escrow_output_descriptor)
.bind(&escrow_tx_data.escrow_psbt_hex)
.bind(&escrow_tx_data.escrow_tx_fee_address) .bind(&escrow_tx_data.escrow_tx_fee_address)
.bind(0) .bind(0)
.bind(0) .bind(0)
@ -438,8 +438,10 @@ impl CoordinatorDB {
offer.try_get::<i64, _>("escrow_fee_per_participant")? as u64; offer.try_get::<i64, _>("escrow_fee_per_participant")? as u64;
let coordinator_xonly_escrow_pk = let coordinator_xonly_escrow_pk =
offer.try_get::<String, _>("escrow_taproot_pk_coordinator")?; offer.try_get::<String, _>("escrow_taproot_pk_coordinator")?;
let escrow_psbt_hex = offer.try_get::<String, _>("escrow_psbt_hex")?;
Ok(Some(EscrowPsbt { Ok(Some(EscrowPsbt {
escrow_psbt_hex,
escrow_output_descriptor, escrow_output_descriptor,
escrow_tx_fee_address, escrow_tx_fee_address,
coordinator_xonly_escrow_pk, coordinator_xonly_escrow_pk,

View File

@ -110,6 +110,11 @@ pub async fn init_coordinator_wallet() -> Result<CoordinatorWallet<sled::Tree>>
} }
impl<D: bdk::database::BatchDatabase> CoordinatorWallet<D> { impl<D: bdk::database::BatchDatabase> CoordinatorWallet<D> {
pub async fn shutdown(&self) {
debug!("Shutting down wallet");
self.mempool.shutdown().await;
}
pub async fn get_new_address(&self) -> Result<String> { pub async fn get_new_address(&self) -> Result<String> {
let wallet = self.wallet.lock().await; let wallet = self.wallet.lock().await;
let address = wallet.get_address(bdk::wallet::AddressIndex::New)?; let address = wallet.get_address(bdk::wallet::AddressIndex::New)?;
@ -313,10 +318,10 @@ impl<D: bdk::database::BatchDatabase> CoordinatorWallet<D> {
.fee_absolute(tx_fee_abs); .fee_absolute(tx_fee_abs);
for input in maker_psbt_input_data.escrow_input_utxos.iter() { for input in maker_psbt_input_data.escrow_input_utxos.iter() {
// satisfaction weight 66 bytes for schnorr sig + opcode + sighash for keyspend. This is a hack? // satisfaction weight 66 bytes for schnorr sig + opcode + sighash for keyspend. This is a hack?
builder.add_foreign_utxo(input.utxo, input.psbt_input.clone(), 264); builder.add_foreign_utxo(input.utxo, input.psbt_input.clone(), 264)?;
} }
for input in taker_psbt_input_data.escrow_input_utxos.iter() { for input in taker_psbt_input_data.escrow_input_utxos.iter() {
builder.add_foreign_utxo(input.utxo, input.psbt_input.clone(), 264); builder.add_foreign_utxo(input.utxo, input.psbt_input.clone(), 264)?;
} }
builder.finish()? builder.finish()?
}; };

View File

@ -1,11 +1,14 @@
use std::collections::btree_map::Range;
use std::time::Duration; use std::time::Duration;
use super::*; use super::*;
use bdk::bitcoin::Network; use bdk::{
use bdk::database::MemoryDatabase; bitcoin::{psbt::Input, Network},
use bdk::keys::GeneratableKey; blockchain::RpcBlockchain,
use bdk::{blockchain::RpcBlockchain, Wallet}; database::MemoryDatabase,
wallet::AddressIndex,
Wallet,
};
async fn new_test_wallet(wallet_xprv: &str) -> CoordinatorWallet<MemoryDatabase> { async fn new_test_wallet(wallet_xprv: &str) -> CoordinatorWallet<MemoryDatabase> {
dotenv().ok(); dotenv().ok();
let wallet_xprv = ExtendedPrivKey::from_str(wallet_xprv).unwrap(); let wallet_xprv = ExtendedPrivKey::from_str(wallet_xprv).unwrap();
@ -39,7 +42,7 @@ async fn new_test_wallet(wallet_xprv: &str) -> CoordinatorWallet<MemoryDatabase>
) )
.unwrap(); .unwrap();
wallet.sync(&backend, SyncOptions::default()).unwrap(); wallet.sync(&backend, SyncOptions::default()).unwrap();
tokio::time::sleep(Duration::from_secs(16)).await; // fetch the mempool tokio::time::sleep(Duration::from_secs(10)).await; // fetch the mempool
CoordinatorWallet::<MemoryDatabase> { CoordinatorWallet::<MemoryDatabase> {
wallet: Arc::new(Mutex::new(wallet)), wallet: Arc::new(Mutex::new(wallet)),
backend: Arc::new(backend), backend: Arc::new(backend),
@ -49,6 +52,65 @@ async fn new_test_wallet(wallet_xprv: &str) -> CoordinatorWallet<MemoryDatabase>
} }
} }
async fn get_escrow_psbt_inputs(
coordinator_wallet: &CoordinatorWallet<MemoryDatabase>,
mut amount_sat: i64,
) -> Result<Vec<PsbtInput>> {
let wallet = coordinator_wallet.wallet.lock().await;
let mut inputs: Vec<PsbtInput> = Vec::new();
wallet.sync(&coordinator_wallet.backend, SyncOptions::default())?;
let available_utxos = wallet.list_unspent()?;
// could use more advanced coin selection if neccessary
for utxo in available_utxos {
let psbt_input: Input = wallet.get_psbt_input(utxo.clone(), None, false)?;
let input = PsbtInput {
psbt_input,
utxo: utxo.outpoint,
};
inputs.push(input);
amount_sat -= utxo.txout.value as i64;
if amount_sat <= 0 {
break;
}
}
Ok(inputs)
}
async fn get_dummy_escrow_psbt_data(
maker_wallet: &CoordinatorWallet<MemoryDatabase>,
taker_wallet: &CoordinatorWallet<MemoryDatabase>,
) -> (EscrowPsbtConstructionData, EscrowPsbtConstructionData) {
let maker_inputs = get_escrow_psbt_inputs(maker_wallet, 50000).await.unwrap();
let taker_inputs = get_escrow_psbt_inputs(taker_wallet, 50000).await.unwrap();
let maker_escrow_data = EscrowPsbtConstructionData {
taproot_xonly_pubkey_hex:
"b709f64da734e04e35b129a65a7fae361cad8a9458d1abc4f0b45b7661a42fca".to_string(),
musig_pubkey_compressed_hex:
"02d8e204cdaebec4c5a637311072c865858dc4f142b3848b8e6dde4143476535b5".to_string(),
change_address: Address::from_str(
"bcrt1pmcgt8wjuxlkp2pqykatt4n6w0jw45vzgsa8em3rx9gacqwzyttjqmg0ufp",
)
.expect("Invalid address")
.assume_checked(),
escrow_input_utxos: maker_inputs,
};
let taker_escrow_data = EscrowPsbtConstructionData {
taproot_xonly_pubkey_hex:
"4987f3de20a9b1fa6f76c6758934953a8d615e415f1a656f0f6563694b53107d".to_string(),
musig_pubkey_compressed_hex:
"02d8e204cdaebec4c5a637311072c865858dc4f142b3848b8e6dde4143476535b5".to_string(),
change_address: Address::from_str(
"bcrt1p28lv60c0t64taw5pp6k5fwwd4z66t99lny9d8mmpsysm5xanzd3smyz320",
)
.expect("Invalid address")
.assume_checked(),
escrow_input_utxos: taker_inputs,
};
(maker_escrow_data, taker_escrow_data)
}
// the transactions are testnet4 transactions, so run a testnet4 rpc node as backend // the transactions are testnet4 transactions, so run a testnet4 rpc node as backend
#[tokio::test] #[tokio::test]
async fn test_transaction_without_signature() { async fn test_transaction_without_signature() {
@ -167,35 +229,43 @@ async fn test_invalid_bond_tx_low_fee_rate() {
.contains("Bond fee rate too low")); .contains("Bond fee rate too low"));
} }
#[test] #[tokio::test]
fn test_build_escrow_transaction_output_descriptor() { async fn test_build_escrow_transaction_output_descriptor() {
// generating pubkeys // generating pubkeys
// let seed: [u8; 32] = [ // let seed: [u8; 32] = [
// 0x1b, 0x2d, 0x3d, 0x4d, 0x5d, 0x6d, 0x7d, 0x8d, 0x9d, 0xad, 0xbd, 0xcd, 0xdd, 0xed, 0xfd, // 0x1b, 0x2d, 0x3d, 0x4d, 0x5d, 0x6d, 0x7d, 0x8d, 0x9d, 0xad, 0xbd, 0xcd, 0xdd, 0xed, 0xfd,
// 0x0d, 0x1d, 0x2d, 0x3d, 0x4d, 0x5d, 0x6d, 0x8d, 0x8d, 0x9d, 0xbd, 0xbd, 0xcd, 0xdd, 0xed, // 0x0d, 0x1d, 0x2d, 0x3d, 0x4d, 0x5d, 0x6d, 0x8d, 0x8d, 0x9d, 0xbd, 0xbd, 0xcd, 0xdd, 0xed,
// 0xfd, 0x0d, // 0xfd, 0x1d,
// ]; // ];
// let xprv = ExtendedPrivKey::new_master(Network::Testnet, &seed).unwrap(); // let xprv = ExtendedPrivKey::new_master(Network::Regtest, &seed).unwrap();
// println!("xprv: {}", xprv.to_string());
// let pubkey = xprv // let pubkey = xprv
// .to_keypair(&secp256k1::Secp256k1::new()) // .to_keypair(&secp256k1::Secp256k1::new())
// .public_key() // .public_key()
// .to_string(); // .to_string();
// dbg!(&pubkey); // dbg!(&pubkey);
let escrow_data = EscrowPsbtConstructionData { let maker_escrow_data: EscrowPsbtConstructionData;
taproot_xonly_pubkey_hex_maker: let taker_escrow_data: EscrowPsbtConstructionData;
"b709f64da734e04e35b129a65a7fae361cad8a9458d1abc4f0b45b7661a42fca".to_string(), {
taproot_xonly_pubkey_hex_taker: let maker_wallet = new_test_wallet("tprv8ZgxMBicQKsPdHuCSjhQuSZP1h6ZTeiRqREYS5guGPdtL7D1uNLpnJmb2oJep99Esq1NbNZKVJBNnD2ZhuXSK7G5eFmmcx73gsoa65e2U32").await;
"4987f3de20a9b1fa6f76c6758934953a8d615e415f1a656f0f6563694b53107d".to_string(), let taker_wallet = new_test_wallet("tprv8ZgxMBicQKsPdKxWZWv9zVc22ubUdFrgaUzA4BZQUpEyMxYX3dwFbNfAGsVJ94zEhUUS1z56YBARpvTEjrSz9NzHyySCL33oMXpbqoGunL4").await;
musig_pubkey_compressed_hex_maker:
"02d8e204cdaebec4c5a637311072c865858dc4f142b3848b8e6dde4143476535b5".to_string(), (maker_escrow_data, taker_escrow_data) =
musig_pubkey_compressed_hex_taker: get_dummy_escrow_psbt_data(&maker_wallet, &taker_wallet).await;
"02d8e204cdaebec4c5a637311072c865858dc4f142b3848b8e6dde4143476535b5".to_string(), maker_wallet.shutdown().await;
}; taker_wallet.shutdown().await;
}
println!("created dummmy psbt data");
let coordinator_pk = XOnlyPublicKey::from_str( let coordinator_pk = XOnlyPublicKey::from_str(
"d8e204cdaebec4c5a637311072c865858dc4f142b3848b8e6dde4143476535b5", "d8e204cdaebec4c5a637311072c865858dc4f142b3848b8e6dde4143476535b5",
) )
.unwrap(); .unwrap();
let result = build_escrow_transaction_output_descriptor(&escrow_data, &coordinator_pk); println!("assembling output descriptor");
let result = build_escrow_transaction_output_descriptor(
&maker_escrow_data,
&taker_escrow_data,
&coordinator_pk,
);
dbg!(&result); // cargo test -- --nocapture to see the output dbg!(&result); // cargo test -- --nocapture to see the output
assert!(result.is_ok()); assert!(result.is_ok());
} }

View File

@ -4,9 +4,9 @@ use bdk::{
blockchain::GetTx, blockchain::GetTx,
database::Database, database::Database,
}; };
use serde::Deserialize; use serde::{Deserialize, Serialize};
#[derive(Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct PsbtInput { pub struct PsbtInput {
pub psbt_input: Input, pub psbt_input: Input,
pub utxo: bdk::bitcoin::OutPoint, pub utxo: bdk::bitcoin::OutPoint,

View File

@ -22,7 +22,7 @@ use bdk::{
database::MemoryDatabase, database::MemoryDatabase,
wallet::AddressInfo, wallet::AddressInfo,
}; };
use std::{thread, time::Duration}; use std::{str::FromStr, thread, time::Duration};
pub fn run_maker(maker_config: &TraderSettings) -> Result<()> { pub fn run_maker(maker_config: &TraderSettings) -> Result<()> {
let wallet = TradingWallet::load_wallet(maker_config)?; // initialize the wallet with xprv let wallet = TradingWallet::load_wallet(maker_config)?; // initialize the wallet with xprv
@ -31,9 +31,11 @@ pub fn run_maker(maker_config: &TraderSettings) -> Result<()> {
info!("Maker offer created: {:#?}", &offer); info!("Maker offer created: {:#?}", &offer);
let escrow_psbt_requirements = offer.wait_until_taken(maker_config)?; let escrow_psbt_requirements = offer.wait_until_taken(maker_config)?;
let escrow_psbt = wallet let mut escrow_psbt =
.validate_maker_psbt(&escrow_contract_psbt)? PartiallySignedTransaction::from_str(escrow_psbt_requirements.escrow_psbt_hex.as_str())?;
.sign_escrow_psbt(&mut escrow_contract_psbt)?; let signed_escrow_psbt = wallet
.validate_maker_psbt(&escrow_psbt)?
.sign_escrow_psbt(&mut escrow_psbt)?;
// submit signed escrow psbt back to coordinator // submit signed escrow psbt back to coordinator
PsbtSubmissionRequest::submit_escrow_psbt( PsbtSubmissionRequest::submit_escrow_psbt(

View File

@ -145,7 +145,7 @@ impl TradingWallet {
// could use more advanced coin selection if neccessary // could use more advanced coin selection if neccessary
for utxo in available_utxos { for utxo in available_utxos {
let psbt_input: Input = self.wallet.get_psbt_input(utxo, None, false)?; let psbt_input: Input = self.wallet.get_psbt_input(utxo.clone(), None, false)?;
let input = PsbtInput { let input = PsbtInput {
psbt_input, psbt_input,
utxo: utxo.outpoint, utxo: utxo.outpoint,
@ -175,19 +175,22 @@ impl TradingWallet {
// Ok(self) // Ok(self)
// } // }
// pub fn sign_escrow_psbt(&self, escrow_psbt: &mut PartiallySignedTransaction) -> Result<&Self> { pub fn sign_escrow_psbt(&self, escrow_psbt: &mut PartiallySignedTransaction) -> Result<&Self> {
// let finalized = self.wallet.sign(escrow_psbt, SignOptions::default())?; // do not finalize as the psbt will be finalized by the coordinator
// if !finalized { let sign_options = SignOptions {
// return Err(anyhow!("Signing of taker escrow psbt failed!")); try_finalize: false,
// } ..SignOptions::default()
// Ok(self) };
// } let _ = self.wallet.sign(escrow_psbt, sign_options)?;
Ok(self)
}
// validate amounts, escrow output // validate amounts, escrow output
// pub fn validate_maker_psbt(&self, psbt: &PartiallySignedTransaction) -> Result<&Self> { pub fn validate_maker_psbt(&self, psbt: &PartiallySignedTransaction) -> Result<&Self> {
// error!("IMPLEMENT MAKER PSBT VALIDATION!"); warn!("IMPLEMENT MAKER PSBT VALIDATION for production use!");
// // tbd once the trade psbt is implemented on coordinator side // validate: change output address, amounts, fee
// tbd once the trade psbt is implemented on coordinator side
// Ok(self) Ok(self)
// } }
} }