diff --git a/app/app.py b/app/app.py deleted file mode 100644 index 1d8337e..0000000 --- a/app/app.py +++ /dev/null @@ -1,314 +0,0 @@ -# app.py -import os -import sys -import asyncio -import hashlib -import uuid -import sqlite3 -import requests -import time -import json -from datetime import datetime -from nostr_sdk import Keys, Client, EventBuilder, NostrSigner, Kind, Tag, NostrSdkError - -log_file_path = '/app/log/app.log' -db_file_name = '/app/data/nostr_sync.db' -order_expiration = 1 * 60 * 60 - -def print_log(*args, **kwargs): - message = ' '.join(map(str, args)) - with open(log_file_path, 'a') as log_file: - log_file.write(f"{datetime.now()}: {message}\n") - -def prepare_db(): - conn = sqlite3.connect(db_file_name) - - cursor = conn.cursor() - create_table_query = ''' - CREATE TABLE IF NOT EXISTS orders ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - identifier TEXT NOT NULL, - first_seen INTEGER NOT NULL, - iteration INTEGER NOT NULL, - origin TEXT NOT NULL - ); - ''' - create_index_query = ''' - CREATE INDEX IF NOT EXISTS idx_identifier_origin ON orders (iteration, origin, identifier); - ''' - cursor.execute(create_table_query) - cursor.execute(create_table_query) - conn.commit() - conn.close() - -def insert_order(cursor, identifier, first_seen, iteration, origin): - insert_query = ''' - INSERT INTO orders (identifier, first_seen, iteration, origin) - VALUES (?, ?, ?, ?); - ''' - cursor.execute(insert_query, (identifier, first_seen, iteration, origin)) - -def max_iteration(conn, origin): - cursor = conn.cursor() - cursor.execute("SELECT COALESCE(MAX(iteration), 0) FROM orders WHERE origin = ?;", (origin,)) - return cursor.fetchone()[0] - -def exists_iteration(conn, identifier, origin): - cursor = conn.cursor() - expiration = time.time() - order_expiration - - cursor.execute(""" - SELECT EXISTS( - SELECT 1 - FROM orders - WHERE identifier = ? - AND origin = ? - AND first_seen >= ? - ); - """, (identifier, origin, expiration)) - return cursor.fetchone()[0] == 1 - -def delete_records_by_iteration(cursor, iteration, origin): - cursor.execute("DELETE FROM orders WHERE iteration <= ? AND origin = ?;", (iteration, origin,)) - -def update_iteration(cursor, identifier, origin, iteration): - cursor.execute("UPDATE orders SET iteration = ? WHERE identifier = ? AND origin = ?;", (iteration, identifier, origin)) - -def get_all_orders_by_iteration(conn, iteration, origin): - cursor = conn.cursor() - query = ''' - SELECT * FROM orders WHERE iteration = ? AND origin = ?; - ''' - cursor.execute(query, (iteration, origin,)) - return cursor.fetchall() - -def fetch_peach_orders(): - base_url = "https://api.peachbitcoin.com/v1/offer/search" - page = 0 - size = 500 - orders = [] - headers = { - 'Content-Type': 'application/json' - } - - while True: - url = f"{base_url}?page={page}&size={size}" - print_log(f"Fetching {url}") - response = requests.post(url, headers=headers) - - if response.status_code != 200: - print_log(f"Error fetching data: {response.status_code}") - break - - data = response.json() - - if data.get("offers") is None: - print_log("Failed to fetch offers.") - break - - orders.extend(data.get("offers", [])) - - # If the number of offers returned is less than the limit, we are done - if len(data.get("offers", [])) < size: - break - - # Increment the offset for the next request - page += 1 - - - print_log(f"Found {len(orders)} Peach orders") - - return orders - -def parse_peach_to_nostr(order, keys, status): - identifier = order.get('id') - - timestamp_in_2_hours = time.time() + order_expiration - - events = [] - - tags = [ - Tag.parse(["name", order.get("user").get("id")]), - Tag.parse(["k", "sell" if order.get("type") == "ask" else "buy"]), - Tag.parse(["s", status]), - Tag.parse(["source", f""]), # TODO - Tag.parse(["rating", json.dumps({ - "total_reviews": order.get("user").get("ratingCount"), - "total_rating": order.get("user").get("rating"), - })]), - Tag.parse(["network", "mainnet"]), - Tag.parse(["layer", "onchain"]), - Tag.parse([ - "expiration", - str(int(timestamp_in_2_hours)) - ]), - Tag.parse(["y", "peach"]), - Tag.parse(["z", "order"]) - ] - - amount = order.get("amount") - if isinstance(amount, list): - Tag.parse(["amt", str(amount[0]), str(amount[1])]) - else: - Tag.parse(["amt", str(amount)]) - - for currency, methods in order.get("meansOfPayment", {}).items(): - tags.append(Tag.parse(["d", identifier + currency])) - tags.append(Tag.parse(["pm"] + methods)) - tags.append(Tag.parse(["f", currency])) - - event = EventBuilder( - Kind(38383), - "", - tags, - ).to_event(keys) - events.append(event) - - return events - -def fetch_hodlhodl_orders(): - base_url = "https://hodlhodl.com/api/v1/offers" - offset = 0 - limit = 100 - orders = [] - headers = { - 'Content-Type': 'application/json' - } - - while True: - url = f"{base_url}?pagination[limit]={limit}&pagination[offset]={offset}" - print_log(f"Fetching {url}") - response = requests.get(url, headers=headers) - - if response.status_code != 200: - print_log(f"Error fetching data: {response.status_code}") - break - - data = response.json() - - if data.get("status") != "success": - print_log("Failed to fetch offers.") - break - - orders.extend(data.get("offers", [])) - - # If the number of offers returned is less than the limit, we are done - if len(data.get("offers", [])) < limit: - break - - # Increment the offset for the next request - offset += limit - - - print_log(f"Found {len(orders)} HodlHodl orders") - - return orders - -def parse_hodlhodl_to_nostr(order, keys, status): - identifier = str(order.get('id')) - - timestamp_in_2_hours = time.time() + order_expiration - - payment_method_names = [] - for instruction in order.get("payment_method_instructions", []): - payment_method_names.append(instruction.get("payment_method_name")) - - tags = [ - Tag.parse(["d", identifier]), - Tag.parse(["name", order.get("trader").get("login")]), - Tag.parse(["k", order.get("side")]), - Tag.parse(["f", order.get("currency_code")]), - Tag.parse(["s", status]), - Tag.parse(["amt", str(order.get('max_amount_sats'))]), - Tag.parse(["fa", str(order.get('min_amount')), str(order.get('max_amount'))]), - Tag.parse(["pm"] + payment_method_names), - Tag.parse(["source", f"https://hodlhodl.com/offers/{identifier}"]), - Tag.parse(["rating", json.dumps({ - "total_reviews": order.get("trader").get("trades_count"), - "total_rating": order.get("trader").get("rating"), - })]), - Tag.parse(["network", "mainnet"]), - Tag.parse(["layer", "onchain"]), - Tag.parse([ - "expiration", - str(int(timestamp_in_2_hours)) - ]), - Tag.parse(["y", "hodlhodl"]), - Tag.parse(["z", "order"]) - ] - - event = EventBuilder( - Kind(38383), - "", - tags, - ).to_event(keys) - - return [event] - -async def publish_to_nostr(orders, origin, parser, nsec): - conn = sqlite3.connect(db_file_name) - - last_iteration = max_iteration(conn, origin) - - print_log(f"Iteration {origin}: {last_iteration + 1}") - - # Initialize with coordinator Keys - keys = Keys.parse(nsec) - signer = NostrSigner.keys(keys) - client = Client(signer) - - # Add relays and connect - await client.add_relay("ws://localhost") - await client.connect() - - print_log(f"Iteration {origin}: Checking Orders") - - cursor = conn.cursor() - for order in orders: - identifier = str(order.get('id')) - if exists_iteration(conn, identifier, origin): - # keep alive existing orders - print_log(f"Iteration {origin}: {last_iteration + 1} - Order stay alive: {identifier}") - update_iteration(cursor, identifier, origin, last_iteration + 1) - else: - # Publish new orders - try: - events = parser(order, keys, "pending") - for event in events: - print_log(f"Iteration {origin}: {last_iteration + 1} - Nostr event sent: {event.as_json()}") - try: - await client.send_event(event) - except NostrSdkError as e: - print_log(f"Iteration {origin}: {last_iteration + 1} - Event already published") - - if (len(events) > 0): - insert_order(cursor, identifier, str(events[0].created_at().as_secs()), last_iteration + 1, origin) - except Exception as e: - print_log(f"Iteration {origin}: {last_iteration + 1} - Error parsing {e} : {order}") - - conn.commit() - - print_log(f"Iteration {origin}: Cleaning Orders") - delete_records_by_iteration(conn, last_iteration, origin) - - conn.commit() - conn.close() - -async def main(): - print_log(f"START LOOP") - prepare_db() - while True: - print_log(f"HODL HODL") - hodlhodl_orders = fetch_hodlhodl_orders() - await publish_to_nostr(hodlhodl_orders, 'hodlhodl', parse_hodlhodl_to_nostr, os.environ.get('HODLHODL_NOSTR_NSEC')) - - print_log(f"PEACH") - peach_orders = fetch_peach_orders() - await publish_to_nostr(peach_orders, 'peach', parse_peach_to_nostr, os.environ.get('PEACH_NOSTR_NSEC')) - - print_log(f"DONE GOING TO SLEEP") - await asyncio.sleep(300) # Wait for 5 minutes - -if __name__ == "__main__": - print_log(f"START") - asyncio.run(main()) diff --git a/app/db.py b/app/db.py new file mode 100644 index 0000000..31fd2ab --- /dev/null +++ b/app/db.py @@ -0,0 +1,58 @@ +# db.py + +import sqlite3 +import time + +db_file_name = '/app/data/nostr_sync.db' + +order_expiration = 1 * 60 * 60 + +def prepare_db(): + conn = sqlite3.connect(db_file_name) + cursor = conn.cursor() + create_table_query = ''' + CREATE TABLE IF NOT EXISTS orders ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + identifier TEXT NOT NULL, + first_seen INTEGER NOT NULL, + iteration INTEGER NOT NULL, + origin TEXT NOT NULL + ); + ''' + cursor.execute(create_table_query) + cursor.execute(create_table_query) + conn.commit() + conn.close() + +def insert_order(cursor, identifier, first_seen, iteration, origin): + insert_query = ''' + INSERT INTO orders (identifier, first_seen, iteration, origin) + VALUES (?, ?, ?, ?); + ''' + cursor.execute(insert_query, (identifier, first_seen, iteration, origin)) + +def max_iteration(conn, origin): + cursor = conn.cursor() + cursor.execute("SELECT COALESCE(MAX(iteration), 0) FROM orders WHERE origin = ?;", (origin,)) + return cursor.fetchone()[0] + +def exists_iteration(conn, identifier, origin): + cursor = conn.cursor() + expiration = time.time() - order_expiration + + cursor.execute(""" + SELECT EXISTS( + SELECT 1 + FROM orders + WHERE identifier = ? + AND origin = ? + AND first_seen >= ? + ); + """, (identifier, origin, expiration)) + return cursor.fetchone()[0] == 1 + +def delete_records_by_iteration(cursor, iteration, origin): + cursor.execute("DELETE FROM orders WHERE iteration <= ? AND origin = ?;", (iteration, origin,)) + +def update_iteration(cursor, identifier, origin, iteration): + cursor.execute("UPDATE orders SET iteration = ? WHERE identifier = ? AND origin = ?;", (iteration, identifier, origin)) diff --git a/app/entrypoint.sh b/app/entrypoint.sh index de96d65..c682db0 100644 --- a/app/entrypoint.sh +++ b/app/entrypoint.sh @@ -1,3 +1,3 @@ #!/bin/sh -python app.py & tail -f /app/log/app.log +python "$SCRIPT.py" & tail -f /app/log/app.log diff --git a/app/hodlhodl.py b/app/hodlhodl.py new file mode 100644 index 0000000..cd05bf9 --- /dev/null +++ b/app/hodlhodl.py @@ -0,0 +1,105 @@ +# hodlhodl.py + +import os +import asyncio +import requests +import time +import json +from db import prepare_db, db_file_name, order_expiration +from logs import print_log +from nostr import publish_to_nostr +from nostr_sdk import Keys, EventBuilder, Kind, Tag + +def fetch_hodlhodl_orders(): + base_url = "https://hodlhodl.com/api/v1/offers" + offset = 0 + limit = 100 + orders = [] + headers = { + 'Content-Type': 'application/json' + } + + while True: + url = f"{base_url}?pagination[limit]={limit}&pagination[offset]={offset}" + print_log(f"Fetching {url}") + response = requests.get(url, headers=headers) + + if response.status_code != 200: + print_log(f"Error fetching data: {response.status_code}") + break + + data = response.json() + + if data.get("status") != "success": + print_log("Failed to fetch offers.") + break + + orders.extend(data.get("offers", [])) + + # If the number of offers returned is less than the limit, we are done + if len(data.get("offers", [])) < limit: + break + + # Increment the offset for the next request + offset += limit + + + print_log(f"Found {len(orders)} HodlHodl orders") + + return orders + +def parse_hodlhodl_to_nostr(order, keys, status): + identifier = str(order.get('id')) + + timestamp_in_2_hours = time.time() + order_expiration + + payment_method_names = [] + for instruction in order.get("payment_method_instructions", []): + payment_method_names.append(instruction.get("payment_method_name")) + + tags = [ + Tag.parse(["d", identifier]), + Tag.parse(["name", order.get("trader").get("login")]), + Tag.parse(["k", order.get("side")]), + Tag.parse(["f", order.get("currency_code")]), + Tag.parse(["s", status]), + Tag.parse(["amt", str(order.get('max_amount_sats'))]), + Tag.parse(["fa", str(order.get('min_amount')), str(order.get('max_amount'))]), + Tag.parse(["pm"] + payment_method_names), + Tag.parse(["source", f"https://hodlhodl.com/offers/{identifier}"]), + Tag.parse(["rating", json.dumps({ + "total_reviews": order.get("trader").get("trades_count"), + "total_rating": order.get("trader").get("rating"), + })]), + Tag.parse(["network", "mainnet"]), + Tag.parse(["layer", "onchain"]), + Tag.parse([ + "expiration", + str(int(timestamp_in_2_hours)) + ]), + Tag.parse(["y", "hodlhodl"]), + Tag.parse(["z", "order"]) + ] + + event = EventBuilder( + Kind(38383), + "", + tags, + ).to_event(keys) + + return [event] + +async def main(): + print_log(f"START LOOP") + prepare_db() + while True: + print_log(f"HODL HODL") + hodlhodl_orders = fetch_hodlhodl_orders() + await publish_to_nostr(hodlhodl_orders, 'hodlhodl', parse_hodlhodl_to_nostr, os.environ.get('HODLHODL_NOSTR_NSEC')) + + print_log(f"DONE GOING TO SLEEP") + await asyncio.sleep(300) # Wait for 5 minutes + +if __name__ == "__main__": + print_log(f"START") + asyncio.run(main()) diff --git a/app/logs.py b/app/logs.py new file mode 100644 index 0000000..bb79d89 --- /dev/null +++ b/app/logs.py @@ -0,0 +1,11 @@ +# logs.py + +import sys +from datetime import datetime + +log_file_path = '/app/log/app.log' + +def print_log(*args, **kwargs): + message = ' '.join(map(str, args)) + with open(log_file_path, 'a') as log_file: + log_file.write(f"{datetime.now()}: {message}\n") \ No newline at end of file diff --git a/app/nostr.py b/app/nostr.py new file mode 100644 index 0000000..7ddd56d --- /dev/null +++ b/app/nostr.py @@ -0,0 +1,56 @@ +# nostr.py + +import hashlib +import sqlite3 +from logs import print_log +from nostr_sdk import Keys, Client, NostrSigner, NostrSdkError +from db import insert_order, max_iteration, exists_iteration, delete_records_by_iteration, update_iteration, db_file_name + +async def publish_to_nostr(orders, origin, parser, nsec): + conn = sqlite3.connect(db_file_name) + + last_iteration = max_iteration(conn, origin) + + print_log(f"Iteration {origin}: {last_iteration + 1}") + + # Initialize with coordinator Keys + keys = Keys.parse(nsec) + signer = NostrSigner.keys(keys) + client = Client(signer) + + # Add relays and connect + await client.add_relay("ws://localhost") + await client.connect() + + print_log(f"Iteration {origin}: Checking Orders") + + cursor = conn.cursor() + for order in orders: + identifier = str(order.get('id')) + if exists_iteration(conn, identifier, origin): + # keep alive existing orders + print_log(f"Iteration {origin}: {last_iteration + 1} - Order stay alive: {identifier}") + update_iteration(cursor, identifier, origin, last_iteration + 1) + else: + # Publish new orders + try: + events = parser(order, keys, "pending") + for event in events: + print_log(f"Iteration {origin}: {last_iteration + 1} - Nostr event sent: {event.as_json()}") + try: + await client.send_event(event) + except NostrSdkError as e: + print_log(f"Iteration {origin}: {last_iteration + 1} - Event already published") + + if (len(events) > 0): + insert_order(cursor, identifier, str(events[0].created_at().as_secs()), last_iteration + 1, origin) + except Exception as e: + print_log(f"Iteration {origin}: {last_iteration + 1} - Error parsing {e} : {order}") + + conn.commit() + + print_log(f"Iteration {origin}: Cleaning Orders") + delete_records_by_iteration(conn, last_iteration, origin) + + conn.commit() + conn.close() \ No newline at end of file diff --git a/app/peach.py b/app/peach.py new file mode 100644 index 0000000..5c96f4c --- /dev/null +++ b/app/peach.py @@ -0,0 +1,124 @@ +# peach.py + +import os +import asyncio +import requests +import time +import json +from db import prepare_db, db_file_name, order_expiration +from logs import print_log +from nostr import publish_to_nostr +from nostr_sdk import Keys, EventBuilder, Kind, Tag + +def fetch_peach_orders(): + base_url = "https://api.peachbitcoin.com/v1/offer/search" + page = 0 + size = 500 + orders = [] + headers = { + 'Content-Type': 'application/json' + } + + while True: + url = f"{base_url}?page={page}&size={size}" + print_log(f"Fetching {url}") + response = requests.post(url, headers=headers) + + if response.status_code != 200: + print_log(f"Error fetching data: {response.status_code}") + break + + data = response.json() + + if data.get("offers") is None: + print_log("Failed to fetch offers.") + break + + orders.extend(data.get("offers", [])) + + if len(data.get("offers", [])) < size: + break + + page += 1 + + print_log(f"Found {len(orders)} Peach orders") + return orders + +def parse_peach_to_nostr(order, keys, status): + identifier = order.get('id') + timestamp_in_2_hours = int(time.time() + order_expiration) + + order_type = "sell" if order.get("type") == "ask" else "buy" + + rating_data = { + "total_reviews": order.get("user", {}).get("ratingCount", 0), + "total_rating": order.get("user", {}).get("rating", 0) + } + + amount = order.get("amount", 0) + premium = order.get("premium", 0) + prices = order.get("prices", {}) + + if isinstance(amount, list): + # Range: amt = 0, fa = 0 if we dont have info details + amt = "0" + fa = ["0"] + else: + amt = str(amount) + if prices: + first_currency = next(iter(prices)) + fa_value = prices[first_currency] + fa = [str(fa_value)] + else: + fa = ["0"] + + source_url = "" + network = "mainnet" + layer = "onchain" + bond = "0" + + events = [] + means_of_payment = order.get("meansOfPayment", {}) + for currency, methods in means_of_payment.items(): + tags = [ + Tag.parse(["d", str(identifier) + currency]), # d = order_id + currency + Tag.parse(["k", order_type]), + Tag.parse(["s", status]), + Tag.parse(["amt", amt]), + Tag.parse(["fa"] + fa), + Tag.parse(["premium", str(premium)]), + Tag.parse(["rating", json.dumps(rating_data)]), + Tag.parse(["source", source_url]), + Tag.parse(["network", network]), + Tag.parse(["layer", layer]), + Tag.parse(["name", order.get("user", {}).get("id", "")]), + Tag.parse(["bond", bond]), + Tag.parse(["expiration", str(timestamp_in_2_hours)]), + Tag.parse(["y", "peach"]), + Tag.parse(["z", "order"]), + Tag.parse(["f", currency]), + Tag.parse(["pm"] + methods) + ] + + event = EventBuilder( + Kind(38383), + "", + tags + ).to_event(keys) + events.append(event) + + return events + +async def main(): + print_log("START LOOP") + prepare_db() + while True: + print_log("PEACH") + peach_orders = fetch_peach_orders() + await publish_to_nostr(peach_orders, 'peach', parse_peach_to_nostr, os.environ.get('PEACH_NOSTR_NSEC')) + print_log("DONE GOING TO SLEEP") + await asyncio.sleep(300) + +if __name__ == "__main__": + print_log("START") + asyncio.run(main()) \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index ce1f8df..4223fc5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,13 +9,21 @@ services: - ./tor:/var/lib/tor ports: - 80:80 - scrapper: + scrapper_peach: + build: ./app + restart: unless-stopped + environment: + PEACH_NOSTR_NSEC: ${PEACH_NOSTR_NSEC} + SCRIPT: peach + volumes: + - ./app/data:/app/data:rw + network_mode: service:tor + scrapper_hodlhodl: build: ./app - container_name: scrapper restart: unless-stopped environment: HODLHODL_NOSTR_NSEC: ${HODLHODL_NOSTR_NSEC} - PEACH_NOSTR_NSEC: ${PEACH_NOSTR_NSEC} + SCRIPT: hodlhodl volumes: - ./app/data:/app/data:rw network_mode: service:tor