diff --git a/scripts/utils.py b/scripts/utils.py index 190daf5..b4aae39 100644 --- a/scripts/utils.py +++ b/scripts/utils.py @@ -9,93 +9,99 @@ import urllib.parse from websockets.sync.client import connect -PURPLE = '\033[35;40m' +PURPLE = '\033[35;40m' BOLD_PURPLE = '\033[35;40;1m' RED = '\033[31;40m' BOLD_RED = '\033[31;40;1m' RESET = '\033[m' +# name should contain only up to 64 alphanumeric characters +VALID_NAME_PATTERN = re.compile(r"^[A-Za-z0-9]{1,64}$") + +# pattern for regular urls +# TODO: this is very simplified pattern +URL_PATTERN = re.compile(r"^[A-Za-z0-9:\/\._%-=#?&@]+$") + +# pattern for onion urls (56 bytes of base32 alphabet + .onion) +ONION_URL_PATTERN = re.compile(r"^(\w+:)?(?:\/\/)?(\w+\.)?[a-z2-7]{56}\.onion") + + +def print_colors(s:str=' ', bold:bool=False, is_error:bool = False, default:bool=False): + """ + Helper function to print with colors + """ + if is_error: + print(f"{RED}{s}{RESET}") + elif bold: + print(f"{BOLD_PURPLE}{s}{RESET}") + elif is_error and bold: + print(f"{BOLD_RED}{s}{RESET}") + elif default: + print(f'{s}') + else: + print(f"{PURPLE}{s}{RESET}") #### Checking Functions to validate that links are legit #### def CheckUrl(url): - """ - Checks if URL is actually reachable via Tor - """ - proxies = { - 'http': 'socks5h://127.0.0.1:9050', - 'https': 'socks5h://127.0.0.1:9050' - } - try: - status = requests.get(url,proxies=proxies, timeout=5).status_code - if status != 502: - return True - else: - return False - except requests.ConnectionError as e: - return False - except requests.exceptions.ReadTimeout as e: - return False + """ + Checks if URL is actually reachable via Tor + """ + proxies = { + 'http': 'socks5h://127.0.0.1:9050', + 'https': 'socks5h://127.0.0.1:9050' + } + try: + status = requests.get(url, proxies=proxies, timeout=5).status_code + return bool(status == 200) + except requests.ConnectionError: + return False + except requests.exceptions.ReadTimeout: + return False #### PROTECTIONS AGAINST MALICIOUS CSV INPUTS #### def IsBannerValid(path: str) -> bool: - """ - Checks if the banner.png file has the correct dimensions (240x60) - """ - try: - im = Image.open(path) - except Exception as e: - print("ERROR, EXCEPTION") - return False - width, height = im.size - if width != 240 or height != 60: - print("INVALID BANNER DIMENSIONS, HEIGHT=",height," WIDTH=",width) - return False - filesizeMB=os.path.getsize(path)/1024/1024 - if filesizeMB > 5: - print("Banner filesize too large (>5Mb): ",os.path.getsize(path)/1024/1024,"MB") - return False - return True + """ + Checks if the banner.png file has the correct dimensions (240x60) + """ + try: + im = Image.open(path) + except Exception: + print("ERROR, EXCEPTION") + return False + width, height = im.size + if width != 240 or height != 60: + print("INVALID BANNER DIMENSIONS, HEIGHT=", height, " WIDTH=", width) + return False + filesizeMB = os.path.getsize(path)/1024/1024 + if filesizeMB > 5: + print("Banner filesize too large (>5Mb): ",os.path.getsize(path)/1024/1024,"MB") + return False + return True -def IsOnionValid(url: str)-> bool: +def IsOnionValid(url: str) -> bool: """ Checks if the domain(param) is a valid onion domain and return True else False. """ try: - pattern = re.compile("^[A-Za-z0-9.]+(.onion)?$") - url = url.strip().removesuffix('/') - if url.startswith('http://'): - domain = url.split('/')[2] - if pattern.fullmatch(domain) is not None: - if len(domain.split('.')) > 3: - return False - else: - if len(domain) < 62: - return False - return True - elif pattern.fullmatch(domain) is None: - return False - else: - return False - else: - #TODO : edit the url to make sure it has http:// at the beginning, in case if it's missing? (problem is that it only returns true or false) - if pattern.fullmatch(url) is not None: - if len(url.split('.')) > 3: - return False - else: - if len(url) < 62: - return False - return True - elif pattern.fullmatch(url) is None: - return False - else: - return False - except Exception as e: + # make sure the protocol is there + if not url.startswith(('http://', 'https://')): + url = 'http://' + url.strip().removesuffix('/') + + domain = url.split('/')[2] + + if ONION_URL_PATTERN.fullmatch(domain): + parts_count = len(domain.split('.')) + # TODO: we probably don't really need to check 62 char length + # regex does that beforehand + return (len(domain) == 62) and (parts_count <= 3) + except Exception: return False + def IsSimpleXChatroomValid(url: str) -> bool: """Validate the SimpleX chatroom URL.""" REQUIRED_SUBSTRING = "#/?v=2-7&smp=smp%3A%2F" @@ -103,7 +109,8 @@ def IsSimpleXChatroomValid(url: str) -> bool: # Step 1: Check if it starts with http://, https://, or simplex:/ if url.startswith(('http://', 'https://', 'simplex:/')): # Step 1.5: If http:// or https://, check for valid clearnet or onion domain - if url.startswith(('http://', 'https://')) and not IsUrlValid(url): + if url.startswith(('http://', 'https://')) \ + and RecognizeUrlOnionClear(url) != 'invalid': return False elif not url.startswith('simplex:/'): return False # Must start with one of the valid protocols @@ -129,7 +136,7 @@ def IsSimpleXChatroomValid(url: str) -> bool: return False # Must contain '@' to separate fingerprint and hostname fingerprint, hostname = smp_value.split('@', 1) - if not IsUrlValid(hostname): + if RecognizeUrlOnionClear(hostname) != 'invalid': return False # Invalid hostname # Step 4: Check for the presence of "%2F" in the original URL @@ -139,26 +146,43 @@ def IsSimpleXChatroomValid(url: str) -> bool: # If all checks pass, return True return True -def IsUrlValid(url:str)->bool: - """ - Check if url is valid both dark net end clearnet. - """ - pattern = re.compile(r"^[A-Za-z0-9:/._%-=#?&@]+$") - onion_pattern = re.compile(r"^(\w+:)?(?://)?(\w+\.)?[a-z2-7]{56}\.onion") - url = str(url) - if len(url) < 4: - return False - if onion_pattern.match(url) is not None: - return IsOnionValid(url) - else: - if not url.__contains__('.'): - return False - if url.__contains__(';'): - return False #required otherwise lantern thinks there are extra columns - if pattern.fullmatch(url) is None: - return False - return True +def RecognizeUrlOnionClear(url: str) -> str: + """ + Recognize if the URL is invalid, onion or clearnet. + """ + # early terminate preconditions + if len(url) < 4 or (';' in url) or ('.' not in url): + return 'invalid' + + # check if possibly onion url, here just perliminary check + # IsOnionValid checks it against regex expression + if '.onion' in url: + if IsOnionValid(url): + return 'onion' + + if URL_PATTERN.fullmatch(url): + return 'clearnet' + + return 'invalid' + + +def RecognizeUrlFull(url: str) -> str: + """ + Recognize if URL is smp, xftp, simplex groupchat, onion, clearnet or just invalid + Depends on RecognizeUrlOnionClear + """ + if IsSimpleXChatroomValid(url): + return 'chatroom' + if url.startswith(('http://', 'https://')): + return RecognizeUrlOnionClear(url) + if url.startswith('xftp://'): + if IsSimpleXServerValid(url): + return 'xftp' + if url.startswith('smp://'): + if IsSimpleXServerValid(url): + return 'smp' + return 'invalid' #def IsUrlValid(url:str)->bool: # """ @@ -178,208 +202,126 @@ def IsUrlValid(url:str)->bool: # return True -def IsStatusValid(status: str)-> bool: - """ - Checks if status contains only ['YES','NO']. Verbose only if False is returned - """ - pattern = ['YES','NO','✔️','❌',''] - #pattern = ['YES','NO'] - status = str(status) - status.strip() - if (status not in pattern): - return False +def IsStatusValid(status: str) -> bool: + """ + Checks if status contains only ['YES','NO']. Verbose only if False is returned + """ + pattern = ['YES','NO','✔️','❌',''] + status = status.strip() + if status not in pattern: + return False + return True + + +def IsScoreValid(score: str) -> bool: + """ + Check the Score is only "^[0-9.,]+$" with 8 max chars. + """ + pattern = re.compile("^[0-9.,]+$") + score = str(score) + score.strip() + if score in ['','nan']: return True - - -def IsScoreValid(score:str)->bool: - """ - Check the Score is only "^[0-9.,]+$" with 8 max chars. - """ - pattern = re.compile("^[0-9.,]+$") - score = str(score) - score.strip() - if score in ['','nan']: - return True - if pattern.fullmatch(score) is None: - return False - elif len(score) > 8: - return False - return True + if pattern.fullmatch(score) is None: + return False + if len(score) > 8: + return False + return True def IsDescriptionValid(desc:str)->bool: - """ - Check the categories are only [a-zA-Z0-9.' ] with 256 max chars. - """ - if desc == "": - return True - pattern = re.compile("^[A-Za-z0-9-.,' \"\(\)\/]+$") - desc = str(desc) - desc.strip() - if pattern.fullmatch(desc) is None: - return False - if desc == "DEFAULT": - return False - elif len(desc) > 256: - return False + """ + Check the categories are only [a-zA-Z0-9.' ] with 256 max chars. + """ + if desc == "": return True + pattern = re.compile(r"^[A-Za-z0-9-.,' \"\(\)\/]+$") + desc = str(desc) + desc.strip() + if pattern.fullmatch(desc) is None: + return False + if desc == "DEFAULT": + return False + elif len(desc) > 256: + return False + return True -def IsCategoryValid(categories: list)-> bool: - """ - Check the categories are only [a-zA-Z0-9 ] with 64 max chars. - """ - pattern = re.compile("^[A-Za-z0-9 ]+$") - for category in categories: - category.strip() - if pattern.fullmatch(category) is None: - return False - elif len(category) > 64: - return False - else: - return True +def IsCategoryValid(categories: list[str]) -> bool: + """ + Check the categories are only [a-zA-Z0-9 ] with 64 max chars. + """ + pattern = re.compile("^[A-Za-z0-9 ]+$") + for category in categories: + category.strip() + if pattern.fullmatch(category) is None: + return False + elif len(category) > 64: + return False + else: + return True def IsSimpleXServerValid(url: str) -> bool: - pattern = re.compile('[0-9A-Za-z-_]*') - url = url.strip() - try: - - if url.startswith(('smp://', 'xftp://')): - # Remove the protocol part - proless = url.split('//', 1)[-1] - # Split the fingerprint and hostname - parts = proless.split('@') - if len(parts) != 2: - return False # Must have exactly one '@' character + pattern = re.compile('[0-9A-Za-z-_]*') + url = url.strip() + try: - fingerprint = parts[0] - hostname = parts[1].split(',')[0] # Get the hostname before any comma + if url.startswith(('smp://', 'xftp://')): + # Remove the protocol part + proless = url.split('//', 1)[-1] + # Split the fingerprint and hostname + parts = proless.split('@') + if len(parts) != 2: + return False # Must have exactly one '@' character - # Check fingerprint length and pattern - if len(fingerprint) == 44 and pattern.match(fingerprint): - # Validate the hostname - result = IsSimpleXUrlValid(hostname) - if result: - # Check for an optional comma and a valid onion domain - if ',' in proless: - onion_part = proless.split(',')[1].strip() - if not hostname_pattern.match(onion_part): - return False - return True - return False - except Exception as e: - print(e) - # Any error will be a false - return False + fingerprint = parts[0] + hostname = parts[1].split(',')[0] # Get the hostname before any comma + + # Check fingerprint length and pattern + if len(fingerprint) == 44 and pattern.match(fingerprint): + # Validate the hostname + if RecognizeUrlOnionClear(hostname) != 'invalid': + # Check for an optional comma and a valid onion domain + if ',' in proless: + onion_part = proless.split(',')[1].strip() + if RecognizeUrlOnionClear(onion_part) != 'invalid': + return False + return True + return False + except Exception as e: + print(e) + # Any error will be a false + return False -def IsNameValid(name: str)->bool: - """ - Check the parameter name only contains [a-zA-Z0-9 ] and is 64 chars long. - """ - try: - name = str(name) - except Exception as e: - return False - pattern = re.compile("^[A-Za-z0-9 ]+$") - name = name.strip() - if (pattern.fullmatch(name) is None): - return False - elif len(name) > 64: - return False - return True - - -def print_colors(s:str=' ', bold:bool=False, is_error:bool = False, default:bool=False): - """ - Helper function to print with colors - """ - if is_error: - print(f"{RED}{s}{RESET}") - elif bold: - print(f"{BOLD_PURPLE}{s}{RESET}") - elif is_error and bold: - print(f"{BOLD_RED}{s}{RESET}") - elif default: - print(f'{s}') - else: - print(f"{PURPLE}{s}{RESET}") - - - -def IsSimpleXOnionValid(url: str)-> bool: +def IsNameValid(name: str) -> bool: """ - Checks if the domain(param) is a valid onion domain and return True else False. + Check the parameter name only contains [a-zA-Z0-9] and is 64 chars long. """ try: - pattern = re.compile(r"^[A-Za-z0-9:/._%-=#?&@]+(.onion)$") - url_pattern = re.compile(r"^(\w+:)?(?://)?(\w+\.)?[a-z2-7]{56}\.onion") - url = url.strip().removesuffix('/') - if url.startswith('http://'): - domain = url.split('/')[2] - if pattern.fullmatch(domain) is not None: - if len(domain.split('.')) > 3: - return False - else: - if len(domain) < 62: - return False - return True - elif pattern.fullmatch(domain) is None: - return False - else: - return False - else: - #TODO : edit the url to make sure it has http:// at the beginning, in case if it's missing? (problem is that it only returns true or false) - if url_pattern.match(url) is not None: - if len(url.split('.')) > 3: - return False - else: - if len(url) < 62: - return False - return True - elif url_pattern.match(url) is None: - return False - else: - return False - except Exception as e: + return bool(VALID_NAME_PATTERN.fullmatch(name.strip())) + except Exception: return False -def IsSimpleXUrlValid(url:str)->bool: - """ - Check if url is valid both dark net end clearnet. - """ - pattern = re.compile(r"^[A-Za-z0-9:/._%-=#?&@]+$") - onion_pattern = re.compile(r"^(\w+:)?(?://)?(\w+\.)?[a-z2-7]{56}\.onion") - url = str(url) - if len(url) < 4: - return False - if onion_pattern.match(url) is not None: - return IsSimpleXOnionValid(url) - else: - if not url.__contains__('.'): - return False - if pattern.fullmatch(url) is None: - return False - return True -def send_server_checks(url:str) -> (): - """ - Sends requests to sxc websocket and retuns - response, response type and testFailure or None. - """ - with connect(f"ws://localhost:3030") as websocket: - query = f"/_server test 1 {url}" - command = { - 'corrId': f"id{random.randint(0,999999)}", - 'cmd': query, - } - websocket.send(json.dumps(command)) - message = websocket.recv() - response = json.loads(message) - resp_type = response["resp"]["type"] - failed_response = response['resp'].get('testFailure') +def send_server_checks(url: str) -> tuple[str, str, str]: + """ + Sends requests to sxc websocket and retuns + response, response type and testFailure or None. + """ + with connect(f"ws://localhost:3030") as websocket: + query = f"/_server test 1 {url}" + command = { + 'corrId': f"id{random.randint(0,999999)}", + 'cmd': query, + } + websocket.send(json.dumps(command)) + message = websocket.recv() + response = json.loads(message) + resp_type = response["resp"]["type"] + failed_response = response['resp'].get('testFailure') - return (response, resp_type, failed_response) + return (response, resp_type, failed_response)