diff --git a/webdriver/common.py b/webdriver/common.py index c1c9f35..5e544f8 100644 --- a/webdriver/common.py +++ b/webdriver/common.py @@ -5,21 +5,19 @@ from typing import TypeVar, Optional, Callable, Union _function = TypeVar("_function", bound=Callable[..., object]) _exceptions = TypeVar("_exceptions", bound=Optional[Union[type, tuple, list]]) +default_exception = None + @attrs class ExceptionDecorator: """ - Decorator factory for catching exceptions and writing logs + Decorator for catching exceptions and writing logs """ exception: Optional[_exceptions] = attrib(default=None) - _default_exception: Optional[_exceptions] = attrib( - kw_only=True, - default=None - ) def __attrs_post_init__(self): if not self.exception: - self.exception = self._default_exception + self.exception = default_exception def __call__( self, @@ -46,25 +44,24 @@ class ExceptionDecorator: return wrapper - @classmethod - def catch_exception( - cls, - func: Optional[_function], - exception: Optional[_exceptions] = None, - ) -> Union[object, _function]: - """ - Decorator for catching exceptions and writing logs - Args: - func: Function to be decorated - exception: Expected exception(s) - Returns: - Decorated function - """ - exceptor = cls(exception) - if func: - exceptor = exceptor(func) - return exceptor +def catch_exception( + func: Optional[_function], + exception: Optional[_exceptions] = None, +) -> Union[object, _function]: + """ + Decorator for catching exceptions and writing logs + + Args: + func: Function to be decorated + exception: Expected exception(s) + Returns: + Decorated function + """ + exceptor = ExceptionDecorator(exception) + if func: + exceptor = exceptor(func) + return exceptor # Lots of tabs - lots of memory diff --git a/webdriver/playwright.py b/webdriver/playwright.py index 87716a0..f7145bb 100644 --- a/webdriver/playwright.py +++ b/webdriver/playwright.py @@ -1,21 +1,21 @@ from asyncio import as_completed +from pathlib import Path +from typing import Dict, Optional +import translators as ts +from attr import attrs, attrib +from attr.validators import instance_of +from playwright.async_api import Browser, Playwright, Page, BrowserContext, ElementHandle from playwright.async_api import async_playwright, TimeoutError -from playwright.async_api import Browser, Playwright, Page, BrowserContext, Locator +from rich.progress import track -from pathlib import Path from utils import settings from utils.console import print_step, print_substep -import translators as ts -from rich.progress import track -from attr import attrs, attrib -from attr.validators import instance_of -from typing import Dict, Optional +import webdriver.common as common -from webdriver.common import ExceptionDecorator, chunks -catch_exception = ExceptionDecorator(default_exception=TimeoutError).catch_exception +common.default_exception = TimeoutError @attrs @@ -65,15 +65,19 @@ class Flaky: """ @staticmethod - @catch_exception - def find_element( - query: str, + @common.catch_exception + async def find_element( + selector: str, page_instance: Page, options: Optional[dict] = None, - ) -> Locator: - return page_instance.locator(query, **options) if options else page_instance.locator(query) + ) -> ElementHandle: + return ( + await page_instance.wait_for_selector(selector, **options) + if options + else await page_instance.wait_for_selector(selector) + ) - @catch_exception + @common.catch_exception async def click( self, page_instance: Optional[Page] = None, @@ -81,19 +85,19 @@ class Flaky: options: Optional[dict] = None, *, find_options: Optional[dict] = None, - element: Optional[Locator] = None, + element: Optional[ElementHandle] = None, ) -> None: if element: - await element.click(**options) if options else element.click() + await element.click(**options) if options else await element.click() else: results = ( - self.find_element(query, page_instance, **find_options) + await self.find_element(query, page_instance, **find_options) if find_options - else self.find_element(query, page_instance) + else await self.find_element(query, page_instance) ) await results.click(**options) if options else await results.click() - @catch_exception + @common.catch_exception async def screenshot( self, page_instance: Optional[Page] = None, @@ -101,15 +105,15 @@ class Flaky: options: Optional[dict] = None, *, find_options: Optional[dict] = None, - element: Optional[Locator] = None, + element: Optional[ElementHandle] = None, ) -> None: if element: await element.screenshot(**options) if options else await element.screenshot() else: results = ( - self.find_element(query, page_instance, **find_options) + await self.find_element(query, page_instance, **find_options) if find_options - else self.find_element(query, page_instance) + else await self.find_element(query, page_instance) ) await results.screenshot(**options) if options else await results.screenshot() @@ -135,7 +139,7 @@ class RedditScreenshot(Flaky, Browser): ): self.post_lang: Optional[bool] = settings.config["reddit"]["thread"]["post_lang"] - async def __dark_theme( + async def __dark_theme( # TODO isn't working self, page_instance: Page, ) -> None: @@ -148,24 +152,24 @@ class RedditScreenshot(Flaky, Browser): await self.click( page_instance, - "header-user-dropdown", + ".header-user-dropdown", ) # It's normal not to find it, sometimes there is none :shrug: await self.click( page_instance, - ":nth-match(button) >> 'Settings'", + "button >> span:has-text('Settings')", ) await self.click( page_instance, - ":nth-match(button) >> 'Dark Mode'", + "button >> span:has-text('Dark Mode')", ) # Closes settings await self.click( page_instance, - "header-user-dropdown" + ".header-user-dropdown" ) async def __close_nsfw( @@ -225,7 +229,7 @@ class RedditScreenshot(Flaky, Browser): await self.screenshot( comment_page, - f"id=t1_{comment_obj['comment_id']}", + f"[data-testid='post-container']", {"path": f"assets/temp/png/comment_{filename_idx}.png"}, ) @@ -255,7 +259,7 @@ class RedditScreenshot(Flaky, Browser): await self.screenshot( main_page, - '[data-click-id="text"]', + '[data-test-id="post-content"] > [data-click-id="text"]', {"path": "assets/temp/png/story_content.png"}, ) @@ -322,7 +326,7 @@ class RedditScreenshot(Flaky, Browser): ) for idx, chunked_tasks in enumerate( - [chunk for chunk in chunks(async_tasks_primary, 10)], + [chunk for chunk in common.chunks(async_tasks_primary, 10)], start=1, ): chunk_list = async_tasks_primary.__len__() // 10 + (1 if async_tasks_primary.__len__() % 10 != 0 else 0) diff --git a/webdriver/pyppeteer.py b/webdriver/pyppeteer.py index 9c630f6..b85a53d 100644 --- a/webdriver/pyppeteer.py +++ b/webdriver/pyppeteer.py @@ -16,9 +16,10 @@ from attr import attrs, attrib from attr.validators import instance_of from typing import Optional -from webdriver.common import ExceptionDecorator, chunks +import webdriver.common as common -catch_exception = ExceptionDecorator(default_exception=BrowserTimeoutError).catch_exception + +common.default_exception = BrowserTimeoutError @attrs @@ -60,7 +61,7 @@ class Browser: class Wait: @staticmethod - @catch_exception + @common.catch_exception async def find_xpath( page_instance: PageCls, xpath: Optional[str] = None, @@ -93,7 +94,7 @@ class Wait: el = await page_instance.waitForXPath(xpath) return el - @catch_exception + @common.catch_exception async def click( self, page_instance: Optional[PageCls] = None, @@ -120,7 +121,7 @@ class Wait: else: await el.click() - @catch_exception + @common.catch_exception async def screenshot( self, page_instance: Optional[PageCls] = None, @@ -297,7 +298,7 @@ class RedditScreenshot(Browser, Wait): await self.screenshot( main_page, - "//*[@data-click-id='text']", + "//div[@data-click-id='post-container']/child::div[@data-click-id='text']", {"path": "assets/temp/png/story_content.png"}, ) @@ -358,13 +359,13 @@ class RedditScreenshot(Browser, Wait): async_tasks_primary.append( self.screenshot( reddit_main, - f'//*[contains(@id, \'t3_{self.reddit_object["thread_id"]}\')]', + f'//*[@data-testid="post-container"]', {"path": "assets/temp/png/title.png"}, ) ) for idx, chunked_tasks in enumerate( - [chunk for chunk in chunks(async_tasks_primary, 10)], + [chunk for chunk in common.chunks(async_tasks_primary, 10)], start=1, ): chunk_list = async_tasks_primary.__len__() // 10 + (1 if async_tasks_primary.__len__() % 10 != 0 else 0)