Add files via upload

pull/2360/head
Tarushv Kosgi 2 months ago committed by GitHub
parent 64bf647de9
commit 8f61ef70be
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -155,6 +155,36 @@
</div>
</div>
</div>
<div class="row mb-2">
<label class="col-4">Story Mode</label>
<div class="col-8">
<div class="form-check form-switch">
<input name="storymode" class="form-check-input" type="checkbox" value="True"
data-toggle="tooltip" data-original-title='Only read out title and post content, great for subreddits with stories'>
</div>
</div>
</div>
<div class="row mb-2">
<label class="col-4">Hybrid Mode</label>
<div class="col-8">
<div class="form-check form-switch">
<input name="hybrid_mode" class="form-check-input" type="checkbox" value="True"
data-toggle="tooltip" data-original-title='Combines story mode and comment mode - includes both post text AND top comments'>
</div>
<span class="form-text text-muted">Reads both the post content and top comments in the same video.</span>
</div>
</div>
<div class="row mb-2">
<label for="hybrid_comments_count" class="col-4">Hybrid Comments Count</label>
<div class="col-8">
<div class="input-group">
<input name="hybrid_comments_count" type="range" class="form-range" min="1" max="20" step="1"
value="{{ data.hybrid_comments_count or 5 }}" data-toggle="tooltip"
data-original-title="{{ data.hybrid_comments_count or 5 }}">
</div>
<span class="form-text text-muted">Number of top comments to include in hybrid mode.</span>
</div>
</div>
<div class="row mb-2">
<label for="theme" class="col-4">Reddit Theme</label>
<div class="col-8">

@ -75,7 +75,36 @@ class TTSEngine:
# processed_text = ##self.reddit_object["thread_post"] != ""
idx = 0
if settings.config["settings"]["storymode"]:
# Handle hybrid mode - process both post content and comments
if settings.config["settings"].get("hybrid_mode", False):
# First process the post content
if settings.config["settings"]["storymodemethod"] == 0:
if len(self.reddit_object["thread_post"]) > self.tts_module.max_chars:
self.split_post(self.reddit_object["thread_post"], "postaudio")
else:
self.call_tts("postaudio", process_text(self.reddit_object["thread_post"]))
elif settings.config["settings"]["storymodemethod"] == 1:
for idx, text in track(enumerate(self.reddit_object["thread_post"])):
self.call_tts(f"postaudio-{idx}", process_text(text))
# Then process the comments
comment_start_idx = idx + 1 if settings.config["settings"]["storymodemethod"] == 1 else 1
for comment_idx, comment in track(enumerate(self.reddit_object["comments"], start=comment_start_idx), "Processing comments..."):
# Stop creating mp3 files if the length is greater than max length
if self.length > self.max_length and comment_idx > comment_start_idx:
self.length -= self.last_clip_length
comment_idx -= 1
break
if (
len(comment["comment_body"]) > self.tts_module.max_chars
): # Split the comment if it is too long
self.split_post(comment["comment_body"], f"comment-{comment_idx}")
else: # If the comment is not too long, just call the tts engine
self.call_tts(f"comment-{comment_idx}", process_text(comment["comment_body"]))
idx = comment_start_idx + len(self.reddit_object["comments"]) - 1
elif settings.config["settings"]["storymode"]:
if settings.config["settings"]["storymodemethod"] == 0:
if len(self.reddit_object["thread_post"]) > self.tts_module.max_chars:
self.split_post(self.reddit_object["thread_post"], "postaudio")

@ -32,6 +32,7 @@ def get_subreddit_threads(POST_ID: str):
username = settings.config["reddit"]["creds"]["username"]
if str(username).casefold().startswith("u/"):
username = username[2:]
reddit = None
try:
reddit = praw.Reddit(
client_id=settings.config["reddit"]["creds"]["client_id"],
@ -44,8 +45,14 @@ def get_subreddit_threads(POST_ID: str):
except ResponseException as e:
if e.response.status_code == 401:
print("Invalid credentials - please check them in config.toml")
except:
print("Something went wrong...")
exit()
except Exception as e:
print(f"Something went wrong: {e}")
exit()
if not reddit:
print("Failed to initialize Reddit instance. Exiting.")
exit()
# Ask user for subreddit input
print_step("Getting subreddit threads...")
@ -69,37 +76,81 @@ def get_subreddit_threads(POST_ID: str):
subreddit_choice = subreddit_choice[2:]
subreddit = reddit.subreddit(subreddit_choice)
if POST_ID: # would only be called if there are multiple queued posts
submission = reddit.submission(id=POST_ID)
elif (
settings.config["reddit"]["thread"]["post_id"]
and len(str(settings.config["reddit"]["thread"]["post_id"]).split("+")) == 1
):
submission = reddit.submission(id=settings.config["reddit"]["thread"]["post_id"])
elif settings.config["ai"]["ai_similarity_enabled"]: # ai sorting based on comparison
threads = subreddit.hot(limit=50)
keywords = settings.config["ai"]["ai_similarity_keywords"].split(",")
keywords = [keyword.strip() for keyword in keywords]
# Reformat the keywords for printing
keywords_print = ", ".join(keywords)
print(f"Sorting threads by similarity to the given keywords: {keywords_print}")
threads, similarity_scores = sort_by_similarity(threads, keywords)
submission, similarity_score = get_subreddit_undone(
threads, subreddit, similarity_scores=similarity_scores
)
else:
threads = subreddit.hot(limit=25)
submission = get_subreddit_undone(threads, subreddit)
while True:
submission_obj = None
if submission is None:
return get_subreddit_threads(POST_ID) # submission already done. rerun
# Get a submission
if POST_ID: # would only be called if there are multiple queued posts
submission_obj = reddit.submission(id=POST_ID)
elif not submission.num_comments and settings.config["settings"]["storymode"] == "false":
print_substep("No comments found. Skipping.")
exit()
elif (
settings.config["reddit"]["thread"]["post_id"]
and len(str(settings.config["reddit"]["thread"]["post_id"]).split("+")) == 1
):
submission_obj = reddit.submission(id=settings.config["reddit"]["thread"]["post_id"])
elif settings.config["ai"]["ai_similarity_enabled"]: # ai sorting based on comparison
threads = subreddit.hot(limit=50)
keywords = settings.config["ai"]["ai_similarity_keywords"].split(",")
keywords = [keyword.strip() for keyword in keywords]
# Reformat the keywords for printing
keywords_print = ", ".join(keywords)
print(f"Sorting threads by similarity to the given keywords: {keywords_print}")
threads, similarity_scores = sort_by_similarity(threads, keywords)
submission_obj, similarity_score = get_subreddit_undone(
threads, subreddit, similarity_scores=similarity_scores
)
else:
threads = subreddit.hot(limit=25)
submission_obj = get_subreddit_undone(threads, subreddit)
if submission_obj is None:
print_substep("No more suitable posts found in this subreddit.")
print_substep("This might be because:")
print_substep("- All posts have been processed already")
print_substep("- No posts have enough text for hybrid mode")
print_substep("- No posts meet the minimum comment requirements")
print_substep("Consider:")
print_substep("- Clearing the video history: Delete video_creation/data/videos.json")
print_substep("- Using a different subreddit")
print_substep("- Lowering hybrid_comments_count in config.toml")
return None
if isinstance(submission_obj, tuple):
submission, similarity_score = submission_obj
else:
submission = submission_obj
if submission is None:
continue
# `check_done` now returns None if the post is done, or the submission object if not.
if not check_done(submission):
continue
if not submission.selftext and (
settings.config["settings"]["storymode"] or settings.config["settings"].get("hybrid_mode", False)
):
print_substep("Post has no text, which is required for story mode or hybrid mode. Skipping post.", style="bold red")
check_done(submission, mark_as_done=True)
continue
submission = check_done(submission) # double-checking
if (
submission.num_comments == 0
and not settings.config["settings"]["storymode"]
and not settings.config["settings"].get("hybrid_mode", False)
):
print_substep("Post has no comments. Skipping.")
check_done(submission, mark_as_done=True)
continue
max_comments = settings.config["settings"].get("hybrid_comments_count", 3)
if settings.config["settings"].get("hybrid_mode", False) and submission.num_comments < max_comments:
print_substep(f"Post has less than {max_comments} comments, which is the minimum required for hybrid mode. Skipping post.")
check_done(submission, mark_as_done=True)
continue
# If we've reached this point, the submission is valid.
break
upvotes = submission.score
ratio = submission.upvote_ratio * 100
@ -122,7 +173,69 @@ def get_subreddit_threads(POST_ID: str):
content["thread_id"] = submission.id
content["is_nsfw"] = submission.over_18
content["comments"] = []
if settings.config["settings"]["storymode"]:
# Handle hybrid mode - includes both post text and comments
if settings.config["settings"].get("hybrid_mode", False):
max_comments = settings.config["settings"].get("hybrid_comments_count", 1)
# Add post content for hybrid mode - prefer selftext, fallback to OP comment
post_content = submission.selftext
if not post_content or len(post_content.strip()) < 30:
# Try to find OP comment as content
try:
for comment in submission.comments.list()[:5]:
if (hasattr(comment, 'author') and comment.author and
str(comment.author) == str(submission.author) and
len(comment.body.strip()) >= 30):
post_content = comment.body
print_substep("Using OP comment as post content")
break
except:
pass
if settings.config["settings"]["storymodemethod"] == 1:
content["thread_post"] = posttextparser(post_content) if post_content else submission.title
else:
content["thread_post"] = post_content if post_content else submission.title
# Also collect top comments for hybrid mode
comment_count = 0
for top_level_comment in submission.comments:
if comment_count >= max_comments:
break
if isinstance(top_level_comment, MoreComments):
continue
if top_level_comment.body in ["[removed]", "[deleted]"]:
continue
if not top_level_comment.stickied:
sanitised = sanitize_text(top_level_comment.body)
if not sanitised or sanitised == " ":
continue
if len(top_level_comment.body) <= int(
settings.config["reddit"]["thread"]["max_comment_length"]
):
if len(top_level_comment.body) >= int(
settings.config["reddit"]["thread"]["min_comment_length"]
):
if (
top_level_comment.author is not None
and sanitize_text(top_level_comment.body) is not None
):
content["comments"].append(
{
"comment_body": top_level_comment.body,
"comment_url": top_level_comment.permalink,
"comment_id": top_level_comment.id,
}
)
comment_count += 1
elif settings.config["settings"]["storymode"]:
if settings.config["settings"]["storymodemethod"] == 1:
content["thread_post"] = posttextparser(submission.selftext)
else:

@ -0,0 +1,74 @@
#!/usr/bin/env python
"""
Test script to verify hybrid mode functionality
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from utils import settings
def test_hybrid_mode_config():
"""Test if hybrid mode is properly configured"""
try:
# Load the configuration
config = settings.check_toml("utils/.config.template.toml", "config.toml")
# Check if hybrid mode is enabled
hybrid_mode = config["settings"].get("hybrid_mode", False)
hybrid_comments_count = config["settings"].get("hybrid_comments_count", 5)
print("=== Hybrid Mode Configuration Test ===")
print(f"Hybrid mode enabled: {hybrid_mode}")
print(f"Hybrid comments count: {hybrid_comments_count}")
if hybrid_mode:
print("✅ Hybrid mode is ENABLED and configured!")
print("This mode will include both the post text AND the top comments in the video.")
print(f"It will include up to {hybrid_comments_count} top comments.")
else:
print("❌ Hybrid mode is DISABLED")
# Show other relevant settings
storymode = config["settings"].get("storymode", False)
storymodemethod = config["settings"].get("storymodemethod", 1)
print(f"\nOther relevant settings:")
print(f"Story mode: {storymode}")
print(f"Story mode method: {storymodemethod}")
return hybrid_mode
except Exception as e:
print(f"Error testing hybrid mode: {e}")
return False
def test_hybrid_mode_features():
"""Test the features available in hybrid mode"""
print("\n=== Hybrid Mode Features ===")
print("When hybrid mode is enabled, the bot will:")
print("1. ✅ Read the post title (like all modes)")
print("2. ✅ Read the post content/text (from story mode)")
print("3. ✅ Read the top comments (from comment mode)")
print("4. ✅ Generate screenshots for both post and comments")
print("5. ✅ Create a video with post text followed by comments")
print("\n=== Configuration Options ===")
print("- hybrid_mode: Enable/disable hybrid mode")
print("- hybrid_comments_count: Number of top comments to include (1-20)")
print("- storymodemethod: How to display post content (0=single image, 1=fancy)")
if __name__ == "__main__":
hybrid_enabled = test_hybrid_mode_config()
test_hybrid_mode_features()
if hybrid_enabled:
print("\n🎉 SUCCESS: Hybrid mode is ready to use!")
print("You can now run 'python main.py' to create videos with both post text and comments.")
else:
print("\n⚠️ Hybrid mode is not enabled. To enable it:")
print("1. Edit config.toml")
print("2. Set hybrid_mode = true")
print("3. Set hybrid_comments_count = 5 (or your preferred number)")
print("4. Set storymode = false (hybrid mode replaces story mode)")

@ -1,4 +1,5 @@
import json
import time
from os.path import exists
from utils import settings
@ -29,9 +30,15 @@ def get_subreddit_undone(submissions: list, subreddit, times_checked=0, similari
json.dump([], f)
with open("./video_creation/data/videos.json", "r", encoding="utf-8") as done_vids_raw:
done_videos = json.load(done_vids_raw)
print_substep("Checking submissions...")
suitable_count = 0
checked_count = 0
for i, submission in enumerate(submissions):
checked_count += 1
if already_done(done_videos, submission):
continue
suitable_count += 1
if submission.over_18:
try:
if not settings.config["settings"]["allow_nsfw"]:
@ -42,47 +49,89 @@ def get_subreddit_undone(submissions: list, subreddit, times_checked=0, similari
if submission.stickied:
print_substep("This post was pinned by moderators. Skipping...")
continue
if (
submission.num_comments <= int(settings.config["reddit"]["thread"]["min_comments"])
and not settings.config["settings"]["storymode"]
):
print_substep(
f'This post has under the specified minimum of comments ({settings.config["reddit"]["thread"]["min_comments"]}). Skipping...'
)
continue
if settings.config["settings"]["storymode"]:
if not submission.selftext:
print_substep("You are trying to use story mode on post with no post text")
# Handle comment count requirements differently for different modes
if settings.config["settings"].get("hybrid_mode", False):
# For hybrid mode, use hybrid_comments_count but be more lenient
min_comments_required = settings.config["settings"].get("hybrid_comments_count", 1)
if submission.num_comments < min_comments_required:
print_substep(f'Post has less than {min_comments_required} comments required for hybrid mode. Skipping...')
continue
elif not settings.config["settings"]["storymode"]:
# For regular comment mode, use min_comments
min_comments_required = int(settings.config["reddit"]["thread"]["min_comments"])
if submission.num_comments < min_comments_required:
print_substep(f'This post has under the specified minimum of comments ({min_comments_required}). Skipping...')
continue
# Story mode doesn't need comments, so no check needed
if settings.config["settings"]["storymode"] or settings.config["settings"].get("hybrid_mode", False):
# Check if there's text content - either in selftext or as a comment from OP
has_text_content = bool(submission.selftext and len(submission.selftext.strip()) >= 30)
if not has_text_content:
# For non-self posts, check if OP made a comment explaining the post
if not submission.is_self and submission.num_comments > 0:
try:
# Look for a comment from the original poster
for comment in submission.comments.list()[:5]: # Check first 5 comments
if hasattr(comment, 'author') and comment.author and str(comment.author) == str(submission.author):
if len(comment.body.strip()) >= 30:
print_substep(f"Found OP comment with content: '{submission.title[:50]}...'")
has_text_content = True
break
except:
pass # Skip if we can't access comments
if not has_text_content:
print_substep(f"Skipping post '{submission.title[:50]}...' - no sufficient text content")
# Mark posts without text as done so they don't get picked up again
with open("./video_creation/data/videos.json", "r+", encoding="utf-8") as raw_vids:
done_videos = json.load(raw_vids)
payload = {
"subreddit": str(submission.subreddit),
"id": str(submission.id),
"time": str(int(time.time())),
"background_credit": "SKIPPED_NO_TEXT",
"reddit_title": submission.title,
"filename": "SKIPPED_NO_TEXT",
}
done_videos.append(payload)
raw_vids.seek(0)
json.dump(done_videos, raw_vids, ensure_ascii=False, indent=4)
continue
else:
# Check for the length of the post text
if len(submission.selftext) > (
# Check for the length of the post text (if it's selftext)
if submission.selftext and len(submission.selftext) > (
settings.config["settings"]["storymode_max_length"] or 2000
):
print_substep(
f"Post is too long ({len(submission.selftext)}), try with a different post. ({settings.config['settings']['storymode_max_length']} character limit)"
)
continue
elif len(submission.selftext) < 30:
continue
if settings.config["settings"]["storymode"] and not submission.is_self:
continue
# If we've reached this point, the post passed all filters!
print_substep(f"Found suitable post: '{submission.title[:50]}...' with {submission.num_comments} comments")
if similarity_scores is not None:
return submission, similarity_scores[i].item()
return submission
print("all submissions have been done going by top submission order")
# No suitable submissions found in current filter
print_substep(f"Checked {checked_count} posts, found {suitable_count} new posts, but none were suitable for hybrid mode.")
VALID_TIME_FILTERS = [
"day",
"hour",
"hour",
"month",
"week",
"year",
"all",
] # set doesn't have __getitem__
]
index = times_checked + 1
if index == len(VALID_TIME_FILTERS):
print("All submissions have been done.")
if index >= len(VALID_TIME_FILTERS):
print("All submissions have been processed. No suitable posts found.")
return None
# Try next time filter
print_substep(f"Trying {VALID_TIME_FILTERS[index]} time filter...")
return get_subreddit_undone(
subreddit.top(
time_filter=VALID_TIME_FILTERS[index],
@ -90,7 +139,7 @@ def get_subreddit_undone(submissions: list, subreddit, times_checked=0, similari
),
subreddit,
times_checked=index,
) # all the videos in hot have already been done
)
def already_done(done_videos: list, submission) -> bool:

@ -1,3 +1,5 @@
from typing import Optional
import json
import time
@ -8,28 +10,44 @@ from utils.console import print_step
def check_done(
redditobj: Submission,
) -> Submission:
redditobj: Submission, mark_as_done: bool = False
) -> Optional[Submission]:
# don't set this to be run anyplace that isn't subreddit.py bc of inspect stack
"""Checks if the chosen post has already been generated
Args:
redditobj (Submission): Reddit object gotten from reddit/subreddit.py
mark_as_done (bool): If true, the post will be marked as done and skipped
Returns:
Submission|None: Reddit object in args
"""
with open("./video_creation/data/videos.json", "r", encoding="utf-8") as done_vids_raw:
done_videos = json.load(done_vids_raw)
for video in done_videos:
if video["id"] == str(redditobj):
if settings.config["reddit"]["thread"]["post_id"]:
print_step(
"You already have done this video but since it was declared specifically in the config file the program will continue"
)
return redditobj
print_step("Getting new post as the current one has already been done")
with open("./video_creation/data/videos.json", "r+", encoding="utf-8") as raw_vids:
done_videos = json.load(raw_vids)
for video in done_videos:
if video["id"] == str(redditobj.id):
if settings.config["reddit"]["thread"]["post_id"]:
print_step(
"You already have done this video but since it was declared specifically in the config file the program will continue"
)
return redditobj
print_step("Getting new post as the current one has already been done")
return None
if mark_as_done:
payload = {
"subreddit": str(redditobj.subreddit),
"id": str(redditobj.id),
"time": str(int(time.time())),
"background_credit": "SKIPPED",
"reddit_title": redditobj.title,
"filename": "SKIPPED",
}
done_videos.append(payload)
raw_vids.seek(0)
json.dump(done_videos, raw_vids, ensure_ascii=False, indent=4)
return None
return redditobj

File diff suppressed because it is too large Load Diff

@ -24,6 +24,9 @@ from utils.videos import save_data
console = Console()
def sanitize_filename(title):
# Remove invalid Windows filename characters and trailing whitespace
return re.sub(r'[\\/:*?"<>|]', '', title).strip()
class ProgressFfmpeg(threading.Thread):
def __init__(self, vid_duration_seconds, progress_update_callback):
@ -65,7 +68,6 @@ class ProgressFfmpeg(threading.Thread):
def __exit__(self, *args, **kwargs):
self.stop()
def name_normalize(name: str) -> str:
name = re.sub(r'[?\\"%*:|<>]', "", name)
name = re.sub(r"( [w,W]\s?\/\s?[o,O,0])", r" without", name)
@ -82,7 +84,6 @@ def name_normalize(name: str) -> str:
else:
return name
def prepare_background(reddit_id: str, W: int, H: int) -> str:
output_path = f"assets/temp/{reddit_id}/background_noaudio.mp4"
output = (
@ -107,7 +108,6 @@ def prepare_background(reddit_id: str, W: int, H: int) -> str:
exit(1)
return output_path
def create_fancy_thumbnail(image, text, text_color, padding, wrap=35):
print_step(f"Creating fancy thumbnail for: {text}")
font_title_size = 47
@ -164,7 +164,6 @@ def create_fancy_thumbnail(image, text, text_color, padding, wrap=35):
return image
def merge_background_audio(audio: ffmpeg, reddit_id: str):
"""Gather an audio and merge with assets/backgrounds/background.mp3
Args:
@ -184,7 +183,6 @@ def merge_background_audio(audio: ffmpeg, reddit_id: str):
merged_audio = ffmpeg.filter([audio, bg_audio], "amix", duration="longest")
return merged_audio # Return merged audio
def make_final_video(
number_of_clips: int,
length: int,
@ -217,12 +215,31 @@ def make_final_video(
# Gather all audio clips
audio_clips = list()
if number_of_clips == 0 and settings.config["settings"]["storymode"] == "false":
if number_of_clips == 0 and settings.config["settings"]["storymode"] == "false" and not settings.config["settings"].get("hybrid_mode", False):
print(
"No audio clips to gather. Please use a different TTS or post."
) # This is to fix the TypeError: unsupported operand type(s) for +: 'int' and 'NoneType'
exit()
if settings.config["settings"]["storymode"]:
# Handle hybrid mode - includes both post audio and comment audio
if settings.config["settings"].get("hybrid_mode", False):
audio_clips = [ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3")]
# Add post audio clips
if settings.config["settings"]["storymodemethod"] == 0:
audio_clips.append(ffmpeg.input(f"assets/temp/{reddit_id}/mp3/postaudio.mp3"))
elif settings.config["settings"]["storymodemethod"] == 1:
post_audio_count = len([f for f in os.listdir(f"assets/temp/{reddit_id}/mp3") if f.startswith("postaudio-")])
for i in range(post_audio_count):
audio_clips.append(ffmpeg.input(f"assets/temp/{reddit_id}/mp3/postaudio-{i}.mp3"))
# Add comment audio clips
comment_audio_files = [f for f in os.listdir(f"assets/temp/{reddit_id}/mp3") if f.startswith("comment-")]
comment_audio_files.sort(key=lambda x: int(x.split('-')[1].split('.')[0])) # Sort by comment number
for comment_file in comment_audio_files:
audio_clips.append(ffmpeg.input(f"assets/temp/{reddit_id}/mp3/{comment_file}"))
elif settings.config["settings"]["storymode"]:
if settings.config["settings"]["storymodemethod"] == 0:
audio_clips = [ffmpeg.input(f"assets/temp/{reddit_id}/mp3/title.mp3")]
audio_clips.insert(1, ffmpeg.input(f"assets/temp/{reddit_id}/mp3/postaudio.mp3"))
@ -285,7 +302,111 @@ def make_final_video(
)
current_time = 0
if settings.config["settings"]["storymode"]:
# Handle hybrid mode - combines story mode and comment mode visuals
if settings.config["settings"].get("hybrid_mode", False):
# Calculate durations for all audio clips (title + post + comments)
audio_clips_durations = []
# Title duration
audio_clips_durations.append(
float(ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/title.mp3")["format"]["duration"])
)
# Post audio durations
if settings.config["settings"]["storymodemethod"] == 0:
audio_clips_durations.append(
float(ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/postaudio.mp3")["format"]["duration"])
)
elif settings.config["settings"]["storymodemethod"] == 1:
post_audio_count = len([f for f in os.listdir(f"assets/temp/{reddit_id}/mp3") if f.startswith("postaudio-")])
for i in range(post_audio_count):
audio_clips_durations.append(
float(ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/postaudio-{i}.mp3")["format"]["duration"])
)
# Comment audio durations
comment_audio_files = [f for f in os.listdir(f"assets/temp/{reddit_id}/mp3") if f.startswith("comment-")]
comment_audio_files.sort(key=lambda x: int(x.split('-')[1].split('.')[0]))
for comment_file in comment_audio_files:
audio_clips_durations.append(
float(ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/{comment_file}")["format"]["duration"])
)
# Handle visuals for hybrid mode
if settings.config["settings"]["storymodemethod"] == 0:
# Overlay title first (during title audio)
background_clip = background_clip.overlay(
image_clips[0],
enable=f"between(t,{current_time},{current_time + audio_clips_durations[0]})",
x="(main_w-overlay_w)/2",
y="(main_h-overlay_h)/2",
)
current_time += audio_clips_durations[0]
# Single image for post content
image_clips.append(
ffmpeg.input(f"assets/temp/{reddit_id}/png/story_content.png").filter(
"scale", screenshot_width, -1
)
)
# Overlay post content (after title finishes)
background_clip = background_clip.overlay(
image_clips[1],
enable=f"between(t,{current_time},{current_time + audio_clips_durations[1]})",
x="(main_w-overlay_w)/2",
y="(main_h-overlay_h)/2",
)
current_time += audio_clips_durations[1]
elif settings.config["settings"]["storymodemethod"] == 1:
# Overlay title first (during title audio)
background_clip = background_clip.overlay(
image_clips[0],
enable=f"between(t,{current_time},{current_time + audio_clips_durations[0]})",
x="(main_w-overlay_w)/2",
y="(main_h-overlay_h)/2",
)
current_time += audio_clips_durations[0]
# Multiple images for post content
clip_index = 1 # Start after title
post_audio_count = len([f for f in os.listdir(f"assets/temp/{reddit_id}/mp3") if f.startswith("postaudio-")])
for i in range(post_audio_count):
image_clips.append(
ffmpeg.input(f"assets/temp/{reddit_id}/png/img{i}.png")["v"].filter(
"scale", screenshot_width, -1
)
)
background_clip = background_clip.overlay(
image_clips[clip_index],
enable=f"between(t,{current_time},{current_time + audio_clips_durations[clip_index]})",
x="(main_w-overlay_w)/2",
y="(main_h-overlay_h)/2",
)
current_time += audio_clips_durations[clip_index]
clip_index += 1
# Add comment images
comment_count = len([f for f in os.listdir(f"assets/temp/{reddit_id}/mp3") if f.startswith("comment-")])
for i in range(comment_count):
comment_img_path = f"assets/temp/{reddit_id}/png/comment_{i + 1}.png"
if exists(comment_img_path):
image_clips.append(
ffmpeg.input(comment_img_path)["v"].filter(
"scale", screenshot_width, -1
)
)
background_clip = background_clip.overlay(
image_clips[-1],
enable=f"between(t,{current_time},{current_time + audio_clips_durations[len(audio_clips_durations) - comment_count + i]})",
x="(main_w-overlay_w)/2",
y="(main_h-overlay_h)/2",
)
current_time += audio_clips_durations[len(audio_clips_durations) - comment_count + i]
elif settings.config["settings"]["storymode"]:
audio_clips_durations = [
float(
ffmpeg.probe(f"assets/temp/{reddit_id}/mp3/postaudio-{i}.mp3")["format"]["duration"]
@ -303,6 +424,7 @@ def make_final_video(
"scale", screenshot_width, -1
),
)
# Overlay title first
background_clip = background_clip.overlay(
image_clips[0],
enable=f"between(t,{current_time},{current_time + audio_clips_durations[0]})",
@ -310,6 +432,14 @@ def make_final_video(
y="(main_h-overlay_h)/2",
)
current_time += audio_clips_durations[0]
# Then overlay story content
background_clip = background_clip.overlay(
image_clips[1],
enable=f"between(t,{current_time},{current_time + audio_clips_durations[1]})",
x="(main_w-overlay_w)/2",
y="(main_h-overlay_h)/2",
)
current_time += audio_clips_durations[1]
elif settings.config["settings"]["storymodemethod"] == 1:
for i in track(range(0, number_of_clips + 1), "Collecting the image files..."):
image_clips.append(
@ -347,7 +477,7 @@ def make_final_video(
idx = re.sub(r"[^\w\s-]", "", reddit_obj["thread_id"])
title_thumb = reddit_obj["thread_title"]
filename = f"{name_normalize(title)[:251]}"
filename = sanitize_filename(name_normalize(title)[:251])
subreddit = settings.config["reddit"]["thread"]["subreddit"]
if not exists(f"./results/{subreddit}"):
@ -479,4 +609,4 @@ def make_final_video(
print_step("Removing temporary files 🗑")
cleanups = cleanup(reddit_id)
print_substep(f"Removed {cleanups} temporary files 🗑")
print_step("Done! 🎉 The video is in the results folder 📁")
print_step("Done! 🎉 The video is in the results folder 📁")

@ -28,6 +28,7 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
H: Final[int] = int(settings.config["settings"]["resolution_h"])
lang: Final[str] = settings.config["reddit"]["thread"]["post_lang"]
storymode: Final[bool] = settings.config["settings"]["storymode"]
hybrid_mode: Final[bool] = settings.config["settings"].get("hybrid_mode", False)
print_step("Downloading screenshots of reddit posts...")
reddit_id = re.sub(r"[^\w\s-]", "", reddit_object["thread_id"])
@ -41,7 +42,7 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
txtcolor = (240, 240, 240)
transparent = False
elif settings.config["settings"]["theme"] == "transparent":
if storymode:
if storymode or hybrid_mode:
# Transparent theme
bgcolor = (0, 0, 0, 0)
txtcolor = (255, 255, 255)
@ -59,7 +60,17 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
txtcolor = (0, 0, 0)
transparent = False
if storymode and settings.config["settings"]["storymodemethod"] == 1:
# Handle hybrid mode - generate images for post content and then take screenshots for comments
if hybrid_mode and settings.config["settings"]["storymodemethod"] == 1:
print_substep("Generating images for post content...")
imagemaker(
theme=bgcolor,
reddit_obj=reddit_object,
txtclr=txtcolor,
transparent=transparent,
)
# Continue to take screenshots for comments below
elif storymode and settings.config["settings"]["storymodemethod"] == 1:
# for idx,item in enumerate(reddit_object["thread_post"]):
print_substep("Generating images...")
return imagemaker(
@ -96,6 +107,7 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
# Login to Reddit
print_substep("Logging in to Reddit...")
page = context.new_page()
page.set_default_timeout(60000) # Set default timeout globally to 60 seconds
page.goto("https://www.reddit.com/login", timeout=0)
page.set_viewport_size(ViewportSize(width=1920, height=1080))
page.wait_for_load_state()
@ -177,12 +189,12 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
# zoom the body of the page
page.evaluate("document.body.style.zoom=" + str(zoom))
# as zooming the body doesn't change the properties of the divs, we need to adjust for the zoom
location = page.locator('[data-test-id="post-content"]').bounding_box()
location = page.locator('shreddit-post').bounding_box()
for i in location:
location[i] = float("{:.2f}".format(location[i] * zoom))
page.screenshot(clip=location, path=postcontentpath)
else:
page.locator('[data-test-id="post-content"]').screenshot(path=postcontentpath)
page.locator('shreddit-post').screenshot(path=postcontentpath)
except Exception as e:
print_substep("Something went wrong!", style="red")
resp = input(
@ -202,15 +214,22 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
raise e
if storymode:
if storymode and not hybrid_mode:
page.locator('[data-click-id="text"]').first.screenshot(
path=f"assets/temp/{reddit_id}/png/story_content.png"
)
else:
elif hybrid_mode and settings.config["settings"]["storymodemethod"] == 0:
# For hybrid mode with single image, take screenshot of story content
page.locator('[data-click-id="text"]').first.screenshot(
path=f"assets/temp/{reddit_id}/png/story_content.png"
)
# For hybrid mode or regular comment mode, take screenshots of comments
if not storymode or hybrid_mode:
for idx, comment in enumerate(
track(
reddit_object["comments"][:screenshot_num],
"Downloading screenshots...",
"Downloading comment screenshots...",
)
):
# Stop if we have reached the screenshot_num
@ -222,8 +241,6 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
page.goto(f"https://new.reddit.com/{comment['comment_url']}")
# translate code
if settings.config["reddit"]["thread"]["post_lang"]:
comment_tl = translators.translate_text(
comment["comment_body"],
@ -231,28 +248,73 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
to_language=settings.config["reddit"]["thread"]["post_lang"],
)
page.evaluate(
'([tl_content, tl_id]) => document.querySelector(`#t1_${tl_id} > div:nth-child(2) > div > div[data-testid="comment"] > div`).textContent = tl_content',
'([tl_content, tl_id]) => document.querySelector(`shreddit-comment[thingid="t1_${tl_id}"] > div:nth-child(2) > div > div[data-testid="comment"] > div`).textContent = tl_content',
[comment_tl, comment["comment_id"]],
)
try:
target = f'shreddit-comment[thingid="t1_{comment["comment_id"]}"] div#t1_{comment["comment_id"]}-comment-rtjson-content'
visible = page.locator(target).is_visible()
if not visible:
class ElementVisible(Exception):pass
try:
for _ in range(30):
page.evaluate("""
(target) => {
const element = document.querySelector(target);
if (element) {
element.style.display = 'block'; // 'inline'
element.style.visibility = 'visible';
}
}
""", target)
page.wait_for_timeout(1000)
visible = page.locator(target).is_visible()
if visible:
raise ElementVisible
target = f'shreddit-comment[thingid="t1_{comment["comment_id"]}"] div#t1_{comment["comment_id"]}-comment-rtjson-content div#-post-rtjson-content'
visible = page.locator(target).is_visible()
if not visible:
for _ in range(30):
page.evaluate("""
(target) => {
const element = document.querySelector(target);
if (element) {
element.style.display = 'block'; // 'inline'
element.style.visibility = 'visible';
}
}
""", target)
page.wait_for_timeout(1000)
visible = page.locator(target).is_visible()
if visible:
raise ElementVisible
target = f'shreddit-comment[thingid="t1_{comment["comment_id"]}"]'
except ElementVisible:
pass
if settings.config["settings"]["zoom"] != 1:
# store zoom settings
zoom = settings.config["settings"]["zoom"]
# zoom the body of the page
page.evaluate("document.body.style.zoom=" + str(zoom))
# scroll comment into view
page.locator(f"#t1_{comment['comment_id']}").scroll_into_view_if_needed()
# as zooming the body doesn't change the properties of the divs, we need to adjust for the zoom
location = page.locator(f"#t1_{comment['comment_id']}").bounding_box()
page.locator(target).scroll_into_view_if_needed()
# Adjust for the zoom and get bounding box of the target element
location = page.locator(target).bounding_box()
for i in location:
location[i] = float("{:.2f}".format(location[i] * zoom))
page.screenshot(
clip=location,
path=f"assets/temp/{reddit_id}/png/comment_{idx}.png",
path=f"assets/temp/{reddit_id}/png/comment_{idx + 1}.png",
timeout=60000 # Increased timeout to 60 seconds
)
else:
page.locator(f"#t1_{comment['comment_id']}").screenshot(
path=f"assets/temp/{reddit_id}/png/comment_{idx}.png"
page.locator(target).screenshot(
path=f"assets/temp/{reddit_id}/png/comment_{idx + 1}.png",
timeout=60000 # Increased timeout to 60 seconds
)
except TimeoutError:
del reddit_object["comments"]
@ -263,4 +325,4 @@ def get_screenshots_of_reddit_posts(reddit_object: dict, screenshot_num: int):
# close browser instance when we are done using it
browser.close()
print_substep("Screenshots downloaded Successfully.", style="bold green")
print_substep("Screenshots downloaded Successfully.", style="bold green")
Loading…
Cancel
Save