Spaces:
Running
Running
import os | |
import gradio as gr | |
from gradio import ChatMessage | |
from typing import Iterator, List, Dict, Tuple, Any | |
import google.generativeai as genai | |
from huggingface_hub import HfApi | |
import requests | |
import re | |
import traceback | |
# HuggingFace κ΄λ ¨ API ν€ (μ€νμ΄μ€ λΆμ μ©) | |
HF_TOKEN = os.getenv("HF_TOKEN") | |
hf_api = HfApi(token=HF_TOKEN) | |
# Gemini 2.0 Flash Thinking λͺ¨λΈ κ΄λ ¨ API ν€ λ° ν΄λΌμ΄μΈνΈ (LLM μ©) | |
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") | |
genai.configure(api_key=GEMINI_API_KEY) | |
model = genai.GenerativeModel("gemini-2.0-flash-thinking-exp-01-21") | |
def get_headers(): | |
if not HF_TOKEN: | |
raise ValueError("Hugging Face token not found in environment variables") | |
return {"Authorization": f"Bearer {HF_TOKEN}"} | |
def get_file_content(space_id: str, file_path: str) -> str: | |
file_url = f"https://huggingface.co./spaces/{space_id}/raw/main/{file_path}" | |
try: | |
response = requests.get(file_url, headers=get_headers()) | |
if response.status_code == 200: | |
return response.text | |
else: | |
return f"File not found or inaccessible: {file_path}" | |
except requests.RequestException: | |
return f"Error fetching content for file: {file_path}" | |
def get_space_structure(space_id: str) -> Dict: | |
try: | |
files = hf_api.list_repo_files(repo_id=space_id, repo_type="space") | |
tree = {"type": "directory", "path": "", "name": space_id, "children": []} | |
for file in files: | |
path_parts = file.split('/') | |
current = tree | |
for i, part in enumerate(path_parts): | |
if i == len(path_parts) - 1: # νμΌ | |
current["children"].append({"type": "file", "path": file, "name": part}) | |
else: | |
found = False | |
for child in current["children"]: | |
if child["type"] == "directory" and child["name"] == part: | |
current = child | |
found = True | |
break | |
if not found: | |
new_dir = {"type": "directory", "path": '/'.join(path_parts[:i+1]), "name": part, "children": []} | |
current["children"].append(new_dir) | |
current = new_dir | |
return tree | |
except Exception as e: | |
print(f"Error in get_space_structure: {str(e)}") | |
return {"error": f"API request error: {str(e)}"} | |
def format_tree_structure(tree_data: Dict, indent: str = "") -> str: | |
if "error" in tree_data: | |
return tree_data["error"] | |
formatted = f"{indent}{'π' if tree_data.get('type') == 'directory' else 'π'} {tree_data.get('name', 'Unknown')}\n" | |
if tree_data.get("type") == "directory": | |
for child in sorted(tree_data.get("children", []), key=lambda x: (x.get("type", "") != "directory", x.get("name", ""))): | |
formatted += format_tree_structure(child, indent + " ") | |
return formatted | |
def adjust_lines_for_code(code_content: str, min_lines: int = 10, max_lines: int = 100) -> int: | |
num_lines = len(code_content.split('\n')) | |
return min(max(num_lines, min_lines), max_lines) | |
def analyze_space(url: str, progress=gr.Progress()): | |
try: | |
space_id = url.split('spaces/')[-1] | |
if not re.match(r'^[\w.-]+/[\w.-]+$', space_id): | |
raise ValueError(f"Invalid Space ID format: {space_id}") | |
progress(0.1, desc="νμΌ κ΅¬μ‘° λΆμ μ€...") | |
tree_structure = get_space_structure(space_id) | |
if "error" in tree_structure: | |
raise ValueError(tree_structure["error"]) | |
tree_view = format_tree_structure(tree_structure) | |
progress(0.3, desc="app.py λ΄μ© κ°μ Έμ€λ μ€...") | |
app_content = get_file_content(space_id, "app.py") | |
progress(0.5, desc="μ½λ μμ½ μ€...") | |
summary = summarize_code(app_content) | |
progress(0.7, desc="μ½λ λΆμ μ€...") | |
analysis = analyze_code(app_content) | |
progress(0.9, desc="μ¬μ©λ² μ€λͺ μμ± μ€...") | |
usage = explain_usage(app_content) | |
lines_for_app_py = adjust_lines_for_code(app_content) | |
progress(1.0, desc="μλ£") | |
return app_content, tree_view, tree_structure, space_id, summary, analysis, usage, lines_for_app_py | |
except Exception as e: | |
print(f"Error in analyze_space: {str(e)}") | |
print(traceback.format_exc()) | |
return f"μ€λ₯κ° λ°μνμ΅λλ€: {str(e)}", "", None, "", "", "", "", 10 | |
# -------------------------------------------------- | |
# Gemini 2.0 Flash Thinking λͺ¨λΈ (LLM) ν¨μλ€ | |
# -------------------------------------------------- | |
from gradio import ChatMessage | |
def format_chat_history(messages: List[ChatMessage]) -> List[Dict]: | |
""" | |
ChatMessage λͺ©λ‘μ Gemini λͺ¨λΈμ΄ μ΄ν΄ν μ μλ νμμΌλ‘ λ³ν | |
(Thinking λ©νλ°μ΄ν°κ° μλ λ©μμ§λ 무μ) | |
""" | |
formatted = [] | |
for m in messages: | |
if hasattr(m, "metadata") and m.metadata: # 'Thinking' λ©μμ§λ 무μ | |
continue | |
role = "assistant" if m.role == "assistant" else "user" | |
formatted.append({"role": role, "parts": [m.content or ""]}) | |
return formatted | |
import google.generativeai as genai | |
def gemini_chat_completion(system_message: str, user_message: str, max_tokens: int = 200, temperature: float = 0.7) -> str: | |
init_msgs = [ | |
ChatMessage(role="system", content=system_message), | |
ChatMessage(role="user", content=user_message) | |
] | |
chat_history = format_chat_history(init_msgs) | |
chat = model.start_chat(history=chat_history) | |
final = "" | |
try: | |
for chunk in chat.send_message(user_message, stream=True): | |
parts = chunk.candidates[0].content.parts | |
if len(parts) == 2: | |
final += parts[1].text | |
else: | |
final += parts[0].text | |
return final.strip() | |
except Exception as e: | |
return f"LLM νΈμΆ μ€ μ€λ₯ λ°μ: {str(e)}" | |
def summarize_code(app_content: str): | |
system_msg = "λΉμ μ Python μ½λλ₯Ό λΆμνκ³ μμ½νλ AI μ‘°μμ λλ€. μ£Όμ΄μ§ μ½λλ₯Ό 3μ€ μ΄λ΄λ‘ κ°κ²°νκ² μμ½ν΄μ£ΌμΈμ." | |
user_msg = f"λ€μ Python μ½λλ₯Ό 3μ€ μ΄λ΄λ‘ μμ½ν΄μ£ΌμΈμ:\n\n{app_content}" | |
try: | |
return gemini_chat_completion(system_msg, user_msg, max_tokens=200, temperature=0.7) | |
except Exception as e: | |
return f"μμ½ μμ± μ€ μ€λ₯ λ°μ: {str(e)}" | |
def analyze_code(app_content: str): | |
system_msg = ( | |
"You are a deep thinking AI. You may use extremely long chains of thought to deeply consider the problem " | |
"and deliberate with yourself via systematic reasoning processes to help come to a correct solution prior to answering. " | |
"You should enclose your thoughts and internal monologue inside tags, and then provide your solution or response to the problem. " | |
"λΉμ μ Python μ½λλ₯Ό λΆμνλ AI μ‘°μμ λλ€. μ£Όμ΄μ§ μ½λλ₯Ό λΆμνμ¬ μλΉμ€μ ν¨μ©μ±κ³Ό νμ© μΈ‘λ©΄μμ λ€μ νλͺ©μ λν΄ μ€λͺ ν΄μ£ΌμΈμ:\n" | |
"A. λ°°κ²½ λ° νμμ±\n" | |
"B. κΈ°λ₯μ ν¨μ©μ± λ° κ°μΉ\n" | |
"C. νΉμ₯μ \n" | |
"D. μ μ© λμ λ° νκ²\n" | |
"E. κΈ°λν¨κ³Ό\n" | |
"κΈ°μ‘΄ λ° μ μ¬ νλ‘μ νΈμ λΉκ΅νμ¬ λΆμν΄μ£ΌμΈμ. Markdown νμμΌλ‘ μΆλ ₯νμΈμ." | |
) | |
user_msg = f"λ€μ Python μ½λλ₯Ό λΆμν΄μ£ΌμΈμ:\n\n{app_content}" | |
try: | |
return gemini_chat_completion(system_msg, user_msg, max_tokens=1000, temperature=0.7) | |
except Exception as e: | |
return f"λΆμ μμ± μ€ μ€λ₯ λ°μ: {str(e)}" | |
def explain_usage(app_content: str): | |
system_msg = ( | |
"You are a deep thinking AI. You may use extremely long chains of thought to deeply consider the problem " | |
"and deliberate with yourself via systematic reasoning processes to help come to a correct solution prior to answering. " | |
"You should enclose your thoughts and internal monologue inside tags, and then provide your solution or response to the problem. " | |
"λΉμ μ Python μ½λλ₯Ό λΆμνμ¬ μ¬μ©λ²μ μ€λͺ νλ AI μ‘°μμ λλ€. μ£Όμ΄μ§ μ½λλ₯Ό λ°νμΌλ‘ λ§μΉ νλ©΄μ 보λ κ²μ²λΌ μ¬μ©λ²μ μμΈν μ€λͺ ν΄μ£ΌμΈμ. Markdown νμμΌλ‘ μΆλ ₯νμΈμ." | |
) | |
user_msg = f"λ€μ Python μ½λλ₯Ό μ¬μ©λ²μ μ€λͺ ν΄μ£ΌμΈμ:\n\n{app_content}" | |
try: | |
return gemini_chat_completion(system_msg, user_msg, max_tokens=800, temperature=0.7) | |
except Exception as e: | |
return f"μ¬μ©λ² μ€λͺ μμ± μ€ μ€λ₯ λ°μ: {str(e)}" | |
def stream_gemini_response(user_message: str, conversation_state: List[ChatMessage]) -> Iterator[List[ChatMessage]]: | |
""" | |
Geminiμ μ€νΈλ¦¬λ° μμ². | |
- user_messageκ° λΉμ΄ μμΌλ©΄, μ΅μνμ μλ΄ λ©μμ§λ₯Ό assistantλ‘ μΆκ°νκ³ yield ν μ’ λ£ | |
""" | |
if not user_message.strip(): | |
# λΉ μ λ ₯ μ²λ¦¬: μλ΄ λ©μμ§ νμ | |
conversation_state.append( | |
ChatMessage( | |
role="assistant", | |
content="μ λ ₯μ΄ μμ΅λλ€. μ§λ¬Έμ μμ±ν΄μ£ΌμΈμ!" | |
) | |
) | |
yield conversation_state | |
return | |
print(f"\n=== New Request ===\nUser message: {user_message}") | |
chat_history = format_chat_history(conversation_state) | |
chat = model.start_chat(history=chat_history) | |
response = chat.send_message(user_message, stream=True) | |
thought_buffer = "" | |
response_buffer = "" | |
thinking_complete = False | |
conversation_state.append( | |
ChatMessage( | |
role="assistant", | |
content="", | |
metadata={"title": "βοΈ Thinking: *The thoughts produced by the model are experimental"} | |
) | |
) | |
try: | |
for chunk in response: | |
parts = chunk.candidates[0].content.parts | |
current_chunk = parts[0].text | |
if len(parts) == 2 and not thinking_complete: | |
thought_buffer += current_chunk | |
print(f"\n=== Complete Thought ===\n{thought_buffer}") | |
conversation_state[-1] = ChatMessage( | |
role="assistant", | |
content=thought_buffer, | |
metadata={"title": "βοΈ Thinking: *The thoughts produced by the model are experimental"} | |
) | |
yield conversation_state | |
response_buffer = parts[1].text | |
print(f"\n=== Starting Response ===\n{response_buffer}") | |
conversation_state.append( | |
ChatMessage(role="assistant", content=response_buffer) | |
) | |
thinking_complete = True | |
elif thinking_complete: | |
response_buffer += current_chunk | |
print(f"\n=== Response Chunk ===\n{current_chunk}") | |
conversation_state[-1] = ChatMessage( | |
role="assistant", | |
content=response_buffer | |
) | |
else: | |
thought_buffer += current_chunk | |
print(f"\n=== Thinking Chunk ===\n{current_chunk}") | |
conversation_state[-1] = ChatMessage( | |
role="assistant", | |
content=thought_buffer, | |
metadata={"title": "βοΈ Thinking: *The thoughts produced by the model are experimental"} | |
) | |
yield conversation_state | |
print(f"\n=== Final Response ===\n{response_buffer}") | |
except Exception as e: | |
print(f"\n=== Error ===\n{str(e)}") | |
conversation_state.append( | |
ChatMessage( | |
role="assistant", | |
content=f"I apologize, but encountered an error: {str(e)}" | |
) | |
) | |
yield conversation_state | |
def convert_for_messages_format(messages: List[ChatMessage]) -> List[Dict[str, str]]: | |
""" | |
ChatMessage 리μ€νΈ -> [{"role":"assistant"/"user", "content":"..."}] | |
""" | |
output = [] | |
for msg in messages: | |
output.append({"role": msg.role, "content": msg.content}) | |
return output | |
def user_submit_message(msg: str, conversation_state: List[ChatMessage]): | |
conversation_state.append(ChatMessage(role="user", content=msg)) | |
return "", conversation_state | |
def respond_wrapper(message: str, conversation_state: List[ChatMessage], max_tokens, temperature, top_p): | |
# λ§μ§λ§ μ¬μ©μ λ©μμ§ κ°μ Έμ€κΈ° | |
last_user_message = "" | |
for msg in reversed(conversation_state): | |
if msg.role == "user": | |
last_user_message = msg.content | |
break | |
# λ§μ§λ§ μ¬μ©μ λ©μμ§λ‘ μλ΅ μμ± | |
for updated_messages in stream_gemini_response(last_user_message, conversation_state): | |
yield "", convert_for_messages_format(updated_messages) | |
def create_ui(): | |
try: | |
css = """ | |
footer {visibility: hidden;} | |
""" | |
with gr.Blocks(css=css) as demo: | |
gr.Markdown("# MOUSE: Space Research Thinking") | |
with gr.Tabs(): | |
with gr.TabItem("λΆμ"): | |
with gr.Row(): | |
with gr.Column(): | |
url_input = gr.Textbox(label="HuggingFace Space URL") | |
analyze_button = gr.Button("λΆμ") | |
summary_output = gr.Markdown(label="μμ½") | |
analysis_output = gr.Markdown(label="λΆμ") | |
usage_output = gr.Markdown(label="μ¬μ©λ²") | |
tree_view_output = gr.Textbox(label="νμΌ κ΅¬μ‘°", lines=20) | |
with gr.Column(): | |
code_tabs = gr.Tabs() | |
with code_tabs: | |
with gr.TabItem("app.py"): | |
app_py_content = gr.Code( | |
language="python", | |
label="app.py", | |
lines=50 | |
) | |
with gr.TabItem("requirements.txt"): | |
requirements_content = gr.Textbox( | |
label="requirements.txt", | |
lines=50 | |
) | |
with gr.TabItem("AI μ½λμ±"): | |
gr.Markdown("## μμ λ₯Ό μ λ ₯ λλ μμ€ μ½λλ₯Ό λΆμ¬λ£κ³ μ§λ¬ΈνμΈμ") | |
# Chatbot: type="messages" | |
chatbot = gr.Chatbot( | |
label="λν", | |
height=400, | |
type="messages" | |
) | |
msg = gr.Textbox( | |
label="λ©μμ§", | |
placeholder="λ©μμ§λ₯Ό μ λ ₯νμΈμ..." | |
) | |
max_tokens = gr.Slider( | |
minimum=1, maximum=8000, | |
value=4000, label="Max Tokens", | |
visible=False | |
) | |
temperature = gr.Slider( | |
minimum=0, maximum=1, | |
value=0.7, label="Temperature", | |
visible=False | |
) | |
top_p = gr.Slider( | |
minimum=0, maximum=1, | |
value=0.9, label="Top P", | |
visible=False | |
) | |
examples = [ | |
["μμΈν μ¬μ© λ°©λ²μ 4000 ν ν° μ΄μ μμΈν μ€λͺ "], | |
["FAQ 20건μ 4000 ν ν° μ΄μ μμ±"], | |
["κΈ°μ μ°¨λ³μ , κ°μ μ μ€μ¬μΌλ‘ 4000 ν ν° μ΄μ μ€λͺ "], | |
["νΉν μΆμμ νμ© κ°λ₯ν νμ μμ΄λμ΄λ₯Ό 4000 ν ν° μ΄μ μμ±"], | |
["λ Όλ¬Έ νμμΌλ‘ 4000 ν ν° μ΄μ μμ±"], | |
["κ³μ μ΄μ΄μ λ΅λ³νλΌ"] | |
] | |
gr.Examples(examples, inputs=msg) | |
conversation_state = gr.State([]) | |
msg.submit( | |
user_submit_message, | |
inputs=[msg, conversation_state], | |
outputs=[msg, conversation_state], | |
queue=False | |
).then( | |
respond_wrapper, | |
inputs=[msg, conversation_state, max_tokens, temperature, top_p], | |
outputs=[msg, chatbot], | |
) | |
with gr.TabItem("Recommended Best"): | |
gr.Markdown( | |
"Discover recommended HuggingFace Spaces [here](https://huggingface.co./spaces/openfree/Korean-Leaderboard)." | |
) | |
# λΆμ ν λ‘μ§ | |
space_id_state = gr.State() | |
tree_structure_state = gr.State() | |
app_py_content_lines = gr.State() | |
analyze_button.click( | |
analyze_space, | |
inputs=[url_input], | |
outputs=[ | |
app_py_content, | |
tree_view_output, | |
tree_structure_state, | |
space_id_state, | |
summary_output, | |
analysis_output, | |
usage_output, | |
app_py_content_lines | |
] | |
).then( | |
lambda space_id: get_file_content(space_id, "requirements.txt"), | |
inputs=[space_id_state], | |
outputs=[requirements_content] | |
).then( | |
lambda lines: gr.update(lines=lines), | |
inputs=[app_py_content_lines], | |
outputs=[app_py_content] | |
) | |
return demo | |
except Exception as e: | |
print(f"Error in create_ui: {str(e)}") | |
print(traceback.format_exc()) | |
raise | |
if __name__ == "__main__": | |
try: | |
print("Starting HuggingFace Space Analyzer...") | |
demo = create_ui() | |
print("UI created successfully.") | |
print("Configuring Gradio queue...") | |
demo.queue() | |
print("Gradio queue configured.") | |
print("Launching Gradio app...") | |
demo.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
share=False, | |
debug=True, | |
show_api=False | |
) | |
print("Gradio app launched successfully.") | |
except Exception as e: | |
print(f"Error in main: {str(e)}") | |
print("Detailed error information:") | |
print(traceback.format_exc()) | |
raise |