diff --git a/README.md b/README.md index c05cafd..4623104 100644 --- a/README.md +++ b/README.md @@ -3,5 +3,5 @@ Scrappers to publish to nostr orders from other platforms ## Setup -1. Set a value for `NOSTR_NSEC` +1. Set a value for `HODLHODL_NOSTR_NSEC` and `PEACH_NOSTR_NSEC` 1. Run `docker-compose up -d` diff --git a/app/Dockerfile b/app/Dockerfile index 495fc0a..a8a5be3 100644 --- a/app/Dockerfile +++ b/app/Dockerfile @@ -6,4 +6,9 @@ COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . -CMD ["python", "app.py"] +RUN chmod +x /app/entrypoint.sh + +RUN mkdir -p /app/log +RUN touch /app/log/app.log && chmod 0644 /app/log/app.log + +ENTRYPOINT ["/app/entrypoint.sh"] diff --git a/app/app.py b/app/app.py index 9262a42..1d8337e 100644 --- a/app/app.py +++ b/app/app.py @@ -1,5 +1,6 @@ # app.py import os +import sys import asyncio import hashlib import uuid @@ -8,10 +9,16 @@ import requests import time import json from datetime import datetime -from nostr_sdk import Keys, Client, EventBuilder, NostrSigner, Kind, Tag +from nostr_sdk import Keys, Client, EventBuilder, NostrSigner, Kind, Tag, NostrSdkError -db_file_name = './data/nostr_sync.db' -order_expiration = 2 * 60 * 60 +log_file_path = '/app/log/app.log' +db_file_name = '/app/data/nostr_sync.db' +order_expiration = 1 * 60 * 60 + +def print_log(*args, **kwargs): + message = ' '.join(map(str, args)) + with open(log_file_path, 'a') as log_file: + log_file.write(f"{datetime.now()}: {message}\n") def prepare_db(): conn = sqlite3.connect(db_file_name) @@ -27,21 +34,19 @@ def prepare_db(): ); ''' create_index_query = ''' - CREATE INDEX IF NOT EXISTS idx_identifier_origin ON orders (identifier, origin); + CREATE INDEX IF NOT EXISTS idx_identifier_origin ON orders (iteration, origin, identifier); ''' cursor.execute(create_table_query) cursor.execute(create_table_query) conn.commit() conn.close() -def insert_order(conn, identifier, first_seen, iteration, origin): - cursor = conn.cursor() +def insert_order(cursor, identifier, first_seen, iteration, origin): insert_query = ''' INSERT INTO orders (identifier, first_seen, iteration, origin) VALUES (?, ?, ?, ?); ''' cursor.execute(insert_query, (identifier, first_seen, iteration, origin)) - conn.commit() def max_iteration(conn, origin): cursor = conn.cursor() @@ -63,43 +68,126 @@ def exists_iteration(conn, identifier, origin): """, (identifier, origin, expiration)) return cursor.fetchone()[0] == 1 -def delete_records_by_iteration(conn, iteration, origin): - cursor = conn.cursor() +def delete_records_by_iteration(cursor, iteration, origin): cursor.execute("DELETE FROM orders WHERE iteration <= ? AND origin = ?;", (iteration, origin,)) - conn.commit() -def update_iteration(conn, identifier, origin, iteration): - cursor = conn.cursor() +def update_iteration(cursor, identifier, origin, iteration): cursor.execute("UPDATE orders SET iteration = ? WHERE identifier = ? AND origin = ?;", (iteration, identifier, origin)) - conn.commit() def get_all_orders_by_iteration(conn, iteration, origin): cursor = conn.cursor() - query = ''' SELECT * FROM orders WHERE iteration = ? AND origin = ?; ''' cursor.execute(query, (iteration, origin,)) return cursor.fetchall() +def fetch_peach_orders(): + base_url = "https://api.peachbitcoin.com/v1/offer/search" + page = 0 + size = 500 + orders = [] + headers = { + 'Content-Type': 'application/json' + } + + while True: + url = f"{base_url}?page={page}&size={size}" + print_log(f"Fetching {url}") + response = requests.post(url, headers=headers) + + if response.status_code != 200: + print_log(f"Error fetching data: {response.status_code}") + break + + data = response.json() + + if data.get("offers") is None: + print_log("Failed to fetch offers.") + break + + orders.extend(data.get("offers", [])) + + # If the number of offers returned is less than the limit, we are done + if len(data.get("offers", [])) < size: + break + + # Increment the offset for the next request + page += 1 + + + print_log(f"Found {len(orders)} Peach orders") + + return orders + +def parse_peach_to_nostr(order, keys, status): + identifier = order.get('id') + + timestamp_in_2_hours = time.time() + order_expiration + + events = [] + + tags = [ + Tag.parse(["name", order.get("user").get("id")]), + Tag.parse(["k", "sell" if order.get("type") == "ask" else "buy"]), + Tag.parse(["s", status]), + Tag.parse(["source", f""]), # TODO + Tag.parse(["rating", json.dumps({ + "total_reviews": order.get("user").get("ratingCount"), + "total_rating": order.get("user").get("rating"), + })]), + Tag.parse(["network", "mainnet"]), + Tag.parse(["layer", "onchain"]), + Tag.parse([ + "expiration", + str(int(timestamp_in_2_hours)) + ]), + Tag.parse(["y", "peach"]), + Tag.parse(["z", "order"]) + ] + + amount = order.get("amount") + if isinstance(amount, list): + Tag.parse(["amt", str(amount[0]), str(amount[1])]) + else: + Tag.parse(["amt", str(amount)]) + + for currency, methods in order.get("meansOfPayment", {}).items(): + tags.append(Tag.parse(["d", identifier + currency])) + tags.append(Tag.parse(["pm"] + methods)) + tags.append(Tag.parse(["f", currency])) + + event = EventBuilder( + Kind(38383), + "", + tags, + ).to_event(keys) + events.append(event) + + return events + def fetch_hodlhodl_orders(): base_url = "https://hodlhodl.com/api/v1/offers" offset = 0 - limit = 50 + limit = 100 orders = [] + headers = { + 'Content-Type': 'application/json' + } while True: url = f"{base_url}?pagination[limit]={limit}&pagination[offset]={offset}" - response = requests.get(url) + print_log(f"Fetching {url}") + response = requests.get(url, headers=headers) if response.status_code != 200: - print(f"Error fetching data: {response.status_code}") + print_log(f"Error fetching data: {response.status_code}") break data = response.json() if data.get("status") != "success": - print("Failed to fetch offers.") + print_log("Failed to fetch offers.") break orders.extend(data.get("offers", [])) @@ -112,12 +200,12 @@ def fetch_hodlhodl_orders(): offset += limit - print(f"Found {len(orders)} HodlHodl orders") + print_log(f"Found {len(orders)} HodlHodl orders") return orders def parse_hodlhodl_to_nostr(order, keys, status): - identifier = order.get('id') + identifier = str(order.get('id')) timestamp_in_2_hours = time.time() + order_expiration @@ -155,17 +243,17 @@ def parse_hodlhodl_to_nostr(order, keys, status): tags, ).to_event(keys) - return event + return [event] -async def publish_to_nostr(orders, origin, parser): +async def publish_to_nostr(orders, origin, parser, nsec): conn = sqlite3.connect(db_file_name) last_iteration = max_iteration(conn, origin) - print(f"Iteration {origin}: {last_iteration + 1}") + print_log(f"Iteration {origin}: {last_iteration + 1}") # Initialize with coordinator Keys - keys = Keys.parse(os.environ.get('NOSTR_NSEC')) + keys = Keys.parse(nsec) signer = NostrSigner.keys(keys) client = Client(signer) @@ -173,38 +261,54 @@ async def publish_to_nostr(orders, origin, parser): await client.add_relay("ws://localhost") await client.connect() - for order in orders: - identifier = order.get('id') + print_log(f"Iteration {origin}: Checking Orders") + cursor = conn.cursor() + for order in orders: + identifier = str(order.get('id')) if exists_iteration(conn, identifier, origin): # keep alive existing orders - update_iteration(conn, identifier, origin, last_iteration + 1) - print(f"Iteration {origin}: {last_iteration + 1} - Order stay alive: {identifier}") + print_log(f"Iteration {origin}: {last_iteration + 1} - Order stay alive: {identifier}") + update_iteration(cursor, identifier, origin, last_iteration + 1) else: # Publish new orders - event = parser(order, keys, "pending") - await client.send_event(event) + try: + events = parser(order, keys, "pending") + for event in events: + print_log(f"Iteration {origin}: {last_iteration + 1} - Nostr event sent: {event.as_json()}") + try: + await client.send_event(event) + except NostrSdkError as e: + print_log(f"Iteration {origin}: {last_iteration + 1} - Event already published") - print(f"Iteration {origin}: {last_iteration + 1} - Nostr event sent: {event.as_json()}") + if (len(events) > 0): + insert_order(cursor, identifier, str(events[0].created_at().as_secs()), last_iteration + 1, origin) + except Exception as e: + print_log(f"Iteration {origin}: {last_iteration + 1} - Error parsing {e} : {order}") - insert_order(conn, identifier, str(event.created_at().as_secs()), last_iteration + 1, origin) - - # Remove expired orders - for orders in get_all_orders_by_iteration(conn, last_iteration, origin): - event = parser(order, keys) - await client.send_event(event, "canceled") - - print(f"Iteration {origin}: {last_iteration + 1} - Order expired: {identifier}") + conn.commit() + print_log(f"Iteration {origin}: Cleaning Orders") delete_records_by_iteration(conn, last_iteration, origin) + + conn.commit() + conn.close() async def main(): - print(f"START") + print_log(f"START LOOP") prepare_db() while True: + print_log(f"HODL HODL") hodlhodl_orders = fetch_hodlhodl_orders() - await publish_to_nostr(hodlhodl_orders, 'hodlhodl', parse_hodlhodl_to_nostr) + await publish_to_nostr(hodlhodl_orders, 'hodlhodl', parse_hodlhodl_to_nostr, os.environ.get('HODLHODL_NOSTR_NSEC')) + + print_log(f"PEACH") + peach_orders = fetch_peach_orders() + await publish_to_nostr(peach_orders, 'peach', parse_peach_to_nostr, os.environ.get('PEACH_NOSTR_NSEC')) + + print_log(f"DONE GOING TO SLEEP") await asyncio.sleep(300) # Wait for 5 minutes if __name__ == "__main__": + print_log(f"START") asyncio.run(main()) diff --git a/app/entrypoint.sh b/app/entrypoint.sh new file mode 100644 index 0000000..de96d65 --- /dev/null +++ b/app/entrypoint.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +python app.py & tail -f /app/log/app.log diff --git a/docker-compose.yml b/docker-compose.yml index ee015ab..ce1f8df 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,7 +14,8 @@ services: container_name: scrapper restart: unless-stopped environment: - NOSTR_NSEC: ${NOSTR_NSEC} + HODLHODL_NOSTR_NSEC: ${HODLHODL_NOSTR_NSEC} + PEACH_NOSTR_NSEC: ${PEACH_NOSTR_NSEC} volumes: - ./app/data:/app/data:rw network_mode: service:tor