diff --git a/taptrade-cli-demo/coordinator/Cargo.lock b/taptrade-cli-demo/coordinator/Cargo.lock index 92d0281..78454db 100644 --- a/taptrade-cli-demo/coordinator/Cargo.lock +++ b/taptrade-cli-demo/coordinator/Cargo.lock @@ -316,6 +316,7 @@ dependencies = [ "axum", "bdk", "dotenv", + "futures-util", "hex", "rand", "reqwest", @@ -614,6 +615,17 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.68", +] + [[package]] name = "futures-sink" version = "0.3.30" @@ -634,6 +646,7 @@ checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ "futures-core", "futures-io", + "futures-macro", "futures-sink", "futures-task", "memchr", diff --git a/taptrade-cli-demo/coordinator/Cargo.toml b/taptrade-cli-demo/coordinator/Cargo.toml index c5ffe8b..5a31ea4 100644 --- a/taptrade-cli-demo/coordinator/Cargo.toml +++ b/taptrade-cli-demo/coordinator/Cargo.toml @@ -8,6 +8,7 @@ anyhow = "1.0.86" axum = { version = "0.7.5", features = ["tokio", "json"] } bdk = { version = "0.29.0", features = ["key-value-db", "bitcoinconsensus", "verify"] } dotenv = "0.15.0" +futures-util = "0.3.30" hex = "0.4.3" rand = "0.8.5" reqwest = { version = "0.12.4", features = ["blocking", "json"] } diff --git a/taptrade-cli-demo/coordinator/src/communication/mod.rs b/taptrade-cli-demo/coordinator/src/communication/mod.rs index 3cddd29..4fc9705 100755 --- a/taptrade-cli-demo/coordinator/src/communication/mod.rs +++ b/taptrade-cli-demo/coordinator/src/communication/mod.rs @@ -56,7 +56,7 @@ async fn submit_maker_bond( }; match wallet - .validate_bond_tx_hex(&payload.signed_bond_hex, bond_requirements) + .validate_bond_tx_hex(&payload.signed_bond_hex, &bond_requirements) .await { Ok(()) => (), diff --git a/taptrade-cli-demo/coordinator/src/coordinator/mod.rs b/taptrade-cli-demo/coordinator/src/coordinator/mod.rs index 848eab9..4d51275 100755 --- a/taptrade-cli-demo/coordinator/src/coordinator/mod.rs +++ b/taptrade-cli-demo/coordinator/src/coordinator/mod.rs @@ -1 +1,3 @@ -pub mod verify_bond; \ No newline at end of file +pub mod monitoring; + +use super::*; diff --git a/taptrade-cli-demo/coordinator/src/coordinator/monitoring.rs b/taptrade-cli-demo/coordinator/src/coordinator/monitoring.rs index 555d5c7..8b399e0 100644 --- a/taptrade-cli-demo/coordinator/src/coordinator/monitoring.rs +++ b/taptrade-cli-demo/coordinator/src/coordinator/monitoring.rs @@ -2,3 +2,40 @@ // continoously verifies the bond inputs (mempool and chain), maybe with some caching in a hashmap to // prevent querying the db all the time. // Also needs to implement punishment logic in case a fraud is detected. +use super::*; + +pub enum Table { + Orderbook, + ActiveTrades, +} + +pub struct MonitoringBond { + pub bond_tx_hex: String, + pub trade_id_hex: String, + pub requirements: BondRequirements, + pub table: Table, +} + +pub async fn monitor_bonds( + coordinator_db: &CoordinatorDB, + coordinator_wallet: &CoordinatorWallet, +) -> Result<()> { + loop { + // fetch all bonds + let bonds = coordinator_db.fetch_all_bonds().await?; + + // verify all bonds and initiate punishment if necessary + for bond in bonds { + if let Err(e) = coordinator_wallet + .validate_bond_tx_hex(&bond.1.bond_tx_hex, &bond.1.requirements) + .await + { + // punish the violator (publish bond, remove offer from db/orderbook) + panic!("Implement bond violation punishment logic: {:?}", e); + } + } + + // sleep for a while + tokio::time::sleep(tokio::time::Duration::from_secs(30)).await; + } +} diff --git a/taptrade-cli-demo/coordinator/src/database/db_tests.rs b/taptrade-cli-demo/coordinator/src/database/db_tests.rs new file mode 100644 index 0000000..ef7ed74 --- /dev/null +++ b/taptrade-cli-demo/coordinator/src/database/db_tests.rs @@ -0,0 +1,432 @@ +use super::*; +#[cfg(test)] +use anyhow::Ok; +async fn create_coordinator() -> Result { + // Set up the in-memory database + env::set_var("DATABASE_PATH", ":memory:"); + + // Initialize the database + let database = CoordinatorDB::init().await?; + Ok(database) +} +#[tokio::test] +async fn test_init() -> Result<()> { + let database = create_coordinator().await?; + // Verify the table creation + let table_exists = + sqlx::query("SELECT name FROM sqlite_master WHERE type='table' AND name='maker_requests'") + .fetch_optional(&*database.db_pool) + .await? + .is_some(); + assert!(table_exists, "The maker_requests table should exist."); + Ok(()) +} + +#[tokio::test] +async fn test_insert_new_maker_request() -> Result<()> { + let database = create_coordinator().await?; + + // Create a sample order request and bond requirement response + let order_request = OrderRequest { + robohash_hex: "a3f1f1f0e2f3f4f5".to_string(), + is_buy_order: true, + amount_satoshi: 1000, + bond_ratio: 50, + offer_duration_ts: 1234567890, + }; + + let bond_requirement_response = BondRequirementResponse { + bond_address: "1BitcoinAddress".to_string(), + locking_amount_sat: 500, + }; + + // Insert the new maker request + database + .insert_new_maker_request(&order_request, &bond_requirement_response) + .await?; + + // Verify the insertion + let row = sqlx::query("SELECT * FROM maker_requests WHERE robohash = ?") + .bind(hex::decode(&order_request.robohash_hex)?) + .fetch_one(&*database.db_pool) + .await?; + + assert!(row.get::("is_buy_order")); + assert_eq!(row.get::("amount_sat"), 1000); + assert_eq!(row.get::("bond_ratio"), 50); + assert_eq!(row.get::("offer_duration_ts"), 1234567890); + assert_eq!(row.get::("bond_address"), "1BitcoinAddress"); + assert_eq!(row.get::("bond_amount_sat"), 500); + + Ok(()) +} + +#[tokio::test] +async fn test_fetch_bond_requirements() -> Result<()> { + let database = create_coordinator().await?; + + // Create a sample order request and insert it into the database + let robohash_hex = "a3f1f1f0e2f3f4f5"; + let order_request = ( + hex::decode(robohash_hex).unwrap(), + true, // is_buy_order + 1000, // amount_satoshi + 50, // bond_ratio + 1234567890, // offer_duration_ts + "1BitcoinAddress".to_string(), // bond_address + 500, // bond_amount_sat + ); + + sqlx::query( + "INSERT INTO maker_requests (robohash, is_buy_order, amount_sat, bond_ratio, offer_duration_ts, bond_address, bond_amount_sat) + VALUES (?, ?, ?, ?, ?, ?, ?)", + ) + .bind(order_request.0.clone()) + .bind(order_request.1) + .bind(order_request.2) + .bind(order_request.3) + .bind(order_request.4) + .bind(order_request.5.clone()) + .bind(order_request.6) + .execute(&*database.db_pool) + .await?; + + // Fetch and delete the order request + let fetched_offer = database + .fetch_bond_requirements(&robohash_hex.to_string()) + .await?; + + // Verify the result + let expected = BondRequirements { + bond_address: "1BitcoinAddress".to_string(), + locking_amount_sat: 500_u64, + min_input_sum_sat: 1000_u64, + }; + assert_eq!(fetched_offer, expected); + + Ok(()) +} + +#[tokio::test] +async fn test_fetch_and_delete_offer_from_bond_table() -> Result<()> { + // Set up the in-memory database + let database = create_coordinator().await?; + + // Create a sample order request and insert it into the database + let robohash_hex = "a3f1f1f0e2f3f4f5"; + let order_request = ( + hex::decode(robohash_hex).unwrap(), + true, // is_buy_order + 1000, // amount_satoshi + 50, // bond_ratio + 1234567890, // offer_duration_ts + "1BitcoinAddress".to_string(), // bond_address + 500, // bond_amount_sat + ); + + sqlx::query( + "INSERT INTO maker_requests (robohash, is_buy_order, amount_sat, bond_ratio, offer_duration_ts, bond_address, bond_amount_sat) + VALUES (?, ?, ?, ?, ?, ?, ?)", + ) + .bind(order_request.0.clone()) + .bind(order_request.1) + .bind(order_request.2) + .bind(order_request.3) + .bind(order_request.4) + .bind(order_request.5.clone()) + .bind(order_request.6) + .execute(&*database.db_pool) + .await?; + + // Fetch and delete the order request + let fetched_offer = database + .fetch_and_delete_offer_from_bond_table(robohash_hex) + .await?; + + // Verify the fetched offer + let expected_offer = AwaitingBondOffer { + robohash_hex: robohash_hex.to_string(), + is_buy_order: order_request.1, + amount_satoshi: order_request.2 as u64, + bond_ratio: order_request.3, + offer_duration_ts: order_request.4 as u64, + bond_address: order_request.5, + bond_amount_sat: order_request.6 as u64, + }; + assert_eq!(fetched_offer, expected_offer); + + // Verify the record is deleted + let result = sqlx::query("SELECT * FROM maker_requests WHERE robohash = ?") + .bind(hex::decode(robohash_hex)?) + .fetch_optional(&*database.db_pool) + .await?; + assert!(result.is_none()); + + Ok(()) +} + +#[tokio::test] +async fn test_move_offer_to_active() -> Result<()> { + // Create a temporary SQLite database + let database = create_coordinator().await?; + + // Insert a test entry into maker_requests + let robohash_hex = "a3f1f1f0e2f3f4f5"; + let order_request = ( + hex::decode(robohash_hex).unwrap(), + true, // is_buy_order + 1000, // amount_satoshi + 50, // bond_ratio + 1234567890, // offer_duration_ts + "1BitcoinAddress".to_string(), // bond_address + 500, // bond_amount_sat + ); + + sqlx::query( + "INSERT INTO maker_requests (robohash, is_buy_order, amount_sat, bond_ratio, offer_duration_ts, bond_address, bond_amount_sat) + VALUES (?, ?, ?, ?, ?, ?, ?)", + ) + .bind(order_request.0.clone()) + .bind(order_request.1) + .bind(order_request.2) + .bind(order_request.3) + .bind(order_request.4) + .bind(order_request.5.clone()) + .bind(order_request.6) + .execute(&*database.db_pool) + .await?; + + // Create a sample BondSubmissionRequest + let bond_submission_request = BondSubmissionRequest { + robohash_hex: robohash_hex.to_string(), + signed_bond_hex: "signedBondHex".to_string(), + payout_address: "1PayoutAddress".to_string(), + musig_pub_nonce_hex: "musigPubNonceHex".to_string(), + musig_pubkey_hex: "musigPubkeyHex".to_string(), + }; + + // Call the move_offer_to_active function + let offer_id = "sample_offer_id".to_string(); + let taker_bond_address = "1TakerBondAddress".to_string(); + let result = database + .move_offer_to_active(&bond_submission_request, &offer_id, taker_bond_address) + .await?; + + // Verify the result + assert_eq!(result, 1234567890); // Verify that the offer_duration_ts is correct + + // Verify that the entry was moved to active_maker_offers + let active_offer = sqlx::query_as::<_, (String, Vec, bool, i64, i64, i64, String, i64, String, String, String, String)> ( + "SELECT offer_id, robohash, is_buy_order, amount_sat, bond_ratio, offer_duration_ts, bond_address, bond_amount_sat, bond_tx_hex, payout_address, musig_pub_nonce_hex, musig_pubkey_hex + FROM active_maker_offers WHERE offer_id = ?", + ) + .bind(offer_id) + .fetch_one(&*database.db_pool) + .await?; + + assert_eq!(active_offer.0, "sample_offer_id".to_string()); + assert_eq!(hex::encode(active_offer.1), robohash_hex); + assert!(active_offer.2); + assert_eq!(active_offer.3, 1000); + assert_eq!(active_offer.4, 50); + assert_eq!(active_offer.5, 1234567890); + assert_eq!(active_offer.6, "1BitcoinAddress".to_string()); + assert_eq!(active_offer.7, 500); + assert_eq!(active_offer.8, "signedBondHex".to_string()); + assert_eq!(active_offer.9, "1PayoutAddress".to_string()); + assert_eq!(active_offer.10, "musigPubNonceHex".to_string()); + assert_eq!(active_offer.11, "musigPubkeyHex".to_string()); + + Ok(()) +} + +#[tokio::test] +async fn test_fetch_suitable_offers() -> Result<()> { + let database = create_coordinator().await?; + // Insert test entries into active_maker_offers + let offers = vec![ + ( + "offer_id_1", + true, // is_buy_order + 15000, // amount_sat + 100, // bond_ratio + 1234567890, // offer_duration_ts + "1BondAddress".to_string(), // bond_address + 50, // bond_amount_sat + "signedBondHex".to_string(), + "1PayoutAddress".to_string(), + "musigPubNonceHex".to_string(), + "musigPubkeyHex".to_string(), + "1TakerBondAddress".to_string(), + ), + ( + "offer_id_2", + true, // is_buy_order + 1500, // amount_sat + 200, // bond_ratio + 1234567891, // offer_duration_ts + "2BondAddress".to_string(), // bond_address + 100, // bond_amount_sat + "signedBondHex2".to_string(), + "2PayoutAddress".to_string(), + "musigPubNonceHex2".to_string(), + "musigPubkeyHex2".to_string(), + "2TakerBondAddress".to_string(), + ), + ]; + + for offer in offers { + sqlx::query( + "INSERT INTO active_maker_offers (offer_id, robohash, is_buy_order, amount_sat, bond_ratio, offer_duration_ts, bond_address, bond_amount_sat, bond_tx_hex, payout_address, musig_pub_nonce_hex, musig_pubkey_hex, taker_bond_address) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + ) + .bind(offer.0) + .bind(hex::decode("a3f1f1f0e2f3f4f5").unwrap()) // Example robohash + .bind(offer.1) + .bind(offer.2) + .bind(offer.3) + .bind(offer.4) + .bind(offer.5.clone()) + .bind(offer.6) + .bind(offer.7.clone()) + .bind(offer.8.clone()) + .bind(offer.9.clone()) + .bind(offer.10.clone()) + .bind(offer.11.clone()) + .execute(&*database.db_pool) + .await?; + } + + // Create a sample OffersRequest + let offers_request = OffersRequest { + buy_offers: true, + amount_min_sat: 1000, + amount_max_sat: 2000, + }; + + // Call the fetch_suitable_offers function + let result = database.fetch_suitable_offers(&offers_request).await?; + + println!("{:?}", result); + // Verify the result + assert!(result.is_some()); + let available_offers = result.unwrap(); + assert_eq!(available_offers.len(), 1); + let offer = &available_offers[0]; + assert_eq!(offer.offer_id_hex, "offer_id_2"); + assert_eq!(offer.amount_sat, 1500); + assert_eq!(offer.required_bond_amount_sat, 100); + assert_eq!(offer.bond_locking_address, "2TakerBondAddress"); + + Ok(()) +} + +#[tokio::test] +async fn test_fetch_taker_bond_requirements() -> Result<()> { + let database = create_coordinator().await?; + + // Insert a test entry into active_maker_offers + let offer_id_hex = "offer_id_1"; + let taker_bond_address = "1TakerBondAddress"; + let bond_amount_sat = 100; + + sqlx::query( + "INSERT INTO active_maker_offers (offer_id, robohash, is_buy_order, amount_sat, bond_ratio, offer_duration_ts, bond_address, bond_amount_sat, bond_tx_hex, payout_address, musig_pub_nonce_hex, musig_pubkey_hex, taker_bond_address) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + ) + .bind(offer_id_hex) + .bind(hex::decode("a3f1f1f0e2f3f4f5").unwrap()) // Example robohash + .bind(true) // is_buy_order + .bind(1500) // amount_sat + .bind(50) // bond_ratio + .bind(1234567890) // offer_duration_ts + .bind("1BondAddress") + .bind(bond_amount_sat) + .bind("signedBondHex") + .bind("1PayoutAddress") + .bind("musigPubNonceHex") + .bind("musigPubkeyHex") + .bind(taker_bond_address) + .execute(&*database.db_pool) + .await?; + + // Call the fetch_taker_bond_requirements function + let result = database + .fetch_taker_bond_requirements(&offer_id_hex.to_string()) + .await?; + + // Verify the result + assert_eq!(result.bond_address, taker_bond_address); + assert_eq!(result.locking_amount_sat, bond_amount_sat as u64); + + Ok(()) +} + +#[tokio::test] +async fn test_fetch_and_delete_offer_from_public_offers_table() -> Result<()> { + let database = create_coordinator().await?; + + // Insert a test entry into active_maker_offers + let offer_id_hex = "offer_id_1"; + let robohash = hex::decode("a3f1f1f0e2f3f4f5").unwrap(); // Example robohash + let is_buy_order = true; + let amount_sat = 1000; + let bond_ratio = 50; + let offer_duration_ts = 1234567890; + let bond_address = "1BondAddress".to_string(); + let bond_amount_sat = 500; + let bond_tx_hex = "signedBondHex".to_string(); + let payout_address = "1PayoutAddress".to_string(); + let musig_pub_nonce_hex = "musigPubNonceHex".to_string(); + let musig_pubkey_hex = "musigPubkeyHex".to_string(); + + sqlx::query( + "INSERT INTO active_maker_offers (offer_id, robohash, is_buy_order, amount_sat, bond_ratio, offer_duration_ts, bond_address, bond_amount_sat, bond_tx_hex, payout_address, musig_pub_nonce_hex, musig_pubkey_hex) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + ) + .bind(offer_id_hex) + .bind(robohash.clone()) + .bind(is_buy_order) + .bind(amount_sat) + .bind(bond_ratio) + .bind(offer_duration_ts) + .bind(bond_address.clone()) + .bind(bond_amount_sat) + .bind(bond_tx_hex.clone()) + .bind(payout_address.clone()) + .bind(musig_pub_nonce_hex.clone()) + .bind(musig_pubkey_hex.clone()) + .execute(&*database.db_pool) + .await?; + + // Call the fetch_and_delete_offer_from_public_offers_table function + let result = database + .fetch_and_delete_offer_from_public_offers_table(offer_id_hex) + .await?; + + // Verify the result + assert_eq!(result.offer_id, offer_id_hex); + assert_eq!(result.robohash_maker, robohash); + assert_eq!(result.is_buy_order, is_buy_order); + assert_eq!(result.amount_sat, amount_sat); + assert_eq!(result.bond_ratio, bond_ratio); + assert_eq!(result.offer_duration_ts, offer_duration_ts); + assert_eq!(result.bond_address_maker, bond_address); + assert_eq!(result.bond_amount_sat, bond_amount_sat); + assert_eq!(result.bond_tx_hex_maker, bond_tx_hex); + assert_eq!(result.payout_address_maker, payout_address); + assert_eq!(result.musig_pub_nonce_hex_maker, musig_pub_nonce_hex); + assert_eq!(result.musig_pubkey_hex_maker, musig_pubkey_hex); + + // Verify the deletion + let remaining_offers = + sqlx::query("SELECT COUNT(*) FROM active_maker_offers WHERE offer_id = ?") + .bind(offer_id_hex) + .fetch_one(&*database.db_pool) + .await?; + + let remaining_offers_count: i64 = remaining_offers.try_get(0)?; + assert_eq!(remaining_offers_count, 0); + + Ok(()) +} diff --git a/taptrade-cli-demo/coordinator/src/database/mod.rs b/taptrade-cli-demo/coordinator/src/database/mod.rs index c88243d..aa47617 100644 --- a/taptrade-cli-demo/coordinator/src/database/mod.rs +++ b/taptrade-cli-demo/coordinator/src/database/mod.rs @@ -1,8 +1,11 @@ +pub mod db_tests; + use anyhow::Context; +use futures_util::{lock, StreamExt}; use super::*; use sqlx::{sqlite::SqlitePoolOptions, Pool, Row, Sqlite}; -use std::env; +use std::{collections::HashMap, env}; #[derive(Clone, Debug)] pub struct CoordinatorDB { @@ -21,6 +24,7 @@ struct AwaitingBondOffer { bond_amount_sat: u64, } +#[derive(PartialEq, Debug)] struct AwaitingTakerOffer { offer_id: String, robohash_maker: Vec, @@ -377,440 +381,78 @@ impl CoordinatorDB { Err(_) => Ok(None), } } -} -#[cfg(test)] -mod tests { - use anyhow::Ok; - - use super::*; - async fn create_coordinator() -> Result { - // Set up the in-memory database - env::set_var("DATABASE_PATH", ":memory:"); - - // Initialize the database - let database = CoordinatorDB::init().await?; - Ok(database) - } - #[tokio::test] - async fn test_init() -> Result<()> { - let database = create_coordinator().await?; - // Verify the table creation - let table_exists = sqlx::query( - "SELECT name FROM sqlite_master WHERE type='table' AND name='maker_requests'", + // returns a hashmap of RoboHash, MonitoringBond for the monitoring loop + // in case this gets a bottleneck (db too large for heap) we can implement in place checking + pub async fn fetch_all_bonds(&self) -> Result, MonitoringBond>> { + let mut bonds = HashMap::new(); + let mut rows_orderbook = sqlx::query( + "SELECT offer_id, robohash, bond_address, bond_amount_sat, amount_sat, bond_tx_hex FROM active_maker_offers", ) - .fetch_optional(&*database.db_pool) - .await? - .is_some(); - assert!(table_exists, "The maker_requests table should exist."); - Ok(()) - } + .fetch(&*self.db_pool); + while let Some(row) = rows_orderbook.next().await { + let row = row?; - #[tokio::test] - async fn test_insert_new_maker_request() -> Result<()> { - let database = create_coordinator().await?; + let robohash: Vec = row.get("robohash"); + let requirements = BondRequirements { + bond_address: row.get("bond_address"), + locking_amount_sat: row.get::("bond_amount_sat") as u64, + min_input_sum_sat: row.get::("amount_sat") as u64, + }; - // Create a sample order request and bond requirement response - let order_request = OrderRequest { - robohash_hex: "a3f1f1f0e2f3f4f5".to_string(), - is_buy_order: true, - amount_satoshi: 1000, - bond_ratio: 50, - offer_duration_ts: 1234567890, - }; - - let bond_requirement_response = BondRequirementResponse { - bond_address: "1BitcoinAddress".to_string(), - locking_amount_sat: 500, - }; - - // Insert the new maker request - database - .insert_new_maker_request(&order_request, &bond_requirement_response) - .await?; - - // Verify the insertion - let row = sqlx::query("SELECT * FROM maker_requests WHERE robohash = ?") - .bind(hex::decode(&order_request.robohash_hex)?) - .fetch_one(&*database.db_pool) - .await?; - - assert!(row.get::("is_buy_order")); - assert_eq!(row.get::("amount_sat"), 1000); - assert_eq!(row.get::("bond_ratio"), 50); - assert_eq!(row.get::("offer_duration_ts"), 1234567890); - assert_eq!(row.get::("bond_address"), "1BitcoinAddress"); - assert_eq!(row.get::("bond_amount_sat"), 500); - - Ok(()) - } - - #[tokio::test] - async fn test_fetch_maker_request() -> Result<()> { - let database = create_coordinator().await?; - - // Create a sample order request and insert it into the database - let robohash_hex = "a3f1f1f0e2f3f4f5"; - let order_request = ( - hex::decode(robohash_hex).unwrap(), - true, // is_buy_order - 1000, // amount_satoshi - 50, // bond_ratio - 1234567890, // offer_duration_ts - "1BitcoinAddress".to_string(), // bond_address - 500, // bond_amount_sat - ); - - sqlx::query( - "INSERT INTO maker_requests (robohash, is_buy_order, amount_sat, bond_ratio, offer_duration_ts, bond_address, bond_amount_sat) - VALUES (?, ?, ?, ?, ?, ?, ?)", - ) - .bind(order_request.0.clone()) - .bind(order_request.1) - .bind(order_request.2) - .bind(order_request.3) - .bind(order_request.4) - .bind(order_request.5.clone()) - .bind(order_request.6) - .execute(&*database.db_pool) - .await?; - - // Fetch and delete the order request - let fetched_offer = database - .fetch_maker_request(&robohash_hex.to_string()) - .await?; - - // Verify the result - let expected = BondRequirementResponse { - bond_address: "1BitcoinAddress".to_string(), - locking_amount_sat: 500_u64, - }; - assert_eq!(fetched_offer, expected); - - Ok(()) - } - - #[tokio::test] - async fn test_fetch_and_delete_offer_from_bond_table() -> Result<()> { - // Set up the in-memory database - let database = create_coordinator().await?; - - // Create a sample order request and insert it into the database - let robohash_hex = "a3f1f1f0e2f3f4f5"; - let order_request = ( - hex::decode(robohash_hex).unwrap(), - true, // is_buy_order - 1000, // amount_satoshi - 50, // bond_ratio - 1234567890, // offer_duration_ts - "1BitcoinAddress".to_string(), // bond_address - 500, // bond_amount_sat - ); - - sqlx::query( - "INSERT INTO maker_requests (robohash, is_buy_order, amount_sat, bond_ratio, offer_duration_ts, bond_address, bond_amount_sat) - VALUES (?, ?, ?, ?, ?, ?, ?)", - ) - .bind(order_request.0.clone()) - .bind(order_request.1) - .bind(order_request.2) - .bind(order_request.3) - .bind(order_request.4) - .bind(order_request.5.clone()) - .bind(order_request.6) - .execute(&*database.db_pool) - .await?; - - // Fetch and delete the order request - let fetched_offer = database - .fetch_and_delete_offer_from_bond_table(robohash_hex) - .await?; - - // Verify the fetched offer - let expected_offer = AwaitingBondOffer { - robohash_hex: robohash_hex.to_string(), - is_buy_order: order_request.1, - amount_satoshi: order_request.2 as u64, - bond_ratio: order_request.3, - offer_duration_ts: order_request.4 as u64, - bond_address: order_request.5, - bond_amount_sat: order_request.6 as u64, - }; - assert_eq!(fetched_offer, expected_offer); - - // Verify the record is deleted - let result = sqlx::query("SELECT * FROM maker_requests WHERE robohash = ?") - .bind(hex::decode(robohash_hex)?) - .fetch_optional(&*database.db_pool) - .await?; - assert!(result.is_none()); - - Ok(()) - } - - #[tokio::test] - async fn test_move_offer_to_active() -> Result<()> { - // Create a temporary SQLite database - let database = create_coordinator().await?; - - // Insert a test entry into maker_requests - let robohash_hex = "a3f1f1f0e2f3f4f5"; - let order_request = ( - hex::decode(robohash_hex).unwrap(), - true, // is_buy_order - 1000, // amount_satoshi - 50, // bond_ratio - 1234567890, // offer_duration_ts - "1BitcoinAddress".to_string(), // bond_address - 500, // bond_amount_sat - ); - - sqlx::query( - "INSERT INTO maker_requests (robohash, is_buy_order, amount_sat, bond_ratio, offer_duration_ts, bond_address, bond_amount_sat) - VALUES (?, ?, ?, ?, ?, ?, ?)", - ) - .bind(order_request.0.clone()) - .bind(order_request.1) - .bind(order_request.2) - .bind(order_request.3) - .bind(order_request.4) - .bind(order_request.5.clone()) - .bind(order_request.6) - .execute(&*database.db_pool) - .await?; - - // Create a sample BondSubmissionRequest - let bond_submission_request = BondSubmissionRequest { - robohash_hex: robohash_hex.to_string(), - signed_bond_hex: "signedBondHex".to_string(), - payout_address: "1PayoutAddress".to_string(), - musig_pub_nonce_hex: "musigPubNonceHex".to_string(), - musig_pubkey_hex: "musigPubkeyHex".to_string(), - }; - - // Call the move_offer_to_active function - let offer_id = "sample_offer_id".to_string(); - let taker_bond_address = "1TakerBondAddress".to_string(); - let result = database - .move_offer_to_active(&bond_submission_request, &offer_id, taker_bond_address) - .await?; - - // Verify the result - assert_eq!(result, 1234567890); // Verify that the offer_duration_ts is correct - - // Verify that the entry was moved to active_maker_offers - let active_offer = sqlx::query_as::<_, (String, Vec, bool, i64, i64, i64, String, i64, String, String, String, String)> ( - "SELECT offer_id, robohash, is_buy_order, amount_sat, bond_ratio, offer_duration_ts, bond_address, bond_amount_sat, bond_tx_hex, payout_address, musig_pub_nonce_hex, musig_pubkey_hex - FROM active_maker_offers WHERE offer_id = ?", - ) - .bind(offer_id) - .fetch_one(&*database.db_pool) - .await?; - - assert_eq!(active_offer.0, "sample_offer_id".to_string()); - assert_eq!(hex::encode(active_offer.1), robohash_hex); - assert!(active_offer.2); - assert_eq!(active_offer.3, 1000); - assert_eq!(active_offer.4, 50); - assert_eq!(active_offer.5, 1234567890); - assert_eq!(active_offer.6, "1BitcoinAddress".to_string()); - assert_eq!(active_offer.7, 500); - assert_eq!(active_offer.8, "signedBondHex".to_string()); - assert_eq!(active_offer.9, "1PayoutAddress".to_string()); - assert_eq!(active_offer.10, "musigPubNonceHex".to_string()); - assert_eq!(active_offer.11, "musigPubkeyHex".to_string()); - - Ok(()) - } - - #[tokio::test] - async fn test_fetch_suitable_offers() -> Result<()> { - let database = create_coordinator().await?; - // Insert test entries into active_maker_offers - let offers = vec![ - ( - "offer_id_1", - true, // is_buy_order - 15000, // amount_sat - 100, // bond_ratio - 1234567890, // offer_duration_ts - "1BondAddress".to_string(), // bond_address - 50, // bond_amount_sat - "signedBondHex".to_string(), - "1PayoutAddress".to_string(), - "musigPubNonceHex".to_string(), - "musigPubkeyHex".to_string(), - "1TakerBondAddress".to_string(), - ), - ( - "offer_id_2", - true, // is_buy_order - 1500, // amount_sat - 200, // bond_ratio - 1234567891, // offer_duration_ts - "2BondAddress".to_string(), // bond_address - 100, // bond_amount_sat - "signedBondHex2".to_string(), - "2PayoutAddress".to_string(), - "musigPubNonceHex2".to_string(), - "musigPubkeyHex2".to_string(), - "2TakerBondAddress".to_string(), - ), - ]; - - for offer in offers { - sqlx::query( - "INSERT INTO active_maker_offers (offer_id, robohash, is_buy_order, amount_sat, bond_ratio, offer_duration_ts, bond_address, bond_amount_sat, bond_tx_hex, payout_address, musig_pub_nonce_hex, musig_pubkey_hex, taker_bond_address) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", - ) - .bind(offer.0) - .bind(hex::decode("a3f1f1f0e2f3f4f5").unwrap()) // Example robohash - .bind(offer.1) - .bind(offer.2) - .bind(offer.3) - .bind(offer.4) - .bind(offer.5.clone()) - .bind(offer.6) - .bind(offer.7.clone()) - .bind(offer.8.clone()) - .bind(offer.9.clone()) - .bind(offer.10.clone()) - .bind(offer.11.clone()) - .execute(&*database.db_pool) - .await?; + let bond = MonitoringBond { + bond_tx_hex: row.get("bond_tx_hex"), + trade_id_hex: row.get("offer_id"), + requirements, + table: Table::Orderbook, + }; + bonds.insert(robohash, bond); } - // Create a sample OffersRequest - let offers_request = OffersRequest { - buy_offers: true, - amount_min_sat: 1000, - amount_max_sat: 2000, - }; + let mut rows_taken = sqlx::query( + "SELECT offer_id, robohash_maker, robohash_taker, + bond_address_maker, bond_address_taker, bond_amount_sat, amount_sat, bond_tx_hex_maker, bond_tx_hex_taker + FROM taken_offers", + ) + .fetch(&*self.db_pool); - // Call the fetch_suitable_offers function - let result = database.fetch_suitable_offers(&offers_request).await?; + while let Some(row) = rows_taken.next().await { + let row = row?; - println!("{:?}", result); - // Verify the result - assert!(result.is_some()); - let available_offers = result.unwrap(); - assert_eq!(available_offers.len(), 1); - let offer = &available_offers[0]; - assert_eq!(offer.offer_id_hex, "offer_id_2"); - assert_eq!(offer.amount_sat, 1500); - assert_eq!(offer.required_bond_amount_sat, 100); - assert_eq!(offer.bond_locking_address, "2TakerBondAddress"); + let robohash_maker: Vec = row.get("robohash_maker"); + let robohash_taker: Vec = row.get("robohash_taker"); + let locking_amount_sat = row.get::("bond_amount_sat") as u64; + let min_input_sum_sat = row.get::("amount_sat") as u64; + let trade_id_hex: String = row.get("offer_id"); - Ok(()) - } + let requirements_maker = BondRequirements { + bond_address: row.get("bond_address_maker"), + locking_amount_sat, + min_input_sum_sat, + }; - #[tokio::test] - async fn test_fetch_taker_bond_requirements() -> Result<()> { - let database = create_coordinator().await?; + let bond_maker = MonitoringBond { + bond_tx_hex: row.get("bond_tx_hex_maker"), + trade_id_hex: trade_id_hex.clone(), + requirements: requirements_maker, + table: Table::ActiveTrades, + }; + bonds.insert(robohash_maker, bond_maker); - // Insert a test entry into active_maker_offers - let offer_id_hex = "offer_id_1"; - let taker_bond_address = "1TakerBondAddress"; - let bond_amount_sat = 100; + let requirements_maker = BondRequirements { + bond_address: row.get("bond_address_taker"), + locking_amount_sat, + min_input_sum_sat, + }; - sqlx::query( - "INSERT INTO active_maker_offers (offer_id, robohash, is_buy_order, amount_sat, bond_ratio, offer_duration_ts, bond_address, bond_amount_sat, bond_tx_hex, payout_address, musig_pub_nonce_hex, musig_pubkey_hex, taker_bond_address) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", - ) - .bind(offer_id_hex) - .bind(hex::decode("a3f1f1f0e2f3f4f5").unwrap()) // Example robohash - .bind(true) // is_buy_order - .bind(1500) // amount_sat - .bind(50) // bond_ratio - .bind(1234567890) // offer_duration_ts - .bind("1BondAddress") - .bind(bond_amount_sat) - .bind("signedBondHex") - .bind("1PayoutAddress") - .bind("musigPubNonceHex") - .bind("musigPubkeyHex") - .bind(taker_bond_address) - .execute(&*database.db_pool) - .await?; - - // Call the fetch_taker_bond_requirements function - let result = database - .fetch_taker_bond_requirements(&offer_id_hex.to_string()) - .await?; - - // Verify the result - assert_eq!(result.bond_address, taker_bond_address); - assert_eq!(result.locking_amount_sat, bond_amount_sat as u64); - - Ok(()) - } - - #[tokio::test] - async fn test_fetch_and_delete_offer_from_public_offers_table() -> Result<()> { - let database = create_coordinator().await?; - - // Insert a test entry into active_maker_offers - let offer_id_hex = "offer_id_1"; - let robohash = hex::decode("a3f1f1f0e2f3f4f5").unwrap(); // Example robohash - let is_buy_order = true; - let amount_sat = 1000; - let bond_ratio = 50; - let offer_duration_ts = 1234567890; - let bond_address = "1BondAddress".to_string(); - let bond_amount_sat = 500; - let bond_tx_hex = "signedBondHex".to_string(); - let payout_address = "1PayoutAddress".to_string(); - let musig_pub_nonce_hex = "musigPubNonceHex".to_string(); - let musig_pubkey_hex = "musigPubkeyHex".to_string(); - - sqlx::query( - "INSERT INTO active_maker_offers (offer_id, robohash, is_buy_order, amount_sat, bond_ratio, offer_duration_ts, bond_address, bond_amount_sat, bond_tx_hex, payout_address, musig_pub_nonce_hex, musig_pubkey_hex) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", - ) - .bind(offer_id_hex) - .bind(robohash.clone()) - .bind(is_buy_order) - .bind(amount_sat) - .bind(bond_ratio) - .bind(offer_duration_ts) - .bind(bond_address.clone()) - .bind(bond_amount_sat) - .bind(bond_tx_hex.clone()) - .bind(payout_address.clone()) - .bind(musig_pub_nonce_hex.clone()) - .bind(musig_pubkey_hex.clone()) - .execute(&*database.db_pool) - .await?; - - // Call the fetch_and_delete_offer_from_public_offers_table function - let result = database - .fetch_and_delete_offer_from_public_offers_table(offer_id_hex) - .await?; - - // Verify the result - assert_eq!(result.offer_id, offer_id_hex); - assert_eq!(result.robohash_maker, robohash); - assert_eq!(result.is_buy_order, is_buy_order); - assert_eq!(result.amount_sat, amount_sat); - assert_eq!(result.bond_ratio, bond_ratio); - assert_eq!(result.offer_duration_ts, offer_duration_ts); - assert_eq!(result.bond_address_maker, bond_address); - assert_eq!(result.bond_amount_sat, bond_amount_sat); - assert_eq!(result.bond_tx_hex_maker, bond_tx_hex); - assert_eq!(result.payout_address_maker, payout_address); - assert_eq!(result.musig_pub_nonce_hex_maker, musig_pub_nonce_hex); - assert_eq!(result.musig_pubkey_hex_maker, musig_pubkey_hex); - - // Verify the deletion - let remaining_offers = - sqlx::query("SELECT COUNT(*) FROM active_maker_offers WHERE offer_id = ?") - .bind(offer_id_hex) - .fetch_one(&*database.db_pool) - .await?; - - let remaining_offers_count: i64 = remaining_offers.try_get(0)?; - assert_eq!(remaining_offers_count, 0); - - Ok(()) + let bond_maker = MonitoringBond { + bond_tx_hex: row.get("bond_tx_hex_taker"), + trade_id_hex, + requirements: requirements_maker, + table: Table::ActiveTrades, + }; + bonds.insert(robohash_taker, bond_maker); + } + Ok(bonds) } } diff --git a/taptrade-cli-demo/coordinator/src/main.rs b/taptrade-cli-demo/coordinator/src/main.rs index afae419..8f6db87 100755 --- a/taptrade-cli-demo/coordinator/src/main.rs +++ b/taptrade-cli-demo/coordinator/src/main.rs @@ -1,10 +1,12 @@ mod communication; -// mod coordinator; +mod coordinator; mod database; mod wallet; use anyhow::{anyhow, Result}; use communication::{api::*, api_server}; +use coordinator::monitoring::monitor_bonds; +use coordinator::monitoring::*; use database::CoordinatorDB; use dotenv::dotenv; use std::time::{SystemTime, UNIX_EPOCH}; @@ -21,6 +23,10 @@ async fn main() -> Result<()> { let coordinator_db = CoordinatorDB::init().await?; let wallet = CoordinatorWallet::init()?; + // begin monitoring bonds + monitor_bonds(&coordinator_db, &wallet).await?; + + // Start the API server api_server(coordinator_db, wallet).await?; Ok(()) } diff --git a/taptrade-cli-demo/coordinator/src/wallet/mod.rs b/taptrade-cli-demo/coordinator/src/wallet/mod.rs index e190963..5c27430 100644 --- a/taptrade-cli-demo/coordinator/src/wallet/mod.rs +++ b/taptrade-cli-demo/coordinator/src/wallet/mod.rs @@ -20,6 +20,7 @@ pub struct CoordinatorWallet { // database: Arc>, } +#[derive(PartialEq, Debug)] pub struct BondRequirements { pub bond_address: String, pub locking_amount_sat: u64, @@ -63,7 +64,7 @@ impl CoordinatorWallet { pub async fn validate_bond_tx_hex( &self, bond: &String, - requirements: BondRequirements, + requirements: &BondRequirements, ) -> Result<()> { let input_sum: u64; let tx: Transaction = deserialize(&hex::decode(bond)?)?;