core_leaderboard / utils /processing.py
benediktstroebl's picture
Update
ff06039
raw
history blame
6.26 kB
import os
import json
import shutil
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import asyncio
import aiofiles
import aiosmtplib
from agent_monitor.monitor import analyze_agent_steps
from agent_monitor.failure_report import analyze_agent_performance, AsyncOpenAIClient
import traceback
async def check_and_process_uploads():
upload_dir = "evals_upload"
processed_dir = "evals_processed"
live_dir = "evals_live"
new_uploads = [f for f in os.listdir(upload_dir) if f.endswith('.json')]
if not new_uploads:
print("No new uploads found.")
return
# check for all new uploads whether they are already in live or processed directory
# Also check whether the files are actually identical
unprocessed_uploads = []
for upload in new_uploads:
upload_path = os.path.join(upload_dir, upload)
processed_path = os.path.join(processed_dir, upload)
live_path = os.path.join(live_dir, upload)
if not os.path.exists(live_path) and not os.path.exists(processed_path):
unprocessed_uploads.append(upload)
elif os.path.exists(processed_path):
# with open(upload_path, 'r') as f:
# new_data = json.load(f)
# with open(processed_path, 'r') as f:
# processed_data = json.load(f)
# TODO we can use a better comparison method with exact comparison
# if new_data != processed_data:
# unprocessed_uploads.append(upload)
print(f"Upload {upload} is already in processed directory.")
elif os.path.exists(live_path):
with open(upload_path, 'r') as f:
new_data = json.load(f)
with open(live_path, 'r') as f:
live_data = json.load(f)
# if new_data != live_data:
# unprocessed_uploads.append(upload)
print(f"Upload {upload} is already in live directory.")
else:
unprocessed_uploads.append(upload)
print(f"Processing {len(unprocessed_uploads)} new uploads.")
tasks = []
for upload in unprocessed_uploads:
upload_path = os.path.join(upload_dir, upload)
processed_path = os.path.join(processed_dir, upload)
tasks.append(process_single_upload(upload_path, processed_path))
await asyncio.gather(*tasks)
async def process_single_upload(upload_path, processed_path):
# Check the structure of the upload
check_result = await check_upload_structure(upload_path)
if check_result['is_valid']:
# Process the file
await process_upload(upload_path, processed_path)
# Move the file to processed directory
# await asyncio.to_thread(shutil.move, upload_path, processed_path)
# Send email notification
# await send_email_notification(upload_path.name, check_result, "Processing successful")
else:
print(f"Upload check failed for {upload_path}: {check_result['message']}")
# Send email notification about the failed check
# await send_email_notification(upload_path.name, check_result, "Upload check failed")
async def check_upload_structure(file_path):
try:
async with aiofiles.open(file_path, 'r') as f:
data = json.loads(await f.read())
# Check for required keys
required_keys = ['config', 'results', 'raw_eval_results', 'raw_logging_results']
missing_keys = [key for key in required_keys if key not in data]
if missing_keys:
return {'is_valid': False, 'message': f"Missing required keys: {', '.join(missing_keys)}"}
# Check for specific structure in raw_logging_results
if not isinstance(data['raw_logging_results'], list):
return {'is_valid': False, 'message': "raw_logging_results should be a list"}
for item in data['raw_logging_results']:
if not all(key in item for key in ['weave_task_id', 'inputs', 'outputs']):
return {'is_valid': False, 'message': "Each item in raw_logging_results should have weave_task_id, inputs, and outputs"}
return {'is_valid': True, 'message': "File structure is valid"}
except json.JSONDecodeError:
return {'is_valid': False, 'message': "Invalid JSON format"}
except Exception as e:
return {'is_valid': False, 'message': f"Unexpected error: {str(e)}"}
async def process_upload(input_path, output_path):
print(f"Processing {input_path}...")
# load the file
with open(input_path, 'r') as f:
data = json.loads(f.read())
assert 'raw_logging_results' in data, "raw_logging_results key not found in the file"
openai_client = AsyncOpenAIClient(model="gpt-4o-mini")
try:
processed_calls = await analyze_agent_steps(data['raw_logging_results'], openai_client, llm_eval=False)
data['raw_logging_results'] = processed_calls
# failure_report = await analyze_agent_performance(data['raw_logging_results'], data['results']['failed_tasks'], openai_client)
# data['failure_report'] = None
except Exception as e:
traceback.print_exc()
print(f"Error in processing: {str(e)}")
return
with open(output_path, 'w') as f:
json.dump(data, f, indent=4)
print(f"Processing of {input_path} successful. Results saved to {output_path}")
async def send_email_notification(filename, check_result, status):
sender_email = "[email protected]"
receiver_email = "[email protected]"
password = "your_password"
message = MIMEMultipart()
message["From"] = sender_email
message["To"] = receiver_email
message["Subject"] = f"Upload Processing Notification: {filename}"
body = f"""
File: {filename}
Status: {status}
Check Result: {check_result['message']}
"""
message.attach(MIMEText(body, "plain"))
await aiosmtplib.send(
message,
hostname="smtp.gmail.com",
port=465,
use_tls=True,
username=sender_email,
password=password
)