issue 26: added simplex server status checker in uptimechecker.py

This commit is contained in:
SovereigntyIsNotFreedom 2025-05-17 12:57:35 +01:00
parent a4d34775c5
commit f5913a9cef
2 changed files with 255 additions and 90 deletions

View file

@ -1,13 +1,17 @@
from dotenv import load_dotenv from dotenv import load_dotenv
import asyncio
import os,re,pwd import os,re,pwd
import csv import csv
import random
import requests import requests
import json import json
import pandas as pd import pandas as pd
import glob import glob
from utils import IsSimpleXServerValid
from websockets.sync.client import connect
script_abs_path = os.path.dirname(os.path.abspath(__file__)) script_abs_path = os.path.dirname(os.path.abspath(__file__))
env_path = os.path.join(script_abs_path+"/.env") env_path = os.path.join(script_abs_path+"/.env")
@ -24,101 +28,168 @@ tor_port = os.getenv("TOR_PORT")
#apt install python3-pandas python3-requests python3-socks #apt install python3-pandas python3-requests python3-socks
def main(): def main():
print('[+] ONION UPTIME CHECKER') print('[+] ONION UPTIME CHECKER')
# TODO get the instance name and exit if its not there # TODO get the instance name and exit if its not there
rootpath='/srv/darknet-lantern/' rootpath='/srv/darknet-lantern/'
urlpath=pwd.getpwuid(os.getuid()).pw_dir+"/.darknet_participant_url" urlpath=pwd.getpwuid(os.getuid()).pw_dir+"/.darknet_participant_url"
#print(urlpath) #print(urlpath)
# check if ~/.darknet_participant_url exists, # check if ~/.darknet_participant_url exists,
# if exists, instance= the content of ~/.darknet_participant_url (which is the url: such as uptime.nowherejez...onion) # if exists, instance= the content of ~/.darknet_participant_url (which is the url: such as uptime.nowherejez...onion)
isitvalid="n" isitvalid="n"
while isitvalid != "y": while isitvalid != "y":
if os.path.isfile(urlpath): if os.path.isfile(urlpath):
with open(urlpath) as f: with open(urlpath) as f:
instance = f.read().rstrip() instance = f.read().rstrip()
# check if the instance URL domain is valid # check if the instance URL domain is valid
if IsOnionValid(instance): if IsOnionValid(instance):
print("[+] Instance Name:",instance,IsOnionValid(instance)) print("[+] Instance Name:",instance,IsOnionValid(instance))
isitvalid="y" isitvalid="y"
else: else:
print('[-] Invalid instance name in ~/.darknet_participant_url:', instance) print('[-] Invalid instance name in ~/.darknet_participant_url:', instance)
return False return False
else: else:
print("[-] Instance path doesn't exist yet, run darknet_exploration.py to set it up" ) print("[-] Instance path doesn't exist yet, run darknet_exploration.py to set it up" )
return False return False
proxies = { proxies = {
'http': f'{tor_host}:{tor_port}', 'http': f'{tor_host}:{tor_port}',
'https': f'{tor_host}:{tor_port}' 'https': f'{tor_host}:{tor_port}'
} }
instancepath=rootpath+'www/participants/'+instance+'/' instancepath=rootpath+'www/participants/'+instance+'/'
csvfiles2check=['verified.csv','unverified.csv','webring-participants.csv'] csvfiles2check=['verified.csv','unverified.csv','webring-participants.csv']
csvfiles2sortcat=['verified.csv','unverified.csv'] csvfiles2sortcat=['verified.csv','unverified.csv']
#for csvfile in glob.glob("/srv/darknet-lantern/www/links/*.csv"): #for csvfile in glob.glob("/srv/darknet-lantern/www/links/*.csv"):
for csvfilename in csvfiles2check: for csvfilename in csvfiles2check:
csvfile = instancepath+csvfilename csvfile = instancepath+csvfilename
print('[+] Reading the CSV File:', csvfile) print('[+] Reading the CSV File:', csvfile)
df = pd.read_csv(csvfile) df = pd.read_csv(csvfile)
print(df[['Name','URL']]) print(df[['Name','URL']])
print('[+] Checking if each .onion link is reachable:') print('[+] Checking if each .onion link is reachable:')
#for i in range(df.index.stop): #for i in range(df.index.stop):
for i in df.index: for i in df.index:
print("[+] Editing the uptime score") print("[+] Editing the uptime score")
#if empty, set to 100 #if empty, set to 100
if pd.isnull(df.at[i,"Score"]): if pd.isnull(df.at[i,"Score"]):
df.at[i,"Score"] = 100 df.at[i,"Score"] = 100
print(i) print(i)
#print(df.at[i,"URL"]) #print(df.at[i,"URL"])
url=df.at[i,"URL"] url=df.at[i,"URL"]
try: try:
index1 = url.find("http://") index1 = url.find("http://")
index2 = url.find("https://") index2 = url.find("https://")
if index1 == -1 and index2 == -1:
url = "http://"+url
status = requests.get(url,proxies=proxies, timeout=5).status_code
print('[+]',url,status) if url.startswith("smp://") or url.startswith("xftp://"):
if status != 502: if IsSimpleXServerValid(url):
print(url,"✔️") with connect(f"ws://localhost:3030") as websocket:
df.at[i,"Status"]="YES" if url.startswith("smp"):
#if uptime <100 do +1 to the value query = f"/_server test 1 {url}"
if df.at[i,"Score"] < 100: command = {
df.at[i,"Score"] = df.at[i,"Score"] + 1 'corrId': f"id{random.randint(0,999999)}",
else: 'cmd': query,
print(url,"") }
df.at[i,"Status"]="NO" websocket.send(json.dumps(command))
#if uptime >0 do -1 to the value message = websocket.recv()
if df.at[i,"Score"] > 0: response = json.loads(message)
df.at[i,"Score"] = df.at[i,"Score"] - 1 failed_response = response['resp'].get('testFailure')
except requests.ConnectionError as e:
#print(e)
print(url,"")
df.at[i,"Status"]="NO"
#if uptime >0 do -1 to the value
if df.at[i,"Score"] > 0:
df.at[i,"Score"] = df.at[i,"Score"] - 1
except requests.exceptions.ReadTimeout as e:
#print(e)
print(url,"")
df.at[i,"Status"]="NO"
#if uptime >0 do -1 to the value
if df.at[i,"Score"] > 0:
df.at[i,"Score"] = df.at[i,"Score"] - 1
df2 = df.sort_values(by=["Score"], ascending=False)
#sort by category if you are verified/unverified.csv
if csvfilename in csvfiles2sortcat:
df2 = df.sort_values(by=["Category"], ascending=True)
#print(df2)
df2.to_csv(csvfile, index=False)
if failed_response is None:
print(url, "✔️")
df.at[i, "Status"]="YES"
if df.at[i, "Score"] < 100:
df.at[i,"Score"] = df.at[i,"Score"] + 1
else:
print(url,"")
df.at[i,"Status"]="NO"
#if uptime >0 do -1 to the value
if df.at[i,"Score"] > 0:
df.at[i,"Score"] = df.at[i,"Score"] - 1
else:
query = f"/_server test 1 {url}"
command = {
'corrId': f"id{random.randint(0,999999)}",
'cmd': query,
}
websocket.send(json.dumps(command))
message = websocket.recv()
response = json.loads(message)
failed_response = response['resp']['testFailure']
if failed_response is None:
print(url, "✔️")
df.at[i, "Status"]="YES"
if df.at[i, "Score"] < 100:
df.at[i,"Score"] = df.at[i,"Score"] + 1
else:
print(url,"")
df.at[i,"Status"]="NO"
#if uptime >0 do -1 to the value
if df.at[i,"Score"] > 0:
df.at[i,"Score"] = df.at[i,"Score"] - 1
else:
if index1 == -1 and index2 == -1:
url = "http://"+url
status = requests.get(url,proxies=proxies, timeout=5).status_code
print('[+]',url,status)
if status != 502:
print(url,"✔️")
df.at[i,"Status"]="YES"
#if uptime <100 do +1 to the value
if df.at[i,"Score"] < 100:
df.at[i,"Score"] = df.at[i,"Score"] + 1
else:
print(url,"")
df.at[i,"Status"]="NO"
#if uptime >0 do -1 to the value
if df.at[i,"Score"] > 0:
df.at[i,"Score"] = df.at[i,"Score"] - 1
except requests.ConnectionError as e:
#print(e)
print(url,"")
df.at[i,"Status"]="NO"
#if uptime >0 do -1 to the value
if df.at[i,"Score"] > 0:
df.at[i,"Score"] = df.at[i,"Score"] - 1
except requests.exceptions.ReadTimeout as e:
#print(e)
print(url,"")
df.at[i,"Status"]="NO"
#if uptime >0 do -1 to the value
if df.at[i,"Score"] > 0:
df.at[i,"Score"] = df.at[i,"Score"] - 1
except ConnectionRefusedError:
print(url,"✔️")
df.at[i, "Status"]="YES"
if df.at[i, "Score"] < 100:
df.at[i,"Score"] = df.at[i,"Score"] + 1
df2 = df.sort_values(by=["Score"], ascending=False)
#sort by category if you are verified/unverified.csv
if csvfilename in csvfiles2sortcat:
df2 = df.sort_values(by=["Category"], ascending=True)
#print(df2)
df2.to_csv(csvfile, index=False)
def IsUrlValid(url:str)->bool: def IsUrlValid(url:str)->bool:
""" """
@ -193,7 +264,6 @@ def IsOnionValid(url: str)-> bool:
except Exception as e: except Exception as e:
print(f"Error: {e}") print(f"Error: {e}")
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View file

@ -236,6 +236,43 @@ def IsCategoryValid(categories: list)-> bool:
else: else:
return True return True
def IsSimpleXServerValid(url: str) -> bool:
pattern = re.compile('[0-9A-Za-z-_]*')
url = url.strip()
try:
if url.startswith(('smp://', 'xftp://')):
# Remove the protocol part
proless = url.split('//', 1)[-1]
# Split the fingerprint and hostname
parts = proless.split('@')
if len(parts) != 2:
return False # Must have exactly one '@' character
fingerprint = parts[0]
hostname = parts[1].split(',')[0] # Get the hostname before any comma
# Check fingerprint length and pattern
if len(fingerprint) == 44 and pattern.match(fingerprint):
# Validate the hostname
result = IsSimpleXUrlValid(hostname)
if result:
# Check for an optional comma and a valid onion domain
if ',' in proless:
onion_part = proless.split(',')[1].strip()
if not hostname_pattern.match(onion_part):
return False
return True
return False
except Exception as e:
print(e)
# Any error will be a false
return False
def IsNameValid(name: str)->bool: def IsNameValid(name: str)->bool:
""" """
Check the parameter name only contains [a-zA-Z0-9 ] and is 64 chars long. Check the parameter name only contains [a-zA-Z0-9 ] and is 64 chars long.
@ -268,3 +305,61 @@ def print_colors(s:str=' ', bold:bool=False, is_error:bool = False, default:bool
else: else:
print(f"{PURPLE}{s}{RESET}") print(f"{PURPLE}{s}{RESET}")
def IsSimpleXOnionValid(url: str)-> bool:
"""
Checks if the domain(param) is a valid onion domain and return True else False.
"""
try:
pattern = re.compile(r"^[A-Za-z0-9:/._%-=#?&@]+(.onion)$")
url_pattern = re.compile(r"^(\w+:)?(?://)?(\w+\.)?[a-z2-7]{56}\.onion")
url = url.strip().removesuffix('/')
if url.startswith('http://'):
domain = url.split('/')[2]
if pattern.fullmatch(domain) is not None:
if len(domain.split('.')) > 3:
return False
else:
if len(domain) < 62:
return False
return True
elif pattern.fullmatch(domain) is None:
return False
else:
return False
else:
#TODO : edit the url to make sure it has http:// at the beginning, in case if it's missing? (problem is that it only returns true or false)
if url_pattern.match(url) is not None:
if len(url.split('.')) > 3:
return False
else:
if len(url) < 62:
return False
return True
elif url_pattern.match(url) is None:
return False
else:
return False
except Exception as e:
return False
def IsSimpleXUrlValid(url:str)->bool:
"""
Check if url is valid both dark net end clearnet.
"""
pattern = re.compile(r"^[A-Za-z0-9:/._%-=#?&@]+$")
onion_pattern = re.compile(r"^(\w+:)?(?://)?(\w+\.)?[a-z2-7]{56}\.onion")
url = str(url)
if len(url) < 4:
return False
if onion_pattern.match(url) is not None:
return IsSimpleXOnionValid(url)
else:
if not url.__contains__('.'):
return False
if pattern.fullmatch(url) is None:
return False
return True