Spaces:
Running
Running
import os | |
import json | |
import shutil | |
import smtplib | |
from email.mime.text import MIMEText | |
from email.mime.multipart import MIMEMultipart | |
import asyncio | |
import aiofiles | |
import aiosmtplib | |
from agent_monitor.monitor import analyze_agent_steps | |
from agent_monitor.failure_report import analyze_agent_performance, AsyncOpenAIClient | |
import traceback | |
from tqdm import tqdm | |
async def check_and_process_uploads(): | |
upload_dir = "evals_upload" | |
processed_dir = "evals_processed" | |
live_dir = "evals_live" | |
new_uploads = [f for f in os.listdir(upload_dir) if f.endswith('.json')] | |
if not new_uploads: | |
print("No new uploads found.") | |
return | |
# check for all new uploads whether they are already in live or processed directory | |
# Also check whether the files are actually identical | |
unprocessed_uploads = [] | |
for upload in new_uploads: | |
upload_path = os.path.join(upload_dir, upload) | |
processed_path = os.path.join(processed_dir, upload) | |
live_path = os.path.join(live_dir, upload) | |
if not os.path.exists(live_path) and not os.path.exists(processed_path): | |
unprocessed_uploads.append(upload) | |
elif os.path.exists(processed_path): | |
# with open(upload_path, 'r') as f: | |
# new_data = json.load(f) | |
# with open(processed_path, 'r') as f: | |
# processed_data = json.load(f) | |
# TODO we can use a better comparison method with exact comparison | |
# if new_data != processed_data: | |
# unprocessed_uploads.append(upload) | |
print(f"Upload {upload} is already in processed directory.") | |
elif os.path.exists(live_path): | |
with open(upload_path, 'r') as f: | |
new_data = json.load(f) | |
with open(live_path, 'r') as f: | |
live_data = json.load(f) | |
# if new_data != live_data: | |
# unprocessed_uploads.append(upload) | |
print(f"Upload {upload} is already in live directory.") | |
else: | |
unprocessed_uploads.append(upload) | |
print(f"Processing {len(unprocessed_uploads)} new uploads.") | |
tasks = [] | |
for upload in tqdm(unprocessed_uploads): | |
upload_path = os.path.join(upload_dir, upload) | |
processed_path = os.path.join(processed_dir, upload) | |
# tasks.append(process_single_upload(upload_path, processed_path)) # for async processing | |
await process_single_upload(upload_path, processed_path) | |
# await asyncio.gather(*tasks) # for async processing | |
async def process_single_upload(upload_path, processed_path): | |
# Check the structure of the upload | |
check_result = await check_upload_structure(upload_path) | |
if check_result['is_valid']: | |
# Process the file | |
await process_upload(upload_path, processed_path) | |
# Move the file to processed directory | |
# await asyncio.to_thread(shutil.move, upload_path, processed_path) | |
else: | |
print(f"Upload check failed for {upload_path}: {check_result['message']}") | |
async def check_upload_structure(file_path): | |
try: | |
async with aiofiles.open(file_path, 'r') as f: | |
data = json.loads(await f.read()) | |
# Check for required keys | |
required_keys = ['config', 'results', 'raw_eval_results', 'raw_logging_results'] | |
missing_keys = [key for key in required_keys if key not in data] | |
if missing_keys: | |
return {'is_valid': False, 'message': f"Missing required keys: {', '.join(missing_keys)}"} | |
# Check for specific structure in raw_logging_results | |
if not isinstance(data['raw_logging_results'], list): | |
return {'is_valid': False, 'message': "raw_logging_results should be a list"} | |
for item in data['raw_logging_results']: | |
if not all(key in item for key in ['weave_task_id', 'inputs', 'outputs']): | |
return {'is_valid': False, 'message': "Each item in raw_logging_results should have weave_task_id, inputs, and outputs"} | |
return {'is_valid': True, 'message': "File structure is valid"} | |
except json.JSONDecodeError: | |
return {'is_valid': False, 'message': "Invalid JSON format"} | |
except Exception as e: | |
return {'is_valid': False, 'message': f"Unexpected error: {str(e)}"} | |
async def process_upload(input_path, output_path): | |
print(f"Processing {input_path}...") | |
# load the file | |
with open(input_path, 'r') as f: | |
data = json.loads(f.read()) | |
assert 'raw_logging_results' in data, "raw_logging_results key not found in the file" | |
openai_client = AsyncOpenAIClient(model="gpt-4o-mini") | |
try: | |
processed_calls = await analyze_agent_steps(data['raw_logging_results'], openai_client, llm_eval=True) | |
failure_report = await analyze_agent_performance(data['raw_logging_results'], data['results']['failed_tasks'], openai_client) | |
data['raw_logging_results'] = processed_calls | |
data['failure_report'] = failure_report | |
except Exception as e: | |
traceback.print_exc() | |
print(f"Error in processing: {str(e)}") | |
return | |
with open(output_path, 'w') as f: | |
json.dump(data, f, indent=4) | |
print(f"Processing of {input_path} successful. Results saved to {output_path}") | |
async def send_email_notification(filename, check_result, status): | |
sender_email = "[email protected]" | |
receiver_email = "[email protected]" | |
password = "your_password" | |
message = MIMEMultipart() | |
message["From"] = sender_email | |
message["To"] = receiver_email | |
message["Subject"] = f"Upload Processing Notification: {filename}" | |
body = f""" | |
File: {filename} | |
Status: {status} | |
Check Result: {check_result['message']} | |
""" | |
message.attach(MIMEText(body, "plain")) | |
await aiosmtplib.send( | |
message, | |
hostname="smtp.gmail.com", | |
port=465, | |
use_tls=True, | |
username=sender_email, | |
password=password | |
) | |