import random import re import os import requests from PIL import Image import json #from SimpleX.utils import IsUrlValid import urllib.parse from websockets.sync.client import connect import conf import pandas as pd PURPLE = '\033[35;40m' BOLD_PURPLE = '\033[35;40;1m' RED = '\033[31;40m' BOLD_RED = '\033[31;40;1m' RESET = '\033[m' def get_current_instance(): """ Checks if all URL files are actually reachable via Tor Returns: str: the local instance onion url """ #expanduser gives the current user directory instance_file = os.path.expanduser("~") + '/.darknet_participant_url' with open(instance_file) as f: return f.read().rstrip() #Set the local dir on script run conf.LOCAL_DIR = conf.PARTICIPANT_DIR + get_current_instance() + '/' ###################### Validations ###################### def IsSimplexChatroomValid(url: str) -> bool: """ Recognizes Simplex Chatroom link. Returns True if URL is a SimpleX chatroom, False otherwise """ return bool(conf.SIMPLEX_CHATROOM_PATTERN.match(url)) def RecognizeSimplexType(url: str) -> str: """ Recognizes Simplex Server URL, returns smp, xftp or invalid """ match = conf.SIMPLEX_SERVER_PATTERN.match(url) if match: return match.group(1) else: return 'invalid' # stub function def IsXFTPServerValid(url: str) -> bool: """ Returns True if URL is a valid SimpleX XFTP Server URL False otherwise """ return conf.RecognizeSimplexType(url) == 'xftp' # stub function def IsSMPServerValid(url: str) -> bool: """ Returns True if URL is a valid SimpleX SMP Server URL False otherwise """ return conf.RecognizeSimplexType(url) == 'smp' def IsClearnetLinkValid(url: str) -> bool: """ Returns True if URL is a valid clearnet URL False otherwise """ return bool(conf.CLEARNET_URL_PATTERN.match(url)) def IsOnionLinkValid(url: str) -> bool: """ Returns True if URL is a valid onion URL False otherwise """ return bool(conf.ONION_URL_PATTERN.match(url)) def RecognizeURLType(url: str) -> str: """ Recognizes URL type, can return: - chatroom - SimpleX chatroom - xftp - XFTP SimpleX server - smp - SMP SimpleX server - onion - onion URL - clearnet - valid clearnet url - invalid - none of the above (probably invalid) """ # order is important here # (ex. simplex chatroom is also valid clearnet link) if IsSimplexChatroomValid(url): return 'chatroom' if IsXFTPServerValid(url): return 'xftp' if IsSMPServerValid(url): return 'smp' if IsOnionLinkValid(url): return 'onion' if IsClearnetLinkValid(url): return 'clearnet' return 'invalid' def IsURLValid(url: str) -> bool: """ Checks if given URL is valid (RecognizeURLType recognizes it) """ return RecognizeURLType(url) != 'invalid' def CheckUrl(url): """ Checks if URL is actually reachable via Tor """ proxies = { 'http': 'socks5h://127.0.0.1:9050', 'https': 'socks5h://127.0.0.1:9050' } try: status = requests.get(url, proxies=proxies, timeout=5).status_code return status == 200 except requests.ConnectionError: return False except requests.exceptions.ReadTimeout: return False ###TODO: should replace checkUrl # checks if all the webring participants are reachable def is_participant_reachable(instance): """ Checks if all URL files are actually reachable via Tor Parameters: instance (str): The participant onion address Returns: Boolean: False if any file is unreachable, True if all are reachable """ url = generate_participant_url(instance) # Checks all files on a webring participant , if all reached returns true for file_name in conf.CSV_FILES: try: status = requests.get(f'{url}{file_name}',proxies=conf.PROXIES, timeout=10).status_code if status != 200: return False except Exception: return False return True #### PROTECTIONS AGAINST MALICIOUS CSV INPUTS #### def IsBannerValid(path: str) -> bool: """ Checks if the banner.png file has the correct dimensions (240x60) """ try: im = Image.open(path) except Exception: print("ERROR, EXCEPTION") return False width, height = im.size if width != 240 or height != 60: print("INVALID BANNER DIMENSIONS, HEIGHT=", height, " WIDTH=", width) return False filesizeMB = os.path.getsize(path)/1024/1024 if filesizeMB > 5: print("Banner filesize too large (>5Mb): ",os.path.getsize(path)/1024/1024,"MB") return False return True def IsStatusValid(status: str) -> bool: """ Checks if status contains only ['YES','NO']. Verbose only if False is returned """ pattern = ['YES','NO',''] status = status.strip() if status not in pattern: return False return True def IsScoreValid(score: str) -> bool: """ Check the Score is only "^[0-9.,]+$" with 8 max chars. """ pattern = re.compile("^[0-9.,]+$") score = str(score) score.strip() if score in ['','nan']: return True if pattern.fullmatch(score) is None: return False if len(score) > 8: return False return True def IsDescriptionValid(desc: str) -> bool: """ Check the categories are only [a-zA-Z0-9.' ] with 256 max chars. """ if desc == "": return True pattern = re.compile(r"^[A-Za-z0-9-.,' \"\(\)\/]+$") desc = str(desc) desc.strip() if pattern.fullmatch(desc) is None: return False if desc == "DEFAULT": return False elif len(desc) > 256: return False return True def IsCategoryValid(categories: list[str]) -> bool: """ Check the categories are only [a-zA-Z0-9 ] with 64 max chars. """ pattern = re.compile("^[A-Za-z0-9 ]+$") for category in categories: category.strip() if pattern.fullmatch(category) is None: return False elif len(category) > 64: return False else: return True def IsNameValid(name: str) -> bool: """ Check the parameter name only contains [a-zA-Z0-9] and is 64 chars long. """ try: return bool(VALID_NAME_PATTERN.fullmatch(name.strip())) except Exception: return False def send_server_checks(url: str) -> tuple[str, str, str]: """ Sends requests to sxc websocket and retuns response, response type and testFailure or None. """ with connect(f"ws://localhost:3030") as websocket: query = f"/_server test 1 {url}" command = { 'corrId': f"id{random.randint(0,999999)}", 'cmd': query, } websocket.send(json.dumps(command)) message = websocket.recv() response = json.loads(message) resp_type = response["resp"]["type"] failed_response = response['resp'].get('testFailure') return (response, resp_type, failed_response) def is_row_valid(row): """ validates dataframe row to check if all field are valid Parameters: row (dict): dataframe row Returns: Boolean: True if row is valid, False if row isn't valid """ try: return ( IsUrlValid(row['Instance']) and IsCategoryValid(row['Category']) and IsNameValid(row['Name']) and IsUrlValid(row['URL']) and IsStatusValid(row['Sensitive']) and IsDescriptionValid(row['Description']) and IsStatusValid(row['Status']) and IsScoreValid(row['Score']) ) except Exception: return False ###################### General ###################### def merge_verification_df(receiving_df, merging_df): """ merges 2 dataframes of type verified or unverified (do not merge duplications by name or url) Parameters: receiving_df (Dataframe): dataframe we want to receive the data merging_df (Dataframe): dataframe we want to merge into the receiving dataframe Returns: Dataframe: the combined dataframe will be returned """ try: filtered_df = merging_df[~((merging_df['URL'].isin(receiving_df['URL'])) | merging_df['Name'].isin(receiving_df['Name']))] if filtered_df.empty: return receiving_df elif receiving_df.empty: return filtered_df else: return pd.concat([receiving_df, filtered_df], ignore_index=True) except Exception: return receiving_df def remove_duplications(df): """ remove url and name duplications from the dataframe Parameters: df (Dataframe): the dataframe to remove duplications from Returns: Dataframe: the dataframe after all duplications were removed """ try: df = df.drop_duplicates(subset='Name') df = df.drop_duplicates(subset='URL') except Exception: pass return df ###TODO: can later remove the inputs and have a "global" local verified and unverified or a class of the local(lantern host) participant def save_local_verified_and_unverified(verified_df, unverified_df): """ saves the local verified and unverified Parameters: verified_df (Dataframe): local verified rows dataframe unverified_df (Dataframe): local unverified rows dataframe Returns: Dataframe: the combined dataframe will be returned """ try: current_instance = get_current_instance() + '/' verified_df.to_csv(f'{conf.PARTICIPANT_DIR}{current_instance}verified.csv', index=False) unverified_df.to_csv(f'{conf.PARTICIPANT_DIR}{current_instance}unverified.csv', index=False) return True except Exception: print_colors('[-] Saving verified and unverified failed',is_error=True ) return False ###################### Getters/Generators ###################### def generate_participant_url(participant): """ generates url of the webring participant Parameters: participant(str): participant's onion address/instance Returns: str: the url of the webring participant """ return f'http://{participant}/participants/{participant}/' def generate_local_participant_dir(participant): """ generates local files path of the webring participant Parameters: participant(str): participant's onion address/instance Returns: str: the local path of the webring participant's files """ return f'{conf.PARTICIPANT_DIR}{participant}/' def get_official_participants(): """ reads all the official webring participants Returns: list: list of all the official webring participants """ try: current_instance = get_current_instance() with open(conf.OFFICIAL_PARTICIPANTS_FILE, 'r') as file: return [line.strip() for line in file if current_instance not in line] except Exception: print_colors('[-] Couldn\'t read official webring participants file',is_error=True ) def get_local_blacklist_and_sensitive(): """ reads the local blacklisted words and the local sensitive words Returns: blacklist(list): list of all the words that are blacklisted sensitive_list(list): list of all the words that are sensitive """ try: current_instance = get_current_instance() + '/' blacklist_df = pd.read_csv(f'{conf.PARTICIPANT_DIR}{current_instance}blacklist.csv') blacklist = blacklist_df.iloc[:, 0].tolist() sensitive_df = pd.read_csv(f'{conf.PARTICIPANT_DIR}{current_instance}sensitive.csv') sensitive_list = sensitive_df.iloc[:, 0].tolist() return blacklist, sensitive_list except Exception: print_colors('[-] Failed reading the blacklist and sensitive words file',is_error=True ) return [], [] def get_local_verified_and_unverified(): """ reads the local verified csv and the local unverified csv Returns: verified_df(Dataframe): verified.csv as dataframe unverified_df(Dataframe): unverified.csv as dataframe """ try: current_instance = get_current_instance() + '/' verified_df = pd.read_csv(f'{conf.PARTICIPANT_DIR}{current_instance}verified.csv') unverified_df = pd.read_csv(f'{conf.PARTICIPANT_DIR}{current_instance}unverified.csv') return verified_df, unverified_df except Exception: print_colors('[-] Failed reading the verified and unverified files',is_error=True ) return pd.DataFrame(), pd.DataFrame() def get_local_webring_participants(): """ make sure the official participants are registered in the webring csv file Returns: Dataframe: the verified local webring participants dataframe """ try: webring_df = pd.read_csv(conf.LOCAL_DIR + conf.WEBRING_CSV_FILE) # finds any missing official webrings in the local webring file missing_participants = set(get_official_participants()) - set(webring_df['URL']) for participant in missing_participants: new_row = [{'Name': '','URL': participant,'Description': '','Trusted': 'NO','Status': '','Score': ''}] webring_df = pd.concat([webring_df, pd.DataFrame(new_row)], ignore_index=True) webring_df.to_csv(conf.LOCAL_DIR + conf.WEBRING_CSV_FILE, index=False) return webring_df except Exception: print_colors(f'[-] failed reading webring participants file',is_error=True ) return pd.DataFrame() def print_colors(s:str=' ', bold:bool=False, is_error:bool = False, default:bool=False): """ Helper function to print with colors """ if is_error: print(f"{RED}{s}{RESET}") elif bold: print(f"{BOLD_PURPLE}{s}{RESET}") elif is_error and bold: print(f"{BOLD_RED}{s}{RESET}") elif default: print(f'{s}') else: print(f"{PURPLE}{s}{RESET}")