Update main.py
This commit is contained in:
parent
8d355a3659
commit
528ea0848f
1 changed files with 113 additions and 41 deletions
154
main.py
154
main.py
|
@ -1,17 +1,19 @@
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
"""
|
||||||
# you may not use this file except in compliance with the License.
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
# You may obtain a copy of the License at
|
you may not use this file except in compliance with the License.
|
||||||
#
|
You may obtain a copy of the License at
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
|
|
||||||
# Author: nloginov
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
# Script Name: Discord Favorite Gif Downloader
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
|
||||||
|
Author: nloginov
|
||||||
|
Script Name: Discord Favorite Gif Downloader
|
||||||
|
"""
|
||||||
|
|
||||||
import base64
|
import base64
|
||||||
import re
|
import re
|
||||||
|
@ -21,15 +23,19 @@ import os
|
||||||
from bs4 import BeautifulSoup
|
from bs4 import BeautifulSoup
|
||||||
import logging
|
import logging
|
||||||
import time
|
import time
|
||||||
|
import json
|
||||||
|
from collections import defaultdict
|
||||||
|
import hashlib
|
||||||
|
|
||||||
# Set up logging
|
# Set up logging
|
||||||
logging.basicConfig(level=logging.INFO, format='%(message)s')
|
logging.basicConfig(level=logging.INFO, format='%(message)s')
|
||||||
logger = logging.getLogger()
|
logger = logging.getLogger()
|
||||||
|
|
||||||
# Global counters
|
# Global counters and error tracking
|
||||||
total_urls = 0
|
total_urls = 0
|
||||||
successful_downloads = 0
|
successful_downloads = 0
|
||||||
failed_downloads = 0
|
failed_downloads = 0
|
||||||
|
error_summary = defaultdict(list)
|
||||||
|
|
||||||
def ensure_directory(directory):
|
def ensure_directory(directory):
|
||||||
if not os.path.exists(directory):
|
if not os.path.exists(directory):
|
||||||
|
@ -76,18 +82,48 @@ def get_tenor_gif_url(tenor_url):
|
||||||
pass # Silently handle the error
|
pass # Silently handle the error
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
CONTENT_TYPES = {
|
||||||
|
'image/gif': ('.gif', 'gif'),
|
||||||
|
'video/mp4': ('.mp4', 'mp4'),
|
||||||
|
'image/png': ('.png', 'img'),
|
||||||
|
'image/jpeg': ('.jpg', 'img'),
|
||||||
|
'video/webm': ('.webm', 'webm'),
|
||||||
|
'image/webp': ('.webp', 'img')
|
||||||
|
}
|
||||||
|
|
||||||
|
SUPPORTED_EXTENSIONS = tuple(ext for ext, _ in CONTENT_TYPES.values())
|
||||||
|
|
||||||
|
def get_extension_and_subfolder(content_type, direct_url):
|
||||||
|
for mime, (ext, subfolder) in CONTENT_TYPES.items():
|
||||||
|
if mime in content_type or direct_url.lower().endswith(ext):
|
||||||
|
return ext, subfolder
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
def safe_filename(filename, max_length=200):
|
||||||
|
# Remove invalid characters
|
||||||
|
filename = re.sub(r'[<>:"/\\|?*]', '', filename)
|
||||||
|
|
||||||
|
# Truncate if too long, but keep the extension
|
||||||
|
name, ext = os.path.splitext(filename)
|
||||||
|
if len(name) > max_length:
|
||||||
|
# Use a hash of the full name to ensure uniqueness
|
||||||
|
name_hash = hashlib.md5(name.encode()).hexdigest()[:8]
|
||||||
|
name = name[:max_length-9] + '_' + name_hash
|
||||||
|
|
||||||
|
return name + ext
|
||||||
|
|
||||||
def download_media(url):
|
def download_media(url):
|
||||||
global successful_downloads, failed_downloads
|
global successful_downloads, failed_downloads
|
||||||
try:
|
try:
|
||||||
if url.lower().endswith(('.gif', '.mp4', '.png')):
|
if url.lower().endswith(SUPPORTED_EXTENSIONS):
|
||||||
direct_url = url
|
direct_url = url
|
||||||
elif 'tenor.com' in url:
|
elif 'tenor.com' in url:
|
||||||
gif_url = get_tenor_gif_url(url)
|
gif_url = get_tenor_gif_url(url)
|
||||||
if gif_url:
|
if gif_url:
|
||||||
direct_url = gif_url
|
direct_url = gif_url
|
||||||
else:
|
else:
|
||||||
logger.debug(f"Skipped Tenor URL: {url}")
|
|
||||||
failed_downloads += 1
|
failed_downloads += 1
|
||||||
|
error_summary["Tenor URL skipped"].append(url)
|
||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
direct_url = url
|
direct_url = url
|
||||||
|
@ -97,19 +133,11 @@ def download_media(url):
|
||||||
|
|
||||||
content_type = response.headers.get('Content-Type', '').lower()
|
content_type = response.headers.get('Content-Type', '').lower()
|
||||||
|
|
||||||
if 'image/gif' in content_type or direct_url.lower().endswith('.gif'):
|
extension, subfolder = get_extension_and_subfolder(content_type, direct_url)
|
||||||
extension = '.gif'
|
if not extension:
|
||||||
subfolder = 'gif'
|
failed_downloads += 1
|
||||||
elif 'video/mp4' in content_type or direct_url.lower().endswith('.mp4'):
|
error_summary["Unsupported content type"].append(f"{content_type} - {direct_url}")
|
||||||
extension = '.mp4'
|
return
|
||||||
subfolder = 'mp4'
|
|
||||||
elif 'image/png' in content_type or direct_url.lower().endswith('.png'):
|
|
||||||
extension = '.png'
|
|
||||||
subfolder = 'gif'
|
|
||||||
else:
|
|
||||||
logger.debug(f"Skipped unsupported content type: {content_type} for URL: {direct_url}")
|
|
||||||
failed_downloads += 1
|
|
||||||
return
|
|
||||||
|
|
||||||
parsed_url = urlparse(unquote(direct_url))
|
parsed_url = urlparse(unquote(direct_url))
|
||||||
filename = os.path.basename(parsed_url.path)
|
filename = os.path.basename(parsed_url.path)
|
||||||
|
@ -119,9 +147,7 @@ def download_media(url):
|
||||||
path_parts = parsed_url.path.rstrip('/').split('/')
|
path_parts = parsed_url.path.rstrip('/').split('/')
|
||||||
filename = path_parts[-1] if path_parts else 'unnamed'
|
filename = path_parts[-1] if path_parts else 'unnamed'
|
||||||
|
|
||||||
filename = re.sub(r'[^\w\-_\. ]', '_', filename)
|
filename = safe_filename(filename + extension)
|
||||||
filename = filename.strip() or 'unnamed'
|
|
||||||
filename += extension
|
|
||||||
|
|
||||||
download_dir = os.path.join('downloaded', subfolder)
|
download_dir = os.path.join('downloaded', subfolder)
|
||||||
ensure_directory(download_dir)
|
ensure_directory(download_dir)
|
||||||
|
@ -140,23 +166,52 @@ def download_media(url):
|
||||||
progress = (successful_downloads + failed_downloads) / total_urls * 100
|
progress = (successful_downloads + failed_downloads) / total_urls * 100
|
||||||
logger.info(f"Downloaded: {filename} ({progress:.1f}% complete)")
|
logger.info(f"Downloaded: {filename} ({progress:.1f}% complete)")
|
||||||
except requests.exceptions.RequestException as e:
|
except requests.exceptions.RequestException as e:
|
||||||
if isinstance(e, requests.exceptions.HTTPError) and e.response.status_code == 404:
|
|
||||||
logger.debug(f"404 Not Found: {url}")
|
|
||||||
else:
|
|
||||||
logger.warning(f"Failed to download: {url}")
|
|
||||||
failed_downloads += 1
|
failed_downloads += 1
|
||||||
|
if isinstance(e, requests.exceptions.HTTPError):
|
||||||
|
error_summary[f"HTTP {e.response.status_code}"].append(url)
|
||||||
|
else:
|
||||||
|
error_summary["Other errors"].append(f"{url} - {str(e)}")
|
||||||
|
|
||||||
def main():
|
|
||||||
global total_urls
|
def read_input_file(file_path):
|
||||||
with open('encoded_file.txt', 'r', encoding='utf-8') as file:
|
with open(file_path, 'r', encoding='utf-8') as file:
|
||||||
content = file.read()
|
content = file.read()
|
||||||
|
|
||||||
|
if file_path.lower().endswith('.json'):
|
||||||
|
try:
|
||||||
|
json_data = json.loads(content)
|
||||||
|
if 'settings' in json_data:
|
||||||
|
content = json_data['settings']
|
||||||
|
else:
|
||||||
|
logger.warning("JSON file does not contain 'settings' key. Using raw content.")
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
logger.warning("Invalid JSON format. Using raw content.")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
decoded_content = base64.b64decode(content).decode('utf-8', errors='ignore')
|
decoded_content = base64.b64decode(content).decode('utf-8', errors='ignore')
|
||||||
except (base64.binascii.Error, UnicodeDecodeError):
|
except (base64.binascii.Error, UnicodeDecodeError):
|
||||||
logger.warning("Content is not valid base64 or couldn't be decoded. Using raw content.")
|
logger.warning("Content is not valid base64 or couldn't be decoded. Using raw content.")
|
||||||
decoded_content = content
|
decoded_content = content
|
||||||
|
|
||||||
|
return decoded_content
|
||||||
|
|
||||||
|
def get_input_file():
|
||||||
|
for ext in ['txt', 'json']:
|
||||||
|
filename = f'data.{ext}'
|
||||||
|
if os.path.exists(filename):
|
||||||
|
return filename
|
||||||
|
logger.error("No valid input file found. Please ensure 'data.txt' or 'data.json' exists.\nNote: If your filename is 'data.txt', only raw data from 'settings' key must be inside of it.")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def main():
|
||||||
|
global total_urls
|
||||||
|
|
||||||
|
input_file = get_input_file()
|
||||||
|
if not input_file:
|
||||||
|
return
|
||||||
|
|
||||||
|
decoded_content = read_input_file(input_file)
|
||||||
|
|
||||||
urls = extract_and_fix_urls(decoded_content)
|
urls = extract_and_fix_urls(decoded_content)
|
||||||
total_urls = len(urls)
|
total_urls = len(urls)
|
||||||
|
|
||||||
|
@ -170,9 +225,26 @@ def main():
|
||||||
logger.info(f"Failed downloads: {failed_downloads}")
|
logger.info(f"Failed downloads: {failed_downloads}")
|
||||||
logger.info(f"Success rate: {successful_downloads/total_urls*100:.1f}%")
|
logger.info(f"Success rate: {successful_downloads/total_urls*100:.1f}%")
|
||||||
|
|
||||||
|
# Print error summary
|
||||||
|
if error_summary:
|
||||||
|
logger.info("\n--- Error Summary ---")
|
||||||
|
for error_type, urls in error_summary.items():
|
||||||
|
logger.info(f"{error_type}: {len(urls)} occurences")
|
||||||
|
if error_type == "HTTP 404":
|
||||||
|
logger.info("Sample URLs (max 5):")
|
||||||
|
for url in urls[:5]:
|
||||||
|
logger.info(f" - {url}")
|
||||||
|
elif len(urls) <= 5:
|
||||||
|
for url in urls:
|
||||||
|
logger.info(f" - {url}")
|
||||||
|
else:
|
||||||
|
logger.info(f" (Showing first 5 of {len(urls)} errors)")
|
||||||
|
for url in urls[:5]:
|
||||||
|
logger.info(f" - {url}")
|
||||||
|
|
||||||
# Pause for 10 seconds
|
# Pause for 10 seconds
|
||||||
logger.info("\nScript finished. Closing in 10 seconds...")
|
logger.info("\nScript finished. Exiting in 10 seconds...")
|
||||||
time.sleep(10)
|
time.sleep(10)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
|
|
Loading…
Reference in a new issue