You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
RedditVideoMakerBot/video_creation/screenshot_downloader.py

390 lines
12 KiB

from asyncio import as_completed
from pyppeteer import launch
from pyppeteer.page import Page as PageCls
from pyppeteer.browser import Browser as BrowserCls
from pyppeteer.element_handle import ElementHandle as ElementHandleCls
from pyppeteer.errors import TimeoutError as BrowserTimeoutError
3 years ago
from pathlib import Path
from typing import Dict
from utils import settings
from rich.progress import track
3 years ago
import translators as ts
from utils.console import print_step, print_substep
3 years ago
from attr import attrs, attrib
3 years ago
from attr.validators import instance_of, optional
from typing import TypeVar, Optional, Callable, Union
_function = TypeVar("_function", bound=Callable[..., object])
_exceptions = TypeVar("_exceptions", bound=Optional[Union[type, tuple, list]])
@attrs
class ExceptionDecorator:
3 years ago
"""
Decorator factory for catching exceptions and writing logs
3 years ago
"""
exception: Optional[_exceptions] = attrib(default=None)
__default_exception: _exceptions = attrib(default=BrowserTimeoutError)
def __attrs_post_init__(self):
if not self.exception:
self.exception = self.__default_exception
def __call__(
self,
func: _function,
):
async def wrapper(*args, **kwargs):
try:
obj_to_return = await func(*args, **kwargs)
return obj_to_return
except Exception as caughtException:
import logging
logger = logging.getLogger("webdriver_log")
logger.setLevel(logging.ERROR)
handler = logging.FileHandler(".webdriver.log", mode="a+", encoding="utf-8")
logger.addHandler(handler)
if isinstance(self.exception, type):
if not type(caughtException) == self.exception:
logger.error(f"unexpected error - {caughtException}")
else:
if not type(caughtException) in self.exception:
logger.error(f"unexpected error - {caughtException}")
return wrapper
def catch_exception(
func: Optional[_function],
exception: Optional[_exceptions] = None,
3 years ago
) -> Union[ExceptionDecorator, _function]:
3 years ago
"""
Decorator for catching exceptions and writing logs
Args:
func: Function to be decorated
exception: Expected exception(s)
Returns:
Decorated function
"""
exceptor = ExceptionDecorator(exception)
if func:
exceptor = exceptor(func)
return exceptor
@attrs
class Browser:
3 years ago
"""
Args:
default_Viewport (dict):Pyppeteer Browser default_Viewport options
browser (BrowserCls): Pyppeteer Browser instance
"""
default_Viewport: dict = attrib(
validator=instance_of(dict),
default={
"defaultViewport": {
"width": 500,
"height": 1200,
},
},
3 years ago
kw_only=True,
)
browser: Optional[BrowserCls] = attrib(
validator=optional(instance_of(BrowserCls)),
default=None,
kw_only=True,
)
async def get_browser(
self,
3 years ago
) -> None:
"""
Creates Pyppeteer browser
"""
self.browser = await launch(self.default_Viewport)
async def close_browser(
self,
) -> None:
3 years ago
"""
Closes Pyppeteer browser
"""
await self.browser.close()
class Wait:
@staticmethod
@catch_exception
async def find_xpath(
page_instance: PageCls,
xpath: Optional[str] = None,
options: Optional[dict] = None,
) -> 'ElementHandleCls':
3 years ago
"""
Explicitly finds element on the page
Args:
page_instance: Pyppeteer page instance
xpath: xpath query
options: Pyppeteer waitForXPath parameters
Available options are:
* ``visible`` (bool): wait for element to be present in DOM and to be
visible, i.e. to not have ``display: none`` or ``visibility: hidden``
CSS properties. Defaults to ``False``.
* ``hidden`` (bool): wait for element to not be found in the DOM or to
be hidden, i.e. have ``display: none`` or ``visibility: hidden`` CSS
properties. Defaults to ``False``.
* ``timeout`` (int|float): maximum time to wait for in milliseconds.
Defaults to 30000 (30 seconds). Pass ``0`` to disable timeout.
Returns:
Pyppeteer element instance
"""
if options:
el = await page_instance.waitForXPath(xpath, options=options)
else:
el = await page_instance.waitForXPath(xpath)
return el
@catch_exception
async def click(
self,
page_instance: Optional[PageCls] = None,
xpath: Optional[str] = None,
find_options: Optional[dict] = None,
options: Optional[dict] = None,
el: Optional[ElementHandleCls] = None,
) -> None:
3 years ago
"""
Clicks on the element
Args:
page_instance: Pyppeteer page instance
xpath: xpath query
find_options: Pyppeteer waitForXPath parameters
options: Pyppeteer click parameters
el: Pyppeteer element instance
"""
if not el:
el = await self.find_xpath(page_instance, xpath, find_options)
if options:
await el.click(options)
else:
await el.click()
@catch_exception
async def screenshot(
self,
page_instance: Optional[PageCls] = None,
xpath: Optional[str] = None,
options: Optional[dict] = None,
find_options: Optional[dict] = None,
el: Optional[ElementHandleCls] = None,
) -> None:
3 years ago
"""
Makes a screenshot of the element
Args:
page_instance: Pyppeteer page instance
xpath: xpath query
options: Pyppeteer screenshot parameters
find_options: Pyppeteer waitForXPath parameters
el: Pyppeteer element instance
"""
if not el:
el = await self.find_xpath(page_instance, xpath, find_options)
if options:
await el.screenshot(options)
else:
await el.screenshot()
@attrs(auto_attribs=True)
3 years ago
class RedditScreenshot(Browser, Wait):
"""
Args:
reddit_object (Dict): Reddit object received from reddit/subreddit.py
3 years ago
screenshot_idx (int): List with indexes of voiced comments
"""
reddit_object: dict
screenshot_idx: list
3 years ago
3 years ago
async def __dark_theme(
self,
page_instance: PageCls,
) -> None:
"""
Enables dark theme in Reddit
3 years ago
Args:
page_instance: Pyppeteer page instance with reddit page opened
"""
3 years ago
await self.click(
page_instance,
"//*[contains(@class, 'header-user-dropdown')]",
{"timeout": 5000},
)
# It's normal not to find it, sometimes there is none :shrug:
await self.click(
page_instance,
"//*[contains(text(), 'Settings')]/ancestor::button[1]",
{"timeout": 5000},
)
await self.click(
page_instance,
"//*[contains(text(), 'Dark Mode')]/ancestor::button[1]",
{"timeout": 5000},
)
# Closes settings
await self.click(
page_instance,
"//*[contains(@class, 'header-user-dropdown')]",
{"timeout": 5000},
)
async def __close_nsfw(
self,
page_instance: PageCls
) -> None:
from asyncio import ensure_future
print_substep("Post is NSFW. You are spicy...")
# To await indirectly reload
navigation = ensure_future(page_instance.waitForNavigation())
# Triggers indirectly reload
await self.click(
page_instance,
'//button[text()="Yes"]',
{"timeout": 5000},
)
# Await reload
await navigation
await (await self.find_xpath(
page_instance,
'//button[text()="Click to see nsfw"]',
{"timeout": 5000},
)).click()
3 years ago
async def __collect_comment(
self,
3 years ago
comment_obj: dict,
filename_idx: int,
) -> None:
"""
Makes a screenshot of the comment
3 years ago
Args:
comment_obj: prew comment object
filename_idx: index for the filename
"""
comment_page = await self.browser.newPage()
await comment_page.goto(f'https://reddit.com{comment_obj["comment_url"]}')
# Translates submission' comment
if settings.config["reddit"]["thread"]["post_lang"]:
3 years ago
comment_tl = ts.google(
comment_obj["comment_body"],
to_language=settings.config["reddit"]["thread"]["post_lang"],
3 years ago
)
await comment_page.evaluate(
f'([tl_content, tl_id]) => document.querySelector(`#t1_{comment_obj["comment_id"]} > div:nth-child(2) '
f'> div > div[data-testid="comment"] > div`).textContent = {comment_tl}',
)
await self.screenshot(
comment_page,
f'//*[contains(@id, \'t1_{comment_obj["comment_id"]}\')]',
{"path": f"assets/temp/png/comment_{filename_idx}.png"},
3 years ago
)
async def download(
self,
):
"""
Downloads screenshots of reddit posts as seen on the web. Downloads to assets/temp/png
"""
print_step("Downloading screenshots of reddit posts...")
print_substep("Launching Headless Browser...")
await self.get_browser()
# ! Make sure the reddit screenshots folder exists
Path("assets/temp/png").mkdir(parents=True, exist_ok=True)
# Get the thread screenshot
3 years ago
reddit_main = await self.browser.newPage()
await reddit_main.goto(self.reddit_object["thread_url"])
if settings.config["settings"]["theme"] == "dark":
3 years ago
await self.__dark_theme(reddit_main)
if self.reddit_object["is_nsfw"]:
# This means the post is NSFW and requires to click the proceed button.
await self.__close_nsfw(reddit_main)
3 years ago
3 years ago
# Translates submission title
if settings.config["reddit"]["thread"]["post_lang"]:
print_substep("Translating post...")
texts_in_tl = ts.google(
self.reddit_object["thread_title"],
to_language=settings.config["reddit"]["thread"]["post_lang"],
)
3 years ago
await reddit_main.evaluate(
"tl_content => document.querySelector('[data-test-id=\"post-content\"] > div:nth-child(3) > div > "
"div').textContent = tl_content",
texts_in_tl,
)
3 years ago
else:
print_substep("Skipping translation...")
3 years ago
async_tasks_primary = [
self.__collect_comment(self.reddit_object["comments"][idx], idx) for idx in
3 years ago
self.screenshot_idx
]
async_tasks_primary.append(
self.screenshot(
reddit_main,
f'//*[contains(@id, \'t3_{self.reddit_object["thread_id"]}\')]',
{"path": "assets/temp/png/title.png"},
)
)
# Lots of tabs - lots of memory
# chunk needed to minimize memory required
def chunks(lst, n):
"""Yield successive n-sized chunks from list."""
for i in range(0, len(lst), n):
yield lst[i:i + n]
3 years ago
for idx, chunked_tasks in enumerate(
[chunk for chunk in chunks(async_tasks_primary, 10)],
start=1,
):
chunk_list = async_tasks_primary.__len__() // 10 + (1 if async_tasks_primary.__len__() % 10 != 0 else 0)
for task in track(
as_completed(chunked_tasks),
description=f"Downloading comments: Chunk {idx}/{chunk_list}",
total=chunked_tasks.__len__(),
):
await task
print_substep("Comments downloaded Successfully.", style="bold green")
3 years ago
await self.close_browser()