"""
A module providing the 'open' command as an alternative to the Selenium based command.
This command utilizes the requests library for HTTP requests and BeautifulSoup for HTML parsing.
"""
from pathlib import Path
from io import BytesIO
import os
import logging
from typing import Optional, Literal
from urllib.parse import urljoin
import requests
from bs4 import BeautifulSoup, FeatureNotFound, Tag
import jsbeautifier
import textwrap
import shutil
from ascii_magic import AsciiArt
from asrch.modules.logging_formatter import ColorFormatter
from asrch_ascli.modules._alert import Alert
from asrch_ascli.modules._box import Box
from asrch_ascli.modules._output import Output
from asrch_ascli.modules._split import print_split_terminal
from asrch_ascli.utils.constants import Colors
from asrch.modules._formatting import Bar
from asrch_ascli.modules._highlighter import clean_and_format_urls as cln
terminal_width = shutil.get_terminal_size().columns
history: list[str] = [] # session history for browse mode
tab_his: list[str] = []
tabs_string: str = ""
tabs: list[str] = [] # tabs for browse mode sessions
form = Bar()
colors = Colors()
c_log = logging.getLogger(__name__)
sh = logging.StreamHandler()
search_string = "Results" # Temporary (i will probably forget though LOL)
c_form = ColorFormatter("%(asctime)s|%(levelname)8s|%(message)s")
sh.setFormatter(c_form)
c_log.addHandler(sh)
config_path = Path(__file__).parent.parent.parent.parent.parent
path_with_config = config_path / '.config'
commands: list[str] = ["q", "<", "nt", "new_tab", "t", "sh", "show_history", "h", ":cache"]
etag_ = "9d6f0b71c083769a83e9d462bf0fff18b9f9b97dcc90946ab3e48ab62af3c0ca541d34cfd2b8e233d3e681ea39da15985ed68fdbbf61d55c1e35c67e6f765a"
index_ = "n9b74c9897bac770ffc029102a200c5de7b5c6b4e715d0ab929c4a1c1f9f2b22d9eec09289c52ed0b2b38b4f3c7d5e2bd5d2a7cb63b223ce550501edbe2202"
[docs]
def get_cache_file_path(config_path, tab_name): # pragma: no cover
return os.path.join(config_path, '.cache', f'{tab_name}.txt')
[docs]
def highlight_elements(soup: BeautifulSoup, base_url: str) -> BeautifulSoup: # pragma: no cover
"""
Highlight all elements in BeautifulSoup object
:param soup: The soup object to parse.
:type: BeautifulSoup
:return: soup
:rtype: BeautifulSoup
"""
for tag in soup.find_all(True):
if tag.string is not None and tag.name in ['div', 'hr']:
tag.string = Box.create_box(None, tag.string, None, padding=1, style="h1u")
separator_line = soup.new_tag('div')
separator_line.string = "------------------------------------------------"
try:
tag.insert_before(separator_line)
tag.insert_after(separator_line)
except ValueError:
continue
for tag in soup.find_all("a"):
if "href" in tag.attrs and tag["href"] is not None:
tag["href"] = " * " + tag["href"]
else:
tag["href"] = " * "
return soup
[docs]
def get_index(url: str, mode: str = "") -> list[str]: # pragma: no cover
"""Fetches and processes URLs from a web page.
:param url: The URL of the web page to fetch and process.
:type url: str
:param mode: Optional mode to determine the format of the output list. If `"url_list"`,
returns a list of URLs. Otherwise, returns a list with indexed URLs.
:type mode: str, default is `""`
:return: A list of URLs, either in indexed format or as a plain list depending on the mode.
:rtype: list[str]
:raises ValueError: If the provided URL is empty.
:raises requests.exceptions.RequestException: If there is an error with the HTTP request.
:raises FeatureNotFound: If BeautifulSoup cannot parse the HTML content.
"""
headers = {"User-Agent": "Mozilla/5.0"}
proxies = {}
if not url:
raise ValueError("URL cannot be empty")
try:
response = requests.get(url, proxies=proxies, headers=headers)
response.raise_for_status()
except requests.exceptions.RequestException as e:
print(f"Error occurred while fetching {url}: {str(e)}")
return []
try:
soup = BeautifulSoup(response.content, "html.parser")
except FeatureNotFound:
return []
href_list = []
anchor_tags = soup.find_all("a")
for anchor_tag in anchor_tags:
if isinstance(anchor_tag, Tag):
href = anchor_tag.get("href")
if href:
href_list.append(href)
highlighted_content = anchor_tag.string
if anchor_tag.string:
anchor_tag.string.replace_with(href)
base_url = url
url_list = []
for url_ in href_list:
modified_url = url_.replace("*", "").strip()
if modified_url.startswith("http"):
url_list.append(modified_url)
elif modified_url.startswith("/"):
full_url = urljoin(base_url, modified_url)
url_list.append(full_url)
page_index = []
try:
for idx, url_ in enumerate(url_list, start=1):
page_index.append(f"({idx}){url_}")
except UnboundLocalError:
print("Could not find page")
if mode == "url_list": # this is absolutely fucking disgusting lets all ignore it :)
return url_list
else:
return page_index
[docs]
def cache_results(get_url, config_path, tab_name, search_string, etag: str): # pragma: no cover
cfile_path = get_cache_file_path(config_path, tab_name)
sub_index = []
c_log.debug(f"{cfile_path}")
if not os.path.exists(cfile_path):
try:
print(f"The file {cfile_path} does not exist in cache.")
with open(cfile_path, 'w') as f:
content = Box.create_box(
"Results",
f"{get_page(get_url)}\n{etag_}\n{etag}\n\{index_}",
f"{get_index(get_url)}",
padding=1,
style="main",
)
f.write(content)
Output.print(content)
except FileNotFoundError:
c_log.debug(f"The file {cfile_path} could not be created.")
return get_url
else:
print(f"The file {cfile_path} exists in cache.")
with open(cfile_path, 'r') as f:
Output.print(f.read())
print(Colors.NC)
return True
[docs]
def handle_page_navigation_input(prompt: str, url_list: list[str], history: list[str], tabs: list[str], config_path: str, etag: 'str') -> str: # pragma: no cover
while True:
if (page_num := input(prompt)) not in commands:
print(Colors.NC)
try:
index = int(page_num) - 1
if 0 <= index < len(url_list):
current_url = url_list[index]
print(f"Navigating to {current_url}")
history.append(current_url)
prev_history = history[-2].replace("https://", "").replace("www.", "")[:17] if len(history) >= 2 else ""
history_formatted = ", ".join(f"({i + 1}) {elem}" for i, elem in enumerate(history))
formatted_string = (f"{Colors.PASTEL_PURPLE}{prev_history}... "
f"{Colors.PASTEL_PINK}[<----] "
f"{Colors.PASTEL_MINT}[---->] "
f"{Colors.PASTEL_CYAN}{current_url}\n"
f"{form.bar('.', '', t_color='', bg_color='')}")
print(Colors.NC)
print(formatted_string)
Alert.alert(2, "Loading...")
tab_name = current_url.replace("https://", "").replace("/", "_")
print(current_url)
if cache_results(current_url, config_path, tab_name, search_string, etag):
# compare requests ETAG with the one in the cache file
with open(f"{config_path}/.cache/{tab_name}.txt", 'r') as file:
for line in file:
if etag_ in line:
next_line = next(file)
etg = (f"{etag}".replace("┋", "",).replace('W//', "").replace('"',"").strip())
nxtl = (f"{next_line}".replace("┋", "",).replace('W//', "").replace('"',"").strip())
print(etg.strip() == nxtl.strip())
print(etg)
print(nxtl)
if etg == nxtl:
c_log.debug(f"{etag} == {next_line}")
print("Content loaded from cache.")
else:
c_log.debug("etag token found but actual etag wasnt.")
else:
print("Content fetched and cached.")
except IndexError:
print(f"Page number {page_num} is out of range. Please try again.")
except ValueError:
c_log.error(f"Invalid input '{page_num}'. Please enter a valid page number.")
if "http" in page_num:
c_log.info("tip: it looks like you tried to input a url, try the 'nt' command.")
elif page_num in ["q", 'exit']:
return "q"
elif page_num == "h":
print('\n'.join(f'{i + 1}. {item}' for i, item in enumerate(history)))
elif page_num == "nt":
current_url = input(f"{Colors.NC}{Colors.UNDERLINE}asrch{Colors.NC}{Colors.GREEN} >{Colors.NC} url: ")
tab_name = current_url.replace("https://", "").replace("/", "_")
history.append(current_url)
tabs.append(current_url)
# im gonna be honest i dont know how or why any of this works but for some reason
# it loads the page and the index from the cache fine?
# if anyone ever tries to contribute to this.... avoid the cache stuff... please...
if cache_results(current_url, config_path, tab_name, search_string, etag):
with open(f"{config_path}/.cache/{tab_name}.txt", 'r') as file:
for line in file:
if etag_ in line:
next_line = next(file)
etg = (f"{etag}".replace("┋", "",).replace('W//', "").replace('"',"").strip())
nxtl = (f"{next_line}".replace("┋", "",).replace('W//', "").replace('"',"").strip())
print(etg.strip() == nxtl.strip())
print(etg)
print(nxtl)
if etg == nxtl:
c_log.debug(f"{etag} == {next_line}")
print("Content loaded from cache.")
else:
c_log.debug("etag token found but actual etag wasnt.")
get_index(current_url)
print("Content loaded from cache.")
else:
#Output.print(f"{get_index(current_url, 'url_list')}")
url_list = get_index(current_url, 'url_list')
print("Content fetched and cached.")
# all of the adv settings here
elif page_num == ":cache":
for file_name in os.listdir(os.path.join(config_path, '.cache')):
if file_name.endswith('.txt'):
Output.print(os.path.join(os.path.join(config_path, '.cache', file_name)))
[docs]
def get_page(
url: str,
header: str = "",
proxy: Optional[dict[str, str]] = None,
log: bool = True,
*,
parser: Optional[str] = "html.parser",
browse: bool = False,
images: bool = False,
debug: bool = False,
) -> str:
"""Get the content of a web page.
:param url: The URL of the web page.
:type url: str
:param proxy: Proxy to be used for the request, defaults to None.
:type proxy: Optional[str], optional
:param log: Flag indicating whether to log the request, defaults to True.
:type log: bool, optional
:raises ValueError: If the URL is empty.
:raises Exception: If there are issues with the request.
:return: The content of the web page.
:rtype: str
"""
if not url:
raise ValueError("URL cannot be empty")
base_url = url
headers = {"User-Agent": "Mozilla/5.0"} if header else {}
try:
response = requests.get(url, proxies=proxy, headers=headers)
response.raise_for_status()
except requests.exceptions.RequestException as e:
c_log.error(f"Error occurred while fetching {url}: {str(e)}")
return ""
initial_headers = "\n".join(f"{header}: {value}" for header, value in response.headers.items())
etag = response.headers.get("ETag")
subsequent_headers = "\n".join(f"{header}: {value}" for header, value in response.headers.items())
if debug:
Output.print(Box.create_box(
f"REQUEST INFO",
f"""INITIAL REQUEST:\nURL: {url}\nHeaders:\n{initial_headers}\n
SUBSEQUENT REQUEST:\nHeaders:\n{subsequent_headers}]\nETAG{response.headers.get('ETag')}""",
f"""DEBUG INFORMATION:\nResponse:{response.status_code}""",
style="main",
))
try:
soup = BeautifulSoup(response.content, parser)
title_tags = soup.find_all("title")
highlight_elements(soup, base_url)
if images: # pragma: no cover
for img_tag in soup.find_all("img"):
if "src" in img_tag.attrs:
img_url = img_tag["src"]
if "/assets" not in img_url:
full_url = urljoin(base_url, img_url)
try:
response = requests.get(full_url)
response.raise_for_status()
print(f"{Colors.GREEN}Downloading image=> {full_url}")
img_data = BytesIO(response.content)
art = AsciiArt.from_image(img_data)
ascii_art_tag = soup.new_tag("pre")
ascii_art_tag.string = art.to_terminal(columns=100,monochrome=True)
os.system('clear')
img_tag.replace_with(ascii_art_tag)
except Exception as e:
print(f"Failed to process image {img_url}: {e}")
for title_tag in title_tags:
if isinstance(title_tag, Tag):
title_tag.string = title_tag.string
page_index = get_index(url)
url_list = get_index(url, "url_list")
text_content = soup.text.strip().replace("\n\n\n", "").replace("", "")
if not browse:
return text_content
else:
formatted_string = ""
text_content = f"""{text_content.replace("None", "")}"""
Output.print(Box.create_box(
f"Results",
f"""{text_content}""",
f"\nINDEX\n{' '.join(page_index)}\nINDEX",
style="main",
padding=1,
))
Alert.alert(1, "Finished")
prompt = (
f"\n{Colors.PASTEL_PINK}\n"
f"[{Colors.MINT}{Colors.BOLD}q{Colors.NC}{Colors.PASTEL_PINK}]uit\n"
f"[{Colors.MINT}{Colors.BOLD}h{Colors.NC}{Colors.PASTEL_PINK}]istory ([<] back in history 1)\n"
f"[{Colors.MINT}{Colors.BOLD}nt{Colors.NC}{Colors.PASTEL_PINK}] create new tab\n"
f"[{Colors.MINT}{Colors.BOLD}t{Colors.NC}{Colors.PASTEL_PINK}] go to nth tab\n"
f"or Enter page number\n\n{Colors.NC}{Colors.UNDERLINE}asrch{Colors.NC}{Colors.GREEN} >{Colors.NC} "
)
initial_headers = "\n".join(f"{header}: {value}" for header, value in response.headers.items())
etag = response.headers.get("ETag")
subsequent_headers = "\n".join(f"{header}: {value}" for header, value in response.headers.items())
if debug:
Output.print(Box.create_box(
f"REQUEST INFO",
f"""INITIAL REQUEST:\nURL: {url}\nHeaders:\n{initial_headers}\n
SUBSEQUENT REQUEST:\nHeaders:\n{subsequent_headers}]\nETAG: {response.headers.get('ETag')}""",
f"""DEBUG INFORMATION:\nResponse:{response.status_code}""",
style="main",
))
return handle_page_navigation_input(prompt, url_list, history, tabs, config_path, response.headers.get('ETag'))
except FeatureNotFound:
c_log.error('Could not find LXML parser \n quickfix: "pip install lxml"')
return ""
[docs]
def get_html(
url: str,
header: str = "",
proxy: Optional[dict[str, str]] = None,
log: bool = True,
*,
parser: Optional[str] = "html.parser",
) -> BeautifulSoup:
"""Get the content of a web page.
:param url: The URL of the web page.
:type url: str
:param proxy: Proxy to be used for the request, defaults to None.
:type proxy: Optional[str], optional
:param log: Flag indicating whether to log the request, defaults to True.
:type log: bool, optional
:raises ValueError: If the URL is empty.
:raises Exception: If there are issues with the request.
:return: The content of the web page.
:rtype: str
"""
if not url:
raise ValueError("URL cannot be empty")
headers = {"User-Agent": "Mozilla/5.0"} if header else {}
try:
response = requests.get(url, proxies=proxy, headers=headers)
response.raise_for_status()
except requests.exceptions.RequestException as e:
raise Exception(f"Error occurred while fetching {url}: {str(e)}")
try:
soup = BeautifulSoup(response.content, parser)
except FeatureNotFound:
c_log.error('Could not find LXML parser \n quickfix: "pip install lxml"')
return soup
[docs]
def get_js(url: str, header: str = "", proxy: Optional[dict[str, str]] = None, log: bool = True, *, parser: Optional[str] = "html.parser") -> list[str] | str:
"""Get JavaScript sources from a web page."""
jsoptions = jsbeautifier.default_options()
if not url:
raise ValueError("URL cannot be empty")
headers = {"User-Agent": "Mozilla/5.0"} if header else {}
try:
response = requests.get(url, proxies=proxy, headers=headers)
response.raise_for_status()
except requests.exceptions.RequestException as e:
raise Exception(f"Error occurred while fetching {url}: {str(e)}")
try:
soup = BeautifulSoup(response.content, parser)
except FeatureNotFound:
raise Exception('Could not find BeautifulSoup parser')
script_tags = soup.find_all('script')
js_sources: list[str] = []
for script in script_tags:
src = script.get('src')
if src:
# Convert relative URLs to absolute URLs
full_url = urljoin(url, src)
js_sources.append(full_url)
output_source: list[str] = []
for js_source in js_sources:
try:
output_source.append(get_page(js_source))
except ValueError as e:
print("===========================================")
c_log.error(e)
print("===========================================")
except KeyError as e:
print("===========================================")
c_log.error(e)
print("===========================================")
continue
return output_source
[docs]
def inspect(
url: str,
header: str = "",
proxy: Optional[dict[str, str]] = None,
log: bool = True,
*,
parser: Optional[str] = "html.parser",
browse: bool = False,
mode: Literal["js", "html"] = "html",
) -> None:
"""
Inspects a web page by making an HTTP request with optional custom headers and proxies.
:param url: The URL of the web page to inspect.
:type url: str
:param header: Optional custom headers to include in the HTTP request.
:type header: str, default is `''`
:param proxy: Optional dictionary of proxy settings to use for the HTTP request. If `None`, no proxies are used.
:type proxy: dict[str, str] | None, default is `None`
:param log: Whether to enable logging of the HTTP request and response. Default is `True`.
:type log: bool, default is `True`
:param parser: Optional HTML parser to use with BeautifulSoup. Default is `'html.parser'`. If `None`, no parser is specified.
:type parser: str | None, default is `'html.parser'`
:param browse: Whether to enable browsing mode, which might affect how the page content is processed. Default is `False`.
:type browse: bool, default is `False`
:param mode: Specifies the mode of operation. Can be either `'js'` for JavaScript processing or `'html'` for HTML parsing.
:type mode: Literal['js', 'html'], default is `'html'`
:return: This function does not return a value. It performs actions based on the provided parameters.
:rtype: None
:raises ValueError: If the `url` is empty or invalid.
:raises requests.exceptions.RequestException: If there is an issue with the HTTP request.
:raises FeatureNotFound: If the specified parser is not found or cannot parse the content.
:raises Exception: For other unexpected errors that may occur during the inspection process.
:note:
- Ensure that the `header` parameter is properly formatted as a valid header string.
- The `proxy` dictionary should be formatted with valid proxy settings.
- The `mode` parameter determines whether JavaScript or HTML parsing is used.
- The `browse` parameter might affect how the content is handled, depending on its implementation.
:seealso:
- `BeautifulSoup` documentation for parsing options: https://www.crummy.com/software/BeautifulSoup/bs4/doc/
- `requests` documentation for HTTP request details: https://docs.python-requests.org/en/latest/
"""
if not url:
raise ValueError("URL cannot be empty")
try:
page_content = get_page(
url=url, header=header, proxy=proxy, log=log, parser=parser
)
if mode == "html":
inspect_content = get_html(
url=url, header=header, proxy=proxy, log=log, parser=parser
)
if mode == "js":
inspect_content = get_js(
url=url, header=header, proxy=proxy, log=log, parser=parser
)
print_split_terminal(left_text=page_content, right_text=str(inspect_content))
except ValueError as e:
print(f"Error occurred during inspection: {str(e)}")
return None