designfailure commited on
Commit
90dba83
·
verified ·
1 Parent(s): f54b02f

Upload 11 files

Browse files
Files changed (11) hide show
  1. agent.py +28 -0
  2. gradio_app.py +24 -0
  3. helper.py +47 -0
  4. insurance_utils.py +53 -0
  5. main.py +60 -0
  6. orchestrator.py +22 -0
  7. research_agent.py +29 -0
  8. run_app.py +20 -0
  9. sales_agent.py +31 -0
  10. tools.py +67 -0
  11. underwriting_agent.py +30 -0
agent.py ADDED
@@ -0,0 +1,28 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from magentic import Agent, prompt
2
+ from tools import ImageCaptioning, WebSearch, GoogleSheets
3
+
4
+ class ResearchAgent(Agent):
5
+ def __init__(self, name: str, description: str):
6
+ super().__init__(name=name, description=description)
7
+ self.tools = {
8
+ "image_caption": ImageCaptioning(),
9
+ "web_search": WebSearch(),
10
+ "sheets": GoogleSheets()
11
+ }
12
+
13
+ @prompt("Analiziraj sliko in identificiraj objekte ter dejavnike tveganja")
14
+ async def process_image(self, image):
15
+ """Obdelava slike z uporabo magentic-one promptov"""
16
+ return await self.tools["image_caption"].analyze(image)
17
+
18
+ class UnderwritingAgent(Agent):
19
+ def __init__(self, name: str, description: str):
20
+ super().__init__(name=name, description=description)
21
+
22
+ @prompt("Oceni tveganje in določi ustrezno kritje")
23
+ async def assess_risk(self, data):
24
+ """Ocena tveganja z uporabo magentic-one promptov"""
25
+ # Implementacija ocene tveganja
26
+ pass
27
+
28
+ # Podobno implementirajte še SalesAgent in DelegationAgent
gradio_app.py ADDED
@@ -0,0 +1,24 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import gradio as gr
2
+ from main import initialize_agents
3
+
4
+ def create_interface():
5
+ agents = initialize_agents()
6
+
7
+ with gr.Blocks() as app:
8
+ gr.Markdown("# InsurTech MGA Aplikacija")
9
+
10
+ with gr.Tab("Ocena tveganja"):
11
+ image_input = gr.Image()
12
+ text_output = gr.Textbox()
13
+
14
+ image_input.change(
15
+ fn=agents["research"].process_image,
16
+ inputs=image_input,
17
+ outputs=text_output
18
+ )
19
+
20
+ return app
21
+
22
+ if __name__ == "__main__":
23
+ interface = create_interface()
24
+ interface.launch()
helper.py ADDED
@@ -0,0 +1,47 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ from dotenv import load_dotenv
3
+ from typing import Dict, Any
4
+ import logging
5
+
6
+ def setup_logging(logs_dir: str) -> None:
7
+ """Nastavitev beleženja dogodkov"""
8
+ if not os.path.exists(logs_dir):
9
+ os.makedirs(logs_dir)
10
+
11
+ logging.basicConfig(
12
+ level=logging.INFO,
13
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
14
+ handlers=[
15
+ logging.FileHandler(os.path.join(logs_dir, 'app.log')),
16
+ logging.StreamHandler()
17
+ ]
18
+ )
19
+
20
+ def load_environment() -> None:
21
+ """Nalaganje okoljskih spremenljivk"""
22
+ load_dotenv()
23
+ required_vars = [
24
+ 'OPENAI_API_KEY',
25
+ 'TOGETHER_API_KEY',
26
+ 'STRIPE_API_KEY'
27
+ ]
28
+
29
+ missing = [var for var in required_vars if not os.getenv(var)]
30
+ if missing:
31
+ raise EnvironmentError(f"Manjkajoče okoljske spremenljivke: {', '.join(missing)}")
32
+
33
+ def process_image_data(image_path: str) -> Dict[str, Any]:
34
+ """Obdelava vhodne slike"""
35
+ from PIL import Image
36
+
37
+ try:
38
+ with Image.open(image_path) as img:
39
+ return {
40
+ "path": image_path,
41
+ "size": img.size,
42
+ "format": img.format,
43
+ "mode": img.mode
44
+ }
45
+ except Exception as e:
46
+ logging.error(f"Napaka pri obdelavi slike: {str(e)}")
47
+ return {}
insurance_utils.py ADDED
@@ -0,0 +1,53 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import Dict, Any
2
+ from datetime import datetime, timedelta
3
+ import json
4
+
5
+ class PolicyGenerator:
6
+ def __init__(self):
7
+ self.policy_counter = 0
8
+
9
+ def generate_policy(self, offer_data: Dict[str, Any]) -> Dict[str, Any]:
10
+ """Generiranje zavarovalne police"""
11
+ self.policy_counter += 1
12
+
13
+ start_date = datetime.now()
14
+ end_date = start_date + timedelta(days=365)
15
+
16
+ return {
17
+ "policy_number": f"POL-{datetime.now().year}-{self.policy_counter:04d}",
18
+ "start_date": start_date.isoformat(),
19
+ "end_date": end_date.isoformat(),
20
+ "premium": offer_data["premium"],
21
+ "coverage": offer_data["coverage"],
22
+ "terms": offer_data["terms"]
23
+ }
24
+
25
+ def save_policy(self, policy: Dict[str, Any], filepath: str) -> None:
26
+ """Shranjevanje police v JSON formatu"""
27
+ with open(filepath, 'w', encoding='utf-8') as f:
28
+ json.dump(policy, f, indent=2, ensure_ascii=False)
29
+
30
+ class ClaimsHandler:
31
+ def __init__(self):
32
+ self.valid_claim_types = {"damage", "theft", "liability"}
33
+
34
+ def validate_claim(self, claim_data: Dict[str, Any]) -> bool:
35
+ """Preverjanje veljavnosti zahtevka"""
36
+ return all([
37
+ claim_data.get("type") in self.valid_claim_types,
38
+ claim_data.get("policy_number"),
39
+ claim_data.get("date_of_incident"),
40
+ claim_data.get("description")
41
+ ])
42
+
43
+ def process_claim(self, claim_data: Dict[str, Any]) -> Dict[str, Any]:
44
+ """Obdelava zahtevka"""
45
+ if not self.validate_claim(claim_data):
46
+ return {"status": "rejected", "reason": "Neveljavni podatki zahtevka"}
47
+
48
+ # Tukaj bi dodali logiko za oceno zahtevka
49
+ return {
50
+ "status": "processing",
51
+ "claim_id": f"CLM-{datetime.now().strftime('%Y%m%d-%H%M%S')}",
52
+ "estimated_payout": 0.0
53
+ }
main.py ADDED
@@ -0,0 +1,60 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import logging
3
+ import os
4
+ from autogen_core import EVENT_LOGGER_NAME, AgentId, AgentProxy, SingleThreadedAgentRuntime
5
+ from autogen_magentic_one.agents.orchestrator import LedgerOrchestrator
6
+ from autogen_magentic_one.utils import LogHandler, create_completion_client_from_env
7
+ from agents.research_agent import ResearchAgent
8
+ from agents.underwriting_agent import UnderwritingAgent
9
+ from agents.sales_agent import SalesAgent
10
+ from runtime.orchestrator import InsuranceOrchestrator
11
+
12
+ async def main(logs_dir: str) -> None:
13
+ # Ustvarimo runtime
14
+ runtime = SingleThreadedAgentRuntime()
15
+
16
+ # Ustvarimo client za LLM
17
+ client = create_completion_client_from_env(model="gpt-4")
18
+
19
+ # Registriramo agente
20
+ await ResearchAgent.register(runtime, "ResearchAgent",
21
+ lambda: ResearchAgent(model_client=client))
22
+ research = AgentProxy(AgentId("ResearchAgent", "default"), runtime)
23
+
24
+ await UnderwritingAgent.register(runtime, "UnderwritingAgent",
25
+ lambda: UnderwritingAgent(model_client=client))
26
+ underwriting = AgentProxy(AgentId("UnderwritingAgent", "default"), runtime)
27
+
28
+ await SalesAgent.register(runtime, "SalesAgent",
29
+ lambda: SalesAgent(model_client=client))
30
+ sales = AgentProxy(AgentId("SalesAgent", "default"), runtime)
31
+
32
+ agent_list = [research, underwriting, sales]
33
+
34
+ # Registriramo orchestrator
35
+ await InsuranceOrchestrator.register(
36
+ runtime,
37
+ "Orchestrator",
38
+ lambda: InsuranceOrchestrator(
39
+ agents=agent_list,
40
+ model_client=client,
41
+ max_rounds=30,
42
+ max_time=25 * 60,
43
+ return_final_answer=True,
44
+ )
45
+ )
46
+
47
+ runtime.start()
48
+ await runtime.stop_when_idle()
49
+
50
+ if __name__ == "__main__":
51
+ logs_dir = "logs"
52
+ if not os.path.exists(logs_dir):
53
+ os.makedirs(logs_dir)
54
+
55
+ logger = logging.getLogger(EVENT_LOGGER_NAME)
56
+ logger.setLevel(logging.INFO)
57
+ log_handler = LogHandler(filename=os.path.join(logs_dir, "insurance_log.jsonl"))
58
+ logger.handlers = [log_handler]
59
+
60
+ asyncio.run(main(logs_dir))
orchestrator.py ADDED
@@ -0,0 +1,22 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from autogen_magentic_one.agents.orchestrator import LedgerOrchestrator
2
+ from typing import List
3
+ from autogen_core import AgentProxy
4
+
5
+ class InsuranceOrchestrator(LedgerOrchestrator):
6
+ def __init__(self, agents: List[AgentProxy], **kwargs):
7
+ super().__init__(agents=agents, **kwargs)
8
+
9
+ async def orchestrate_workflow(self, initial_task: dict):
10
+ """Koordinacija delovnega toka zavarovanja"""
11
+ steps = [
12
+ ("research", "analyze_risk"),
13
+ ("underwriting", "calculate_premium"),
14
+ ("sales", "prepare_offer")
15
+ ]
16
+
17
+ results = {}
18
+ for agent_name, action in steps:
19
+ agent = self.get_agent(agent_name)
20
+ results[action] = await agent.execute_action(action, initial_task)
21
+
22
+ return results
research_agent.py ADDED
@@ -0,0 +1,29 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from autogen_magentic_one.agents.base import BaseAgent
2
+ from autogen_core.code_executor import CodeBlock
3
+ from typing import Dict, Any
4
+
5
+ class ResearchAgent(BaseAgent):
6
+ def __init__(self, model_client):
7
+ super().__init__(
8
+ name="ResearchAgent",
9
+ description="Agent za analizo slik in raziskovanje zavarovalnih primerov",
10
+ model_client=model_client
11
+ )
12
+
13
+ async def process_image(self, image_data: Dict[str, Any]) -> Dict[str, Any]:
14
+ """Analiza slike in identifikacija objektov"""
15
+ prompt = f"""Analiziraj sliko in identificiraj:
16
+ 1. Vse vidne objekte
17
+ 2. Potencialne dejavnike tveganja
18
+ 3. Stanje objektov
19
+
20
+ Slika: {image_data}
21
+ """
22
+
23
+ response = await self.model_client.complete(prompt)
24
+ return self._parse_response(response)
25
+
26
+ def _parse_response(self, response: str) -> Dict[str, Any]:
27
+ """Pretvorba odgovora v strukturirane podatke"""
28
+ # Implementacija parsanja
29
+ return {}
run_app.py ADDED
@@ -0,0 +1,20 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import argparse
2
+
3
+ def main():
4
+ parser = argparse.ArgumentParser()
5
+ parser.add_argument('--interface', type=str, choices=['gradio', 'streamlit'],
6
+ default='gradio', help='Izbira uporabniškega vmesnika')
7
+ args = parser.parse_args()
8
+
9
+ if args.interface == 'gradio':
10
+ from gradio_app import create_interface
11
+ app = create_interface()
12
+ app.launch(share=True)
13
+ else:
14
+ import streamlit.cli as stcli
15
+ import sys
16
+ sys.argv = ["streamlit", "run", "streamlit_app.py"]
17
+ stcli.main()
18
+
19
+ if __name__ == "__main__":
20
+ main()
sales_agent.py ADDED
@@ -0,0 +1,31 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from autogen_magentic_one.agents.base import BaseAgent
2
+ from typing import Dict, Any
3
+
4
+ class SalesAgent(BaseAgent):
5
+ def __init__(self, model_client):
6
+ super().__init__(
7
+ name="SalesAgent",
8
+ description="Agent za pripravo ponudbe in komunikacijo s strankami",
9
+ model_client=model_client
10
+ )
11
+
12
+ async def prepare_offer(self, insurance_data: Dict[str, Any]) -> Dict[str, Any]:
13
+ """Priprava zavarovalne ponudbe"""
14
+ prompt = f"""Pripravi ponudbo za zavarovanje:
15
+ 1. Premium: {insurance_data.get('final_premium')}
16
+ 2. Kritje: {insurance_data.get('coverage_details')}
17
+ 3. Posebni pogoji: {insurance_data.get('special_conditions', [])}
18
+ """
19
+
20
+ response = await self.model_client.complete(prompt)
21
+ return self._format_offer(response, insurance_data)
22
+
23
+ def _format_offer(self, offer_text: str, data: Dict[str, Any]) -> Dict[str, Any]:
24
+ """Oblikovanje končne ponudbe"""
25
+ return {
26
+ "offer_id": "OFF-" + str(hash(offer_text))[:8],
27
+ "premium": data.get('final_premium'),
28
+ "coverage": data.get('coverage_details'),
29
+ "terms": offer_text,
30
+ "valid_until": "30 days"
31
+ }
tools.py ADDED
@@ -0,0 +1,67 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import Dict, Any
2
+ import requests
3
+ import stripe
4
+ from PIL import Image
5
+ import google.auth
6
+ from google.oauth2 import service_account
7
+ from googleapiclient.discovery import build
8
+
9
+ class ImageCaptioning:
10
+ def __init__(self):
11
+ self.model = None # Initialize your image captioning model
12
+
13
+ async def analyze(self, image: Image) -> Dict[str, Any]:
14
+ """Analiza slike in vračanje kontekstualnih informacij"""
15
+ results = {
16
+ "objects": [], # seznam zaznanih objektov
17
+ "count": {}, # število posameznih objektov
18
+ "risk_factors": [] # zaznani dejavniki tveganja
19
+ }
20
+ return results
21
+
22
+ class WebSearch:
23
+ def __init__(self):
24
+ self.session = requests.Session()
25
+
26
+ async def search(self, query: str) -> list:
27
+ """Izvajanje spletnega iskanja in luščenja podatkov"""
28
+ results = []
29
+ # Implementacija spletnega iskanja
30
+ return results
31
+
32
+ class GoogleSheets:
33
+ def __init__(self):
34
+ self.SCOPES = ['https://www.googleapis.com/auth/spreadsheets.readonly']
35
+ self.creds = None
36
+
37
+ async def get_pricing_data(self, coverage_type: str) -> Dict[str, float]:
38
+ """Pridobivanje podatkov o cenah iz Google Sheets"""
39
+ pricing_data = {}
40
+ # Implementacija branja podatkov
41
+ return pricing_data
42
+
43
+ class StripePayment:
44
+ def __init__(self, api_key: str):
45
+ stripe.api_key = api_key
46
+
47
+ async def create_payment(self, amount: float, currency: str) -> Dict[str, Any]:
48
+ """Ustvarjanje Stripe plačila"""
49
+ try:
50
+ payment_intent = stripe.PaymentIntent.create(
51
+ amount=int(amount * 100), # Stripe uporablja najmanjše denarne enote
52
+ currency=currency
53
+ )
54
+ return {"status": "success", "client_secret": payment_intent.client_secret}
55
+ except stripe.error.StripeError as e:
56
+ return {"status": "error", "message": str(e)}
57
+
58
+ class WeatherAPI:
59
+ def __init__(self):
60
+ self.api_key = None
61
+ self.base_url = "https://api.weatherapi.com/v1"
62
+
63
+ async def get_weather_forecast(self, location: str, days: int = 7) -> Dict[str, Any]:
64
+ """Pridobivanje vremenske napovedi"""
65
+ weather_data = {}
66
+ # Implementacija vremenske napovedi
67
+ return weather_data
underwriting_agent.py ADDED
@@ -0,0 +1,30 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from autogen_magentic_one.agents.base import BaseAgent
2
+ from typing import Dict, Any
3
+
4
+ class UnderwritingAgent(BaseAgent):
5
+ def __init__(self, model_client):
6
+ super().__init__(
7
+ name="UnderwritingAgent",
8
+ description="Agent za oceno tveganja in izračun premije",
9
+ model_client=model_client
10
+ )
11
+
12
+ async def calculate_premium(self, risk_data: Dict[str, Any]) -> Dict[str, Any]:
13
+ """Izračun zavarovalne premije na podlagi ocene tveganja"""
14
+ prompt = f"""Oceni tveganje in izračunaj premijo za:
15
+ 1. Identificirani objekti: {risk_data.get('objects', [])}
16
+ 2. Dejavniki tveganja: {risk_data.get('risk_factors', [])}
17
+ 3. Lokacija: {risk_data.get('location', 'Unknown')}
18
+ """
19
+
20
+ response = await self.model_client.complete(prompt)
21
+ return self._calculate_final_premium(response)
22
+
23
+ def _calculate_final_premium(self, assessment: str) -> Dict[str, Any]:
24
+ """Izračun končne premije na podlagi ocene"""
25
+ return {
26
+ "base_premium": 0.0,
27
+ "risk_multiplier": 1.0,
28
+ "final_premium": 0.0,
29
+ "coverage_details": {}
30
+ }