Spaces:
Runtime error
Runtime error
import openai | |
import gradio as gr | |
from PyPDF2 import PdfReader | |
from azure.storage.blob import BlobServiceClient | |
import io | |
from PyPDF2 import PdfReader | |
import json | |
import os | |
import sendgrid | |
from sendgrid.helpers.mail import Mail, Attachment, FileContent, FileName, FileType, Disposition | |
import base64 | |
from azure.cosmos import CosmosClient, exceptions | |
openai.api_key = os.getenv("OPENAI_API_KEY") | |
openai.api_base = "https://tensora-oai-france.openai.azure.com/" | |
openai.api_type = "azure" | |
openai.api_version = "2023-12-01-preview" | |
os_connection_string = os.getenv("CONNECTION") | |
os_mail_password = os.getenv("MAIL_PASSWORD") | |
with open("sys_prompt.txt") as f: | |
sys_prompt = f.read() | |
with open("sys_prompt_pre_questions.txt") as fg: | |
sys_prompt_pre_generated = fg.read() | |
get_window_url_params = """ | |
function(job, resume, job_params) { | |
console.log(job, job_params); | |
const params = new URLSearchParams(window.location.search); | |
job_params = Object.fromEntries(params); | |
return [job, resume, job_params]; | |
} | |
""" | |
def get_job_data_from_db(job_id, can_id): | |
try: | |
endpoint = "https://wg-candidate-data.documents.azure.com:443/" | |
key = os.getenv("CONNECTION_DB") | |
client = CosmosClient(endpoint, key) | |
database = client.get_database_client("ToDoList") | |
container = database.get_container_client("JobData") | |
query = f""" | |
SELECT TOP 1 * | |
FROM c | |
WHERE c.id = '{job_id}' | |
""" | |
job_data = list(container.query_items(query=query, enable_cross_partition_query=True))[0] | |
if "pre_generated" in job_data: | |
return job_data["title"], job_data["evaluation_email"], job_data["question_one"],job_data["question_two"],job_data["question_three"], can_id, job_data["pre_generated"], job_data["custom_questions"] | |
return job_data["title"], job_data["evaluation_email"], job_data["question_one"],job_data["question_two"],job_data["question_three"], can_id | |
except Exception as e: | |
print(f"Fehler beim laden der Job Daten: {str(e)}") | |
return None, None | |
def write_job_and_candidate_db_data(can_id, ai_summary): | |
try: | |
job_id = can_id.split("cv")[0][:-1] | |
endpoint = "https://wg-candidate-data.documents.azure.com:443/" | |
key = os.getenv("CONNECTION_DB") | |
client = CosmosClient(endpoint, key) | |
database = client.get_database_client("ToDoList") | |
container = database.get_container_client("Items") | |
job_container = database.get_container_client("JobData") | |
candidate_item = container.read_item(item=can_id, partition_key="wg-candidate-data-v1") | |
candidate_item["interview_conducted"] = True | |
candidate_item["ai_summary"] = ai_summary | |
container.replace_item(item=candidate_item,body=candidate_item) | |
candidate_query = f""" | |
SELECT * | |
FROM c | |
WHERE c.job_description_id = '{candidate_item["job_description_id"]}' AND c.interview_conducted = false | |
""" | |
query_result = list(container.query_items(query=candidate_query, enable_cross_partition_query=True)) | |
print("Succesfully updated the candidate item") | |
if(len(query_result)<1): | |
query = f""" | |
SELECT TOP 1 * | |
FROM c | |
WHERE c.id = '{job_id}'x | |
""" | |
job_item = list(job_container.query_items(query=query, enable_cross_partition_query=True))[0] | |
job_item["every_interview_conducted"] = True | |
job_container.replace_item(item=job_item,body=job_item) | |
print("Succesfully updated the job item") | |
except Exception as e: | |
print(f"Fehler beim aktualisieren der Daten: {str(e)}") | |
def download_and_parse_json_blob(storage_connection_string, container_name, blob_name, encoding='utf-8'): | |
try: | |
# Verbindung zum Blob-Dienst herstellen | |
blob_service_client = BlobServiceClient.from_connection_string(storage_connection_string) | |
# Container und Blob-Client erstellen | |
container_client = blob_service_client.get_container_client(container_name) | |
blob_client = container_client.get_blob_client(blob_name) | |
# Blob herunterladen | |
blob_data = blob_client.download_blob() | |
blob_bytes = blob_data.readall() | |
# JSON-Bytes in einen Python-Datenobjekt umwandeln | |
json_text = blob_bytes.decode(encoding) | |
json_data = json.loads(json_text) | |
# Parameter "title" und "email" aus dem JSON-Datenobjekt extrahieren und zurückgeben | |
title = json_data.get("title", "") | |
email = json_data.get("email", "") | |
question_one = json_data.get("question_one", "") | |
question_two = json_data.get("question_two", "") | |
question_three = json_data.get("question_three", "") | |
return title, email, question_one, question_two, question_three | |
except Exception as e: | |
print(f"Fehler beim Herunterladen und Verarbeiten der JSON-Datei: {str(e)}") | |
return None, None | |
def download_pdf_blob_as_text(storage_connection_string, container_name, blob_name): | |
try: | |
# Verbindung zum Blob-Dienst herstellen | |
blob_service_client = BlobServiceClient.from_connection_string(storage_connection_string) | |
# Container und Blob-Client erstellen | |
container_client = blob_service_client.get_container_client(container_name) | |
blob_client = container_client.get_blob_client(blob_name) | |
# Blob herunterladen und als Binärdaten speichern | |
blob_data = blob_client.download_blob() | |
pdf_bytes = blob_data.readall() | |
# PDF-Text extrahieren | |
pdf_text = "" | |
pdf_reader = PdfReader(io.BytesIO(pdf_bytes)) | |
for page_num in range(len(pdf_reader.pages)): | |
page = pdf_reader.pages[page_num] | |
pdf_text += page.extract_text() | |
return pdf_text | |
except Exception as e: | |
print(f"Fehler beim Herunterladen und Konvertieren der Datei: {str(e)}") | |
return None | |
def load_job_data(job, resume, job_params): | |
if not job: | |
print("JOB_PDF: "+job_params["job"].split("cv")[0][:-1]+".pdf") | |
print("Resume: "+job_params["job"]+".pdf") | |
try: | |
job_id = job_params["job"].split("cv")[0][:-1] | |
pdf_filename_jobdescription = job_params["job"].split("cv")[0][:-1]+".pdf" | |
pdf_filename_cv = job_params["job"]+".pdf" | |
json_filename = job_params["job"].split("cv")[0][:-1]+"_jsondata.json" | |
storage_connection_string = os_connection_string | |
container_name = "jobdescriptions" # Der Name des Blob-Containers | |
job = download_pdf_blob_as_text(storage_connection_string, container_name, pdf_filename_jobdescription) | |
resume = download_pdf_blob_as_text(storage_connection_string, container_name, pdf_filename_cv) | |
try: | |
job_params = get_job_data_from_db(job_id,job_params["job"]) | |
except: | |
job_params = download_and_parse_json_blob(storage_connection_string,container_name,json_filename) | |
print(job_params) | |
return job, resume, job_params, gr.Label.update("Evaluation for the job: "+job_params[0]) | |
except: | |
gr.Error("An error occurred, the job description could not be loaded. Please contact the recruiter.") | |
return job, resume, job_params, gr.Label.update("An error occurred and the job description could not be loaded. Please contact the recruiter.", color="red") | |
# print(job) | |
# print(job_params) | |
def add_file(file, chat, job, resume): | |
if file.name.endswith(".pdf"): | |
doc = PdfReader(file.name) | |
text = "" | |
for page in doc.pages: | |
text += page.extract_text() | |
else: | |
with open(file.name) as f: | |
text = f.read() | |
if job: | |
print("im cv") | |
chat += [["📄 " + file.name.split("/")[-1], None]] | |
resume = text | |
# else: | |
# print("im job") | |
# chat += [["📄 " + file.name.split("/")[-1], "Thanks. Please upload the resume."]] | |
# job = text | |
return chat, job, resume | |
def user(message, history): | |
return "", history + [[message, None]] | |
def bot(history, job, resume, job_params): | |
if not resume or not job: | |
yield history | |
return | |
extension_text = "" | |
if len(job_params[2])>0 or len(job_params[3])>0 or len(job_params[4])>0: | |
print("ich habe eine extra Frage") | |
questions_combined = "" | |
if len(job_params[2])>0: | |
questions_combined += "-"+job_params[2]+"\n" | |
if len(job_params[3])>0: | |
questions_combined += "-"+job_params[3]+"\n" | |
if len(job_params[4])>0: | |
questions_combined += "-"+job_params[4]+"\n" | |
extension_text = "The recruiter has predefined the following question(s): \n\n" + questions_combined + "\nPlease ask these questions one after the other first and only then generate your own questions so that you get a total of 10 questions together with the predefined ones." | |
if len(job_params)>7: | |
print("in der neuen Struktur") | |
if job_params[6]: | |
print("mit pre fragen") | |
final_question_string = "\n".join(job_params[7]) | |
system = sys_prompt_pre_generated.format(job=job, resume=resume,questions=final_question_string) | |
else: | |
print("neue struktur keine Fragen") | |
system = sys_prompt.format(job=job, resume=resume, n=10, extension=extension_text) | |
else: | |
print("alte Struktur") | |
system = sys_prompt.format(job=job, resume=resume, n=10, extension=extension_text) | |
messages = [{"role": "system", "content": system}] | |
for user, assistant in history: | |
if user: | |
messages.append({"role": "user", "content": user}) | |
if assistant: | |
messages.append({"role": "assistant", "content": assistant}) | |
response = openai.ChatCompletion.create( | |
engine="gpt-4-1106", | |
messages=messages, | |
temperature=0.0, | |
#stream=True, | |
) | |
print(response["choices"][0]["message"]["content"]) | |
history[-1][1] = response["choices"][0]["message"]["content"] | |
yield history | |
#for chunk in response: | |
# print(chunk) | |
# if len(chunk["choices"][0]["delta"]) != 0 and hasattr(chunk["choices"][0]["delta"], "content"): | |
# history[-1][1] = history[-1][1] + chunk["choices"][0]["delta"]["content"] | |
# yield history | |
if history[-1][1] == "Thank you for conducting the evaluation! We will get back to you shortly.": | |
print("finished") | |
send_evaluation(history, job, resume, job_params) | |
return | |
def send_evaluation(history, job, resume, job_params): | |
# Chatverlauf in einen Textstring umwandeln | |
chat_text = "" | |
for entry in history: | |
if entry[0]: | |
chat_text += "Applicant: " + entry[0] + "\n" | |
if entry[1]: | |
chat_text += "Chatbot: " + entry[1] + "\n" | |
# Einstellungen für SendGrid | |
sg = sendgrid.SendGridAPIClient(api_key=os.environ.get('SENDGRID_API')) | |
# Sender- und Empfänger-E-Mail-Adressen | |
sender_email = "[email protected]" | |
receiver_email = job_params[1] | |
print(receiver_email) | |
ai_summary = "TEST Summary" | |
prompt = "You are a professional recruiter who has been given a CV and a job description and has created questions based on that. The eventual applicant has entered his answers to the questions. Now you have to evaluate on the basis of the answers if the applicant fits the job in principle. This is the case when about 70 percent of all questions have been answered satisfactorily and positively. Keep in mind that an answer must always be fact-based, so if, for example, the question asks for examples, the potential applicant must also give such examples. Please also provide details of which questions were answered positively and why." | |
res = openai.ChatCompletion.create( | |
engine="gpt-4-1106", | |
temperature=0.2, | |
messages=[ | |
{ | |
"role": "system", | |
"content": prompt, | |
}, | |
{"role": "system", "content": "Job description: "+job+"; Resume: "+resume}, | |
{"role": "system", "content": "Chathistory: "+chat_text}, | |
], | |
) | |
ai_summary = res.choices[0]["message"]["content"] | |
# E-Mail-Nachricht erstellen | |
subject = "Evaluation for the job: "+job_params[0] | |
message = f"""Dear Recruiter, | |
Please find attached the complete chat history for this evaluation, resume and summary. | |
The evaluation AI-supported summarized: | |
{ai_summary} | |
Sincerely, | |
Your Evaluation Tool | |
""" | |
if(len(job_params)>5): | |
write_job_and_candidate_db_data(job_params[5],ai_summary) | |
# Chatverlauf in eine Textdatei schreiben | |
chat_file_path = "chat_history.txt" | |
with open(chat_file_path, 'w', encoding='utf-8') as chat_file: | |
chat_file.write(chat_text) | |
# Resume-Bytes in eine TXT-Datei schreiben und als Anhang hinzufügen | |
resume_file_path = "resume.txt" | |
with open(resume_file_path, 'wb') as resume_file: | |
resume_file.write(resume.encode('utf-8')) | |
# Summary in eine TXT Datei schreiben | |
summary_file_path = "summary.txt" | |
with open(summary_file_path, 'wb') as summary_file: | |
summary_file.write(ai_summary.encode('utf-8')) | |
# SendGrid-E-Mail erstellen | |
message = Mail( | |
from_email=sender_email, | |
to_emails=receiver_email, | |
subject=subject, | |
plain_text_content=message, | |
) | |
# Chatverlauf als Anhang hinzufügen | |
with open(chat_file_path, 'rb') as chat_file: | |
encode_file_chat = base64.b64encode(chat_file.read()).decode() | |
chat_attachment = Attachment() | |
chat_attachment.file_content = FileContent(encode_file_chat) | |
chat_attachment.file_name = FileName('chat_history.txt') | |
chat_attachment.file_type = FileType('text/plain') | |
chat_attachment.disposition = Disposition('attachment') | |
message.attachment = chat_attachment | |
# Resume als Anhang hinzufügen | |
with open(resume_file_path, 'rb') as resume_file: | |
encode_file_resume = base64.b64encode(resume_file.read()).decode() | |
resume_attachment = Attachment() | |
resume_attachment.file_content = FileContent(encode_file_resume) | |
resume_attachment.file_name = FileName('resume.txt') | |
resume_attachment.file_type = FileType('text/plain') | |
resume_attachment.disposition = Disposition('attachment') | |
message.attachment = resume_attachment | |
# Resume als Anhang hinzufügen | |
with open(summary_file_path, 'rb') as summary_file: | |
encode_file_summary = base64.b64encode(summary_file.read()).decode() | |
summary_attachment = Attachment() | |
summary_attachment.file_content = FileContent(encode_file_summary) | |
summary_attachment.file_name = FileName('ai_summary.txt') | |
summary_attachment.file_type = FileType('text/plain') | |
summary_attachment.disposition = Disposition('attachment') | |
message.attachment = summary_attachment | |
try: | |
response = sg.send(message) | |
print("E-Mail wurde erfolgreich gesendet. Statuscode:", response.status_code) | |
except Exception as e: | |
print("Fehler beim Senden der E-Mail:", str(e)) | |
def handle_start_click(welcome_label, start): | |
return gr.Label.update("", visible=False), gr.Button.update("Interview started", scale=0,interactive=False) | |
css = "footer {visibility: hidden} #component-0{height: 90vh !important} #chatbot{height: 85vh !important} #welcome_label {text-align: center!important}" | |
with gr.Blocks(css=css) as app: | |
job_params = gr.JSON({}, visible=False, label="URL Params") | |
job = gr.State("") | |
resume = gr.State("") | |
gr.Markdown( | |
f"<div style='display: flex; justify-content: space-between;align-items: center;margin-bottom: 1rem' ><h1>CV Evaluation</h1><img width='150' src='https://www.workgenius.com/wp-content/uploads/2023/03/WorkGenius_navy-1.svg' alt='WorkGeniusLogo' /></div>" | |
) | |
job_title_label = gr.Label("An error occurred and the job description could not be loaded. Please contact the recruiter.", show_label=False) | |
welcome_label = gr.Label("Welcome to the interview chatbot from WorkGenius. After you click the 'Start Interview' button, you will be asked a few questions to discuss whether you are a good fit for the position. This process will take approximately 10-15 minutes. Please answer the questions honestly and thoroughly. Good luck!", show_label=False, elem_id="welcome_label") | |
app.load(load_job_data,[job,resume, job_params], [job,resume, job_params, job_title_label], _js=get_window_url_params) | |
chat = gr.Chatbot( | |
[[None, None]], height=600, elem_id="chatbot" | |
) | |
with gr.Row(): | |
clr = gr.Button("Clear", scale=0) | |
msg = gr.Textbox(lines=1, show_label=False, scale=1) | |
start = gr.Button("Start interview", scale=0) | |
# file = gr.UploadButton("Browse", file_types=[".pdf", ".txt"], scale=0) | |
msg.submit(user, [msg, chat], [msg, chat], queue=False).then( | |
bot, [chat, job, resume, job_params], chat | |
) | |
# file.upload( | |
# add_file, [file, chat, job, resume], [chat, job, resume], queue=False | |
# ).then(bot, [chat, job, resume, job_params], chat) | |
#().then | |
start.click(handle_start_click,[welcome_label, start],[welcome_label, start]).then(bot, [chat, job, resume, job_params], chat) | |
clr.click(lambda: None, None, chat, queue=False) | |
app.queue() | |
app.launch() |