diff --git a/scripts/uptimechecker.py b/scripts/uptimechecker.py index e3bccea..a5db7ee 100644 --- a/scripts/uptimechecker.py +++ b/scripts/uptimechecker.py @@ -1,13 +1,17 @@ from dotenv import load_dotenv +import asyncio import os,re,pwd import csv +import random import requests import json import pandas as pd import glob +from utils import IsSimpleXServerValid +from websockets.sync.client import connect script_abs_path = os.path.dirname(os.path.abspath(__file__)) env_path = os.path.join(script_abs_path+"/.env") @@ -24,101 +28,168 @@ tor_port = os.getenv("TOR_PORT") #apt install python3-pandas python3-requests python3-socks def main(): - print('[+] ONION UPTIME CHECKER') + print('[+] ONION UPTIME CHECKER') - # TODO get the instance name and exit if its not there - rootpath='/srv/darknet-lantern/' - urlpath=pwd.getpwuid(os.getuid()).pw_dir+"/.darknet_participant_url" + # TODO get the instance name and exit if its not there + rootpath='/srv/darknet-lantern/' + urlpath=pwd.getpwuid(os.getuid()).pw_dir+"/.darknet_participant_url" - #print(urlpath) + #print(urlpath) - # check if ~/.darknet_participant_url exists, - # if exists, instance= the content of ~/.darknet_participant_url (which is the url: such as uptime.nowherejez...onion) - isitvalid="n" - while isitvalid != "y": - if os.path.isfile(urlpath): - with open(urlpath) as f: - instance = f.read().rstrip() - # check if the instance URL domain is valid - if IsOnionValid(instance): - print("[+] Instance Name:",instance,IsOnionValid(instance)) - isitvalid="y" - else: - print('[-] Invalid instance name in ~/.darknet_participant_url:', instance) - return False - else: - print("[-] Instance path doesn't exist yet, run darknet_exploration.py to set it up" ) - return False - - proxies = { - 'http': f'{tor_host}:{tor_port}', - 'https': f'{tor_host}:{tor_port}' - } - - instancepath=rootpath+'www/participants/'+instance+'/' - csvfiles2check=['verified.csv','unverified.csv','webring-participants.csv'] - csvfiles2sortcat=['verified.csv','unverified.csv'] - - #for csvfile in glob.glob("/srv/darknet-lantern/www/links/*.csv"): - for csvfilename in csvfiles2check: - csvfile = instancepath+csvfilename - print('[+] Reading the CSV File:', csvfile) + # check if ~/.darknet_participant_url exists, + # if exists, instance= the content of ~/.darknet_participant_url (which is the url: such as uptime.nowherejez...onion) + isitvalid="n" + while isitvalid != "y": + if os.path.isfile(urlpath): + with open(urlpath) as f: + instance = f.read().rstrip() + # check if the instance URL domain is valid + if IsOnionValid(instance): + print("[+] Instance Name:",instance,IsOnionValid(instance)) + isitvalid="y" + else: + print('[-] Invalid instance name in ~/.darknet_participant_url:', instance) + return False + else: + print("[-] Instance path doesn't exist yet, run darknet_exploration.py to set it up" ) + return False + + proxies = { + 'http': f'{tor_host}:{tor_port}', + 'https': f'{tor_host}:{tor_port}' + } + + instancepath=rootpath+'www/participants/'+instance+'/' + csvfiles2check=['verified.csv','unverified.csv','webring-participants.csv'] + csvfiles2sortcat=['verified.csv','unverified.csv'] + + #for csvfile in glob.glob("/srv/darknet-lantern/www/links/*.csv"): + for csvfilename in csvfiles2check: + csvfile = instancepath+csvfilename + print('[+] Reading the CSV File:', csvfile) - df = pd.read_csv(csvfile) - print(df[['Name','URL']]) - print('[+] Checking if each .onion link is reachable:') - #for i in range(df.index.stop): - for i in df.index: - print("[+] Editing the uptime score") - #if empty, set to 100 - if pd.isnull(df.at[i,"Score"]): - df.at[i,"Score"] = 100 + df = pd.read_csv(csvfile) + print(df[['Name','URL']]) + print('[+] Checking if each .onion link is reachable:') + #for i in range(df.index.stop): + for i in df.index: + print("[+] Editing the uptime score") + #if empty, set to 100 + if pd.isnull(df.at[i,"Score"]): + df.at[i,"Score"] = 100 - print(i) - #print(df.at[i,"URL"]) - url=df.at[i,"URL"] - try: - index1 = url.find("http://") - index2 = url.find("https://") - if index1 == -1 and index2 == -1: - url = "http://"+url - status = requests.get(url,proxies=proxies, timeout=5).status_code - print('[+]',url,status) - if status != 502: - print(url,"✔️") - df.at[i,"Status"]="YES" - #if uptime <100 do +1 to the value - if df.at[i,"Score"] < 100: - df.at[i,"Score"] = df.at[i,"Score"] + 1 - else: - print(url,"❌") - df.at[i,"Status"]="NO" - #if uptime >0 do -1 to the value - if df.at[i,"Score"] > 0: - df.at[i,"Score"] = df.at[i,"Score"] - 1 - except requests.ConnectionError as e: - #print(e) - print(url,"❌") - df.at[i,"Status"]="NO" - #if uptime >0 do -1 to the value - if df.at[i,"Score"] > 0: - df.at[i,"Score"] = df.at[i,"Score"] - 1 - except requests.exceptions.ReadTimeout as e: - #print(e) - print(url,"❌") - df.at[i,"Status"]="NO" - #if uptime >0 do -1 to the value - if df.at[i,"Score"] > 0: - df.at[i,"Score"] = df.at[i,"Score"] - 1 + print(i) + #print(df.at[i,"URL"]) + url=df.at[i,"URL"] + try: + index1 = url.find("http://") + index2 = url.find("https://") + + + + if url.startswith("smp://") or url.startswith("xftp://"): + if IsSimpleXServerValid(url): + with connect(f"ws://localhost:3030") as websocket: + if url.startswith("smp"): + query = f"/_server test 1 {url}" + command = { + 'corrId': f"id{random.randint(0,999999)}", + 'cmd': query, + } + websocket.send(json.dumps(command)) + message = websocket.recv() + response = json.loads(message) + failed_response = response['resp'].get('testFailure') - df2 = df.sort_values(by=["Score"], ascending=False) - #sort by category if you are verified/unverified.csv - if csvfilename in csvfiles2sortcat: - df2 = df.sort_values(by=["Category"], ascending=True) - #print(df2) - df2.to_csv(csvfile, index=False) + + if failed_response is None: + print(url, "✔️") + df.at[i, "Status"]="YES" + if df.at[i, "Score"] < 100: + df.at[i,"Score"] = df.at[i,"Score"] + 1 + else: + + print(url,"❌") + df.at[i,"Status"]="NO" + #if uptime >0 do -1 to the value + if df.at[i,"Score"] > 0: + df.at[i,"Score"] = df.at[i,"Score"] - 1 + + + else: + query = f"/_server test 1 {url}" + command = { + 'corrId': f"id{random.randint(0,999999)}", + 'cmd': query, + } + websocket.send(json.dumps(command)) + message = websocket.recv() + response = json.loads(message) + failed_response = response['resp']['testFailure'] + + + if failed_response is None: + print(url, "✔️") + df.at[i, "Status"]="YES" + if df.at[i, "Score"] < 100: + df.at[i,"Score"] = df.at[i,"Score"] + 1 + else: + print(url,"❌") + df.at[i,"Status"]="NO" + #if uptime >0 do -1 to the value + if df.at[i,"Score"] > 0: + df.at[i,"Score"] = df.at[i,"Score"] - 1 + + + else: + if index1 == -1 and index2 == -1: + url = "http://"+url + + status = requests.get(url,proxies=proxies, timeout=5).status_code + print('[+]',url,status) + if status != 502: + print(url,"✔️") + df.at[i,"Status"]="YES" + #if uptime <100 do +1 to the value + if df.at[i,"Score"] < 100: + df.at[i,"Score"] = df.at[i,"Score"] + 1 + else: + print(url,"❌") + df.at[i,"Status"]="NO" + #if uptime >0 do -1 to the value + if df.at[i,"Score"] > 0: + df.at[i,"Score"] = df.at[i,"Score"] - 1 + + except requests.ConnectionError as e: + #print(e) + print(url,"❌") + df.at[i,"Status"]="NO" + #if uptime >0 do -1 to the value + if df.at[i,"Score"] > 0: + df.at[i,"Score"] = df.at[i,"Score"] - 1 + except requests.exceptions.ReadTimeout as e: + #print(e) + print(url,"❌") + df.at[i,"Status"]="NO" + #if uptime >0 do -1 to the value + if df.at[i,"Score"] > 0: + df.at[i,"Score"] = df.at[i,"Score"] - 1 + except ConnectionRefusedError: + print(url,"✔️") + df.at[i, "Status"]="YES" + if df.at[i, "Score"] < 100: + df.at[i,"Score"] = df.at[i,"Score"] + 1 + + + df2 = df.sort_values(by=["Score"], ascending=False) + #sort by category if you are verified/unverified.csv + if csvfilename in csvfiles2sortcat: + df2 = df.sort_values(by=["Category"], ascending=True) + #print(df2) + df2.to_csv(csvfile, index=False) + def IsUrlValid(url:str)->bool: """ @@ -193,7 +264,6 @@ def IsOnionValid(url: str)-> bool: except Exception as e: print(f"Error: {e}") - - + if __name__ == '__main__': - main() + main() diff --git a/scripts/utils.py b/scripts/utils.py index ad8a819..1656c25 100644 --- a/scripts/utils.py +++ b/scripts/utils.py @@ -236,6 +236,43 @@ def IsCategoryValid(categories: list)-> bool: else: return True + + +def IsSimpleXServerValid(url: str) -> bool: + pattern = re.compile('[0-9A-Za-z-_]*') + url = url.strip() + try: + + if url.startswith(('smp://', 'xftp://')): + # Remove the protocol part + proless = url.split('//', 1)[-1] + # Split the fingerprint and hostname + parts = proless.split('@') + if len(parts) != 2: + return False # Must have exactly one '@' character + + fingerprint = parts[0] + hostname = parts[1].split(',')[0] # Get the hostname before any comma + + # Check fingerprint length and pattern + if len(fingerprint) == 44 and pattern.match(fingerprint): + # Validate the hostname + result = IsSimpleXUrlValid(hostname) + if result: + # Check for an optional comma and a valid onion domain + if ',' in proless: + onion_part = proless.split(',')[1].strip() + if not hostname_pattern.match(onion_part): + return False + return True + return False + except Exception as e: + print(e) + # Any error will be a false + return False + + + def IsNameValid(name: str)->bool: """ Check the parameter name only contains [a-zA-Z0-9 ] and is 64 chars long. @@ -268,3 +305,61 @@ def print_colors(s:str=' ', bold:bool=False, is_error:bool = False, default:bool else: print(f"{PURPLE}{s}{RESET}") + + +def IsSimpleXOnionValid(url: str)-> bool: + """ + Checks if the domain(param) is a valid onion domain and return True else False. + """ + try: + pattern = re.compile(r"^[A-Za-z0-9:/._%-=#?&@]+(.onion)$") + url_pattern = re.compile(r"^(\w+:)?(?://)?(\w+\.)?[a-z2-7]{56}\.onion") + url = url.strip().removesuffix('/') + if url.startswith('http://'): + domain = url.split('/')[2] + if pattern.fullmatch(domain) is not None: + if len(domain.split('.')) > 3: + return False + else: + if len(domain) < 62: + return False + return True + elif pattern.fullmatch(domain) is None: + return False + else: + return False + else: + #TODO : edit the url to make sure it has http:// at the beginning, in case if it's missing? (problem is that it only returns true or false) + if url_pattern.match(url) is not None: + if len(url.split('.')) > 3: + return False + else: + if len(url) < 62: + return False + return True + elif url_pattern.match(url) is None: + return False + else: + return False + except Exception as e: + return False + +def IsSimpleXUrlValid(url:str)->bool: + """ + Check if url is valid both dark net end clearnet. + """ + pattern = re.compile(r"^[A-Za-z0-9:/._%-=#?&@]+$") + onion_pattern = re.compile(r"^(\w+:)?(?://)?(\w+\.)?[a-z2-7]{56}\.onion") + url = str(url) + if len(url) < 4: + return False + if onion_pattern.match(url) is not None: + return IsSimpleXOnionValid(url) + else: + if not url.__contains__('.'): + return False + if pattern.fullmatch(url) is None: + return False + return True + +