fixes in async webdriver

pull/963/head
Drugsosos 2 years ago
parent 3d88f2bbe9
commit 1931b5e8e5
No known key found for this signature in database
GPG Key ID: 8E35176FE617E28D

@ -5,21 +5,19 @@ from typing import TypeVar, Optional, Callable, Union
_function = TypeVar("_function", bound=Callable[..., object]) _function = TypeVar("_function", bound=Callable[..., object])
_exceptions = TypeVar("_exceptions", bound=Optional[Union[type, tuple, list]]) _exceptions = TypeVar("_exceptions", bound=Optional[Union[type, tuple, list]])
default_exception = None
@attrs @attrs
class ExceptionDecorator: class ExceptionDecorator:
""" """
Decorator factory for catching exceptions and writing logs Decorator for catching exceptions and writing logs
""" """
exception: Optional[_exceptions] = attrib(default=None) exception: Optional[_exceptions] = attrib(default=None)
_default_exception: Optional[_exceptions] = attrib(
kw_only=True,
default=None
)
def __attrs_post_init__(self): def __attrs_post_init__(self):
if not self.exception: if not self.exception:
self.exception = self._default_exception self.exception = default_exception
def __call__( def __call__(
self, self,
@ -46,25 +44,24 @@ class ExceptionDecorator:
return wrapper return wrapper
@classmethod
def catch_exception(
cls,
func: Optional[_function],
exception: Optional[_exceptions] = None,
) -> Union[object, _function]:
"""
Decorator for catching exceptions and writing logs
Args: def catch_exception(
func: Function to be decorated func: Optional[_function],
exception: Expected exception(s) exception: Optional[_exceptions] = None,
Returns: ) -> Union[object, _function]:
Decorated function """
""" Decorator for catching exceptions and writing logs
exceptor = cls(exception)
if func: Args:
exceptor = exceptor(func) func: Function to be decorated
return exceptor exception: Expected exception(s)
Returns:
Decorated function
"""
exceptor = ExceptionDecorator(exception)
if func:
exceptor = exceptor(func)
return exceptor
# Lots of tabs - lots of memory # Lots of tabs - lots of memory

@ -1,21 +1,21 @@
from asyncio import as_completed from asyncio import as_completed
from pathlib import Path
from typing import Dict, Optional
import translators as ts
from attr import attrs, attrib
from attr.validators import instance_of
from playwright.async_api import Browser, Playwright, Page, BrowserContext, ElementHandle
from playwright.async_api import async_playwright, TimeoutError from playwright.async_api import async_playwright, TimeoutError
from playwright.async_api import Browser, Playwright, Page, BrowserContext, Locator from rich.progress import track
from pathlib import Path
from utils import settings from utils import settings
from utils.console import print_step, print_substep from utils.console import print_step, print_substep
import translators as ts
from rich.progress import track
from attr import attrs, attrib import webdriver.common as common
from attr.validators import instance_of
from typing import Dict, Optional
from webdriver.common import ExceptionDecorator, chunks
catch_exception = ExceptionDecorator(default_exception=TimeoutError).catch_exception common.default_exception = TimeoutError
@attrs @attrs
@ -65,15 +65,19 @@ class Flaky:
""" """
@staticmethod @staticmethod
@catch_exception @common.catch_exception
def find_element( async def find_element(
query: str, selector: str,
page_instance: Page, page_instance: Page,
options: Optional[dict] = None, options: Optional[dict] = None,
) -> Locator: ) -> ElementHandle:
return page_instance.locator(query, **options) if options else page_instance.locator(query) return (
await page_instance.wait_for_selector(selector, **options)
if options
else await page_instance.wait_for_selector(selector)
)
@catch_exception @common.catch_exception
async def click( async def click(
self, self,
page_instance: Optional[Page] = None, page_instance: Optional[Page] = None,
@ -81,19 +85,19 @@ class Flaky:
options: Optional[dict] = None, options: Optional[dict] = None,
*, *,
find_options: Optional[dict] = None, find_options: Optional[dict] = None,
element: Optional[Locator] = None, element: Optional[ElementHandle] = None,
) -> None: ) -> None:
if element: if element:
await element.click(**options) if options else element.click() await element.click(**options) if options else await element.click()
else: else:
results = ( results = (
self.find_element(query, page_instance, **find_options) await self.find_element(query, page_instance, **find_options)
if find_options if find_options
else self.find_element(query, page_instance) else await self.find_element(query, page_instance)
) )
await results.click(**options) if options else await results.click() await results.click(**options) if options else await results.click()
@catch_exception @common.catch_exception
async def screenshot( async def screenshot(
self, self,
page_instance: Optional[Page] = None, page_instance: Optional[Page] = None,
@ -101,15 +105,15 @@ class Flaky:
options: Optional[dict] = None, options: Optional[dict] = None,
*, *,
find_options: Optional[dict] = None, find_options: Optional[dict] = None,
element: Optional[Locator] = None, element: Optional[ElementHandle] = None,
) -> None: ) -> None:
if element: if element:
await element.screenshot(**options) if options else await element.screenshot() await element.screenshot(**options) if options else await element.screenshot()
else: else:
results = ( results = (
self.find_element(query, page_instance, **find_options) await self.find_element(query, page_instance, **find_options)
if find_options if find_options
else self.find_element(query, page_instance) else await self.find_element(query, page_instance)
) )
await results.screenshot(**options) if options else await results.screenshot() await results.screenshot(**options) if options else await results.screenshot()
@ -135,7 +139,7 @@ class RedditScreenshot(Flaky, Browser):
): ):
self.post_lang: Optional[bool] = settings.config["reddit"]["thread"]["post_lang"] self.post_lang: Optional[bool] = settings.config["reddit"]["thread"]["post_lang"]
async def __dark_theme( async def __dark_theme( # TODO isn't working
self, self,
page_instance: Page, page_instance: Page,
) -> None: ) -> None:
@ -148,24 +152,24 @@ class RedditScreenshot(Flaky, Browser):
await self.click( await self.click(
page_instance, page_instance,
"header-user-dropdown", ".header-user-dropdown",
) )
# It's normal not to find it, sometimes there is none :shrug: # It's normal not to find it, sometimes there is none :shrug:
await self.click( await self.click(
page_instance, page_instance,
":nth-match(button) >> 'Settings'", "button >> span:has-text('Settings')",
) )
await self.click( await self.click(
page_instance, page_instance,
":nth-match(button) >> 'Dark Mode'", "button >> span:has-text('Dark Mode')",
) )
# Closes settings # Closes settings
await self.click( await self.click(
page_instance, page_instance,
"header-user-dropdown" ".header-user-dropdown"
) )
async def __close_nsfw( async def __close_nsfw(
@ -225,7 +229,7 @@ class RedditScreenshot(Flaky, Browser):
await self.screenshot( await self.screenshot(
comment_page, comment_page,
f"id=t1_{comment_obj['comment_id']}", f"[data-testid='post-container']",
{"path": f"assets/temp/png/comment_{filename_idx}.png"}, {"path": f"assets/temp/png/comment_{filename_idx}.png"},
) )
@ -255,7 +259,7 @@ class RedditScreenshot(Flaky, Browser):
await self.screenshot( await self.screenshot(
main_page, main_page,
'[data-click-id="text"]', '[data-test-id="post-content"] > [data-click-id="text"]',
{"path": "assets/temp/png/story_content.png"}, {"path": "assets/temp/png/story_content.png"},
) )
@ -322,7 +326,7 @@ class RedditScreenshot(Flaky, Browser):
) )
for idx, chunked_tasks in enumerate( for idx, chunked_tasks in enumerate(
[chunk for chunk in chunks(async_tasks_primary, 10)], [chunk for chunk in common.chunks(async_tasks_primary, 10)],
start=1, start=1,
): ):
chunk_list = async_tasks_primary.__len__() // 10 + (1 if async_tasks_primary.__len__() % 10 != 0 else 0) chunk_list = async_tasks_primary.__len__() // 10 + (1 if async_tasks_primary.__len__() % 10 != 0 else 0)

@ -16,9 +16,10 @@ from attr import attrs, attrib
from attr.validators import instance_of from attr.validators import instance_of
from typing import Optional from typing import Optional
from webdriver.common import ExceptionDecorator, chunks import webdriver.common as common
catch_exception = ExceptionDecorator(default_exception=BrowserTimeoutError).catch_exception
common.default_exception = BrowserTimeoutError
@attrs @attrs
@ -60,7 +61,7 @@ class Browser:
class Wait: class Wait:
@staticmethod @staticmethod
@catch_exception @common.catch_exception
async def find_xpath( async def find_xpath(
page_instance: PageCls, page_instance: PageCls,
xpath: Optional[str] = None, xpath: Optional[str] = None,
@ -93,7 +94,7 @@ class Wait:
el = await page_instance.waitForXPath(xpath) el = await page_instance.waitForXPath(xpath)
return el return el
@catch_exception @common.catch_exception
async def click( async def click(
self, self,
page_instance: Optional[PageCls] = None, page_instance: Optional[PageCls] = None,
@ -120,7 +121,7 @@ class Wait:
else: else:
await el.click() await el.click()
@catch_exception @common.catch_exception
async def screenshot( async def screenshot(
self, self,
page_instance: Optional[PageCls] = None, page_instance: Optional[PageCls] = None,
@ -297,7 +298,7 @@ class RedditScreenshot(Browser, Wait):
await self.screenshot( await self.screenshot(
main_page, main_page,
"//*[@data-click-id='text']", "//div[@data-click-id='post-container']/child::div[@data-click-id='text']",
{"path": "assets/temp/png/story_content.png"}, {"path": "assets/temp/png/story_content.png"},
) )
@ -358,13 +359,13 @@ class RedditScreenshot(Browser, Wait):
async_tasks_primary.append( async_tasks_primary.append(
self.screenshot( self.screenshot(
reddit_main, reddit_main,
f'//*[contains(@id, \'t3_{self.reddit_object["thread_id"]}\')]', f'//*[@data-testid="post-container"]',
{"path": "assets/temp/png/title.png"}, {"path": "assets/temp/png/title.png"},
) )
) )
for idx, chunked_tasks in enumerate( for idx, chunked_tasks in enumerate(
[chunk for chunk in chunks(async_tasks_primary, 10)], [chunk for chunk in common.chunks(async_tasks_primary, 10)],
start=1, start=1,
): ):
chunk_list = async_tasks_primary.__len__() // 10 + (1 if async_tasks_primary.__len__() % 10 != 0 else 0) chunk_list = async_tasks_primary.__len__() // 10 + (1 if async_tasks_primary.__len__() % 10 != 0 else 0)

Loading…
Cancel
Save