add escrow tx confirmation monitoring

This commit is contained in:
f321x
2024-07-18 12:01:13 +02:00
parent b1a4863140
commit f658037721
7 changed files with 123 additions and 91 deletions

View File

@ -138,12 +138,14 @@ async fn submit_taker_bond(
panic!("Trade contract PSBT not implemented!"); panic!("Trade contract PSBT not implemented!");
let trade_contract_psbt_taker = "".to_string(); // implement psbt let trade_contract_psbt_taker = "".to_string(); // implement psbt
let trade_contract_psbt_maker = "".to_string(); // implement psbt let trade_contract_psbt_maker = "".to_string(); // implement psbt
let escrow_tx_txid: String = "".to_string(); // implement txid of psbt
database database
.add_taker_info_and_move_table( .add_taker_info_and_move_table(
&payload, &payload,
&trade_contract_psbt_maker, &trade_contract_psbt_maker,
&trade_contract_psbt_taker, &trade_contract_psbt_taker,
escrow_tx_txid,
) )
.await?; .await?;
Ok(Json(OfferTakenResponse { Ok(Json(OfferTakenResponse {

View File

@ -1,84 +0,0 @@
use bdk::bitcoin::consensus::encode::deserialize;
use bdk::bitcoin::psbt::PartiallySignedTransaction;
use bdk::bitcoin::Transaction;
use bdk::database::MemoryDatabase;
use bdk::SignOptions;
use bdk::Wallet;
use crate::communication::api::BondSubmissionRequest;
use crate::communication::api::OrderActivatedResponse;
use anyhow::{anyhow, Result};
use hex;
pub fn verify_and_respond(
bond_submission: BondSubmissionRequest,
wallet: &Wallet<MemoryDatabase>,
) -> Result<OrderActivatedResponse> {
// Deserialize the signed bond hex
let tx: Transaction = deserialize(&hex::decode(&bond_submission.signed_bond_hex)?)?;
// Verify the transaction (this example assumes you've implemented your own verification logic)
let is_valid = verify_psbt(&tx, &wallet, &bond_submission)?;
if !is_valid {
return Err(anyhow!("Invalid PSBT"));
}
// Create the response (you may need additional logic to generate order_id_hex and timestamp)
// let response = OrderActivatedResponse {
// order_id_hex: generate_order_id(&tx)?, // Assuming you have a function to generate this
// bond_locked_until_timestamp: calculate_bond_lock_time()?, // Assuming you have a function for this
// };
Ok(response)
}
pub fn verify_psbt(
tx: &Transaction,
wallet: &Wallet<MemoryDatabase>,
bond_submission: &BondSubmissionRequest,
) -> Result<bool> {
// Example verification logic
// Check if the payout address matches
// let payout_address = bond_submission.payout_address.parse();
// let output = tx.output.iter().find(|output| outputvc
// .script_pubkey == payout_address.script_pubkey());
// if output.is_none() {
// return Ok(false);
// }
// Check if the transaction is signed correctly
let mut psbt = PartiallySignedTransaction::from_unsigned_tx(tx.clone())?;
let finalized = wallet.sign(&mut psbt, SignOptions::default())?;
if !finalized {
return Ok(false);
}
// Validate MuSig data (assuming you have methods for this)
let musig_data_valid = validate_musig_data(
&bond_submission.musig_pubkey_hex,
&bond_submission.musig_pub_nonce_hex,
)?;
if !musig_data_valid {
return Ok(false);
}
Ok(true)
}
fn generate_order_id(tx: &Transaction) -> Result<String> {
// Example logic to generate an order ID from the transaction
Ok(tx.txid().to_string())
}
fn calculate_bond_lock_time() -> Result<u128> {
// Example logic to calculate the bond lock time
// This might depend on the current block height or a specific timestamp
Ok(12345678901234567890) // Placeholder value
}
fn validate_musig_data(pubkey_hex: &str, nonce_hex: &str) -> Result<bool> {
// Example logic to validate MuSig data
// This might involve parsing the hex strings and ensuring they match expected values
Ok(true) // Placeholder validation
}

View File

@ -1,6 +1,7 @@
pub mod create_taproot; pub mod create_taproot;
// pub mod mempool_actor; // pub mod mempool_actor;
pub mod mempool_monitoring; pub mod mempool_monitoring;
pub mod monitoring; // commented out for testing pub mod monitoring;
pub mod tx_confirmation_monitoring; // commented out for testing
use super::*; use super::*;

View File

@ -0,0 +1,79 @@
use std::str::FromStr;
use bdk::{bitcoin::Txid, bitcoincore_rpc::RpcApi};
use super::*;
fn get_confirmations(
unconfirmed_txids: Vec<String>,
coordinator: Arc<Coordinator>,
) -> Result<Vec<String>> {
let mut now_confirmed_txs = Vec::new();
for txid in unconfirmed_txids {
let txid_struct = Txid::from_str(&txid)?;
let tx_info = coordinator
.coordinator_wallet
.json_rpc_client
.as_ref()
.get_raw_transaction_info(&txid_struct, None)?;
if let Some(confirmations) = tx_info.confirmations {
debug!(
"Transaction {} in now confirmed with {} confirmations",
&txid, confirmations
);
if confirmations > 3 {
now_confirmed_txs.push(txid);
}
}
}
Ok(now_confirmed_txs)
}
pub async fn update_transaction_confirmations(coordinator: Arc<Coordinator>) {
loop {
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
trace!("Checking for transaction confirmations");
let unconfirmed_transactions = match coordinator
.coordinator_db
.fetch_unconfirmed_bond_txids()
.await
{
Ok(txids) => txids,
Err(e) => {
error!("Error fetching unconfirmed bond txids from db: {:?}", e);
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
continue;
}
};
if unconfirmed_transactions.is_empty() {
continue;
}
let coordinator_clone = Arc::clone(&coordinator);
let newly_confirmed_txids = match tokio::task::spawn_blocking(move || {
get_confirmations(unconfirmed_transactions, coordinator_clone)
})
.await
{
Ok(result) => match result {
Ok(txids) => txids,
Err(e) => {
error!("Error getting confirmations: {:?}", e);
Vec::new() // or handle the error as appropriate
}
},
Err(e) => {
error!("Getting tx confirmations spawn_blocking panicked: {:?}", e);
Vec::new() // or handle the error as appropriate
}
};
if !newly_confirmed_txids.is_empty() {
if let Err(e) = coordinator
.coordinator_db
.confirm_bond_txids(newly_confirmed_txids)
.await
{
error!("Error updating bond confirmations in db: {:?}", e);
}
}
}
}

View File

@ -52,7 +52,7 @@ fn bool_to_sql_int(flag: bool) -> Option<i64> {
impl CoordinatorDB { impl CoordinatorDB {
// will either create a new db or load existing one. Will create according tables in new db // will either create a new db or load existing one. Will create according tables in new db
pub async fn init() -> Result<Self> { pub async fn init() -> Result<Self> {
dbg!(env::var("DATABASE_PATH")?); debug!("coordinator db path: {}", env::var("DATABASE_PATH")?);
let db_path = let db_path =
env::var("DATABASE_PATH").context("Parsing DATABASE_PATH from .env failed")?; env::var("DATABASE_PATH").context("Parsing DATABASE_PATH from .env failed")?;
@ -121,8 +121,10 @@ impl CoordinatorDB {
musig_pub_nonce_hex_taker TEXT NOT NULL, musig_pub_nonce_hex_taker TEXT NOT NULL,
musig_pubkey_hex_taker TEXT NOT NULL, musig_pubkey_hex_taker TEXT NOT NULL,
escrow_psbt_hex_maker TEXT, escrow_psbt_hex_maker TEXT,
escrow_psbt_hex_taker TEXT escrow_psbt_hex_taker TEXT,
)", escrow_psbt_txid TEXT,
escrow_psbt_is_confirmed INTEGER
)", // escrow_psbt_is_confirmed will be set 1 once the escrow psbt is confirmed onchain
) )
.execute(&db_pool) .execute(&db_pool)
.await?; .await?;
@ -335,6 +337,7 @@ impl CoordinatorDB {
trade_and_taker_info: &OfferPsbtRequest, trade_and_taker_info: &OfferPsbtRequest,
trade_contract_psbt_maker: &String, trade_contract_psbt_maker: &String,
trade_contract_psbt_taker: &String, trade_contract_psbt_taker: &String,
trade_tx_txid: String,
) -> Result<()> { ) -> Result<()> {
let public_offer = self let public_offer = self
.fetch_and_delete_offer_from_public_offers_table( .fetch_and_delete_offer_from_public_offers_table(
@ -345,9 +348,9 @@ impl CoordinatorDB {
sqlx::query( sqlx::query(
"INSERT OR REPLACE INTO taken_offers (offer_id, robohash_maker, robohash_taker, is_buy_order, amount_sat, "INSERT OR REPLACE INTO taken_offers (offer_id, robohash_maker, robohash_taker, is_buy_order, amount_sat,
bond_ratio, offer_duration_ts, bond_address_maker, bond_address_taker, bond_amount_sat, bond_tx_hex_maker, bond_ratio, offer_duration_ts, bond_address_maker, bond_address_taker, bond_amount_sat, bond_tx_hex_maker,
bond_tx_hex_taker, payout_address_maker, payout_address_taker, musig_pub_nonce_hex_maker, musig_pubkey_hex_maker bond_tx_hex_taker, payout_address_maker, payout_address_taker, musig_pub_nonce_hex_maker, musig_pubkey_hex_maker,
musig_pub_nonce_hex_taker, musig_pubkey_hex_taker, escrow_psbt_hex_maker, escrow_psbt_hex_taker) musig_pub_nonce_hex_taker, musig_pubkey_hex_taker, escrow_psbt_hex_maker, escrow_psbt_hex_taker, escrow_psbt_txid, escrow_psbt_is_confirmed)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
) )
.bind(public_offer.offer_id) .bind(public_offer.offer_id)
.bind(public_offer.robohash_maker) .bind(public_offer.robohash_maker)
@ -369,6 +372,8 @@ impl CoordinatorDB {
.bind(trade_and_taker_info.trade_data.musig_pubkey_hex.clone()) .bind(trade_and_taker_info.trade_data.musig_pubkey_hex.clone())
.bind(trade_contract_psbt_maker.clone()) .bind(trade_contract_psbt_maker.clone())
.bind(trade_contract_psbt_taker.clone()) .bind(trade_contract_psbt_taker.clone())
.bind(trade_tx_txid)
.bind(0)
.execute(&*self.db_pool) .execute(&*self.db_pool)
.await?; .await?;
@ -508,4 +513,30 @@ impl CoordinatorDB {
// .await?; // .await?;
Ok(()) Ok(())
} }
pub async fn fetch_unconfirmed_bond_txids(&self) -> Result<Vec<String>> {
let mut txids = Vec::new();
let mut rows = sqlx::query(
"SELECT escrow_psbt_txid FROM taken_offers WHERE escrow_psbt_is_confirmed = 0",
)
.fetch(&*self.db_pool);
while let Some(row) = rows.next().await {
let row = row?;
let txid: String = row.get("escrow_psbt_txid");
txids.push(txid);
}
Ok(txids)
}
pub async fn confirm_bond_txids(&self, confirmed_txids: Vec<String>) -> Result<()> {
for txid in confirmed_txids {
sqlx::query(
"UPDATE taken_offers SET escrow_psbt_is_confirmed = 1 WHERE escrow_psbt_txid = ?",
)
.bind(txid)
.execute(&*self.db_pool)
.await?;
}
Ok(())
}
} }

View File

@ -8,6 +8,7 @@ use bdk::sled;
use communication::{api::*, api_server}; use communication::{api::*, api_server};
use coordinator::monitoring::monitor_bonds; use coordinator::monitoring::monitor_bonds;
use coordinator::monitoring::*; use coordinator::monitoring::*;
use coordinator::tx_confirmation_monitoring::update_transaction_confirmations;
use database::CoordinatorDB; use database::CoordinatorDB;
use dotenv::dotenv; use dotenv::dotenv;
use log::{debug, error, info, trace, warn}; use log::{debug, error, info, trace, warn};
@ -47,6 +48,8 @@ async fn main() -> Result<()> {
} }
} }
}); });
let coordinator_ref = Arc::clone(&coordinator);
tokio::spawn(async move { update_transaction_confirmations(coordinator_ref).await });
// Start the API server // Start the API server
api_server(coordinator).await?; api_server(coordinator).await?;
Ok(()) Ok(())