SB-Test-v2 / utils.py
Severian's picture
Update utils.py
f2066e7 verified
raw
history blame contribute delete
No virus
22.5 kB
import re
from typing import Dict, List, Optional, Any
from pydantic import ValidationError
from data_models import IdeaForm, IDEA_STAGES
from config import API_TOKEN, STAGES
from huggingface_hub import InferenceClient
from duckduckgo_search import DDGS
from sqlalchemy import create_engine, delete, inspect, Column, Integer, String, Text
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.ext.declarative import declarative_base
import sys
import sqlite3
from sqlalchemy.exc import OperationalError, PendingRollbackError
import logging
import os
# Database configuration
DATABASE_URL = os.environ.get('DATABASE_URL', 'sqlite:////tmp/innovative_ideas.db')
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
class InnovativeIdea(Base):
__tablename__ = "innovative_ideas"
id = Column(Integer, primary_key=True, index=True)
idea_name = Column(String, index=True)
idea_overview = Column(Text)
problem_solution = Column(Text)
target_customers = Column(Text)
market_value = Column(Text)
existing_solutions = Column(Text)
unique_value = Column(Text)
technical_challenges = Column(Text)
legal_barriers = Column(Text)
data_dependencies = Column(Text)
team_roles = Column(Text)
timeline = Column(Text)
additional_info = Column(Text)
def get_llm_response(context: str, model: str, max_tokens: int, api_key: Optional[str] = None, stream: bool = False):
client = InferenceClient(model=model, token=api_key or API_TOKEN)
messages = [
{"role": "system", "content": "You are a helpful AI assistant."},
{"role": "user", "content": context}
]
response = client.chat_completion(
messages=messages,
max_tokens=max_tokens,
temperature=0.7,
top_p=0.95,
stream=False,
)
full_response = response.choices[0].message.content
print(full_response) # Print the full response
return full_response
def extract_form_data(llm_response: str) -> Dict[str, Any]:
form_data = {}
# Define patterns for each field
patterns = {stage["field"]: rf'{stage["name"]}:?\s*(.*?)(?:\n\n|\Z)' for stage in STAGES}
# Extract data for each field
for field, pattern in patterns.items():
match = re.search(pattern, llm_response, re.DOTALL | re.IGNORECASE)
if match:
form_data[field] = match.group(1).strip()
# Special handling for team_roles (convert to list)
if 'team_roles' in form_data:
form_data['team_roles'] = [role.strip() for role in form_data['team_roles'].split(',')]
return form_data
def perform_web_search(query: str, max_results: int = 3) -> str:
with DDGS() as ddgs:
results = list(ddgs.text(query, max_results=max_results))
formatted_results = ""
for i, result in enumerate(results, 1):
formatted_results += f"{i}. {result['title']}\n {result['body']}\n URL: {result['href']}\n\n"
return formatted_results
def optimize_search_query(query: str, model: str) -> str:
client = InferenceClient(model=model, token=API_TOKEN)
prompt = f"Optimize the following search query to get better web search results:\n\n{query}\n\nOptimized query:"
response = client.text_generation(prompt, max_new_tokens=50, temperature=0.7)
return response.strip()
def refine_user_input(user_input: str, current_stage: str, model: str) -> str:
client = InferenceClient(model=model, token=API_TOKEN)
prompt = f"""
Refine and clarify the following user input for the '{current_stage}' stage of an innovative idea:
{user_input}
Please provide a clear, concise, and well-structured version of this input.
Ensure that the refined input is specific to the current stage and does not include information for other stages.
"""
messages = [
{"role": "system", "content": "You are a helpful AI assistant specializing in refining innovative ideas."},
{"role": "user", "content": prompt}
]
full_response = ""
for message in client.chat_completion(
messages=messages,
max_tokens=200,
temperature=0.7,
stream=True,
):
content = message.choices[0].delta.content
if content:
full_response += content
return full_response.strip()
def update_idea_form(form: IdeaForm, new_data: Dict) -> Optional[IdeaForm]:
"""
Updates the IdeaForm with new data and validates it.
"""
try:
updated_data = form.dict()
updated_data.update(new_data)
return IdeaForm(**updated_data)
except ValidationError as e:
print(f"Validation error: {e}")
return None
def update_checklist(idea_form: IdeaForm) -> str:
"""
Generates a checklist based on the current state of the idea form.
"""
checklist = []
for field, value in idea_form.dict().items():
if not value:
checklist.append(f"[ ] Complete the '{field.replace('_', ' ').title()}' section")
elif isinstance(value, str) and len(value) < 50:
checklist.append(f"[?] Consider expanding the '{field.replace('_', ' ').title()}' section")
else:
checklist.append(f"[x] '{field.replace('_', ' ').title()}' section completed")
return "\n".join(checklist)
def generate_follow_up_questions(idea_form: IdeaForm, current_stage: int) -> List[str]:
"""
Generates follow-up questions based on the current stage and form state.
"""
questions = []
current_stage_name = IDEA_STAGES[current_stage]
form_dict = idea_form.dict()
if current_stage_name == "Idea Name":
if not form_dict['idea_name']:
questions.append("What would be a catchy and memorable name for your idea?")
elif len(form_dict['idea_name']) > 50:
questions.append("Can you shorten your idea name to make it more concise?")
elif current_stage_name == "Idea Overview":
if not form_dict['idea_overview']:
questions.append("Can you provide a brief overview of your idea in simple terms?")
elif len(form_dict['idea_overview']) < 100:
questions.append("Can you elaborate more on how your idea works?")
elif current_stage_name == "Problem Solution":
if not form_dict['problem_solution']:
questions.append("What specific problem does your idea solve?")
if 'how' not in form_dict['problem_solution'].lower():
questions.append("How exactly does your idea solve the problem you've identified?")
elif current_stage_name == "Target Customers":
if not form_dict['target_customers']:
questions.append("Who are the main users or customers for your idea?")
elif len(form_dict['target_customers']) < 50:
questions.append("Can you be more specific about your target customers? Consider demographics, industries, or use cases.")
elif current_stage_name == "Market Value":
if not form_dict['market_value']:
questions.append("What industries or markets could benefit from your idea?")
if 'size' not in form_dict['market_value'].lower():
questions.append("Can you estimate the size of the market for your idea?")
elif current_stage_name == "Existing Solutions":
if not form_dict['existing_solutions']:
questions.append("Are there any existing products or services similar to your idea?")
elif 'different' not in form_dict['existing_solutions'].lower():
questions.append("How is your idea different from these existing solutions?")
elif current_stage_name == "Unique Value":
if not form_dict['unique_value']:
questions.append("What makes your idea unique or better than existing solutions?")
if 'why' not in form_dict['unique_value'].lower():
questions.append("Why would customers choose your solution over alternatives?")
elif current_stage_name == "Technical Challenges":
if not form_dict['technical_challenges']:
questions.append("What are the main technical challenges in developing your idea?")
if 'overcome' not in form_dict['technical_challenges'].lower():
questions.append("How might these technical challenges be overcome?")
elif current_stage_name == "Legal Barriers":
if not form_dict['legal_barriers']:
questions.append("Are there any legal or regulatory issues that could affect your idea?")
if 'address' not in form_dict['legal_barriers'].lower():
questions.append("How might you address these legal or regulatory issues?")
elif current_stage_name == "Data Dependencies":
if not form_dict['data_dependencies']:
questions.append("Does your idea rely on any specific data sources?")
if 'acquire' not in form_dict['data_dependencies'].lower():
questions.append("How will you acquire or generate the necessary data for your idea?")
elif current_stage_name == "Team Roles":
if not form_dict['team_roles']:
questions.append("What key roles or expertise are needed to develop and implement your idea?")
elif len(form_dict['team_roles']) < 3:
questions.append("Are there any additional roles or skills that would be beneficial for your team?")
elif current_stage_name == "Timeline":
if not form_dict['timeline']:
questions.append("Can you provide a rough timeline for developing your idea?")
if 'milestone' not in form_dict['timeline'].lower():
questions.append("What are the main milestones in your development timeline?")
elif current_stage_name == "Additional Info":
if not form_dict['additional_info']:
questions.append("Is there any additional information that's important for understanding or implementing your idea?")
elif len(form_dict['additional_info']) < 100:
questions.append("Are there any other aspects of your idea that you'd like to elaborate on?")
return questions
def update_idea_in_database(idea_id: int, form: IdeaForm, db: Session) -> bool:
try:
db_idea = db.query(InnovativeIdea).filter(InnovativeIdea.id == idea_id).first()
if db_idea:
for field, value in form.dict(exclude_unset=True).items():
if field == 'team_roles' and isinstance(value, list):
value = ','.join(value) # Convert list to comma-separated string
setattr(db_idea, field, value)
db.commit()
return True
except Exception as e:
db.rollback()
logging.error(f"Error updating idea in database: {str(e)}")
return False
def save_idea_to_database(form: IdeaForm, db: Session) -> Optional[int]:
try:
db_idea = InnovativeIdea(
idea_name=form.idea_name,
idea_overview=form.idea_overview,
problem_solution=form.problem_solution,
target_customers=form.target_customers,
market_value=form.market_value,
existing_solutions=form.existing_solutions,
unique_value=form.unique_value,
technical_challenges=form.technical_challenges,
legal_barriers=form.legal_barriers,
data_dependencies=form.data_dependencies,
team_roles=','.join(form.team_roles) if form.team_roles else '', # Convert list to string
timeline=form.timeline,
additional_info=form.additional_info
)
db.add(db_idea)
db.commit()
db.refresh(db_idea)
return db_idea.id
except Exception as e:
db.rollback()
logging.error(f"Error saving idea to database: {str(e)}")
return None
def load_idea_from_database(idea_id: int, db: Session) -> Optional[IdeaForm]:
db_idea = db.query(InnovativeIdea).filter(InnovativeIdea.id == idea_id).first()
if db_idea:
return IdeaForm(
idea_name=db_idea.idea_name,
idea_overview=db_idea.idea_overview,
problem_solution=db_idea.problem_solution,
target_customers=db_idea.target_customers,
market_value=db_idea.market_value,
existing_solutions=db_idea.existing_solutions,
unique_value=db_idea.unique_value,
technical_challenges=db_idea.technical_challenges,
legal_barriers=db_idea.legal_barriers,
data_dependencies=db_idea.data_dependencies,
team_roles=db_idea.team_roles.split(',') if db_idea.team_roles else [], # Convert string back to list
timeline=db_idea.timeline,
additional_info=db_idea.additional_info
)
return None
def extract_search_query(message: str) -> Optional[str]:
"""
Extract a search query from a user message if it starts with '@'.
"""
if message.startswith('@'):
return message[1:].strip()
return None
def create_tables():
Base.metadata.create_all(bind=engine)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
def ensure_database_exists(engine):
try:
if DATABASE_URL.startswith('sqlite'):
db_path = DATABASE_URL.replace("sqlite:///", "")
os.makedirs(os.path.dirname(db_path), exist_ok=True)
if not os.path.exists(db_path):
logging.info(f"Database file not found. Creating new database at {db_path}")
open(db_path, 'a').close() # Create an empty file
else:
logging.info(f"Database file found at {db_path}")
# Create tables
Base.metadata.create_all(bind=engine)
logging.info("Database tables created successfully.")
return True
except Exception as e:
logging.error(f"Failed to ensure database exists: {str(e)}")
return False
def initialize_database(engine):
try:
# Create tables if they don't exist
Base.metadata.create_all(bind=engine)
logging.info("Database tables created successfully.")
# Check if the table is empty
db = SessionLocal()
try:
existing_idea = db.query(InnovativeIdea).first()
if existing_idea is None:
# Create an empty IdeaForm
empty_form = IdeaForm()
# Save the empty form to the database
db_idea = InnovativeIdea(**empty_form.dict())
db.add(db_idea)
db.commit()
db.refresh(db_idea)
logging.info("Empty idea form added to the database.")
else:
logging.info("Database already contains data.")
except Exception as e:
db.rollback()
logging.error(f"Error querying or inserting data: {str(e)}")
return False
finally:
db.close()
return True
except Exception as e:
logging.error(f"Failed to initialize database: {str(e)}")
return False
def verify_database(engine):
try:
inspector = inspect(engine)
if not inspector.has_table("innovative_ideas"):
logging.error("InnovativeIdea table does not exist")
return False
columns = inspector.get_columns("innovative_ideas")
expected_columns = [field for field in IdeaForm.__fields__] + ['id']
actual_columns = [column['name'] for column in columns]
if set(expected_columns) != set(actual_columns):
logging.error(f"Mismatch in table columns. Expected: {expected_columns}, Actual: {actual_columns}")
return False
# Check if there's at least one row in the table
db = SessionLocal()
try:
idea_count = db.query(InnovativeIdea).count()
if idea_count == 0:
logging.warning("No ideas found in the database")
# Instead of returning False, we'll create an initial idea
initialize_database(engine)
except Exception as e:
logging.error(f"Error querying database: {str(e)}")
return False
finally:
db.close()
return True
except Exception as e:
logging.error(f"Database verification failed: {str(e)}", exc_info=True)
return False
def init_db():
Base.metadata.create_all(bind=engine)
def get_example_idea() -> Dict[str, str]:
"""
Returns an example idea to guide users and the LLM.
"""
return {
"idea_name": "Sharks with Laserbeams",
"idea_overview": "My idea is to attach laserbeams to sharks to create the ultimate evil weapon. The laserbeams are powered by a battery that are recharged by the shark's swimming motion. The laserbeams are used to attack enemy submarines.",
"problem_solution": "Our customers' secret lairs are often attacked by submarines. If our clients had sharks with laserbeams they could attack and destroy the submarines, saving the secret lair from being discovered.",
"target_customers": "Evil geniuses, tyrannical dictators, rogue government agencies",
"market_value": "Defense Contractors, submarine manufacturers",
"existing_solutions": "Nuclear Attack submarines and giant squids are somewhat similar to Shark with Laserbeams.",
"unique_value": "Shark with Laserbeams are cheaper than Nuclear Attack submarines and the lasers are more powerful than squid tentacles.",
"technical_challenges": "We would need to design a waterproof housing for the laser and batteries. We might need to experiment with how powerful lasers need to be when underwater in order to melt submarine steel hulls. We need to explore how to capture sharks and install the waterproof harnesses for the lasers.",
"legal_barriers": "We will need to obtain a weapons export license if we want to sell outside the United States.",
"data_dependencies": "We might need to get data on underwater motion vectors for creating the aiming mechanism for the lasers.",
"team_roles": "Marine biologists, fishermen, electrical engineers, optical engineers, software engineers, project manager, QA tester",
"timeline": "It will take 2-3 weeks to capture a shark. It will take 2-3 weeks to modify the harness and calibrate the laserbeam. I estimate it will take 3 months to train the shark to attack submarines.",
"additional_info": "We might consider adding optional racing stripes or bioluminescent patterns to the sharks as an upgrade with charge the customer for. We could explore a subscription model where we send the customers a new shark each quarter to replace sharks that age out of the program."
}
def generate_prompt_for_stage(stage: str, current_form: Dict[str, Any]) -> str:
"""
Generates a prompt for the current stage, including the example idea.
"""
example_idea = get_example_idea()
prompt = f"We are currently in the '{stage}' stage of developing an innovative idea. "
prompt += "Please provide information for this stage, keeping in mind the following example:\n\n"
if stage == "Idea Name":
prompt += f"Example: {example_idea['idea_name']}\n"
prompt += "What's the name of your innovative idea? Keep it short and catchy."
elif stage == "Idea Overview":
prompt += f"Example: {example_idea['idea_overview']}\n"
prompt += "Provide a simple overview of your idea, as if explaining it to a 10-year-old child. What does your idea do?"
elif stage == "Problem Solution":
prompt += f"Example: {example_idea['problem_solution']}\n"
prompt += "What specific problem does your idea solve? How does it solve this problem?"
elif stage == "Target Customers":
prompt += f"Example: {example_idea['target_customers']}\n"
prompt += "Who are the main customers or users for your idea? Be as specific as possible."
elif stage == "Market Value":
prompt += f"Example: {example_idea['market_value']}\n"
prompt += "What industries or markets could benefit from your idea? How large is this market?"
elif stage == "Existing Solutions":
prompt += f"Example: {example_idea['existing_solutions']}\n"
prompt += "Are there any existing products or services similar to your idea? How are they different?"
elif stage == "Unique Value":
prompt += f"Example: {example_idea['unique_value']}\n"
prompt += "What makes your idea unique or better than existing solutions? Why would customers choose your solution?"
elif stage == "Technical Challenges":
prompt += f"Example: {example_idea['technical_challenges']}\n"
prompt += "What are the main technical challenges in developing your idea? How might these be overcome?"
elif stage == "Legal Barriers":
prompt += f"Example: {example_idea['legal_barriers']}\n"
prompt += "Are there any legal or regulatory issues that could affect your idea? How might these be addressed?"
elif stage == "Data Dependencies":
prompt += f"Example: {example_idea['data_dependencies']}\n"
prompt += "Does your idea rely on any specific data sources? How will you acquire or generate this data?"
elif stage == "Team Roles":
prompt += f"Example: {example_idea['team_roles']}\n"
prompt += "What key roles or expertise are needed to develop and implement your idea? List specific job titles or skills."
elif stage == "Timeline":
prompt += f"Example: {example_idea['timeline']}\n"
prompt += "Provide a rough timeline for developing your idea. What are the main milestones and how long might each take?"
elif stage == "Additional Info":
prompt += f"Example: {example_idea['additional_info']}\n"
prompt += "Is there any additional information that's important for understanding or implementing your idea?"
prompt += "\n\nPlease provide your response for this stage, considering the example provided and the current state of your idea."
return prompt
def clear_database(session):
try:
session.query(InnovativeIdea).delete()
session.commit()
except Exception as e:
session.rollback()
logging.error(f"Error clearing database: {str(e)}")