diff --git a/scripts/conf.py b/scripts/conf.py index ed6b5eb..3c2728e 100644 --- a/scripts/conf.py +++ b/scripts/conf.py @@ -19,4 +19,36 @@ CSV_FILES = [ 'blacklist.csv', 'sensitive.csv', 'webring-participants.csv' -] \ No newline at end of file +] + + +############ REGEX ############ + +# name should contain only up to 64 alphanumeric characters +VALID_NAME_PATTERN = re.compile(r"^[A-Za-z0-9]{1,64}$") + +# pattern for regular urls (https://stackoverflow.com/a/3809435) +CLEARNET_URL_PATTERN = re.compile( + r"https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]" + r"{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)" +) + +# pattern for onion urls (56 bytes of base32 alphabet + .onion) +# it works also without http(s)://, so just the hostname will also go through +ONION_URL_PATTERN = re.compile( + r"^(https?:\/\/)?([a-zA-Z0-9-]+\.)*[a-z2-7-]{56}\.onion[^\s]*$" +) + +# pattern for simplex chatroom links +SIMPLEX_CHATROOM_PATTERN = re.compile( + r"(?:https?:\/\/(?:simplex\.chat|[^\/]+)|simplex:)\/(?:contact|invitation)#\/\?v=[\d-]+" + r"&smp=[^&]+(?:&[^=]+=[^&]*)*(?:&data=\{[^}]*\})?" +) + +# pattern for smp or xftp simplex server ((smp|xftp):// 44 byte key @ url [:port]) +SIMPLEX_SERVER_PATTERN = re.compile( + r"^(smp|xftp):\/\/([a-zA-Z0-9\-_+=]{44})@([a-z2-7]{56}\.onion|" + r"([a-zA-Z0-9\-\.]+\.[a-zA-Z0-9\-\.]+))" + r"{1,}(?::[1-9][0-9]{0,4}|[1-5][0-9]{4}|6[0-4][0-9]{3}|" + r"65[0-4][0-9]{2}|655[0-3][0-9]|6553[0-5])?$" +) \ No newline at end of file diff --git a/scripts/crawler.py b/scripts/crawler.py index c7446ea..b4a9650 100644 --- a/scripts/crawler.py +++ b/scripts/crawler.py @@ -11,8 +11,10 @@ import re import sys sys.path.append("..") -from utils import print_colors, IsUrlValid -from SimpleX.regex_simplexlinks import IsSimpleXChatroomValid, IsSimpleXServerValid +from utils import ( + print_colors, IsURLValid, IsSimplexChatroomValid, RecognizeURLType +) +#from SimpleX.regex_simplexlinks import IsSimpleXChatroomValid, IsSimpleXServerValid from dotenv import load_dotenv # Make default parameters for arguments @@ -107,12 +109,12 @@ def add_urls(urls): global output_file for url in urls: parsed_url = urllib.parse.urlparse(url) - if IsSimpleXChatroomValid(url) and not (output_file['URL'] == url).any(): + if IsSimplexChatroomValid(url) and not (output_file['URL'] == url).any(): output_file.loc[-1] = ["", url, "", "SimpleX Chatroom"] output_file.index += 1 output_file = output_file.sort_index() continue - elif IsSimpleXServerValid(url) and not (output_file['URL'] == url).any(): + elif RecognizeURLType(url) in ('smp', 'xftp') and not (output_file['URL'] == url).any(): output_file.loc[-1] = ["", url, "", "SimpleX Server"] output_file.index += 1 output_file = output_file.sort_index() @@ -164,13 +166,13 @@ def extract_urls_html(url, text): print_colors(f'[D] Joined URL: {joined_url}') # Capture SimpleX URLs - if IsSimpleXChatroomValid(joined_url) or IsSimpleXServerValid(joined_url): + if RecognizeURLType(joined_url) in ('smp', 'xftp', 'chatroom'): if url not in result.thirdp_urls: result.thirdp_urls.append(joined_url) continue # Check if the URL is a .onion link or not even a web link - if not IsUrlValid(joined_url): + if not IsURLValid(joined_url): continue print_colors(f'[+] Found url: {joined_url}') @@ -266,4 +268,3 @@ for i, url in enumerate(vcsv_urls): crawl_url(url) crawler_file.to_csv(args.crawler_file, index=False) output_file.to_csv(args.output, index=False) - diff --git a/scripts/lantern.py b/scripts/lantern.py index 5e2fae6..2bd1454 100644 --- a/scripts/lantern.py +++ b/scripts/lantern.py @@ -1,4 +1,3 @@ -###TODO: importing * is bad practice should import just utils and use it like in lantern_logic.py from utils import * import logic.lantern_logic as lantern from dotenv import load_dotenv @@ -52,10 +51,6 @@ def main(): print("Official participant ",line.strip() , "'s directory doesnt exist, creating it") os.makedirs(participantdir) - - - - print_colors(""" ; ED. @@ -97,8 +92,8 @@ def main(): if os.path.isfile(urlpath): with open(urlpath) as f: instance = f.read().rstrip() - if IsOnionValid(instance): - print_colors(f"[+] Instance Name: {instance}. Valid:{IsOnionValid(instance)}") + if IsOnionLinkValid(instance): + print_colors(f"[+] Instance Name: {instance}. Valid:{IsOnionLinkValid(instance)}") break else: print_colors(f'[-] Invalid instance name in ~/.darknet_participant_url: {instance}',is_error=True ) @@ -107,8 +102,8 @@ def main(): print_colors("[+] Instance Path doesn't exist yet") print_colors(f"Your url will be saved here {urlpath}") instance = input("What is your Instance domain?(ex: lantern.nowherejezfoltodf4jiyl6r56jnzintap5vyjlia7fkirfsnfizflqd.onion): ") - if IsOnionValid(instance): - print_colors(f"[+] Instance Name: {instance}. Valid: {IsUrlValid(instance)}") + if IsOnionLinkValid(instance): + print_colors(f"[+] Instance Name: {instance}. Valid: {IsOnionLinkValid(instance)}") instancepath=rootpath+'www/participants/'+instance else: print_colors(f'[-] Invalid instance name in ~/.darknet_participant_url: {instance}', is_error=True ) @@ -215,9 +210,9 @@ Maintenance: while(IsCategoryValid(category) is not True): category = input("What is the website Category? ") # the url of the website (required) + check if its valid - url='' - while(IsUrlValid(url) is not True and IsSimpleXChatroomValid(url) is not True): - url=input("What is the website URL ? ") + url = '' + while not IsURLValid(url): + url = input("What is the website URL ? ") # a quick description (optional) + check if its valid desc='DEFAULT' @@ -250,7 +245,7 @@ Maintenance: uvdf = uvdf.sort_values(by=["Category","Score"], ascending=[True,False]) # sorting categories print_colors("[+] New row added! now writing the csv file") else: - print("Adding new row in verified.csv since descriptioln is not empty") + print("Adding new row in verified.csv since description is not empty") vdf.loc[-1] = newrow # adding a row vdf = vdf.sort_values(by=["Category","Score"], ascending=[True,False]) # sorting categories print_colors("[+] New row added! now writing the csv file") @@ -461,7 +456,7 @@ Maintenance: value = input("What is the new name of the website? ") vdf.at[index,'Name']=value elif i == 3: # column URL - while(IsUrlValid(value) is not True or value == ''): + while(IsURLValid(value) is not True or value == ''): value = input("What is the new URL of the website? ") vdf.at[index,'URL']=value elif i == 4: # column Sensitive @@ -507,7 +502,7 @@ Maintenance: value = input("What is the new name of the website? ") uvdf.at[index,'Name']=value elif i == 3: # column URL - while(IsUrlValid(value) is not True or value == ''): + while(IsURLValid(value) is not True or value == ''): value = input("What is the new URL of the website? ") uvdf.at[index,'URL']=value elif i == 4: # column Sensitive @@ -591,7 +586,7 @@ Maintenance: case 5: print_colors("[+] Add a new webring participant (and download their files into their directory (without trusting them yet!))") webring_participant_url = '' - while(IsOnionValid(webring_participant_url) is not True): + while(IsOnionLinkValid(webring_participant_url) is not True): webring_participant_url = input("What is the onion domain of the new webring participant? (ex: lantern.nowherejezfoltodf4jiyl6r56jnzintap5vyjlia7fkirfsnfizflqd.onion) ") participantdir=rootpath+'www/participants/'+webring_participant_url if os.path.isdir(participantdir): @@ -695,7 +690,7 @@ Maintenance: csvdf.to_csv(csvfilepath, index=False) ### SANITY CHECK 1: Mark all the rows that have incorrect formatting for deletion### - if IsUrlValid(csvdf.at[i, 'Instance']) is False or IsCategoryValid(csvdf.at[i, 'Category']) is False or IsNameValid(csvdf.at[i, 'Name']) is False or IsUrlValid(csvdf.at[i, 'URL']) is False or IsStatusValid(csvdf.at[i, 'Sensitive']) is False or IsDescriptionValid(csvdf.at[i, 'Description']) is False or IsStatusValid(csvdf.at[i, 'Status']) is False or IsScoreValid(csvdf.at[i, 'Score']) is False: + if IsURLValid(csvdf.at[i, 'Instance']) is False or IsCategoryValid(csvdf.at[i, 'Category']) is False or IsNameValid(csvdf.at[i, 'Name']) is False or IsURLValid(csvdf.at[i, 'URL']) is False or IsStatusValid(csvdf.at[i, 'Sensitive']) is False or IsDescriptionValid(csvdf.at[i, 'Description']) is False or IsStatusValid(csvdf.at[i, 'Status']) is False or IsScoreValid(csvdf.at[i, 'Score']) is False: #mark the row for deletion as it has invalid inputs if i not in rows2delete: print_colors(f"Marking row {i} for deletion, as it has invalid inputs") @@ -875,7 +870,7 @@ Maintenance: break else: print_colors("[+] checking if the Word/URL is valid: ") - if IsUrlValid(word) or IsOnionValid(word) or IsDescriptionValid(word): + if IsURLValid(word) or IsDescriptionValid(word): print_colors('[+] Word/URL is valid, adding the word into the sensitive wordlist') newrow=[word] print_colors(f"[+] NEWROW= {newrow}") @@ -944,7 +939,7 @@ Maintenance: break else: print_colors("[+] Checking if the Word/URL is valid: ") - if IsUrlValid(word) or IsOnionValid(word) or IsDescriptionValid(word): + if IsURLValid(word) or IsDescriptionValid(word): print_colors('[+] Word/URL is valid, adding the word into the blacklist') newrow=[word] print_colors(f"[+] NEWROW= {newrow}") @@ -1061,11 +1056,11 @@ Maintenance: csvdf.to_csv(csvfilepath, index=False) ### SANITY CHECK 1: Mark all the rows that have incorrect formatting for deletion### - if IsUrlValid(csvdf.at[i, 'Instance']) is False or IsCategoryValid(csvdf.at[i, 'Category']) is False or IsNameValid(csvdf.at[i, 'Name']) is False or IsUrlValid(csvdf.at[i, 'URL']) is False or IsStatusValid(csvdf.at[i, 'Sensitive']) is False or IsDescriptionValid(csvdf.at[i, 'Description']) is False or IsStatusValid(csvdf.at[i, 'Status']) is False or IsScoreValid(csvdf.at[i, 'Score']) is False: + if IsURLValid(csvdf.at[i, 'Instance']) is False or IsCategoryValid(csvdf.at[i, 'Category']) is False or IsNameValid(csvdf.at[i, 'Name']) is False or IsURLValid(csvdf.at[i, 'URL']) is False or IsStatusValid(csvdf.at[i, 'Sensitive']) is False or IsDescriptionValid(csvdf.at[i, 'Description']) is False or IsStatusValid(csvdf.at[i, 'Status']) is False or IsScoreValid(csvdf.at[i, 'Score']) is False: if i not in rows2delete: print_colors(f"Marking row {i} for deletion, as it has invalid inputs") #print_colors(f"{row}") - print(IsUrlValid(csvdf.at[i, 'Instance']), IsCategoryValid(csvdf.at[i, 'Category']), IsNameValid(csvdf.at[i, 'Name']), IsUrlValid(csvdf.at[i, 'URL']), IsStatusValid(csvdf.at[i, 'Sensitive']), IsDescriptionValid(csvdf.at[i, 'Description']), IsStatusValid(csvdf.at[i, 'Status']), IsScoreValid(csvdf.at[i, 'Score'])) + print(IsURLValid(csvdf.at[i, 'Instance']), IsCategoryValid(csvdf.at[i, 'Category']), IsNameValid(csvdf.at[i, 'Name']), IsURLValid(csvdf.at[i, 'URL']), IsStatusValid(csvdf.at[i, 'Sensitive']), IsDescriptionValid(csvdf.at[i, 'Description']), IsStatusValid(csvdf.at[i, 'Status']), IsScoreValid(csvdf.at[i, 'Score'])) rows2delete.append(i) read=input("Continue?") @@ -1167,9 +1162,6 @@ Maintenance: print_colors("Invalid Number",is_error=True) continue - - - except Exception as e: print_colors(f'Try again {e}',is_error=True) break @@ -1178,7 +1170,6 @@ Maintenance: print_colors("No more submissions to review, exiting.") break - case 12: # review the crawled websites try: @@ -1266,12 +1257,12 @@ Maintenance: crawled_df.to_csv(crawled_file_abs_path, index=False) elif number == 3: - # Delete from crawled_onion.csv + # Delete from crawled_onion.csv crawled_df.drop(index=i,inplace=True) crawled_df.to_csv(crawled_file_abs_path, index=False) elif number == 4: - # Add to blacklist.csv + # Add to blacklist.csv newrow=[link] blacklist_df.loc[-1] = newrow # adding a row @@ -1289,15 +1280,10 @@ Maintenance: print_colors("Invalid Number",is_error=True) continue - - - - - except Exception as e: print_colors(f'Try again {e}',is_error=True) - break - + break + finally: print_colors("No more crawled websites to review, exiting.") break diff --git a/scripts/uptimechecker.py b/scripts/uptimechecker.py index d788eec..a27193b 100644 --- a/scripts/uptimechecker.py +++ b/scripts/uptimechecker.py @@ -8,7 +8,7 @@ import requests import json import pandas as pd import glob -from utils import IsSimpleXServerValid, send_server_checks +from utils import RecognizeURLType, IsOnionLinkValid, send_server_checks @@ -46,8 +46,8 @@ def main(): with open(urlpath) as f: instance = f.read().rstrip() # check if the instance URL domain is valid - if IsOnionValid(instance): - print("[+] Instance Name:",instance,IsOnionValid(instance)) + if IsOnionLinkValid(instance): + print("[+] Instance Name:",instance,IsOnionLinkValid(instance)) isitvalid="y" else: print('[-] Invalid instance name in ~/.darknet_participant_url:', instance) @@ -87,44 +87,42 @@ def main(): index1 = url.find("http://") index2 = url.find("https://") - if url.startswith("smp://") or url.startswith("xftp://"): - if IsSimpleXServerValid(url): - if url.startswith("smp"): - resp,resp_type,failed_response = send_server_checks(url) - - if resp_type in ["chatError", "contactSubSummary"]: - resp, resp_type,failed_response = send_server_checks(url) - - if failed_response is None: - print(url, "✔️") - df.at[i, "Status"]="YES" - if df.at[i, "Score"] < 100: - df.at[i,"Score"] = df.at[i,"Score"] + 1 - else: - print(url,"❌") - df.at[i,"Status"]="NO" - #if uptime >0 do -1 to the value - if df.at[i,"Score"] > 0: - df.at[i,"Score"] = df.at[i,"Score"] - 1 - - else: - resp,resp_type,failed_response = send_server_checks(url) - - if resp_type in ["chatError", "contactSubSummary"]: - resp, resp_type,failed_response = send_server_checks(url) - - if failed_response is None: - print(url, "✔️") - df.at[i, "Status"]="YES" - if df.at[i, "Score"] < 100: - df.at[i,"Score"] = df.at[i,"Score"] + 1 - else: - print(url,"❌") - df.at[i,"Status"]="NO" - #if uptime >0 do -1 to the value - if df.at[i,"Score"] > 0: - df.at[i,"Score"] = df.at[i,"Score"] - 1 + urltype = RecognizeURLType(url) + if urltype == 'smp': + resp,resp_type,failed_response = send_server_checks(url) + + if resp_type in ["chatError", "contactSubSummary"]: + resp, resp_type,failed_response = send_server_checks(url) + + if failed_response is None: + print(url, "✔️") + df.at[i, "Status"]="YES" + if df.at[i, "Score"] < 100: + df.at[i,"Score"] = df.at[i,"Score"] + 1 + else: + print(url,"❌") + df.at[i,"Status"]="NO" + #if uptime >0 do -1 to the value + if df.at[i,"Score"] > 0: + df.at[i,"Score"] = df.at[i,"Score"] - 1 + elif urltype == 'xftp': + resp,resp_type,failed_response = send_server_checks(url) + + if resp_type in ["chatError", "contactSubSummary"]: + resp, resp_type,failed_response = send_server_checks(url) + + if failed_response is None: + print(url, "✔️") + df.at[i, "Status"]="YES" + if df.at[i, "Score"] < 100: + df.at[i,"Score"] = df.at[i,"Score"] + 1 + else: + print(url,"❌") + df.at[i,"Status"]="NO" + #if uptime >0 do -1 to the value + if df.at[i,"Score"] > 0: + df.at[i,"Score"] = df.at[i,"Score"] - 1 else: if index1 == -1 and index2 == -1: @@ -145,14 +143,14 @@ def main(): if df.at[i,"Score"] > 0: df.at[i,"Score"] = df.at[i,"Score"] - 1 - except requests.ConnectionError as e: + except requests.ConnectionError: #print(e) print(url,"❌") df.at[i,"Status"]="NO" #if uptime >0 do -1 to the value if df.at[i,"Score"] > 0: df.at[i,"Score"] = df.at[i,"Score"] - 1 - except requests.exceptions.ReadTimeout as e: + except requests.exceptions.ReadTimeout: #print(e) print(url,"❌") df.at[i,"Status"]="NO" @@ -173,80 +171,6 @@ def main(): #print(df2) df2.to_csv(csvfile, index=False) - -def IsUrlValid(url:str)->bool: - """ - Check if url is valid both dark net end clearnet. - """ - # check if the characters are only [a-zA-Z0-9.:/] with maximum 128 chars max? - # check that it is only http(s)://wordA.wordB or http(s)://WordC.WordB.WordC, (onion or not), clearnet is fine too (double check if those are fine!) - # if OK return True - #if not : return False - pattern = re.compile("^[A-Za-z0-9:/.]+$") - url = str(url) - if url.endswith('.onion'): - return IsOnionValid(url) - else: - if not url.__contains__('.'): - #print("No (DOT) in clearnet url") - return False - if pattern.fullmatch(url) is None: - #print('Url contains invalid chars') - return False - return True - -def IsOnionValid(url: str)-> bool: - """ - Checks if the domain(param) is a valid onion domain and return True else False. - """ - # check if the characters are only [a-zA-Z0-9.] with maximum 128 chars max? - # check that it is only url.onion or subdomain.url.onion, - # if OK return True - #if not : return False - try: - pattern = re.compile("^[A-Za-z0-9.]+(\.onion)?$") - url = url.strip().removesuffix('/') - if url.startswith('http://'): - #print('URL starts with http') - # Removes the http:// - domain = url.split('/')[2] - if pattern.fullmatch(domain) is not None: - if len(domain.split('.')) > 3: - n_subdomians = len(domain.split('.')) - # Checks if there is more than 1 subdomain. "subdomain.url.onion" only - #print(f"This domain have more than one subdomain. There are {n_subdomians} subdomains") - return False - else: - if len(domain) < 62: - #print("Domain length is less than 62.") - return False - return True - elif pattern.fullmatch(domain) is None: - #print("Domain contains invalid character.") - #print(domain) - return False - else: - #print("Domain not valid") - return False - else: - #TODO : edit the url to make sure it has http:// at the beginning, in case if it's missing? (problem is that it only returns true or false) - #print("URL doesn't start http") - if pattern.fullmatch(url) is not None: - if len(url.split('.')) > 3: - n_subdomians = len(url.split('.')) - # Checks if there is more than 1 subdomain. "subdomain.url.onion" only - return False - else: - if len(url) < 62: - return False - return True - elif pattern.fullmatch(url) is None: - return False - else: - return False - except Exception as e: - print(f"Error: {e}") - if __name__ == '__main__': main() diff --git a/scripts/utils.py b/scripts/utils.py index f31340e..e33482d 100644 --- a/scripts/utils.py +++ b/scripts/utils.py @@ -10,13 +10,13 @@ from websockets.sync.client import connect import conf import pandas as pd -PURPLE = '\033[35;40m' + +PURPLE = '\033[35;40m' BOLD_PURPLE = '\033[35;40;1m' RED = '\033[31;40m' BOLD_RED = '\033[31;40;1m' RESET = '\033[m' - def get_current_instance(): """ Checks if all URL files are actually reachable via Tor @@ -36,24 +36,100 @@ conf.LOCAL_DIR = conf.PARTICIPANT_DIR + get_current_instance() + '/' ###################### Validations ###################### +def IsSimplexChatroomValid(url: str) -> bool: + """ + Recognizes Simplex Chatroom link. + Returns True if URL is a SimpleX chatroom, + False otherwise + """ + return bool(conf.SIMPLEX_CHATROOM_PATTERN.match(url)) + +def RecognizeSimplexType(url: str) -> str: + """ + Recognizes Simplex Server URL, returns smp, xftp or invalid + """ + match = conf.SIMPLEX_SERVER_PATTERN.match(url) + if match: + return match.group(1) + else: + return 'invalid' + +# stub function +def IsXFTPServerValid(url: str) -> bool: + """ + Returns True if URL is a valid SimpleX XFTP Server URL + False otherwise + """ + return conf.RecognizeSimplexType(url) == 'xftp' + +# stub function +def IsSMPServerValid(url: str) -> bool: + """ + Returns True if URL is a valid SimpleX SMP Server URL + False otherwise + """ + return conf.RecognizeSimplexType(url) == 'smp' + +def IsClearnetLinkValid(url: str) -> bool: + """ + Returns True if URL is a valid clearnet URL + False otherwise + """ + return bool(conf.CLEARNET_URL_PATTERN.match(url)) + +def IsOnionLinkValid(url: str) -> bool: + """ + Returns True if URL is a valid onion URL + False otherwise + """ + return bool(conf.ONION_URL_PATTERN.match(url)) + +def RecognizeURLType(url: str) -> str: + """ + Recognizes URL type, can return: + - chatroom - SimpleX chatroom + - xftp - XFTP SimpleX server + - smp - SMP SimpleX server + - onion - onion URL + - clearnet - valid clearnet url + - invalid - none of the above (probably invalid) + """ + # order is important here + # (ex. simplex chatroom is also valid clearnet link) + if IsSimplexChatroomValid(url): + return 'chatroom' + if IsXFTPServerValid(url): + return 'xftp' + if IsSMPServerValid(url): + return 'smp' + if IsOnionLinkValid(url): + return 'onion' + if IsClearnetLinkValid(url): + return 'clearnet' + return 'invalid' + +def IsURLValid(url: str) -> bool: + """ + Checks if given URL is valid (RecognizeURLType recognizes it) + """ + return RecognizeURLType(url) != 'invalid' + + def CheckUrl(url): - """ - Checks if URL is actually reachable via Tor - """ - proxies = { - 'http': 'socks5h://127.0.0.1:9050', - 'https': 'socks5h://127.0.0.1:9050' - } - try: - status = requests.get(url,proxies=proxies, timeout=5).status_code - if status == 200: - return True - else: - return False - except requests.ConnectionError as e: - return False - except requests.exceptions.ReadTimeout as e: - return False + """ + Checks if URL is actually reachable via Tor + """ + proxies = { + 'http': 'socks5h://127.0.0.1:9050', + 'https': 'socks5h://127.0.0.1:9050' + } + try: + status = requests.get(url, proxies=proxies, timeout=5).status_code + return status == 200 + except requests.ConnectionError: + return False + except requests.exceptions.ReadTimeout: + return False ###TODO: should replace checkUrl # checks if all the webring participants are reachable @@ -83,317 +159,110 @@ def is_participant_reachable(instance): #### PROTECTIONS AGAINST MALICIOUS CSV INPUTS #### def IsBannerValid(path: str) -> bool: - """ - Checks if the banner.png file has the correct dimensions (240x60) - """ - try: - im = Image.open(path) - except Exception as e: - print("ERROR, EXCEPTION") - return False - width, height = im.size - if width != 240 or height != 60: - print("INVALID BANNER DIMENSIONS, HEIGHT=",height," WIDTH=",width) - return False - filesizeMB=os.path.getsize(path)/1024/1024 - if filesizeMB > 5: - print("Banner filesize too large (>5Mb): ",os.path.getsize(path)/1024/1024,"MB") - return False - return True - -def IsOnionValid(url: str)-> bool: """ - Checks if the domain(param) is a valid onion domain and return True else False. + Checks if the banner.png file has the correct dimensions (240x60) """ try: - pattern = re.compile("^[A-Za-z0-9.]+(.onion)?$") - url = url.strip().removesuffix('/') - if url.startswith('http://'): - domain = url.split('/')[2] - if pattern.fullmatch(domain) is not None: - if len(domain.split('.')) > 3: - return False - else: - if len(domain) < 62: - return False - return True - elif pattern.fullmatch(domain) is None: - return False - else: - return False - else: - #TODO : edit the url to make sure it has http:// at the beginning, in case if it's missing? (problem is that it only returns true or false) - if pattern.fullmatch(url) is not None: - if len(url.split('.')) > 3: - return False - else: - if len(url) < 62: - return False - return True - elif pattern.fullmatch(url) is None: - return False - else: - return False - except Exception as e: + im = Image.open(path) + except Exception: + print("ERROR, EXCEPTION") + return False + width, height = im.size + if width != 240 or height != 60: + print("INVALID BANNER DIMENSIONS, HEIGHT=", height, " WIDTH=", width) + return False + filesizeMB = os.path.getsize(path)/1024/1024 + if filesizeMB > 5: + print("Banner filesize too large (>5Mb): ",os.path.getsize(path)/1024/1024,"MB") return False - -def IsSimpleXChatroomValid(url: str) -> bool: - """Validate the SimpleX chatroom URL.""" - REQUIRED_SUBSTRING = "#/?v=2-7&smp=smp%3A%2F" - - # Step 1: Check if it starts with http://, https://, or simplex:/ - if url.startswith(('http://', 'https://', 'simplex:/')): - # Step 1.5: If http:// or https://, check for valid clearnet or onion domain - if url.startswith(('http://', 'https://')) and not IsUrlValid(url): - return False - elif not url.startswith('simplex:/'): - return False # Must start with one of the valid protocols - - # Step 2: Check for the presence of the required substring - if REQUIRED_SUBSTRING not in url: - return False # Required substring not found - - # Step 3: Extract the part after "smp=smp%3A%2F" - smp_start = url.find("smp=smp%3A%2F") - if smp_start == -1: - return False # Required substring not found - - smp_start += len("smp=smp%3A%2F") - smp_end = url.find("&", smp_start) - if smp_end == -1: - smp_end = len(url) # Take until the end if no "&" is found - - smp_value = urllib.parse.unquote(url[smp_start:smp_end]) # Decode the URL-encoded string - - # Step 3.5: Check if the smp_value contains a valid hostname - if '@' not in smp_value: - return False # Must contain '@' to separate fingerprint and hostname - - fingerprint, hostname = smp_value.split('@', 1) - if not IsUrlValid(hostname): - return False # Invalid hostname - - # Step 4: Check for the presence of "%2F" in the original URL - if "%2F" not in url: - return False # Required substring not found - - # If all checks pass, return True return True -def IsUrlValid(url:str)->bool: - """ - Check if url is valid both dark net end clearnet. - """ - pattern = re.compile(r"^[A-Za-z0-9:/._%-=#?&@]+$") - onion_pattern = re.compile(r"^(\w+:)?(?://)?(\w+\.)?[a-z2-7]{56}\.onion") - url = str(url) - if len(url) < 4: - return False - if onion_pattern.match(url) is not None: - return IsOnionValid(url) - else: - if not url.__contains__('.'): - return False - if url.__contains__(';'): - return False #required otherwise lantern thinks there are extra columns - if pattern.fullmatch(url) is None: - return False - return True - -def IsStatusValid(status: str)-> bool: - """ - Checks if status contains only ['YES','NO']. Verbose only if False is returned - """ - pattern = ['YES','NO','✔️','❌',''] - #pattern = ['YES','NO'] - status = str(status) - status.strip() - if (status not in pattern): - return False - - return True - -def IsScoreValid(score:str)->bool: - """ - Check the Score is only "^[0-9.,]+$" with 8 max chars. - """ - pattern = re.compile("^[0-9.,]+$") - score = str(score) - score.strip() - if score in ['','nan']: - return True - if pattern.fullmatch(score) is None: - return False - elif len(score) > 8: - return False - return True - -def IsDescriptionValid(desc:str)->bool: - """ - Check the categories are only [a-zA-Z0-9.' ] with 256 max chars. - """ - if desc == "": - return True - pattern = re.compile("^[A-Za-z0-9-.,' \"\(\)\/]+$") - desc = str(desc) - desc.strip() - if pattern.fullmatch(desc) is None: - return False - if desc == "DEFAULT": - return False - elif len(desc) > 256: - return False - return True - -def IsCategoryValid(categories: list)-> bool: - """ - Check the categories are only [a-zA-Z0-9 ] with 64 max chars. - """ - pattern = re.compile("^[A-Za-z0-9 ]+$") - for category in categories: - category.strip() - if pattern.fullmatch(category) is None: - return False - elif len(category) > 64: - return False - else: - return True - -def IsSimpleXServerValid(url: str) -> bool: - pattern = re.compile('[0-9A-Za-z-_]*') - url = url.strip() - try: - - if url.startswith(('smp://', 'xftp://')): - # Remove the protocol part - proless = url.split('//', 1)[-1] - # Split the fingerprint and hostname - parts = proless.split('@') - if len(parts) != 2: - return False # Must have exactly one '@' character - - fingerprint = parts[0] - hostname = parts[1].split(',')[0] # Get the hostname before any comma - - # Check fingerprint length and pattern - if len(fingerprint) == 44 and pattern.match(fingerprint): - # Validate the hostname - result = IsSimpleXUrlValid(hostname) - if result: - # Check for an optional comma and a valid onion domain - if ',' in proless: - onion_part = proless.split(',')[1].strip() - if not hostname_pattern.match(onion_part): - return False - return True - return False - except Exception as e: - print(e) - # Any error will be a false - return False - -def IsNameValid(name: str)->bool: - """ - Check the parameter name only contains [a-zA-Z0-9 ] and is 64 chars long. - """ - try: - name = str(name) - except Exception as e: - return False - pattern = re.compile("^[A-Za-z0-9 ]+$") - name = name.strip() - if (pattern.fullmatch(name) is None): - return False - elif len(name) > 64: - return False - return True - -def print_colors(s:str=' ', bold:bool=False, is_error:bool = False, default:bool=False): - """ - Helper function to print with colors - """ - if is_error: - print(f"{RED}{s}{RESET}") - elif bold: - print(f"{BOLD_PURPLE}{s}{RESET}") - elif is_error and bold: - print(f"{BOLD_RED}{s}{RESET}") - elif default: - print(f'{s}') - else: - print(f"{PURPLE}{s}{RESET}") - -def IsSimpleXOnionValid(url: str)-> bool: +def IsStatusValid(status: str) -> bool: """ - Checks if the domain(param) is a valid onion domain and return True else False. + Checks if status contains only ['YES','NO']. Verbose only if False is returned """ - try: - pattern = re.compile(r"^[A-Za-z0-9:/._%-=#?&@]+(.onion)$") - url_pattern = re.compile(r"^(\w+:)?(?://)?(\w+\.)?[a-z2-7]{56}\.onion") - url = url.strip().removesuffix('/') - if url.startswith('http://'): - domain = url.split('/')[2] - if pattern.fullmatch(domain) is not None: - if len(domain.split('.')) > 3: - return False - else: - if len(domain) < 62: - return False - return True - elif pattern.fullmatch(domain) is None: - return False - else: - return False - else: - #TODO : edit the url to make sure it has http:// at the beginning, in case if it's missing? (problem is that it only returns true or false) - if url_pattern.match(url) is not None: - if len(url.split('.')) > 3: - return False - else: - if len(url) < 62: - return False - return True - elif url_pattern.match(url) is None: - return False - else: - return False - except Exception as e: + pattern = ['YES','NO',''] + status = status.strip() + if status not in pattern: return False -def IsSimpleXUrlValid(url:str)->bool: - """ - Check if url is valid both dark net end clearnet. - """ - pattern = re.compile(r"^[A-Za-z0-9:/._%-=#?&@]+$") - onion_pattern = re.compile(r"^(\w+:)?(?://)?(\w+\.)?[a-z2-7]{56}\.onion") - url = str(url) - if len(url) < 4: - return False - if onion_pattern.match(url) is not None: - return IsSimpleXOnionValid(url) - else: - if not url.__contains__('.'): - return False - if pattern.fullmatch(url) is None: - return False - return True + return True -def send_server_checks(url:str) -> (): - """ - Sends requests to sxc websocket and retuns - response, response type and testFailure or None. - """ - with connect(f"ws://localhost:3030") as websocket: - query = f"/_server test 1 {url}" - command = { - 'corrId': f"id{random.randint(0,999999)}", - 'cmd': query, - } - websocket.send(json.dumps(command)) - message = websocket.recv() - response = json.loads(message) - resp_type = response["resp"]["type"] - failed_response = response['resp'].get('testFailure') +def IsScoreValid(score: str) -> bool: + """ + Check the Score is only "^[0-9.,]+$" with 8 max chars. + """ + pattern = re.compile("^[0-9.,]+$") + score = str(score) + score.strip() + if score in ['','nan']: + return True + if pattern.fullmatch(score) is None: + return False + if len(score) > 8: + return False + return True + + +def IsDescriptionValid(desc: str) -> bool: + """ + Check the categories are only [a-zA-Z0-9.' ] with 256 max chars. + """ + if desc == "": + return True + pattern = re.compile(r"^[A-Za-z0-9-.,' \"\(\)\/]+$") + desc = str(desc) + desc.strip() + if pattern.fullmatch(desc) is None: + return False + if desc == "DEFAULT": + return False + elif len(desc) > 256: + return False + return True + + +def IsCategoryValid(categories: list[str]) -> bool: + """ + Check the categories are only [a-zA-Z0-9 ] with 64 max chars. + """ + pattern = re.compile("^[A-Za-z0-9 ]+$") + for category in categories: + category.strip() + if pattern.fullmatch(category) is None: + return False + elif len(category) > 64: + return False + else: + return True + + +def IsNameValid(name: str) -> bool: + """ + Check the parameter name only contains [a-zA-Z0-9] and is 64 chars long. + """ + try: + return bool(VALID_NAME_PATTERN.fullmatch(name.strip())) + except Exception: + return False + + +def send_server_checks(url: str) -> tuple[str, str, str]: + """ + Sends requests to sxc websocket and retuns + response, response type and testFailure or None. + """ + with connect(f"ws://localhost:3030") as websocket: + query = f"/_server test 1 {url}" + command = { + 'corrId': f"id{random.randint(0,999999)}", + 'cmd': query, + } + websocket.send(json.dumps(command)) + message = websocket.recv() + response = json.loads(message) + resp_type = response["resp"]["type"] + failed_response = response['resp'].get('testFailure') return (response, resp_type, failed_response) @@ -676,3 +545,19 @@ def get_local_webring_participants(): except Exception: print_colors(f'[-] failed reading webring participants file',is_error=True ) return pd.DataFrame() + + +def print_colors(s:str=' ', bold:bool=False, is_error:bool = False, default:bool=False): + """ + Helper function to print with colors + """ + if is_error: + print(f"{RED}{s}{RESET}") + elif bold: + print(f"{BOLD_PURPLE}{s}{RESET}") + elif is_error and bold: + print(f"{BOLD_RED}{s}{RESET}") + elif default: + print(f'{s}') + else: + print(f"{PURPLE}{s}{RESET}") \ No newline at end of file