Split services

This commit is contained in:
koalasat
2024-12-19 12:08:20 +01:00
parent 76e0c64ab5
commit 84d06d08e8
8 changed files with 366 additions and 318 deletions

View File

@ -1,314 +0,0 @@
# app.py
import os
import sys
import asyncio
import hashlib
import uuid
import sqlite3
import requests
import time
import json
from datetime import datetime
from nostr_sdk import Keys, Client, EventBuilder, NostrSigner, Kind, Tag, NostrSdkError
log_file_path = '/app/log/app.log'
db_file_name = '/app/data/nostr_sync.db'
order_expiration = 1 * 60 * 60
def print_log(*args, **kwargs):
message = ' '.join(map(str, args))
with open(log_file_path, 'a') as log_file:
log_file.write(f"{datetime.now()}: {message}\n")
def prepare_db():
conn = sqlite3.connect(db_file_name)
cursor = conn.cursor()
create_table_query = '''
CREATE TABLE IF NOT EXISTS orders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
identifier TEXT NOT NULL,
first_seen INTEGER NOT NULL,
iteration INTEGER NOT NULL,
origin TEXT NOT NULL
);
'''
create_index_query = '''
CREATE INDEX IF NOT EXISTS idx_identifier_origin ON orders (iteration, origin, identifier);
'''
cursor.execute(create_table_query)
cursor.execute(create_table_query)
conn.commit()
conn.close()
def insert_order(cursor, identifier, first_seen, iteration, origin):
insert_query = '''
INSERT INTO orders (identifier, first_seen, iteration, origin)
VALUES (?, ?, ?, ?);
'''
cursor.execute(insert_query, (identifier, first_seen, iteration, origin))
def max_iteration(conn, origin):
cursor = conn.cursor()
cursor.execute("SELECT COALESCE(MAX(iteration), 0) FROM orders WHERE origin = ?;", (origin,))
return cursor.fetchone()[0]
def exists_iteration(conn, identifier, origin):
cursor = conn.cursor()
expiration = time.time() - order_expiration
cursor.execute("""
SELECT EXISTS(
SELECT 1
FROM orders
WHERE identifier = ?
AND origin = ?
AND first_seen >= ?
);
""", (identifier, origin, expiration))
return cursor.fetchone()[0] == 1
def delete_records_by_iteration(cursor, iteration, origin):
cursor.execute("DELETE FROM orders WHERE iteration <= ? AND origin = ?;", (iteration, origin,))
def update_iteration(cursor, identifier, origin, iteration):
cursor.execute("UPDATE orders SET iteration = ? WHERE identifier = ? AND origin = ?;", (iteration, identifier, origin))
def get_all_orders_by_iteration(conn, iteration, origin):
cursor = conn.cursor()
query = '''
SELECT * FROM orders WHERE iteration = ? AND origin = ?;
'''
cursor.execute(query, (iteration, origin,))
return cursor.fetchall()
def fetch_peach_orders():
base_url = "https://api.peachbitcoin.com/v1/offer/search"
page = 0
size = 500
orders = []
headers = {
'Content-Type': 'application/json'
}
while True:
url = f"{base_url}?page={page}&size={size}"
print_log(f"Fetching {url}")
response = requests.post(url, headers=headers)
if response.status_code != 200:
print_log(f"Error fetching data: {response.status_code}")
break
data = response.json()
if data.get("offers") is None:
print_log("Failed to fetch offers.")
break
orders.extend(data.get("offers", []))
# If the number of offers returned is less than the limit, we are done
if len(data.get("offers", [])) < size:
break
# Increment the offset for the next request
page += 1
print_log(f"Found {len(orders)} Peach orders")
return orders
def parse_peach_to_nostr(order, keys, status):
identifier = order.get('id')
timestamp_in_2_hours = time.time() + order_expiration
events = []
tags = [
Tag.parse(["name", order.get("user").get("id")]),
Tag.parse(["k", "sell" if order.get("type") == "ask" else "buy"]),
Tag.parse(["s", status]),
Tag.parse(["source", f""]), # TODO
Tag.parse(["rating", json.dumps({
"total_reviews": order.get("user").get("ratingCount"),
"total_rating": order.get("user").get("rating"),
})]),
Tag.parse(["network", "mainnet"]),
Tag.parse(["layer", "onchain"]),
Tag.parse([
"expiration",
str(int(timestamp_in_2_hours))
]),
Tag.parse(["y", "peach"]),
Tag.parse(["z", "order"])
]
amount = order.get("amount")
if isinstance(amount, list):
Tag.parse(["amt", str(amount[0]), str(amount[1])])
else:
Tag.parse(["amt", str(amount)])
for currency, methods in order.get("meansOfPayment", {}).items():
tags.append(Tag.parse(["d", identifier + currency]))
tags.append(Tag.parse(["pm"] + methods))
tags.append(Tag.parse(["f", currency]))
event = EventBuilder(
Kind(38383),
"",
tags,
).to_event(keys)
events.append(event)
return events
def fetch_hodlhodl_orders():
base_url = "https://hodlhodl.com/api/v1/offers"
offset = 0
limit = 100
orders = []
headers = {
'Content-Type': 'application/json'
}
while True:
url = f"{base_url}?pagination[limit]={limit}&pagination[offset]={offset}"
print_log(f"Fetching {url}")
response = requests.get(url, headers=headers)
if response.status_code != 200:
print_log(f"Error fetching data: {response.status_code}")
break
data = response.json()
if data.get("status") != "success":
print_log("Failed to fetch offers.")
break
orders.extend(data.get("offers", []))
# If the number of offers returned is less than the limit, we are done
if len(data.get("offers", [])) < limit:
break
# Increment the offset for the next request
offset += limit
print_log(f"Found {len(orders)} HodlHodl orders")
return orders
def parse_hodlhodl_to_nostr(order, keys, status):
identifier = str(order.get('id'))
timestamp_in_2_hours = time.time() + order_expiration
payment_method_names = []
for instruction in order.get("payment_method_instructions", []):
payment_method_names.append(instruction.get("payment_method_name"))
tags = [
Tag.parse(["d", identifier]),
Tag.parse(["name", order.get("trader").get("login")]),
Tag.parse(["k", order.get("side")]),
Tag.parse(["f", order.get("currency_code")]),
Tag.parse(["s", status]),
Tag.parse(["amt", str(order.get('max_amount_sats'))]),
Tag.parse(["fa", str(order.get('min_amount')), str(order.get('max_amount'))]),
Tag.parse(["pm"] + payment_method_names),
Tag.parse(["source", f"https://hodlhodl.com/offers/{identifier}"]),
Tag.parse(["rating", json.dumps({
"total_reviews": order.get("trader").get("trades_count"),
"total_rating": order.get("trader").get("rating"),
})]),
Tag.parse(["network", "mainnet"]),
Tag.parse(["layer", "onchain"]),
Tag.parse([
"expiration",
str(int(timestamp_in_2_hours))
]),
Tag.parse(["y", "hodlhodl"]),
Tag.parse(["z", "order"])
]
event = EventBuilder(
Kind(38383),
"",
tags,
).to_event(keys)
return [event]
async def publish_to_nostr(orders, origin, parser, nsec):
conn = sqlite3.connect(db_file_name)
last_iteration = max_iteration(conn, origin)
print_log(f"Iteration {origin}: {last_iteration + 1}")
# Initialize with coordinator Keys
keys = Keys.parse(nsec)
signer = NostrSigner.keys(keys)
client = Client(signer)
# Add relays and connect
await client.add_relay("ws://localhost")
await client.connect()
print_log(f"Iteration {origin}: Checking Orders")
cursor = conn.cursor()
for order in orders:
identifier = str(order.get('id'))
if exists_iteration(conn, identifier, origin):
# keep alive existing orders
print_log(f"Iteration {origin}: {last_iteration + 1} - Order stay alive: {identifier}")
update_iteration(cursor, identifier, origin, last_iteration + 1)
else:
# Publish new orders
try:
events = parser(order, keys, "pending")
for event in events:
print_log(f"Iteration {origin}: {last_iteration + 1} - Nostr event sent: {event.as_json()}")
try:
await client.send_event(event)
except NostrSdkError as e:
print_log(f"Iteration {origin}: {last_iteration + 1} - Event already published")
if (len(events) > 0):
insert_order(cursor, identifier, str(events[0].created_at().as_secs()), last_iteration + 1, origin)
except Exception as e:
print_log(f"Iteration {origin}: {last_iteration + 1} - Error parsing {e} : {order}")
conn.commit()
print_log(f"Iteration {origin}: Cleaning Orders")
delete_records_by_iteration(conn, last_iteration, origin)
conn.commit()
conn.close()
async def main():
print_log(f"START LOOP")
prepare_db()
while True:
print_log(f"HODL HODL")
hodlhodl_orders = fetch_hodlhodl_orders()
await publish_to_nostr(hodlhodl_orders, 'hodlhodl', parse_hodlhodl_to_nostr, os.environ.get('HODLHODL_NOSTR_NSEC'))
print_log(f"PEACH")
peach_orders = fetch_peach_orders()
await publish_to_nostr(peach_orders, 'peach', parse_peach_to_nostr, os.environ.get('PEACH_NOSTR_NSEC'))
print_log(f"DONE GOING TO SLEEP")
await asyncio.sleep(300) # Wait for 5 minutes
if __name__ == "__main__":
print_log(f"START")
asyncio.run(main())

58
app/db.py Normal file
View File

@ -0,0 +1,58 @@
# db.py
import sqlite3
import time
db_file_name = '/app/data/nostr_sync.db'
order_expiration = 1 * 60 * 60
def prepare_db():
conn = sqlite3.connect(db_file_name)
cursor = conn.cursor()
create_table_query = '''
CREATE TABLE IF NOT EXISTS orders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
identifier TEXT NOT NULL,
first_seen INTEGER NOT NULL,
iteration INTEGER NOT NULL,
origin TEXT NOT NULL
);
'''
cursor.execute(create_table_query)
cursor.execute(create_table_query)
conn.commit()
conn.close()
def insert_order(cursor, identifier, first_seen, iteration, origin):
insert_query = '''
INSERT INTO orders (identifier, first_seen, iteration, origin)
VALUES (?, ?, ?, ?);
'''
cursor.execute(insert_query, (identifier, first_seen, iteration, origin))
def max_iteration(conn, origin):
cursor = conn.cursor()
cursor.execute("SELECT COALESCE(MAX(iteration), 0) FROM orders WHERE origin = ?;", (origin,))
return cursor.fetchone()[0]
def exists_iteration(conn, identifier, origin):
cursor = conn.cursor()
expiration = time.time() - order_expiration
cursor.execute("""
SELECT EXISTS(
SELECT 1
FROM orders
WHERE identifier = ?
AND origin = ?
AND first_seen >= ?
);
""", (identifier, origin, expiration))
return cursor.fetchone()[0] == 1
def delete_records_by_iteration(cursor, iteration, origin):
cursor.execute("DELETE FROM orders WHERE iteration <= ? AND origin = ?;", (iteration, origin,))
def update_iteration(cursor, identifier, origin, iteration):
cursor.execute("UPDATE orders SET iteration = ? WHERE identifier = ? AND origin = ?;", (iteration, identifier, origin))

View File

@ -1,3 +1,3 @@
#!/bin/sh
python app.py & tail -f /app/log/app.log
python "$SCRIPT.py" & tail -f /app/log/app.log

105
app/hodlhodl.py Normal file
View File

@ -0,0 +1,105 @@
# hodlhodl.py
import os
import asyncio
import requests
import time
import json
from db import prepare_db, db_file_name, order_expiration
from logs import print_log
from nostr import publish_to_nostr
from nostr_sdk import Keys, EventBuilder, Kind, Tag
def fetch_hodlhodl_orders():
base_url = "https://hodlhodl.com/api/v1/offers"
offset = 0
limit = 100
orders = []
headers = {
'Content-Type': 'application/json'
}
while True:
url = f"{base_url}?pagination[limit]={limit}&pagination[offset]={offset}"
print_log(f"Fetching {url}")
response = requests.get(url, headers=headers)
if response.status_code != 200:
print_log(f"Error fetching data: {response.status_code}")
break
data = response.json()
if data.get("status") != "success":
print_log("Failed to fetch offers.")
break
orders.extend(data.get("offers", []))
# If the number of offers returned is less than the limit, we are done
if len(data.get("offers", [])) < limit:
break
# Increment the offset for the next request
offset += limit
print_log(f"Found {len(orders)} HodlHodl orders")
return orders
def parse_hodlhodl_to_nostr(order, keys, status):
identifier = str(order.get('id'))
timestamp_in_2_hours = time.time() + order_expiration
payment_method_names = []
for instruction in order.get("payment_method_instructions", []):
payment_method_names.append(instruction.get("payment_method_name"))
tags = [
Tag.parse(["d", identifier]),
Tag.parse(["name", order.get("trader").get("login")]),
Tag.parse(["k", order.get("side")]),
Tag.parse(["f", order.get("currency_code")]),
Tag.parse(["s", status]),
Tag.parse(["amt", str(order.get('max_amount_sats'))]),
Tag.parse(["fa", str(order.get('min_amount')), str(order.get('max_amount'))]),
Tag.parse(["pm"] + payment_method_names),
Tag.parse(["source", f"https://hodlhodl.com/offers/{identifier}"]),
Tag.parse(["rating", json.dumps({
"total_reviews": order.get("trader").get("trades_count"),
"total_rating": order.get("trader").get("rating"),
})]),
Tag.parse(["network", "mainnet"]),
Tag.parse(["layer", "onchain"]),
Tag.parse([
"expiration",
str(int(timestamp_in_2_hours))
]),
Tag.parse(["y", "hodlhodl"]),
Tag.parse(["z", "order"])
]
event = EventBuilder(
Kind(38383),
"",
tags,
).to_event(keys)
return [event]
async def main():
print_log(f"START LOOP")
prepare_db()
while True:
print_log(f"HODL HODL")
hodlhodl_orders = fetch_hodlhodl_orders()
await publish_to_nostr(hodlhodl_orders, 'hodlhodl', parse_hodlhodl_to_nostr, os.environ.get('HODLHODL_NOSTR_NSEC'))
print_log(f"DONE GOING TO SLEEP")
await asyncio.sleep(300) # Wait for 5 minutes
if __name__ == "__main__":
print_log(f"START")
asyncio.run(main())

11
app/logs.py Normal file
View File

@ -0,0 +1,11 @@
# logs.py
import sys
from datetime import datetime
log_file_path = '/app/log/app.log'
def print_log(*args, **kwargs):
message = ' '.join(map(str, args))
with open(log_file_path, 'a') as log_file:
log_file.write(f"{datetime.now()}: {message}\n")

56
app/nostr.py Normal file
View File

@ -0,0 +1,56 @@
# nostr.py
import hashlib
import sqlite3
from logs import print_log
from nostr_sdk import Keys, Client, NostrSigner, NostrSdkError
from db import insert_order, max_iteration, exists_iteration, delete_records_by_iteration, update_iteration, db_file_name
async def publish_to_nostr(orders, origin, parser, nsec):
conn = sqlite3.connect(db_file_name)
last_iteration = max_iteration(conn, origin)
print_log(f"Iteration {origin}: {last_iteration + 1}")
# Initialize with coordinator Keys
keys = Keys.parse(nsec)
signer = NostrSigner.keys(keys)
client = Client(signer)
# Add relays and connect
await client.add_relay("ws://localhost")
await client.connect()
print_log(f"Iteration {origin}: Checking Orders")
cursor = conn.cursor()
for order in orders:
identifier = str(order.get('id'))
if exists_iteration(conn, identifier, origin):
# keep alive existing orders
print_log(f"Iteration {origin}: {last_iteration + 1} - Order stay alive: {identifier}")
update_iteration(cursor, identifier, origin, last_iteration + 1)
else:
# Publish new orders
try:
events = parser(order, keys, "pending")
for event in events:
print_log(f"Iteration {origin}: {last_iteration + 1} - Nostr event sent: {event.as_json()}")
try:
await client.send_event(event)
except NostrSdkError as e:
print_log(f"Iteration {origin}: {last_iteration + 1} - Event already published")
if (len(events) > 0):
insert_order(cursor, identifier, str(events[0].created_at().as_secs()), last_iteration + 1, origin)
except Exception as e:
print_log(f"Iteration {origin}: {last_iteration + 1} - Error parsing {e} : {order}")
conn.commit()
print_log(f"Iteration {origin}: Cleaning Orders")
delete_records_by_iteration(conn, last_iteration, origin)
conn.commit()
conn.close()

124
app/peach.py Normal file
View File

@ -0,0 +1,124 @@
# peach.py
import os
import asyncio
import requests
import time
import json
from db import prepare_db, db_file_name, order_expiration
from logs import print_log
from nostr import publish_to_nostr
from nostr_sdk import Keys, EventBuilder, Kind, Tag
def fetch_peach_orders():
base_url = "https://api.peachbitcoin.com/v1/offer/search"
page = 0
size = 500
orders = []
headers = {
'Content-Type': 'application/json'
}
while True:
url = f"{base_url}?page={page}&size={size}"
print_log(f"Fetching {url}")
response = requests.post(url, headers=headers)
if response.status_code != 200:
print_log(f"Error fetching data: {response.status_code}")
break
data = response.json()
if data.get("offers") is None:
print_log("Failed to fetch offers.")
break
orders.extend(data.get("offers", []))
if len(data.get("offers", [])) < size:
break
page += 1
print_log(f"Found {len(orders)} Peach orders")
return orders
def parse_peach_to_nostr(order, keys, status):
identifier = order.get('id')
timestamp_in_2_hours = int(time.time() + order_expiration)
order_type = "sell" if order.get("type") == "ask" else "buy"
rating_data = {
"total_reviews": order.get("user", {}).get("ratingCount", 0),
"total_rating": order.get("user", {}).get("rating", 0)
}
amount = order.get("amount", 0)
premium = order.get("premium", 0)
prices = order.get("prices", {})
if isinstance(amount, list):
# Range: amt = 0, fa = 0 if we dont have info details
amt = "0"
fa = ["0"]
else:
amt = str(amount)
if prices:
first_currency = next(iter(prices))
fa_value = prices[first_currency]
fa = [str(fa_value)]
else:
fa = ["0"]
source_url = ""
network = "mainnet"
layer = "onchain"
bond = "0"
events = []
means_of_payment = order.get("meansOfPayment", {})
for currency, methods in means_of_payment.items():
tags = [
Tag.parse(["d", str(identifier) + currency]), # d = order_id + currency
Tag.parse(["k", order_type]),
Tag.parse(["s", status]),
Tag.parse(["amt", amt]),
Tag.parse(["fa"] + fa),
Tag.parse(["premium", str(premium)]),
Tag.parse(["rating", json.dumps(rating_data)]),
Tag.parse(["source", source_url]),
Tag.parse(["network", network]),
Tag.parse(["layer", layer]),
Tag.parse(["name", order.get("user", {}).get("id", "")]),
Tag.parse(["bond", bond]),
Tag.parse(["expiration", str(timestamp_in_2_hours)]),
Tag.parse(["y", "peach"]),
Tag.parse(["z", "order"]),
Tag.parse(["f", currency]),
Tag.parse(["pm"] + methods)
]
event = EventBuilder(
Kind(38383),
"",
tags
).to_event(keys)
events.append(event)
return events
async def main():
print_log("START LOOP")
prepare_db()
while True:
print_log("PEACH")
peach_orders = fetch_peach_orders()
await publish_to_nostr(peach_orders, 'peach', parse_peach_to_nostr, os.environ.get('PEACH_NOSTR_NSEC'))
print_log("DONE GOING TO SLEEP")
await asyncio.sleep(300)
if __name__ == "__main__":
print_log("START")
asyncio.run(main())

View File

@ -9,13 +9,21 @@ services:
- ./tor:/var/lib/tor
ports:
- 80:80
scrapper:
scrapper_peach:
build: ./app
restart: unless-stopped
environment:
PEACH_NOSTR_NSEC: ${PEACH_NOSTR_NSEC}
SCRIPT: peach
volumes:
- ./app/data:/app/data:rw
network_mode: service:tor
scrapper_hodlhodl:
build: ./app
container_name: scrapper
restart: unless-stopped
environment:
HODLHODL_NOSTR_NSEC: ${HODLHODL_NOSTR_NSEC}
PEACH_NOSTR_NSEC: ${PEACH_NOSTR_NSEC}
SCRIPT: hodlhodl
volumes:
- ./app/data:/app/data:rw
network_mode: service:tor