import random import re import os import requests from PIL import Image import json #from SimpleX.utils import IsUrlValid import urllib.parse from websockets.sync.client import connect import conf import pandas as pd PURPLE = '\033[35;40m' BOLD_PURPLE = '\033[35;40;1m' RED = '\033[31;40m' BOLD_RED = '\033[31;40;1m' RESET = '\033[m' def get_current_instance(): """ Checks if all URL files are actually reachable via Tor Returns: str: the local instance onion url """ #expanduser gives the current user directory instance_file = os.path.expanduser("~") + '/.darknet_participant_url' with open(instance_file) as f: return f.read().rstrip() #Set the local dir on script run conf.LOCAL_DIR = conf.PARTICIPANT_DIR + get_current_instance() + '/' ###################### Validations ###################### def CheckUrl(url): """ Checks if URL is actually reachable via Tor """ proxies = { 'http': 'socks5h://127.0.0.1:9050', 'https': 'socks5h://127.0.0.1:9050' } try: status = requests.get(url,proxies=proxies, timeout=5).status_code if status == 200: return True else: return False except requests.ConnectionError as e: return False except requests.exceptions.ReadTimeout as e: return False ###TODO: should replace checkUrl # checks if all the webring participants are reachable def is_participant_reachable(instance): """ Checks if all URL files are actually reachable via Tor Parameters: instance (str): The participant onion address Returns: Boolean: False if any file is unreachable, True if all are reachable """ url = generate_participant_url(instance) # Checks all files on a webring participant , if all reached returns true for file_name in conf.CSV_FILES: try: status = requests.get(f'{url}{file_name}',proxies=conf.PROXIES, timeout=10).status_code if status != 200: return False except Exception: return False return True #### PROTECTIONS AGAINST MALICIOUS CSV INPUTS #### def IsBannerValid(path: str) -> bool: """ Checks if the banner.png file has the correct dimensions (240x60) """ try: im = Image.open(path) except Exception as e: print("ERROR, EXCEPTION") return False width, height = im.size if width != 240 or height != 60: print("INVALID BANNER DIMENSIONS, HEIGHT=",height," WIDTH=",width) return False filesizeMB=os.path.getsize(path)/1024/1024 if filesizeMB > 5: print("Banner filesize too large (>5Mb): ",os.path.getsize(path)/1024/1024,"MB") return False return True def IsOnionValid(url: str)-> bool: """ Checks if the domain(param) is a valid onion domain and return True else False. """ try: pattern = re.compile("^[A-Za-z0-9.]+(.onion)?$") url = url.strip().removesuffix('/') if url.startswith('http://'): domain = url.split('/')[2] if pattern.fullmatch(domain) is not None: if len(domain.split('.')) > 3: return False else: if len(domain) < 62: return False return True elif pattern.fullmatch(domain) is None: return False else: return False else: #TODO : edit the url to make sure it has http:// at the beginning, in case if it's missing? (problem is that it only returns true or false) if pattern.fullmatch(url) is not None: if len(url.split('.')) > 3: return False else: if len(url) < 62: return False return True elif pattern.fullmatch(url) is None: return False else: return False except Exception as e: return False def IsSimpleXChatroomValid(url: str) -> bool: """Validate the SimpleX chatroom URL.""" REQUIRED_SUBSTRING = "#/?v=2-7&smp=smp%3A%2F" # Step 1: Check if it starts with http://, https://, or simplex:/ if url.startswith(('http://', 'https://', 'simplex:/')): # Step 1.5: If http:// or https://, check for valid clearnet or onion domain if url.startswith(('http://', 'https://')) and not IsUrlValid(url): return False elif not url.startswith('simplex:/'): return False # Must start with one of the valid protocols # Step 2: Check for the presence of the required substring if REQUIRED_SUBSTRING not in url: return False # Required substring not found # Step 3: Extract the part after "smp=smp%3A%2F" smp_start = url.find("smp=smp%3A%2F") if smp_start == -1: return False # Required substring not found smp_start += len("smp=smp%3A%2F") smp_end = url.find("&", smp_start) if smp_end == -1: smp_end = len(url) # Take until the end if no "&" is found smp_value = urllib.parse.unquote(url[smp_start:smp_end]) # Decode the URL-encoded string # Step 3.5: Check if the smp_value contains a valid hostname if '@' not in smp_value: return False # Must contain '@' to separate fingerprint and hostname fingerprint, hostname = smp_value.split('@', 1) if not IsUrlValid(hostname): return False # Invalid hostname # Step 4: Check for the presence of "%2F" in the original URL if "%2F" not in url: return False # Required substring not found # If all checks pass, return True return True def IsUrlValid(url:str)->bool: """ Check if url is valid both dark net end clearnet. """ pattern = re.compile(r"^[A-Za-z0-9:/._%-=#?&@]+$") onion_pattern = re.compile(r"^(\w+:)?(?://)?(\w+\.)?[a-z2-7]{56}\.onion") url = str(url) if len(url) < 4: return False if onion_pattern.match(url) is not None: return IsOnionValid(url) else: if not url.__contains__('.'): return False if url.__contains__(';'): return False #required otherwise lantern thinks there are extra columns if pattern.fullmatch(url) is None: return False return True def IsStatusValid(status: str)-> bool: """ Checks if status contains only ['YES','NO']. Verbose only if False is returned """ pattern = ['YES','NO','✔️','❌',''] #pattern = ['YES','NO'] status = str(status) status.strip() if (status not in pattern): return False return True def IsScoreValid(score:str)->bool: """ Check the Score is only "^[0-9.,]+$" with 8 max chars. """ pattern = re.compile("^[0-9.,]+$") score = str(score) score.strip() if score in ['','nan']: return True if pattern.fullmatch(score) is None: return False elif len(score) > 8: return False return True def IsDescriptionValid(desc:str)->bool: """ Check the categories are only [a-zA-Z0-9.' ] with 256 max chars. """ if desc == "": return True pattern = re.compile("^[A-Za-z0-9-.,' \"\(\)\/]+$") desc = str(desc) desc.strip() if pattern.fullmatch(desc) is None: return False if desc == "DEFAULT": return False elif len(desc) > 256: return False return True def IsCategoryValid(categories: list)-> bool: """ Check the categories are only [a-zA-Z0-9 ] with 64 max chars. """ pattern = re.compile("^[A-Za-z0-9 ]+$") for category in categories: category.strip() if pattern.fullmatch(category) is None: return False elif len(category) > 64: return False else: return True def IsSimpleXServerValid(url: str) -> bool: pattern = re.compile('[0-9A-Za-z-_]*') url = url.strip() try: if url.startswith(('smp://', 'xftp://')): # Remove the protocol part proless = url.split('//', 1)[-1] # Split the fingerprint and hostname parts = proless.split('@') if len(parts) != 2: return False # Must have exactly one '@' character fingerprint = parts[0] hostname = parts[1].split(',')[0] # Get the hostname before any comma # Check fingerprint length and pattern if len(fingerprint) == 44 and pattern.match(fingerprint): # Validate the hostname result = IsSimpleXUrlValid(hostname) if result: # Check for an optional comma and a valid onion domain if ',' in proless: onion_part = proless.split(',')[1].strip() if not hostname_pattern.match(onion_part): return False return True return False except Exception as e: print(e) # Any error will be a false return False def IsNameValid(name: str)->bool: """ Check the parameter name only contains [a-zA-Z0-9 ] and is 64 chars long. """ try: name = str(name) except Exception as e: return False pattern = re.compile("^[A-Za-z0-9 ]+$") name = name.strip() if (pattern.fullmatch(name) is None): return False elif len(name) > 64: return False return True def print_colors(s:str=' ', bold:bool=False, is_error:bool = False, default:bool=False): """ Helper function to print with colors """ if is_error: print(f"{RED}{s}{RESET}") elif bold: print(f"{BOLD_PURPLE}{s}{RESET}") elif is_error and bold: print(f"{BOLD_RED}{s}{RESET}") elif default: print(f'{s}') else: print(f"{PURPLE}{s}{RESET}") def IsSimpleXOnionValid(url: str)-> bool: """ Checks if the domain(param) is a valid onion domain and return True else False. """ try: pattern = re.compile(r"^[A-Za-z0-9:/._%-=#?&@]+(.onion)$") url_pattern = re.compile(r"^(\w+:)?(?://)?(\w+\.)?[a-z2-7]{56}\.onion") url = url.strip().removesuffix('/') if url.startswith('http://'): domain = url.split('/')[2] if pattern.fullmatch(domain) is not None: if len(domain.split('.')) > 3: return False else: if len(domain) < 62: return False return True elif pattern.fullmatch(domain) is None: return False else: return False else: #TODO : edit the url to make sure it has http:// at the beginning, in case if it's missing? (problem is that it only returns true or false) if url_pattern.match(url) is not None: if len(url.split('.')) > 3: return False else: if len(url) < 62: return False return True elif url_pattern.match(url) is None: return False else: return False except Exception as e: return False def IsSimpleXUrlValid(url:str)->bool: """ Check if url is valid both dark net end clearnet. """ pattern = re.compile(r"^[A-Za-z0-9:/._%-=#?&@]+$") onion_pattern = re.compile(r"^(\w+:)?(?://)?(\w+\.)?[a-z2-7]{56}\.onion") url = str(url) if len(url) < 4: return False if onion_pattern.match(url) is not None: return IsSimpleXOnionValid(url) else: if not url.__contains__('.'): return False if pattern.fullmatch(url) is None: return False return True def send_server_checks(url:str) -> (): """ Sends requests to sxc websocket and retuns response, response type and testFailure or None. """ with connect(f"ws://localhost:3030") as websocket: query = f"/_server test 1 {url}" command = { 'corrId': f"id{random.randint(0,999999)}", 'cmd': query, } websocket.send(json.dumps(command)) message = websocket.recv() response = json.loads(message) resp_type = response["resp"]["type"] failed_response = response['resp'].get('testFailure') return (response, resp_type, failed_response) def is_row_valid(row): """ validates dataframe row to check if all field are valid Parameters: row (dict): dataframe row Returns: Boolean: True if row is valid, False if row isn't valid """ try: return ( IsUrlValid(row['Instance']) and IsCategoryValid(row['Category']) and IsNameValid(row['Name']) and IsUrlValid(row['URL']) and IsStatusValid(row['Sensitive']) and IsDescriptionValid(row['Description']) and IsStatusValid(row['Status']) and IsScoreValid(row['Score']) ) except Exception: return False ###################### General ###################### def merge_verification_df(receiving_df, merging_df): """ merges 2 dataframes of type verified or unverified (do not merge duplications by name or url) Parameters: receiving_df (Dataframe): dataframe we want to receive the data merging_df (Dataframe): dataframe we want to merge into the receiving dataframe Returns: Dataframe: the combined dataframe will be returned """ try: filtered_df = merging_df[~((merging_df['URL'].isin(receiving_df['URL'])) | merging_df['Name'].isin(receiving_df['Name']))] if filtered_df.empty: return receiving_df elif receiving_df.empty: return filtered_df else: return pd.concat([receiving_df, filtered_df], ignore_index=True) except Exception: return receiving_df def remove_duplications(df): """ remove url and name duplications from the dataframe Parameters: df (Dataframe): the dataframe to remove duplications from Returns: Dataframe: the dataframe after all duplications were removed """ try: df = df.drop_duplicates(subset='Name') df = df.drop_duplicates(subset='URL') except Exception: pass return df def remove_cross_dataframe_replications(main_df, sub_df): try: main_df = remove_duplications(main_df) sub_df = remove_duplications(sub_df) mask = sub_df['URL'].isin(main_fd['URL']) | df_a['Name'].isin(df_b['Name']) sub_df = sub_df[~mask] return sub_df except: pass return main_df, sub_df ###TODO: can later remove the inputs and have a "global" local verified and unverified or a class of the local(lantern host) participant def save_local_verified_and_unverified(verified_df, unverified_df): """ saves the local verified and unverified Parameters: verified_df (Dataframe): local verified rows dataframe unverified_df (Dataframe): local unverified rows dataframe Returns: Dataframe: the combined dataframe will be returned """ try: current_instance = get_current_instance() + '/' verified_df.to_csv(f'{conf.PARTICIPANT_DIR}{current_instance}verified.csv', index=False) unverified_df.to_csv(f'{conf.PARTICIPANT_DIR}{current_instance}unverified.csv', index=False) return True except Exception: print_colors('[-] Saving verified and unverified failed',is_error=True ) return False ###################### Getters/Generators ###################### def generate_participant_url(participant): """ generates url of the webring participant Parameters: participant(str): participant's onion address/instance Returns: str: the url of the webring participant """ return f'http://{participant}/participants/{participant}/' def generate_local_participant_dir(participant): """ generates local files path of the webring participant Parameters: participant(str): participant's onion address/instance Returns: str: the local path of the webring participant's files """ return f'{conf.PARTICIPANT_DIR}{participant}/' def get_participant_local_verified_and_unverified(participant): """ reads the local verified csv and the local unverified csv of a participant Parameters: participant (str): participant's onion address/instance Returns: verified_df(Dataframe): verified.csv as dataframe unverified_df(Dataframe): unverified.csv as dataframe """ try: current_instance = get_current_instance() + '/' try: verified_df = pd.read_csv(f'{participant}verified.csv') except FileNotFoundError: print_colors("[-] File not found: verified.csv", is_error=True) try: unverified_df = pd.read_csv(f'{participant}unverified.csv') except FileNotFoundError: print_colors("[-] Participant File not found: unverified.csv", is_error=True) return verified_df, unverified_df except Exception: print_colors('[-] Failed reading the verified and unverified files',is_error=True) return pd.DataFrame(), pd.DataFrame() def get_official_participants(): """ reads all the official webring participants Returns: list: list of all the official webring participants """ try: current_instance = get_current_instance() with open(conf.OFFICIAL_PARTICIPANTS_FILE, 'r') as file: return [line.strip() for line in file if current_instance not in line] except Exception: print_colors('[-] Couldn\'t read official webring participants file',is_error=True ) def get_local_blacklist_and_sensitive(): """ reads the local blacklisted words and the local sensitive words Returns: blacklist(list): list of all the words that are blacklisted sensitive_list(list): list of all the words that are sensitive """ try: current_instance = get_current_instance() + '/' try: blacklist_df = pd.read_csv(f'{conf.PARTICIPANT_DIR}{current_instance}blacklist.csv') blacklist = blacklist_df.iloc[:, 0].tolist() except FileNotFoundError: print_colors("[-] File not found: blacklist.csv", is_error=True) try: sensitive_df = pd.read_csv(f'{conf.PARTICIPANT_DIR}{current_instance}sensitive.csv') sensitive_list = sensitive_df.iloc[:, 0].tolist() except FileNotFoundError: print_colors("[-] File not found: sensitive.csv", is_error=True) return blacklist, sensitive_list except Exception: print_colors('[-] Failed reading the blacklist and sensitive words file',is_error=True) return [], [] def get_local_verified_and_unverified(): """ reads the local verified csv and the local unverified csv of the instance Returns: verified_df(Dataframe): verified.csv as dataframe unverified_df(Dataframe): unverified.csv as dataframe """ try: current_instance = get_current_instance() + '/' try: verified_df = pd.read_csv(f'{conf.PARTICIPANT_DIR}{current_instance}verified.csv') except FileNotFoundError: print_colors("[-] File not found: verified.csv", is_error=True) try: unverified_df = pd.read_csv(f'{conf.PARTICIPANT_DIR}{current_instance}unverified.csv') except FileNotFoundError: print_colors("[-] File not found: unverified.csv", is_error=True) return verified_df, unverified_df except Exception: print_colors('[-] Failed reading the verified and unverified files',is_error=True) return pd.DataFrame(), pd.DataFrame() def get_local_webring_participants(): """ make sure the official participants are registered in the webring csv file Returns: Dataframe: the verified local webring participants dataframe """ try: webring_df = pd.read_csv(conf.LOCAL_DIR + conf.WEBRING_CSV_FILE) # finds any missing official webrings in the local webring file missing_participants = set(get_official_participants()) - set(webring_df['URL']) for participant in missing_participants: new_row = [{'Name': '','URL': participant,'Description': '','Trusted': 'NO','Status': '','Score': ''}] webring_df = pd.concat([webring_df, pd.DataFrame(new_row)], ignore_index=True) webring_df.to_csv(conf.LOCAL_DIR + conf.WEBRING_CSV_FILE, index=False) return webring_df except Exception: print_colors(f'[-] failed reading webring participants file',is_error=True ) return pd.DataFrame()