import streamlit as st from datetime import datetime, timedelta from collections import defaultdict, Counter from llama_index.llms.openai import OpenAI from composio_llamaindex import ComposioToolSet, App, Action import os import json from dotenv import load_dotenv # Load environment variables load_dotenv() class CalendarService: def __init__(self): self.toolset = ComposioToolSet(api_key=os.getenv('COMPOSIO_API_KEY')) self.connection_request = None # [Previous CalendarService methods remain the same] def analyze_calendar_events(self, response_data): """ Analyze calendar events and return statistics about meetings. """ current_year = datetime.now().year meetings = [] participants = [] meeting_times = [] total_duration = timedelta() monthly_meetings = defaultdict(int) daily_meetings = defaultdict(int) events = response_data.get('data', {}).get('event_data', {}).get('event_data', []) for event in events: start_data = event.get('start', {}) end_data = event.get('end', {}) try: start = datetime.fromisoformat(start_data.get('dateTime').replace('Z', '+00:00')) end = datetime.fromisoformat(end_data.get('dateTime').replace('Z', '+00:00')) if start.year == current_year: duration = end - start total_duration += duration monthly_meetings[start.strftime('%B')] += 1 daily_meetings[start.strftime('%A')] += 1 meeting_times.append(start.strftime('%H:%M')) if 'attendees' in event: for attendee in event['attendees']: if attendee.get('responseStatus') != 'declined': participants.append(attendee.get('email')) organizer_email = event.get('organizer', {}).get('email') if organizer_email: participants.append(organizer_email) meetings.append({ 'start': start, 'duration': duration, 'summary': event.get('summary', 'No Title') }) except (ValueError, TypeError, AttributeError) as e: st.error(f"Error processing event: {e}") continue total_meetings = len(meetings) stats = { "total_meetings_this_year": total_meetings } if total_meetings > 0: stats.update({ "total_time_spent": str(total_duration), "busiest_month": max(monthly_meetings.items(), key=lambda x: x[1])[0] if monthly_meetings else "N/A", "busiest_day": max(daily_meetings.items(), key=lambda x: x[1])[0] if daily_meetings else "N/A", "most_frequent_participant": Counter(participants).most_common(1)[0][0] if participants else "N/A", "average_meeting_duration": str(total_duration / total_meetings), "most_common_meeting_time": Counter(meeting_times).most_common(1)[0][0] if meeting_times else "N/A", "monthly_breakdown": dict(monthly_meetings), "daily_breakdown": dict(daily_meetings) }) else: stats.update({ "total_time_spent": "0:00:00", "busiest_month": "N/A", "busiest_day": "N/A", "most_frequent_participant": "N/A", "average_meeting_duration": "0:00:00", "most_common_meeting_time": "N/A", "monthly_breakdown": {}, "daily_breakdown": {} }) return stats def initiate_connection(self, entity_id: str, redirect_url: str = "https://calendar-wrapped-eight.vercel.app/") -> dict: try: self.connection_request = self.toolset.initiate_connection( entity_id=entity_id, app=App.GOOGLECALENDAR, ) return { 'success': True, 'data': { 'redirect_url': self.connection_request.redirectUrl, 'message': "Please authenticate using the provided link." } } except Exception as e: return { 'success': False, 'error': str(e) } def check_connection_status(self, entity_id: str) -> dict: try: entity_id = self.toolset.get_entity(id=entity_id) connection = entity_id.get_connection(app=App.GOOGLECALENDAR) status = connection.status return { 'success': True, 'data': { 'status': status, 'message': f"Connection status: {status}" } } except Exception as e: return { 'success': False, 'error': str(e) } def generate_wrapped(self, entity_id: str) -> dict: try: current_year = datetime.now().year request_params = { "calendar_id": "primary", "timeMin": f"{current_year},1,1,0,0,0", "timeMax": f"{current_year},12,31,23,59,59", "single_events": True, "max_results": 2500, "order_by": "startTime" } events_response = self.toolset.execute_action( action=Action.GOOGLECALENDAR_FIND_EVENT, params=request_params, entity_id=entity_id ) if events_response["successfull"]: stats = self.analyze_calendar_events(events_response) llm = OpenAI(model='gpt-4', api_key=os.getenv('OPENAI_API_KEY')) billionaire_prompt = f"""Based on these calendar stats, which tech billionaire's schedule does this most resemble and why? Stats: - {stats['total_meetings_this_year']} total meetings - {stats['total_time_spent']} total time in meetings - Most active on {stats['busiest_day']}s - Busiest month is {stats['busiest_month']} - Average meeting duration: {stats['average_meeting_duration']} Suggest a different billionaire each time, dont say elon. Return as JSON with format: {{"name": "billionaire name", "reason": "explanation"}} """ stats_prompt = f"""Analyze these calendar stats and write a brief, insightful one-sentence comment for each metric: - Total meetings: {stats['total_meetings_this_year']} - Total time in meetings: {stats['total_time_spent']} - Busiest month: {stats['busiest_month']} - Busiest day: {stats['busiest_day']} - Average meeting duration: {stats['average_meeting_duration']} - Most common meeting time: {stats['most_common_meeting_time']} - Most frequent participant: {stats['most_frequent_participant']} Return as JSON with format: {{"total_meetings_comment": "", "time_spent_comment": "", "busiest_times_comment": "", "collaborator_comment": "", "habits_comment": ""}} """ try: billionaire_response = json.loads(llm.complete(billionaire_prompt).text) stats_comments = json.loads(llm.complete(stats_prompt).text) stats["schedule_analysis"] = billionaire_response stats["metric_insights"] = stats_comments except Exception as e: st.error(f"Error processing LLM responses: {e}") stats["schedule_analysis"] = {"name": "Unknown", "reason": "Analysis unavailable"} stats["metric_insights"] = { "total_meetings_comment": "", "time_spent_comment": "", "busiest_times_comment": "", "collaborator_comment": "", "habits_comment": "" } return { 'success': True, 'data': stats } else: return { 'success': False, 'error': events_response.get("error", "Failed to fetch calendar events") } except Exception as e: return { 'success': False, 'error': str(e) } def main(): st.set_page_config(page_title="Calendar Wrapped", layout="wide") st.title("Calendar Wrapped") service = CalendarService() tab1, tab2, tab3 = st.tabs(["Connect", "Check Status", "Generate Wrapped"]) with tab1: st.header("Initialize Connection") entity_id = st.text_input("Entity ID") redirect_url = st.text_input("Redirect URL", value="https://calendar-wrapped-eight.vercel.app/") if st.button("Initialize Connection"): if entity_id: result = service.initiate_connection(entity_id, redirect_url) if result['success']: st.success(result['data']['message']) st.markdown(f"[Click here to authenticate]({result['data']['redirect_url']})") else: st.error(f"Error: {result['error']}") else: st.warning("Please enter an Entity ID") with tab2: st.header("Check Connection Status") status_entity_id = st.text_input("Entity ID", key="status_entity_id") if st.button("Check Status"): if status_entity_id: result = service.check_connection_status(status_entity_id) if result['success']: st.success(result['data']['message']) else: st.error(f"Error: {result['error']}") else: st.warning("Please enter an Entity ID") with tab3: st.header("Generate Calendar Wrapped") wrapped_entity_id = st.text_input("Entity ID", key="wrapped_entity_id") if st.button("Generate Wrapped"): if wrapped_entity_id: with st.spinner("Generating your Calendar Wrapped..."): result = service.generate_wrapped(wrapped_entity_id) if result['success']: data = result['data'] # Display basic stats col1, col2, col3 = st.columns(3) with col1: st.metric("Total Meetings", data['total_meetings_this_year']) with col2: st.metric("Total Time in Meetings", data['total_time_spent']) with col3: st.metric("Average Meeting Duration", data['average_meeting_duration']) # Display schedule analysis st.subheader("Schedule Analysis") if data.get('schedule_analysis'): st.write(f"Your schedule most resembles: **{data['schedule_analysis']['name']}**") st.write(f"*{data['schedule_analysis']['reason']}*") # Display insights st.subheader("Insights") if data.get('metric_insights'): insights = data['metric_insights'] for key, value in insights.items(): if value: # Only display non-empty insights st.write(f"- {value}") # Display breakdowns col1, col2 = st.columns(2) with col1: st.subheader("Monthly Breakdown") st.bar_chart(data['monthly_breakdown']) with col2: st.subheader("Daily Breakdown") st.bar_chart(data['daily_breakdown']) else: st.error(f"Error: {result['error']}") else: st.warning("Please enter an Entity ID") if __name__ == "__main__": main()