import random import re import os import requests from PIL import Image import json #from SimpleX.utils import IsUrlValid import urllib.parse from websockets.sync.client import connect import conf import pandas as pd PURPLE = '\033[35;40m' BOLD_PURPLE = '\033[35;40;1m' RED = '\033[31;40m' BOLD_RED = '\033[31;40;1m' RESET = '\033[m' def get_current_instance(): """ Get the current host instance Returns: str: the local instance onion url """ #expanduser gives the current user directory instance_file = os.path.expanduser("~") + '/.darknet_participant_url' if os.path.exists(instance_file): with open(instance_file) as f: return f.read().rstrip() else: return "" #Set the local dir on script run conf.LOCAL_INSTANCE = get_current_instance() conf.LOCAL_DIR = conf.PARTICIPANT_DIR + conf.LOCAL_INSTANCE + '/' ###################### Validations ###################### def IsSimplexChatroomValid(url: str) -> bool: """ Recognizes Simplex Chatroom link. Returns True if URL is a SimpleX chatroom, False otherwise """ return bool(conf.SIMPLEX_CHATROOM_PATTERN.match(url)) def RecognizeSimplexType(url: str) -> str: """ Recognizes Simplex Server URL, returns smp, xftp or invalid """ match = conf.SIMPLEX_SERVER_PATTERN.match(url) if match: return match.group(1) else: return 'invalid' # stub function def IsXFTPServerValid(url: str) -> bool: """ Returns True if URL is a valid SimpleX XFTP Server URL False otherwise """ return RecognizeSimplexType(url) == 'xftp' # stub function def IsSMPServerValid(url: str) -> bool: """ Returns True if URL is a valid SimpleX SMP Server URL False otherwise """ return RecognizeSimplexType(url) == 'smp' def IsClearnetLinkValid(url: str) -> bool: """ Returns True if URL is a valid clearnet URL False otherwise """ return bool(conf.CLEARNET_URL_PATTERN.match(url)) def IsOnionLinkValid(url: str) -> bool: """ Returns True if URL is a valid onion URL False otherwise """ return bool(conf.ONION_URL_PATTERN.match(url)) def RecognizeURLType(url: str) -> str: """ Recognizes URL type, can return: - chatroom - SimpleX chatroom - xftp - XFTP SimpleX server - smp - SMP SimpleX server - onion - onion URL - clearnet - valid clearnet url - invalid - none of the above (probably invalid) """ # order is important here # (ex. simplex chatroom is also valid clearnet link) if IsSimplexChatroomValid(url): return 'chatroom' if IsXFTPServerValid(url): return 'xftp' if IsSMPServerValid(url): return 'smp' if IsOnionLinkValid(url): return 'onion' if IsClearnetLinkValid(url): return 'clearnet' return 'invalid' def IsURLValid(url: str) -> bool: """ Checks if given URL is valid (RecognizeURLType recognizes it) """ return RecognizeURLType(url) != 'invalid' def CheckUrl(url): """ Checks if URL is actually reachable via Tor """ proxies = { 'http': 'socks5h://127.0.0.1:9050', 'https': 'socks5h://127.0.0.1:9050' } try: status = requests.get(url, proxies=proxies, timeout=5).status_code return status == 200 except requests.ConnectionError: return False except requests.exceptions.ReadTimeout: return False ###TODO: should replace checkUrl # checks if all the webring participants are reachable def is_participant_reachable(instance): """ Checks if all URL files are actually reachable via Tor Parameters ---------- instance : str The participant onion address Returns ------- Bool False if any file is unreachable, True if all are reachable """ url = generate_participant_url(instance) # Checks all files on a webring participant , if all reached returns true for file_name in conf.CSV_FILES: try: status = requests.get(f'{url}{file_name}',proxies=conf.PROXIES, timeout=10).status_code if status != 200: return False except Exception as err: return False return True #### PROTECTIONS AGAINST MALICIOUS CSV INPUTS #### def IsBannerValid(path: str) -> bool: """ Checks if the banner.png file has the correct dimensions (240x60) """ try: im = Image.open(path) except Exception: print("ERROR, EXCEPTION") return False width, height = im.size if width != 240 or height != 60: print("INVALID BANNER DIMENSIONS, HEIGHT=", height, " WIDTH=", width) return False filesizeMB = os.path.getsize(path)/1024/1024 if filesizeMB > 5: print("Banner filesize too large (>5Mb): ",os.path.getsize(path)/1024/1024,"MB") return False return True def IsStatusValid(status: str) -> bool: """ Checks if status contains only ['YES','NO']. Verbose only if False is returned """ pattern = ['YES','NO',''] status = status.strip() if status not in pattern: return False return True def IsScoreValid(score: str) -> bool: """ Check the Score is only "^[0-9.,]+$" with 8 max chars. """ pattern = re.compile("^[0-9.,]+$") score = str(score) score.strip() if score in ['','nan']: return True if pattern.fullmatch(score) is None: return False if len(score) > 8: return False return True def IsDescriptionValid(desc: str) -> bool: """ Check the categories are only [a-zA-Z0-9.' ] with 256 max chars. """ if desc == "": return True pattern = re.compile(r"^[A-Za-z0-9-.,' \"\(\)\/]+$") desc = str(desc) desc.strip() if pattern.fullmatch(desc) is None: return False if desc == "DEFAULT": return False elif len(desc) > 256: return False return True def IsCategoryValid(categories: list[str]) -> bool: """ Check the categories are only [a-zA-Z0-9 ] with 64 max chars. """ pattern = re.compile("^[A-Za-z0-9 ]+$") for category in categories: category.strip() if pattern.fullmatch(category) is None: return False elif len(category) > 64: return False else: return True def IsNameValid(name: str) -> bool: """ Check the parameter name only contains [a-zA-Z0-9] and is 64 chars long. """ try: return bool(conf.VALID_NAME_PATTERN.fullmatch(name.strip())) except Exception: return False def send_server_checks(url: str) -> tuple[str, str, str]: """ Sends requests to sxc websocket and retuns response, response type and testFailure or None. """ with connect(f"ws://localhost:3030") as websocket: query = f"/_server test 1 {url}" command = { 'corrId': f"id{random.randint(0,999999)}", 'cmd': query, } websocket.send(json.dumps(command)) message = websocket.recv() response = json.loads(message) resp_type = response["resp"]["type"] failed_response = response['resp'].get('testFailure') return (response, resp_type, failed_response) def is_row_valid(row): """ Validates dataframe row to check if all field are valid Parameters ---------- row : dict Dataframe row Returns ------- Bool True if row is valid, False if row isn't valid """ try: return ( IsURLValid(row['Instance']) and IsCategoryValid(row['Category']) and IsNameValid(row['Name']) and IsURLValid(row['URL']) and IsStatusValid(row['Sensitive']) and IsDescriptionValid(row['Description']) and IsStatusValid(row['Status']) and IsScoreValid(row['Score']) ) except Exception as err: return False ###################### General ###################### def renew_csv(df, participant_url): """ Removes all rows that are not generated by the local instance Parameters ---------- df : pd.DataFrame Dataframe we want to renew participant_url : str the instance url Returns: -------- pd.DataFrame The renewed dataframe """ return df[df['Instance'] == participant_url] def merge_verification_df(receiving_df, merging_df): """ Merges 2 dataframes of type verified or unverified (do not merge duplications by name or url) Parameters ---------- receiving_df : pd.DataFrame Dataframe we want to receive the data merging_df : pd.DataFrame Dataframe we want to merge into the receiving dataframe Returns: -------- pd.DataFrame The combined dataframe will be returned """ try: filtered_df = merging_df[~((merging_df['URL'].isin(receiving_df['URL'])) | merging_df['Name'].isin(receiving_df['Name']))] if filtered_df.empty: return receiving_df elif receiving_df.empty: return filtered_df else: return pd.concat([receiving_df, filtered_df], ignore_index=True) except Exception as err: return receiving_df def sort_instances(df, sort_by, preferred=None): """ Sorts dataframe Parameters ---------- df : pd.DataFrame The dataframe to sort sort_by : str The column to sort by preferred(optional) : str the preferred value is if i want the column to be sorted with a preferred value at the start of the dataframe Returns ------- pd.DataFrame The sorted dataframe """ try: if preferred: df['priority'] = (df[sort_by] == preferred).astype(int) df = df.sort_values(by=['priority', sort_by], ascending=[False, True]).drop(columns='priority') else: df = df.sort_values(by=sort_by) except Exception as err: print_colors('[-] Sorting failed',is_error=True) return df def remove_duplications(df): """ Remove url and name duplications from the dataframe Parameters ---------- df : pd.DataFrame The dataframe to remove duplications from Returns ------- pd.DataFrame The dataframe after all duplications were removed """ try: df = df.drop_duplicates(subset='Name', keep='first') df = df.drop_duplicates(subset='URL', keep='first') except Exception as err: print_colors('[-] Removing duplication failed',is_error=True) return df def remove_cross_dataframe_replications(main_df, sub_df): """ Remove replications from sub_df that exist in main_df Parameters ---------- main_df : pd.DataFrame The dataframe to keep replications sub_df : DataFrame The dataframe to remove replications Returns ------- pd.DataFrame The main_df with removed duplications pd.DataFrame The sub_df with removed duplications and removed replications """ try: main_df = remove_duplications(main_df) sub_df = remove_duplications(sub_df) mask = sub_df['URL'].isin(main_df['URL']) | sub_df['Name'].isin(main_df['Name']) sub_df = sub_df[~mask] except Exception as err: print_colors('[-] Removing cross dataframe duplications failed',is_error=True) return main_df, sub_df def add_word_to_blacklist(word): """ Add a new word to the blacklist Parameters ---------- word : str The new word we want to add to the blacklist Returns ------- bool True if word is in the blacklist or added, False if fails """ try: local_blacklist_df = get_local_blacklist() if word not in local_blacklist_df['blacklisted-words'].values: local_blacklist_df.loc[len(local_blacklist_df)] = [word] save_local_blacklist(local_blacklist_df) else: print_colors('[+] Word already exists in the blacklist') except Exception as err: print_colors('[-] Adding word to the blacklist failed',is_error=True) return local_blacklist_df def remove_word_from_blacklist(word): """ Remove a word from the blacklist Parameters ---------- word : str The word we want to remove from the blacklist Returns ------- bool True if word is not in the blacklist or removed, False if fails """ try: local_blacklist_df = get_local_blacklist() if word in local_blacklist_df['blacklisted-words'].values: local_blacklist_df = local_blacklist_df[local_blacklist_df['blacklisted-words'] != word] save_local_blacklist(local_blacklist_df) else: print_colors('[+] Word wasn\'t found on the blacklist') except Exception as err: print_colors('[-] Removing word from the blacklist failed',is_error=True) return local_blacklist_df def transfer_rows_by_instance(target_df, source_df, participant_instance): """ Transfer rows from one dataframe to another by instance condition Parameters ---------- target_df pd.DataFrame The dataframe i want to copy into source_df pd.DataFrame The dataframe i want to cut out of participant_instance : str The participant's instance onion address Returns ------- pd.DataFrame The target_df with the new rows pd.DataFrame The source_df with the removed rows """ try: mask = source_df['Instance'] == participant_instance target_df = pd.concat([target_df, source_df[mask]]) source_df = source_df[~mask] except Exception as err: print_colors('[-] Transferring rows by instance failed',is_error=True) return target_df, source_df def save_local_blacklist(blacklist_df): """ Saves the local blacklist Parameters ---------- blacklist_df : pd.DataFrame Dataframe of the blacklist Returns ------- bool True if successful, False if not """ try: save_dataframe(blacklist_df, f'{conf.LOCAL_DIR}blacklist.csv') return True except Exception as err: print_colors('[-] Saving blacklist failed',is_error=True) return False ###TODO: can later remove the inputs and have a "global" local verified and unverified or a class of the local(lantern host) participant def save_local_verified_and_unverified(verified_df, unverified_df): """ Saves the local verified and unverified Parameters ---------- verified_df : pd.DataFrame Local verified rows dataframe unverified_df : DataFrame Local unverified rows dataframe Returns ------- bool True if successful, False if not """ try: save_dataframe(verified_df, f'{conf.LOCAL_DIR}verified.csv') save_dataframe(unverified_df, f'{conf.LOCAL_DIR}unverified.csv') print_colors('[+] Verified and unverified saved successfully') return True except Exception as err: print_colors('[-] Saving verified and unverified failed',is_error=True) return False def save_local_participant_verified_and_unverified(verified_df, unverified_df, participant): """ Saves the local verified and unverified of a participant Parameters ---------- verified_df pd.DataFrame Local verified rows dataframe unverified_df pd.DataFrame Local unverified rows dataframe participant : str Participant's onion local path Returns ------- bool True if successful, False if not """ try: save_dataframe(verified_df, f'{participant}verified.csv') save_dataframe(unverified_df, f'{participant}unverified.csv') print_colors('[+] Verified and unverified saved successfully') return True except Exception as err: print_colors('[-] Saving verified and unverified failed',is_error=True) return False def save_dataframe(df, path): """ Saves a dataframe Parameters ---------- df : pd.DataFrame Dataframe wants to be saved path : str Local path for the dataframe Returns ------- bool True if saved, False if not """ try: df.to_csv(path, index=False) return True except Exception as err: return False ###################### Getters/Generators ###################### def generate_participant_url(participant): """ Generates url of the webring participant Parameters ---------- participant : str Participant's onion address/instance Returns ------- str The url of the webring participant """ return f'http://{participant}/participants/{participant}/' def generate_local_participant_dir(participant): """ Generates local files path of the webring participant Parameters ---------- participant : str Participant's onion address/instance Returns ------- str The local path of the webring participant's files """ return f'{conf.PARTICIPANT_DIR}{participant}/' def get_participant_local_verified_and_unverified(participant): """ Reads the local verified csv and the local unverified csv of a participant Parameters ---------- participant : str Participant's local files path Returns ------- pd.DataFrame verified.csv as dataframe pd.DataFrame unverified.csv as dataframe """ try: verified_df = pd.read_csv(f'{participant}verified.csv') except FileNotFoundError: print_colors("[-] File not found: verified.csv", is_error=True) return pd.DataFrame(), pd.DataFrame() try: unverified_df = pd.read_csv(f'{participant}unverified.csv') except FileNotFoundError: print_colors("[-] Participant File not found: unverified.csv", is_error=True) return pd.DataFrame(), pd.DataFrame() return verified_df, unverified_df def get_official_participants(): """ Reads all the official webring participants Returns ------- list List of all the official webring participants """ try: with open(conf.OFFICIAL_PARTICIPANTS_FILE, 'r') as file: return [line.strip() for line in file if conf.LOCAL_INSTANCE not in line] except Exception as err: print_colors('[-] Couldn\'t read official webring participants file',is_error=True ) def get_local_blacklist(): """ Reads the local blacklist Returns ------- blacklist_df : pd.DataFrame Dataframe of the blacklist """ try: try: blacklist_df = pd.read_csv(f'{conf.LOCAL_DIR}blacklist.csv') except FileNotFoundError: print_colors("[-] File not found: blacklist.csv", is_error=True) return blacklist_df except Exception as err: print_colors('[-] Failed reading the blacklist words file',is_error=True) return pd.DataFrame() def get_local_sensitive(): """ Reads the local sensitive words Returns ------- sensitive_list list List of all the words that are sensitive """ try: try: sensitive_df = pd.read_csv(f'{conf.LOCAL_DIR}sensitive.csv') except FileNotFoundError: print_colors("[-] File not found: sensitive.csv", is_error=True) return sensitive_df except Exception as err: print_colors('[-] Failed reading the sensitive words file',is_error=True) return pd.DataFrame() def get_local_verified_and_unverified(): """ Reads the local verified csv and the local unverified csv of the instance Returns ------- verified_df : pd.DataFrame verified.csv as dataframe unverified_df : pd.DataFrame unverified.csv as dataframe """ try: try: verified_df = pd.read_csv(f'{conf.LOCAL_DIR}verified.csv') except FileNotFoundError: print_colors("[-] File not found: verified.csv", is_error=True) try: unverified_df = pd.read_csv(f'{conf.LOCAL_DIR}unverified.csv') except FileNotFoundError: print_colors("[-] File not found: unverified.csv", is_error=True) return verified_df, unverified_df except Exception as err: print_colors('[-] Failed reading the verified and unverified files',is_error=True) return pd.DataFrame(), pd.DataFrame() def get_local_webring_participants(current_instance): """ Make sure the official participants are registered in the webring csv file Parameters ---------- current_instance : str The current local instance url Returns ------- pd.DataFrame The verified local webring participants dataframe """ try: webring_df = pd.read_csv(conf.LOCAL_DIR + conf.WEBRING_CSV_FILE) # finds any missing official webrings in the local webring file missing_participants = set(get_official_participants()) - set(webring_df['URL']) for participant in missing_participants: if participant == current_instance: continue new_row = [{'Name': '','URL': participant,'Description': '','Trusted': 'NO','Status': '','Score': '', 'Blacklisted': 'NO'}] webring_df = pd.concat([webring_df, pd.DataFrame(new_row)], ignore_index=True) save_dataframe(webring_df, conf.LOCAL_DIR + conf.WEBRING_CSV_FILE) return webring_df except Exception as err: print_colors(f'[-] failed reading webring participants file',is_error=True ) return pd.DataFrame() def print_colors(s:str=' ', bold:bool=False, is_error:bool = False, default:bool=False): """ Helper function to print with colors """ if is_error: print(f"{RED}{s}{RESET}") elif bold: print(f"{BOLD_PURPLE}{s}{RESET}") elif is_error and bold: print(f"{BOLD_RED}{s}{RESET}") elif default: print(f'{s}') else: print(f"{PURPLE}{s}{RESET}")