import os import json import shutil import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart import asyncio import aiofiles import aiosmtplib from agent_monitor.monitor import analyze_agent_steps, AsyncOpenAIClient async def check_and_process_uploads(): upload_dir = "evals_upload" processed_dir = "evals_processed" live_dir = "evals_live" new_uploads = [f for f in os.listdir(upload_dir) if f.endswith('.json')] if not new_uploads: print("No new uploads found.") return # check for all new uploads whether they are already in live or processed directory # Also check whether the files are actually identical unprocessed_uploads = [] for upload in new_uploads: upload_path = os.path.join(upload_dir, upload) processed_path = os.path.join(processed_dir, upload) live_path = os.path.join(live_dir, upload) if not os.path.exists(live_path) and not os.path.exists(processed_path): unprocessed_uploads.append(upload) elif os.path.exists(processed_path): with open(upload_path, 'r') as f: new_data = json.load(f) with open(processed_path, 'r') as f: processed_data = json.load(f) # TODO we can use a better comparison method with exact comparison # if new_data != processed_data: # unprocessed_uploads.append(upload) print(f"Upload {upload} is already in processed directory.") elif os.path.exists(live_path): with open(upload_path, 'r') as f: new_data = json.load(f) with open(live_path, 'r') as f: live_data = json.load(f) # if new_data != live_data: # unprocessed_uploads.append(upload) print(f"Upload {upload} is already in live directory.") else: unprocessed_uploads.append(upload) print(f"Processing {len(unprocessed_uploads)} new uploads.") tasks = [] for upload in unprocessed_uploads: upload_path = os.path.join(upload_dir, upload) processed_path = os.path.join(processed_dir, upload) tasks.append(process_single_upload(upload_path, processed_path)) await asyncio.gather(*tasks) async def process_single_upload(upload_path, processed_path): # Check the structure of the upload check_result = await check_upload_structure(upload_path) if check_result['is_valid']: # Process the file await process_upload(upload_path, processed_path) # Move the file to processed directory # await asyncio.to_thread(shutil.move, upload_path, processed_path) # Send email notification # await send_email_notification(upload_path.name, check_result, "Processing successful") else: print(f"Upload check failed for {upload_path}: {check_result['message']}") # Send email notification about the failed check # await send_email_notification(upload_path.name, check_result, "Upload check failed") async def check_upload_structure(file_path): try: async with aiofiles.open(file_path, 'r') as f: data = json.loads(await f.read()) # Check for required keys required_keys = ['config', 'results', 'raw_eval_results', 'raw_logging_results'] missing_keys = [key for key in required_keys if key not in data] if missing_keys: return {'is_valid': False, 'message': f"Missing required keys: {', '.join(missing_keys)}"} # Check for specific structure in raw_logging_results if not isinstance(data['raw_logging_results'], list): return {'is_valid': False, 'message': "raw_logging_results should be a list"} for item in data['raw_logging_results']: if not all(key in item for key in ['weave_task_id', 'inputs', 'outputs']): return {'is_valid': False, 'message': "Each item in raw_logging_results should have weave_task_id, inputs, and outputs"} return {'is_valid': True, 'message': "File structure is valid"} except json.JSONDecodeError: return {'is_valid': False, 'message': "Invalid JSON format"} except Exception as e: return {'is_valid': False, 'message': f"Unexpected error: {str(e)}"} async def process_upload(input_path, output_path): print(f"Processing {input_path}...") # load the file with open(input_path, 'r') as f: data = json.loads(f.read()) assert 'raw_logging_results' in data, "raw_logging_results key not found in the file" openai_client = AsyncOpenAIClient(model="gpt-4o-mini") processed_calls = await analyze_agent_steps(data['raw_logging_results'][:2], openai_client) # Save the processed data data['raw_logging_results'] = processed_calls with open(output_path, 'w') as f: json.dump(data, f, indent=4) print(f"Processing of {input_path} successful. Results saved to {output_path}") async def send_email_notification(filename, check_result, status): sender_email = "your_email@example.com" receiver_email = "receiver@example.com" password = "your_password" message = MIMEMultipart() message["From"] = sender_email message["To"] = receiver_email message["Subject"] = f"Upload Processing Notification: {filename}" body = f""" File: {filename} Status: {status} Check Result: {check_result['message']} """ message.attach(MIMEText(body, "plain")) await aiosmtplib.send( message, hostname="smtp.gmail.com", port=465, use_tls=True, username=sender_email, password=password )