automatically video upload on youtube

pull/1568/head
Muhammad Naeem Tariq 2 years ago
parent f2d89a10dd
commit cb0b07b223

5
.gitignore vendored

@ -242,6 +242,7 @@ reddit-bot-351418-5560ebc49cac.json
*.pyc *.pyc
video_creation/data/videos.json video_creation/data/videos.json
video_creation/data/envvars.txt video_creation/data/envvars.txt
main.py-oauth2.json
client_secrets.json
config.toml config.toml
video_creation/data/videos.json

@ -10,6 +10,7 @@ from prawcore import ResponseException
from utils.console import print_substep from utils.console import print_substep
from reddit.subreddit import get_subreddit_threads from reddit.subreddit import get_subreddit_threads
from utils import settings from utils import settings
from upload import youtube
from utils.cleanup import cleanup from utils.cleanup import cleanup
from utils.console import print_markdown, print_step from utils.console import print_markdown, print_step
from utils.id import id from utils.id import id
@ -54,6 +55,8 @@ def main(POST_ID=None) -> None:
download_background(bg_config) download_background(bg_config)
chop_background_video(bg_config, length, reddit_object) chop_background_video(bg_config, length, reddit_object)
make_final_video(number_of_comments, length, reddit_object, bg_config) make_final_video(number_of_comments, length, reddit_object, bg_config)
youtube.init()
def run_many(times) -> None: def run_many(times) -> None:
@ -62,7 +65,7 @@ def run_many(times) -> None:
f'on the {x}{("th", "st", "nd", "rd", "th", "th", "th", "th", "th", "th")[x % 10]} iteration of {times}' f'on the {x}{("th", "st", "nd", "rd", "th", "th", "th", "th", "th", "th")[x % 10]} iteration of {times}'
) # correct 1st 2nd 3rd 4th 5th.... ) # correct 1st 2nd 3rd 4th 5th....
main() main()
Popen("cls" if name == "nt" else "clear", shell=True).wait() # Popen("cls" if name == "nt" else "clear", shell=True).wait()
def shutdown(): def shutdown():
@ -87,6 +90,14 @@ if __name__ == "__main__":
f"{directory}/utils/.config.template.toml", "config.toml" f"{directory}/utils/.config.template.toml", "config.toml"
) )
config is False and exit() config is False and exit()
settings.saveYoutubeConfig("config.toml")
# if upload argument exists, upload to youtube
if len(sys.argv) > 1 and sys.argv[1] == "upload" and sys.argv[2]=="youtube":
del sys.argv[2]
del sys.argv[1]
youtube.init()
sys.exit()
if ( if (
not settings.config["settings"]["tts"]["tiktok_sessionid"] not settings.config["settings"]["tts"]["tiktok_sessionid"]
or settings.config["settings"]["tts"]["tiktok_sessionid"] == "" or settings.config["settings"]["tts"]["tiktok_sessionid"] == ""

@ -17,6 +17,8 @@ Flask==2.2.2
clean-text==0.6.0 clean-text==0.6.0
unidecode==1.3.2 unidecode==1.3.2
spacy==3.4.1 spacy==3.4.1
torch==1.12.1 torch==2.0.0
transformers==4.25.1 transformers==4.25.1
ffmpeg-python==0.2.0 ffmpeg-python==0.2.0
google-api-python-client
httplib2

@ -0,0 +1,254 @@
import httplib2
import os
import random
import sys
import time
import datetime
import json
import toml
from apiclient.discovery import build
from apiclient.errors import HttpError
from apiclient.http import MediaFileUpload
from oauth2client.client import flow_from_clientsecrets
from oauth2client.file import Storage
from oauth2client.tools import argparser, run_flow
from datetime import timedelta
from datetime import datetime
# Explicitly tell the underlying HTTP transport library not to retry, since
# we are handling retry logic ourselves.
httplib2.RETRIES = 1
# Maximum number of times to retry before giving up.
MAX_RETRIES = 10
# Always retry when these exceptions are raised.
RETRIABLE_EXCEPTIONS = (
httplib2.HttpLib2Error,
IOError,
)
# Always retry when an apiclient.errors.HttpError with one of these status
# codes is raised.
RETRIABLE_STATUS_CODES = [500, 502, 503, 504]
# The CLIENT_SECRETS_FILE variable specifies the name of a file that contains
# the OAuth 2.0 information for this application, including its client_id and
# client_secret. You can acquire an OAuth 2.0 client ID and client secret from
# the Google API Console at
# https://console.developers.google.com/.
# Please ensure that you have enabled the YouTube Data API for your project.
# For more information about using OAuth2 to access the YouTube Data API, see:
# https://developers.google.com/youtube/v3/guides/authentication
# For more information about the client_secrets.json file format, see:
# https://developers.google.com/api-client-library/python/guide/aaa_client_secrets
CLIENT_SECRETS_FILE = "client_secrets.json"
# This OAuth 2.0 access scope allows an application to upload files to the
# authenticated user's YouTube channel, but doesn't allow other types of access.
YOUTUBE_UPLOAD_SCOPE = "https://www.googleapis.com/auth/youtube.upload"
YOUTUBE_API_SERVICE_NAME = "youtube"
YOUTUBE_API_VERSION = "v3"
# This variable defines a message to display if the CLIENT_SECRETS_FILE is
# missing.
MISSING_CLIENT_SECRETS_MESSAGE = """
WARNING: Please configure OAuth 2.0
To make this sample run you will need to populate the client_secrets.json file
found at:
%s
with information from the API Console
https://console.developers.google.com/
For more information about the client_secrets.json file format, please visit:
https://developers.google.com/api-client-library/python/guide/aaa_client_secrets
""" % os.path.abspath(
os.path.join(os.path.dirname(__file__), CLIENT_SECRETS_FILE)
)
VALID_PRIVACY_STATUSES = ("public", "private", "unlisted")
def get_authenticated_service(args):
# print(CLIENT_SECRETS_FILE)
flow = flow_from_clientsecrets(
CLIENT_SECRETS_FILE,
scope=YOUTUBE_UPLOAD_SCOPE,
message=MISSING_CLIENT_SECRETS_MESSAGE,
)
storage = Storage("%s-oauth2.json" % sys.argv[0])
credentials = storage.get()
if credentials is None or credentials.invalid:
credentials = run_flow(flow, storage, args)
return build(
YOUTUBE_API_SERVICE_NAME,
YOUTUBE_API_VERSION,
http=credentials.authorize(httplib2.Http()),
)
def initialize_upload(options, next_upload_time, i):
args = argparser.parse_args()
youtube = get_authenticated_service(args)
tags = None
if options["keywords"]:
tags = options["keywords"].split(",")
body = dict(
snippet=dict(
title=options["title"],
description=options["description"],
tags=tags,
categoryId=options["category"],
),
status=dict(
privacyStatus=options["privacyStatus"],
publishAt=options[
"publishTime"
], # might need to be publishedAt, implement this
),
)
# Call the API's videos.insert method to create and upload the video.
insert_request = youtube.videos().insert(
part=",".join(body.keys()),
body=body,
media_body=MediaFileUpload(options["file"], chunksize=-1, resumable=True),
)
resumable_upload(insert_request, next_upload_time, i, options["title"])
# This method implements an exponential backoff strategy to resume a
# failed upload.
def resumable_upload(insert_request, next_upload_time, i, title):
response = None
error = None
retry = 0
while response is None:
try:
print("Uploading video " + str(i) +" : " + title)
status, response = insert_request.next_chunk()
if response is not None:
if "id" in response:
print(
"Video id {0} was successfully uploaded and will be published at {1}".format(
response["id"], next_upload_time
)
)
## upload success
updateUploadedStatus(i, next_upload_time)
else:
exit("The upload failed with an unexpected response: %s" % response)
except HttpError as e:
if e.resp.status in RETRIABLE_STATUS_CODES:
error = "A retriable HTTP error %d occurred:\n%s" % (
e.resp.status,
e.content,
)
else:
raise
except RETRIABLE_EXCEPTIONS as e:
error = "A retriable error occurred: %s" % e
if error is not None:
print(error)
retry += 1
if retry > MAX_RETRIES:
exit("No longer attempting to retry.")
max_sleep = 2**retry
sleep_seconds = random.random() * max_sleep
print("Sleeping %f seconds and then retrying...") % sleep_seconds
time.sleep(sleep_seconds)
def updateUploadedStatus(currIndex, time):
# opens the json containing video_creation data (used for logic and upload data)
f = open(videos_json_file)
# loads videos.json into a dictionary
video_data = json.load(f)
video_data[currIndex]["uploaded_at"] = time
video_data[currIndex]["uploaded"] = True
# serialises the dictionary to json
json_obj = json.dumps(video_data, indent=4, default=str)
# writes the file
with open(videos_json_file, "w") as outfile:
outfile.write(json_obj)
def upload_youtube(video, prev_video, i):
next_upload = datetime.strptime(prev_video.get("uploaded_at", datetime.now().strftime(format_data)), format_data) + timedelta(
hours=4
)
file_name = "results/" + video["subreddit"] + "/" + video["filename"]
title = video["reddit_title"]+ " : " + "r/" + video["subreddit"]
description = (
"r/"
+ video["subreddit"]
+ " | "
+ video["reddit_title"]
+ "?"
+ " 🔔 Hit the bell next to Subscribe so you never miss a video! ❤️ Like and Comment 🧍 Subscribe if you are new on the channel!"
)
if len(title) >= 99:
title = title[0:96] + "..."
if len(title) <= 99:
title = title + "?"
options = {
"file": file_name,
"title": title,
"description": description,
"category": "22",
"keywords": "reddit,shorts,askReddit",
"privacyStatus": "private",
"publishTime": next_upload.isoformat(),
}
if not os.path.exists(options["file"]):
# print(options["file"])
exit(
"could not find the specified file --> {0} / {1}".format(
options["file"].split("/")[0], options["file"].split("/")[1]
)
)
initialize_upload(options, next_upload, i)
videos_json_file = "video_creation\\data\\videos.json"
next_upload = datetime.now()
format_data = "%Y-%m-%d %H:%M:%S"
def init():
if not(os.path.exists(CLIENT_SECRETS_FILE)):
print("video not automatically uploaded on youtube, please add youtube configuration in toml file if you want to upload videos automatically")
return
# opens the json containing video_creation data (used for logic and upload data)
f = open(videos_json_file)
# loads videos.json into a dictionary
video_data = json.load(f)
options = {}
uploaded = 0
for i, video in enumerate(video_data):
if video.get("uploaded", False) == False:
if uploaded < 6:
prev_video = list(video_data)[i - 1]
try:
upload_youtube(video, prev_video, i)
video["uploaded_at"] = next_upload.strftime(format_data)
uploaded += 1
except HttpError as e:
print("An HTTP error %d occurred:\n%s" % (e.resp.status, e.content))

@ -5,6 +5,10 @@ username = { optional = false, nmin = 3, nmax = 20, explanation = "The username
password = { optional = false, nmin = 8, explanation = "The password of your reddit account", example = "fFAGRNJru1FTz70BzhT3Zg", oob_error = "Password too short" } password = { optional = false, nmin = 8, explanation = "The password of your reddit account", example = "fFAGRNJru1FTz70BzhT3Zg", oob_error = "Password too short" }
2fa = { optional = true, type = "bool", options = [true, false, ], default = false, explanation = "Whether you have Reddit 2FA enabled, Valid options are True and False", example = true } 2fa = { optional = true, type = "bool", options = [true, false, ], default = false, explanation = "Whether you have Reddit 2FA enabled, Valid options are True and False", example = true }
[youtube.creds]
client_id = { optional = true, nmin = 12, nmax = 72, explanation = "The ID of your Youtube app of your app", example = "Youtube: 990756476661-jidf9qpp4m5r43ki5fk8fk706qc5mdfu.apps.googleusercontent.com", regex = "^[-a-zA-Z0-9._~+/]+=*$", input_error = "The client ID can only contain printable characters.", oob_error = "The ID should be over 12 and under 30 characters, double check your input." }
client_secret = { optional = true, nmin = 20, nmax = 40, explanation = "The SECRET of your Youtube app", example = "Youtube: fFAGRNJru1FTz70BzgthT3Zg", regex = "^[-a-zA-Z0-9._~+/]+=*$", input_error = "The client ID can only contain printable characters.", oob_error = "The secret should be over 20 and under 40 characters, double check your input." }
[reddit.thread] [reddit.thread]
random = { optional = true, options = [true, false, ], default = false, type = "bool", explanation = "If set to no, it will ask you a thread link to extract the thread, if yes it will randomize it. Default: 'False'", example = "True" } random = { optional = true, options = [true, false, ], default = false, type = "bool", explanation = "If set to no, it will ask you a thread link to extract the thread, if yes it will randomize it. Default: 'False'", example = "True" }
@ -50,4 +54,5 @@ tiktok_sessionid = { optional = true, example = "c76bcc3a7625abcc27b508c7db457ff
python_voice = { optional = false, default = "1", example = "1", explanation = "The index of the system tts voices (can be downloaded externally, run ptt.py to find value, start from zero)" } python_voice = { optional = false, default = "1", example = "1", explanation = "The index of the system tts voices (can be downloaded externally, run ptt.py to find value, start from zero)" }
py_voice_num = { optional = false, default = "2", example = "2", explanation = "The number of system voices (2 are pre-installed in Windows)" } py_voice_num = { optional = false, default = "2", example = "2", explanation = "The number of system voices (2 are pre-installed in Windows)" }
silence_duration = { optional = true, example = "0.1", explanation = "Time in seconds between TTS comments", default = 0.3, type = "float" } silence_duration = { optional = true, example = "0.1", explanation = "Time in seconds between TTS comments", default = 0.3, type = "float" }
no_emojis = { optional = false, type = "bool", default = false, example = false, options = [true, false,], explanation = "Whether to remove emojis from the comments" } no_emojis = { optional = false, type = "bool", default = false, example = false, options = [true, false,], explanation = "Whether to remove emojis from the comments" }

@ -48,6 +48,7 @@ def handle_input(
default=NotImplemented, default=NotImplemented,
optional=False, optional=False,
): ):
console.print("[green bold]" + extra_info, no_wrap=True)
if optional: if optional:
console.print( console.print(
message message
@ -67,7 +68,6 @@ def handle_input(
return default return default
if options is None: if options is None:
match = re.compile(match) match = re.compile(match)
console.print("[green bold]" + extra_info, no_wrap=True)
while True: while True:
console.print(message, end="") console.print(message, end="")
user_input = input("").strip() user_input = input("").strip()

@ -1,7 +1,7 @@
import re import re
from typing import Tuple, Dict from typing import Tuple, Dict
from pathlib import Path from pathlib import Path
import toml import toml, json, os
from rich.console import Console from rich.console import Console
from utils.console import handle_input from utils.console import handle_input
@ -178,12 +178,42 @@ Creating it now."""
If you see any prompts, that means that you have unset/incorrectly set variables, please input the correct values.\ If you see any prompts, that means that you have unset/incorrectly set variables, please input the correct values.\
""" """
) )
crawl(template, check_vars) crawl(template, check_vars)
with open(config_file, "w") as f: with open(config_file, "w") as f:
toml.dump(config, f) toml.dump(config, f)
return config return config
def saveYoutubeConfig(config_file):
if os.path.exists('client_secrets.json'):
return
console.print(f"[blue]Saving YouTube API configuration...")
global config
# Load the configuration file
config = toml.load(config_file)
# Get the YouTube API credentials
client_id = config['youtube']['creds']['client_id']
client_secret = config['youtube']['creds']['client_secret']
if(client_id == "" or client_secret == ""):
return
# Create the client_secrets dictionary
client_secrets = {
"installed": {
"client_id": client_id,
"client_secret": client_secret,
"redirect_uris": ["urn:ietf:wg:oauth:2.0:oob", "http://localhost"],
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://accounts.google.com/o/oauth2/token",
}
}
# Save the client_secrets dictionary to the client_secrets.json file
with open('client_secrets.json', 'w') as f:
json.dump(client_secrets, f)
if __name__ == "__main__": if __name__ == "__main__":
directory = Path().absolute() directory = Path().absolute()
check_toml(f"{directory}/utils/.config.template.toml", "config.toml") check_toml(f"{directory}/utils/.config.template.toml", "config.toml")

Loading…
Cancel
Save