refactor spawning of monitoring thread, working on spawn_blocking for sync bitcoinrpc call

This commit is contained in:
Felix
2024-07-12 10:54:43 +00:00
parent ec05e61323
commit 369961a2ea
3 changed files with 69 additions and 54 deletions

View File

@ -41,8 +41,10 @@ pub async fn monitor_bonds(coordinator: Arc<Coordinator>) -> Result<()> {
loop { loop {
// fetch all bonds // fetch all bonds
let bonds = coordinator_db.fetch_all_bonds().await?; let bonds = Arc::new(coordinator_db.fetch_all_bonds().await?);
let validation_results = coordinator_wallet.validate_bonds(&bonds).await?; let validation_results = coordinator_wallet
.validate_bonds(Arc::clone(&bonds))
.await?;
debug!("Monitoring active bonds: {}", bonds.len()); debug!("Monitoring active bonds: {}", bonds.len());
// verify all bonds and initiate punishment if necessary // verify all bonds and initiate punishment if necessary
for (bond, error) in validation_results { for (bond, error) in validation_results {

View File

@ -37,8 +37,16 @@ async fn main() -> Result<()> {
}); });
// begin monitoring bonds // begin monitoring bonds
// spawn_blocking(monitor_bonds(Arc::clone(&coordinator))); let coordinator_ref = Arc::clone(&coordinator);
tokio::spawn(async move {
loop {
if let Err(e) = monitor_bonds(coordinator_ref.clone()).await {
error!("Error in monitor_bonds: {:?}", e);
// Optionally add a delay before retrying
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}
});
// Start the API server // Start the API server
api_server(coordinator).await?; api_server(coordinator).await?;
Ok(()) Ok(())

View File

@ -86,7 +86,9 @@ impl<D: bdk::database::BatchDatabase> CoordinatorWallet<D> {
requirements: requirements.clone(), requirements: requirements.clone(),
table: Table::Memory, table: Table::Memory,
}; };
let invalid_bond = self.validate_bonds(&vec![dummy_monitoring_bond]).await?; let invalid_bond = self
.validate_bonds(Arc::new(vec![dummy_monitoring_bond]))
.await?;
if !invalid_bond.is_empty() { if !invalid_bond.is_empty() {
return Err(anyhow!(invalid_bond[0].1.to_string())); return Err(anyhow!(invalid_bond[0].1.to_string()));
} }
@ -99,14 +101,14 @@ impl<D: bdk::database::BatchDatabase> CoordinatorWallet<D> {
// blockchain::get_tx to get input // blockchain::get_tx to get input
pub async fn validate_bonds( pub async fn validate_bonds(
&self, &self,
bonds: &Vec<MonitoringBond>, bonds: Arc<Vec<MonitoringBond>>,
) -> Result<Vec<(MonitoringBond, anyhow::Error)>> { ) -> Result<Vec<(MonitoringBond, anyhow::Error)>> {
let mut invalid_bonds: Vec<(MonitoringBond, anyhow::Error)> = Vec::new(); let mut invalid_bonds: Vec<(MonitoringBond, anyhow::Error)> = Vec::new();
let blockchain = &*self.backend; let blockchain = &*self.backend;
{ {
let wallet = self.wallet.lock().await; let wallet = self.wallet.lock().await;
for bond in bonds { for bond in *bonds {
let input_sum: u64; let input_sum: u64;
let tx: Transaction = deserialize(&hex::decode(&bond.bond_tx_hex)?)?; let tx: Transaction = deserialize(&hex::decode(&bond.bond_tx_hex)?)?;
@ -162,30 +164,57 @@ impl<D: bdk::database::BatchDatabase> CoordinatorWallet<D> {
} }
} }
} }
self.test_mempool_accept_bonds(bonds, &mut invalid_bonds)?; // let invalid_bonds = Arc::new(invalid_bonds);
// let json_rpc_client = self.json_rpc_client.clone();
// let mempool_accept_future = tokio::task::spawn_blocking(move || {
// test_mempool_accept_bonds(json_rpc_client, bonds, &mut invalid_bonds)
// });
// mempool_accept_future.await??;
debug!("validate_bond_tx_hex(): Bond validation done."); debug!("validate_bond_tx_hex(): Bond validation done.");
Ok(invalid_bonds) Ok(invalid_bonds)
} }
fn test_mempool_accept_bonds( pub fn publish_bond_tx_hex(&self, bond: &str) -> Result<()> {
&self, warn!("publish_bond_tx_hex(): publishing cheating bond tx!");
bonds: &Vec<MonitoringBond>, let blockchain = &*self.backend;
let tx: Transaction = deserialize(&hex::decode(bond)?)?;
blockchain.broadcast(&tx)?;
Ok(())
}
}
fn search_monitoring_bond_by_txid(
// this should not happen often, so the inefficiency is acceptable
monitoring_bonds: &Vec<MonitoringBond>,
txid: &str,
) -> Result<MonitoringBond> {
for bond in monitoring_bonds {
let bond_tx: Transaction = deserialize(&hex::decode(&bond.bond_tx_hex)?)?;
if bond_tx.txid().to_string() == txid {
return Ok(bond.clone());
}
}
Err(anyhow!("Bond not found in monitoring bonds"))
}
fn test_mempool_accept_bonds(
json_rpc_client: Arc<Client>,
bonds: Arc<Vec<MonitoringBond>>,
invalid_bonds: &mut Vec<(MonitoringBond, anyhow::Error)>, invalid_bonds: &mut Vec<(MonitoringBond, anyhow::Error)>,
) -> Result<()> { ) -> Result<()> {
let raw_bonds: Vec<String> = bonds let raw_bonds: Vec<String> = bonds
.iter() .iter()
.map(|bond| bond.bond_tx_hex.clone().raw_hex()) // Assuming `raw_hex()` returns a String or &str .map(|bond| bond.bond_tx_hex.clone().raw_hex()) // Assuming `raw_hex()` returns a String or &str
.collect(); .collect();
let test_mempool_accept_res = self let test_mempool_accept_res = json_rpc_client.deref().test_mempool_accept(&raw_bonds)?;
.json_rpc_client
.deref()
.test_mempool_accept(&raw_bonds)?;
for res in test_mempool_accept_res { for res in test_mempool_accept_res {
if !res.allowed { if !res.allowed {
let invalid_bond: MonitoringBond = let invalid_bond: MonitoringBond =
Self::search_monitoring_bond_by_txid(&bonds, &res.txid.to_string())?; search_monitoring_bond_by_txid(&bonds, &res.txid.to_string())?;
invalid_bonds.push(( invalid_bonds.push((
invalid_bond, invalid_bond,
anyhow!( anyhow!(
@ -197,30 +226,6 @@ impl<D: bdk::database::BatchDatabase> CoordinatorWallet<D> {
}; };
} }
Ok(()) Ok(())
}
fn search_monitoring_bond_by_txid(
// this should not happen often, so the inefficiency is acceptable
monitoring_bonds: &Vec<MonitoringBond>,
txid: &str,
) -> Result<MonitoringBond> {
for bond in monitoring_bonds {
let bond_tx: Transaction = deserialize(&hex::decode(&bond.bond_tx_hex)?)?;
if bond_tx.txid().to_string() == txid {
return Ok(bond.clone());
}
}
Err(anyhow!("Bond not found in monitoring bonds"))
}
pub fn publish_bond_tx_hex(&self, bond: &str) -> Result<()> {
warn!("publish_bond_tx_hex(): publishing cheating bond tx!");
let blockchain = &*self.backend;
let tx: Transaction = deserialize(&hex::decode(bond)?)?;
blockchain.broadcast(&tx)?;
Ok(())
}
} }
impl fmt::Debug for CoordinatorWallet<Tree> { impl fmt::Debug for CoordinatorWallet<Tree> {