seawolf2357's picture
Update app.py
f53f3e8 verified
raw
history blame
9.97 kB
import discord
import logging
import os
import re
import asyncio
import subprocess
import aiohttp
import time
from huggingface_hub import InferenceClient
from googleapiclient.discovery import build
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound
from youtube_transcript_api.formatters import TextFormatter
from dotenv import load_dotenv
from pytube import YouTube
import whisper
import torch
from transformers import AutoProcessor, AutoModelForSpeechSeq2Seq
import librosa
# ν™˜κ²½ λ³€μˆ˜ λ‘œλ“œ
load_dotenv()
# λ‘œκΉ… μ„€μ •
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s:%(levelname)s:%(name)s:%(message)s', handlers=[logging.StreamHandler()])
# μΈν…νŠΈ μ„€μ •
intents = discord.Intents.default()
intents.message_content = True
intents.messages = True
intents.guilds = True
intents.guild_messages = True
# μΆ”λ‘  API ν΄λΌμ΄μ–ΈνŠΈ μ„€μ •
hf_client = InferenceClient("CohereForAI/c4ai-command-r-plus-08-2024", token=os.getenv("HF_TOKEN"))
whisper_client = InferenceClient("openai/whisper-large-v3", token=os.getenv("HF_TOKEN"))
# YouTube API μ„€μ •
API_KEY = os.getenv("YOUTUBE_API_KEY")
youtube_service = build('youtube', 'v3', developerKey=API_KEY)
# νŠΉμ • 채널 ID
SPECIFIC_CHANNEL_ID = int(os.getenv("DISCORD_CHANNEL_ID"))
# 전솑 μ‹€νŒ¨ μ‹œ μž¬μ‹œλ„ 횟수
MAX_RETRIES = 3
class MyClient(discord.Client):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.is_processing = False
self.session = None
async def on_ready(self):
logging.info(f'{self.user}둜 λ‘œκ·ΈμΈλ˜μ—ˆμŠ΅λ‹ˆλ‹€!')
# web.py 파일 μ‹€ν–‰
subprocess.Popen(["python", "web.py"])
logging.info("Web.py μ„œλ²„κ°€ μ‹œμž‘λ˜μ—ˆμŠ΅λ‹ˆλ‹€.")
# aiohttp ν΄λΌμ΄μ–ΈνŠΈ μ„Έμ…˜ 생성
self.session = aiohttp.ClientSession()
# 봇이 μ‹œμž‘λ  λ•Œ μ•ˆλ‚΄ λ©”μ‹œμ§€λ₯Ό 전솑
channel = self.get_channel(SPECIFIC_CHANNEL_ID)
if channel:
await channel.send("유튜브 λΉ„λ””μ˜€ URL을 μž…λ ₯ν•˜λ©΄, μžλ§‰κ³Ό λŒ“κΈ€μ„ 기반으둜 닡글을 μž‘μ„±ν•©λ‹ˆλ‹€.")
async def on_message(self, message):
if message.author == self.user:
return
if not self.is_message_in_specific_channel(message):
return
if self.is_processing:
return
self.is_processing = True
try:
video_id = extract_video_id(message.content)
if video_id:
transcript, language = await get_best_available_transcript(video_id)
comments = await get_video_comments(video_id)
if comments:
if transcript:
replies = await generate_replies(comments, transcript)
await create_thread_and_send_replies(message, video_id, comments, replies, self.session)
else:
await message.channel.send("μžλ§‰μ„ κ°€μ Έμ˜¬ 수 μ—†μŠ΅λ‹ˆλ‹€. Whisper λͺ¨λΈμ„ μ‚¬μš©ν•˜μ—¬ μžλ§‰μ„ μƒμ„±ν•©λ‹ˆλ‹€.")
transcript = await generate_whisper_transcript(video_id)
if transcript:
replies = await generate_replies(comments, transcript)
await create_thread_and_send_replies(message, video_id, comments, replies, self.session)
else:
await message.channel.send("Whisper λͺ¨λΈλ‘œλ„ μžλ§‰μ„ 생성할 수 μ—†μŠ΅λ‹ˆλ‹€. λŒ“κΈ€λ§Œμ„ 기반으둜 닡변을 μƒμ„±ν•©λ‹ˆλ‹€.")
replies = await generate_replies(comments, "")
await create_thread_and_send_replies(message, video_id, comments, replies, self.session)
else:
await message.channel.send("λŒ“κΈ€μ„ κ°€μ Έμ˜¬ 수 μ—†μŠ΅λ‹ˆλ‹€.")
else:
await message.channel.send("μœ νš¨ν•œ 유튜브 λΉ„λ””μ˜€ URL을 μ œκ³΅ν•΄ μ£Όμ„Έμš”.")
finally:
self.is_processing = False
def is_message_in_specific_channel(self, message):
return message.channel.id == SPECIFIC_CHANNEL_ID or (
isinstance(message.channel, discord.Thread) and message.channel.parent_id == SPECIFIC_CHANNEL_ID
)
async def close(self):
# aiohttp ν΄λΌμ΄μ–ΈνŠΈ μ„Έμ…˜ μ’…λ£Œ
if self.session:
await self.session.close()
await super().close()
def extract_video_id(url):
video_id = None
youtube_regex = (
r'(https?://)?(www\.)?'
'(youtube|youtu|youtube-nocookie)\.(com|be)/'
'(watch\?v=|embed/|v/|.+\?v=)?([^&=%\?]{11})')
match = re.match(youtube_regex, url)
if match:
video_id = match.group(6)
logging.debug(f'μΆ”μΆœλœ λΉ„λ””μ˜€ ID: {video_id}')
return video_id
async def get_best_available_transcript(video_id, max_retries=5, delay=10):
async def fetch_transcript(language):
try:
transcript = await asyncio.to_thread(YouTubeTranscriptApi.get_transcript, video_id, languages=[language])
return transcript, language
except (NoTranscriptFound, TranscriptsDisabled):
logging.warning(f'{language} μžλ§‰μ΄ μ œκ³΅λ˜μ§€ μ•ŠμŒ.')
return None, None
except Exception as e:
logging.warning(f'{language} μžλ§‰ κ°€μ Έμ˜€κΈ° 였λ₯˜: {e}')
return None, None
for attempt in range(max_retries):
# μš°μ„  ν•œκ΅­μ–΄ μžλ§‰μ„ μ‹œλ„
ko_transcript, ko_lang = await fetch_transcript('ko')
if ko_transcript:
return ko_transcript, ko_lang
# μ˜μ–΄ μžλ§‰ μ‹œλ„
en_transcript, en_lang = await fetch_transcript('en')
if en_transcript:
return en_transcript, en_lang
try:
# λΉ„λ””μ˜€μ— μžλ§‰ λͺ©λ‘μ΄ μžˆλŠ”μ§€ 확인
transcripts = await asyncio.to_thread(YouTubeTranscriptApi.list_transcripts, video_id)
# μˆ˜λ™μœΌλ‘œ μƒμ„±λœ μžλ§‰μ΄ μžˆλŠ”μ§€ 확인
manual_transcript = transcripts.find_manually_created_transcript(['ko', 'en'])
transcript = await asyncio.to_thread(manual_transcript.fetch)
return transcript, manual_transcript.language_code
except (NoTranscriptFound, TranscriptsDisabled) as e:
logging.warning(f'μˆ˜λ™ μžλ§‰μ„ 찾을 수 μ—†μŒ: {e}')
except Exception as e:
if attempt < max_retries - 1:
logging.error(f'μžλ§‰ κ°€μ Έμ˜€κΈ° μ‹€νŒ¨ (μ‹œλ„ {attempt + 1}/{max_retries}): {e}')
await asyncio.sleep(delay)
else:
logging.error(f'μ΅œμ’… μžλ§‰ κ°€μ Έμ˜€κΈ° μ‹€νŒ¨: {e}')
return None, None
return None, None
async def generate_whisper_transcript(video_id):
try:
# YouTube λΉ„λ””μ˜€ λ‹€μš΄λ‘œλ“œ
yt = YouTube(f'https://www.youtube.com/watch?v={video_id}')
audio_stream = yt.streams.filter(only_audio=True).first()
audio_file = audio_stream.download(output_path='temp', filename=f'{video_id}.mp3')
# μ˜€λ””μ˜€ 파일 λ‘œλ“œ
audio, sr = librosa.load(audio_file, sr=16000)
# Whisper λͺ¨λΈ 및 ν”„λ‘œμ„Έμ„œ λ‘œλ“œ
device = "cuda" if torch.cuda.is_available() else "cpu"
processor = AutoProcessor.from_pretrained("openai/whisper-large-v3")
model = AutoModelForSpeechSeq2Seq.from_pretrained("openai/whisper-large-v3").to(device)
# μ˜€λ””μ˜€ 처리
input_features = processor(audio, sampling_rate=sr, return_tensors="pt").input_features.to(device)
# 생성
predicted_ids = model.generate(input_features)
transcription = processor.batch_decode(predicted_ids, skip_special_tokens=True)
# μž„μ‹œ 파일 μ‚­μ œ
os.remove(audio_file)
return transcription[0]
except Exception as e:
logging.error(f'Whisper μžλ§‰ 생성 μ‹€νŒ¨: {e}')
return None
async def get_video_comments(video_id):
comments = []
response = youtube_service.commentThreads().list(
part='snippet',
videoId=video_id,
maxResults=100 # μ΅œλŒ€ 100개의 λŒ“κΈ€ κ°€μ Έμ˜€κΈ°
).execute()
for item in response.get('items', []):
comment = item['snippet']['topLevelComment']['snippet']['textOriginal']
comment_id = item['snippet']['topLevelComment']['id']
comments.append((comment, comment_id))
logging.debug(f'κ°€μ Έμ˜¨ λŒ“κΈ€: {comments}')
return comments
async def generate_replies(comments, transcript):
replies = []
for comment, _ in comments:
messages = [
{"role": "system", "content": f"""λ„ˆμ˜ 이름은 OpenFreeAI이닀. λ‹΅κΈ€ 생성후 κ°€μž₯ λ§ˆμ§€λ§‰μ— λ„ˆμ˜ 이름을 밝히고 κ³΅μ†ν•˜κ²Œ μΈμ‚¬ν•˜λΌ. λΉ„λ””μ˜€ μžλ§‰: {transcript}"""},
{"role": "user", "content": comment}
]
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(None, lambda: hf_client.chat_completion(
messages, max_tokens=250, temperature=0.7, top_p=0.85))
if response.choices and response.choices[0].message:
reply = response.choices[0].message['content'].strip()
else:
reply = "닡글을 생성할 수 μ—†μŠ΅λ‹ˆλ‹€."
replies.append(reply)
logging.debug(f'μƒμ„±λœ λ‹΅κΈ€: {replies}')
return replies
async def create_thread_and_send_replies(message, video_id, comments, replies, session):
thread = await message.channel.create_thread(name=f"{message.author.name}의 λŒ“κΈ€ λ‹΅κΈ€", message=message)
for (comment, comment_id), reply in zip(comments, replies):
embed = discord.Embed(description=f"**λŒ“κΈ€**: {comment}\n**λ‹΅κΈ€**: {reply}")
await thread.send(embed=embed)
if __name__ == "__main__":
discord_client = MyClient(intents=intents)
discord_client.run(os.getenv('DISCORD_TOKEN'))