Merge pull request 'main' (#61) from SovereigntyIsNotFreedom/darknet-lantern:main into main

Reviewed-on: http://git.nowherejezfoltodf4jiyl6r56jnzintap5vyjlia7fkirfsnfizflqd.onion/nihilist/darknet-lantern/pulls/61
This commit is contained in:
nihilist 2025-05-22 18:22:24 +02:00
commit 0adba9be1f
2 changed files with 258 additions and 90 deletions

View file

@ -1,11 +1,15 @@
from dotenv import load_dotenv from dotenv import load_dotenv
import os,re,pwd import os,re,pwd
import csv import csv
import random
import requests import requests
import json import json
import pandas as pd import pandas as pd
import glob import glob
from utils import IsSimpleXServerValid, send_server_checks
@ -24,6 +28,7 @@ tor_port = os.getenv("TOR_PORT")
#apt install python3-pandas python3-requests python3-socks #apt install python3-pandas python3-requests python3-socks
def main(): def main():
print('[+] ONION UPTIME CHECKER') print('[+] ONION UPTIME CHECKER')
# TODO get the instance name and exit if its not there # TODO get the instance name and exit if its not there
@ -81,8 +86,50 @@ def main():
try: try:
index1 = url.find("http://") index1 = url.find("http://")
index2 = url.find("https://") index2 = url.find("https://")
if url.startswith("smp://") or url.startswith("xftp://"):
if IsSimpleXServerValid(url):
if url.startswith("smp"):
resp,resp_type,failed_response = send_server_checks(url)
if resp_type in ["chatError", "contactSubSummary"]:
resp, resp_type,failed_response = send_server_checks(url)
if failed_response is None:
print(url, "✔️")
df.at[i, "Status"]="YES"
if df.at[i, "Score"] < 100:
df.at[i,"Score"] = df.at[i,"Score"] + 1
else:
print(url,"")
df.at[i,"Status"]="NO"
#if uptime >0 do -1 to the value
if df.at[i,"Score"] > 0:
df.at[i,"Score"] = df.at[i,"Score"] - 1
else:
resp,resp_type,failed_response = send_server_checks(url)
if resp_type in ["chatError", "contactSubSummary"]:
resp, resp_type,failed_response = send_server_checks(url)
if failed_response is None:
print(url, "✔️")
df.at[i, "Status"]="YES"
if df.at[i, "Score"] < 100:
df.at[i,"Score"] = df.at[i,"Score"] + 1
else:
print(url,"")
df.at[i,"Status"]="NO"
#if uptime >0 do -1 to the value
if df.at[i,"Score"] > 0:
df.at[i,"Score"] = df.at[i,"Score"] - 1
else:
if index1 == -1 and index2 == -1: if index1 == -1 and index2 == -1:
url = "http://"+url url = "http://"+url
status = requests.get(url,proxies=proxies, timeout=5).status_code status = requests.get(url,proxies=proxies, timeout=5).status_code
print('[+]',url,status) print('[+]',url,status)
if status != 502: if status != 502:
@ -97,6 +144,7 @@ def main():
#if uptime >0 do -1 to the value #if uptime >0 do -1 to the value
if df.at[i,"Score"] > 0: if df.at[i,"Score"] > 0:
df.at[i,"Score"] = df.at[i,"Score"] - 1 df.at[i,"Score"] = df.at[i,"Score"] - 1
except requests.ConnectionError as e: except requests.ConnectionError as e:
#print(e) #print(e)
print(url,"") print(url,"")
@ -111,6 +159,12 @@ def main():
#if uptime >0 do -1 to the value #if uptime >0 do -1 to the value
if df.at[i,"Score"] > 0: if df.at[i,"Score"] > 0:
df.at[i,"Score"] = df.at[i,"Score"] - 1 df.at[i,"Score"] = df.at[i,"Score"] - 1
except ConnectionRefusedError:
print(url,"✔️")
df.at[i, "Status"]="YES"
if df.at[i, "Score"] < 100:
df.at[i,"Score"] = df.at[i,"Score"] + 1
df2 = df.sort_values(by=["Score"], ascending=False) df2 = df.sort_values(by=["Score"], ascending=False)
#sort by category if you are verified/unverified.csv #sort by category if you are verified/unverified.csv
@ -194,6 +248,5 @@ def IsOnionValid(url: str)-> bool:
print(f"Error: {e}") print(f"Error: {e}")
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View file

@ -1,9 +1,12 @@
import random
import re import re
import os import os
import requests import requests
from PIL import Image from PIL import Image
import json
#from SimpleX.utils import IsUrlValid #from SimpleX.utils import IsUrlValid
import urllib.parse import urllib.parse
from websockets.sync.client import connect
PURPLE = '\033[35;40m' PURPLE = '\033[35;40m'
@ -236,6 +239,43 @@ def IsCategoryValid(categories: list)-> bool:
else: else:
return True return True
def IsSimpleXServerValid(url: str) -> bool:
pattern = re.compile('[0-9A-Za-z-_]*')
url = url.strip()
try:
if url.startswith(('smp://', 'xftp://')):
# Remove the protocol part
proless = url.split('//', 1)[-1]
# Split the fingerprint and hostname
parts = proless.split('@')
if len(parts) != 2:
return False # Must have exactly one '@' character
fingerprint = parts[0]
hostname = parts[1].split(',')[0] # Get the hostname before any comma
# Check fingerprint length and pattern
if len(fingerprint) == 44 and pattern.match(fingerprint):
# Validate the hostname
result = IsSimpleXUrlValid(hostname)
if result:
# Check for an optional comma and a valid onion domain
if ',' in proless:
onion_part = proless.split(',')[1].strip()
if not hostname_pattern.match(onion_part):
return False
return True
return False
except Exception as e:
print(e)
# Any error will be a false
return False
def IsNameValid(name: str)->bool: def IsNameValid(name: str)->bool:
""" """
Check the parameter name only contains [a-zA-Z0-9 ] and is 64 chars long. Check the parameter name only contains [a-zA-Z0-9 ] and is 64 chars long.
@ -268,3 +308,78 @@ def print_colors(s:str=' ', bold:bool=False, is_error:bool = False, default:bool
else: else:
print(f"{PURPLE}{s}{RESET}") print(f"{PURPLE}{s}{RESET}")
def IsSimpleXOnionValid(url: str)-> bool:
"""
Checks if the domain(param) is a valid onion domain and return True else False.
"""
try:
pattern = re.compile(r"^[A-Za-z0-9:/._%-=#?&@]+(.onion)$")
url_pattern = re.compile(r"^(\w+:)?(?://)?(\w+\.)?[a-z2-7]{56}\.onion")
url = url.strip().removesuffix('/')
if url.startswith('http://'):
domain = url.split('/')[2]
if pattern.fullmatch(domain) is not None:
if len(domain.split('.')) > 3:
return False
else:
if len(domain) < 62:
return False
return True
elif pattern.fullmatch(domain) is None:
return False
else:
return False
else:
#TODO : edit the url to make sure it has http:// at the beginning, in case if it's missing? (problem is that it only returns true or false)
if url_pattern.match(url) is not None:
if len(url.split('.')) > 3:
return False
else:
if len(url) < 62:
return False
return True
elif url_pattern.match(url) is None:
return False
else:
return False
except Exception as e:
return False
def IsSimpleXUrlValid(url:str)->bool:
"""
Check if url is valid both dark net end clearnet.
"""
pattern = re.compile(r"^[A-Za-z0-9:/._%-=#?&@]+$")
onion_pattern = re.compile(r"^(\w+:)?(?://)?(\w+\.)?[a-z2-7]{56}\.onion")
url = str(url)
if len(url) < 4:
return False
if onion_pattern.match(url) is not None:
return IsSimpleXOnionValid(url)
else:
if not url.__contains__('.'):
return False
if pattern.fullmatch(url) is None:
return False
return True
def send_server_checks(url:str) -> ():
"""
Sends requests to sxc websocket and retuns
response, response type and testFailure or None.
"""
with connect(f"ws://localhost:3030") as websocket:
query = f"/_server test 1 {url}"
command = {
'corrId': f"id{random.randint(0,999999)}",
'cmd': query,
}
websocket.send(json.dumps(command))
message = websocket.recv()
response = json.loads(message)
resp_type = response["resp"]["type"]
failed_response = response['resp'].get('testFailure')
return (response, resp_type, failed_response)