lightbulb / agent.py
RobbiePasquale's picture
Upload 20 files
e1392d6 verified
raw
history blame
45 kB
# agent.py
# agent.py
import numpy as np
from mcts import MCTS
from ranking import train_ranking_model
from bs4 import BeautifulSoup
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque, OrderedDict
import random
from sklearn.metrics.pairwise import cosine_similarity
from transformers import GPT2LMHeadModel, GPT2Tokenizer
from sentence_transformers import SentenceTransformer
import hashlib
from twisted.internet import defer
import logging
import json
import os
from urllib.parse import urlparse
logger = logging.getLogger(__name__)
# ==========================
# Prioritized Experience Replay
# ==========================
class SumTree:
"""
SumTree data structure where the parent’s value is the sum of its children.
Leaf nodes contain the priorities of experiences.
"""
def __init__(self, capacity):
self.capacity = capacity
self.tree = np.zeros(2 * capacity - 1)
self.data = np.zeros(capacity, dtype=object)
self.write = 0
self.n_entries = 0
def _propagate(self, idx, change):
parent = (idx - 1) // 2
self.tree[parent] += change
if parent != 0:
self._propagate(parent, change)
def _retrieve(self, idx, s):
left = 2 * idx + 1
right = left + 1
if left >= len(self.tree):
return idx
if s <= self.tree[left]:
return self._retrieve(left, s)
else:
return self._retrieve(right, s - self.tree[left])
def total(self):
return self.tree[0]
def add(self, p, data):
idx = self.write + self.capacity - 1
self.data[self.write] = data
self.update(idx, p)
self.write += 1
if self.write >= self.capacity:
self.write = 0
if self.n_entries < self.capacity:
self.n_entries += 1
def update(self, idx, p):
change = p - self.tree[idx]
self.tree[idx] = p
self._propagate(idx, change)
def get(self, s):
idx = self._retrieve(0, s)
data_idx = idx - self.capacity + 1
return (idx, self.tree[idx], self.data[data_idx])
class PrioritizedReplayMemory:
def __init__(self, capacity, alpha=0.6):
self.tree = SumTree(capacity)
self.alpha = alpha # [0,1] convert the importance of TD error to priority
self.epsilon = 1e-6 # small amount to avoid zero priority
def add(self, error, sample):
p = (np.abs(error) + self.epsilon) ** self.alpha
self.tree.add(p, sample)
def sample(self, batch_size, beta=0.4):
batch = []
idxs = []
segment = self.tree.total() / batch_size
priorities = []
for i in range(batch_size):
a = segment * i
b = segment * (i + 1)
s = random.uniform(a, b)
idx, p, data = self.tree.get(s)
batch.append(data)
idxs.append(idx)
priorities.append(p)
total = self.tree.total()
probs = priorities / total
weights = (self.tree.n_entries * probs) ** (-beta)
weights /= weights.max()
return batch, idxs, weights
def update(self, idx, error):
p = (np.abs(error) + self.epsilon) ** self.alpha
self.tree.update(idx, p)
# ==========================
# Hierarchical Reinforcement Learning (HRL)
# ==========================
class ManagerModel(nn.Module):
"""
High-level policy model (Manager) that decides which option to execute.
"""
def __init__(self, input_size, hidden_size, num_options):
super(ManagerModel, self).__init__()
self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
self.fc = nn.Linear(hidden_size, num_options)
self.layer_norm = nn.LayerNorm(hidden_size)
def forward(self, x, hidden=None):
if x.dim() == 2:
x = x.unsqueeze(1) # Add a time dimension
out, hidden = self.lstm(x, hidden)
last_output = out[:, -1, :]
last_output = self.layer_norm(last_output)
option_scores = self.fc(last_output)
return option_scores, hidden
class WorkerModel(nn.Module):
"""
Low-level policy model (Worker) that executes actions based on the selected option.
"""
def __init__(self, input_size, hidden_size, action_size):
super(WorkerModel, self).__init__()
self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
self.fc = nn.Linear(hidden_size, action_size)
self.layer_norm = nn.LayerNorm(hidden_size)
self.action_size = action_size # Store action_size for reference
def forward(self, x, hidden=None):
if x.dim() == 2:
x = x.unsqueeze(1) # Add a time dimension
out, hidden = self.lstm(x, hidden)
last_output = out[:, -1, :]
last_output = self.layer_norm(last_output)
action_scores = self.fc(last_output)
return action_scores, hidden
def act(self, state, epsilon=0.1):
"""
Selects an action using epsilon-greedy policy.
"""
if random.random() < epsilon:
action = random.randint(0, self.action_size - 1)
return action
state = torch.FloatTensor(state).unsqueeze(0).to(next(self.parameters()).device)
with torch.no_grad():
action_scores, _ = self(state)
action = torch.argmax(action_scores, dim=1).item()
return action
# ==========================
# RAGSummarizer Class
# ==========================
class RAGSummarizer:
def __init__(self, model_name='gpt2', embedding_model='all-MiniLM-L6-v2',
max_length=150, cache_capacity=100, persistent_cache_path='rag_cache.json'):
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.tokenizer = GPT2Tokenizer.from_pretrained(model_name)
self.model = GPT2LMHeadModel.from_pretrained(model_name).to(self.device)
# Explicitly set the device for SentenceTransformer
self.embedding_model = SentenceTransformer(embedding_model, device=self.device)
self.max_length = max_length
self.cache = LRUCache(cache_capacity)
self.persistent_cache_path = persistent_cache_path
self.load_persistent_cache()
def load_persistent_cache(self):
if os.path.exists(self.persistent_cache_path):
with open(self.persistent_cache_path, 'r', encoding='utf-8') as f:
try:
persistent_data = json.load(f)
for key, value in persistent_data.items():
self.cache.put(key, value)
logger.info(f"Loaded persistent cache with {len(persistent_data)} entries.")
except json.JSONDecodeError:
logger.warning("Persistent cache file is corrupted. Initializing empty cache.")
else:
logger.info("No persistent cache found. Starting with empty cache.")
def save_persistent_cache(self):
with open(self.persistent_cache_path, 'w', encoding='utf-8') as f:
json.dump(self.cache.cache, f, indent=2)
logger.info(f"Saved persistent cache with {len(self.cache.cache)} entries.")
def save_rag_data(self, query, chunks, embeddings):
data = {
"query": query,
"chunks": chunks,
"embeddings": embeddings.tolist()
}
os.makedirs("rag_data", exist_ok=True)
filename = f"rag_data/{hash(query)}.json"
with open(filename, 'w') as f:
json.dump(data, f, indent=2)
logger.info(f"Saved RAG data to {filename}")
def split_into_chunks(self, text, chunk_size=200):
words = text.split()
return [' '.join(words[i:i+chunk_size]) for i in range(0, len(words), chunk_size)]
def retrieve_relevant_chunks(self, query, chunks, embeddings, top_k=3):
if embeddings.size(0) == 0:
logger.warning("Embeddings are empty. Cannot retrieve relevant chunks.")
return []
query_embedding = self.embedding_model.encode([query], convert_to_tensor=True)
cosine_scores = cosine_similarity(query_embedding.cpu().numpy(), embeddings.cpu().numpy())[0]
top_indices = cosine_scores.argsort()[-top_k:][::-1]
# Ensure indices are within bounds
top_indices = [idx for idx in top_indices if idx < len(chunks)]
return [chunks[i] for i in top_indices]
def get_embeddings(self, chunks):
# Implement batch processing
batch_size = 32
embeddings = []
for i in range(0, len(chunks), batch_size):
batch = chunks[i:i+batch_size]
batch_embeddings = self.embedding_model.encode(batch, convert_to_tensor=True)
embeddings.append(batch_embeddings)
if embeddings:
return torch.cat(embeddings, dim=0)
else:
return torch.tensor([])
def generate_summary(self, query, relevant_chunks):
cache_key = hashlib.md5((query + ''.join(relevant_chunks)).encode()).hexdigest()
cached_summary = self.cache.get(cache_key)
if cached_summary:
return cached_summary
context = " ".join(relevant_chunks)
prompt = f"Summarize the following content in relation to '{query}': {context}\n\nSummary:"
input_ids = self.tokenizer.encode(prompt, return_tensors='pt').to(self.device)
try:
output = self.model.generate(
input_ids,
max_length=input_ids.shape[1] + self.max_length,
num_return_sequences=1,
no_repeat_ngram_size=2,
top_k=50,
top_p=0.95,
temperature=0.7,
early_stopping=True
)
except Exception as e:
logger.error(f"Error during summary generation: {str(e)}")
return "Summary generation failed."
self.save_rag_data(query, relevant_chunks, self.get_embeddings(relevant_chunks))
summary = self.tokenizer.decode(output[0], skip_special_tokens=True)
summary = summary.split("Summary:")[-1].strip()
self.cache.put(cache_key, summary)
self.save_persistent_cache()
return summary
# ==========================
# WorldModel Class
# ==========================
class WorldModel(nn.Module):
def __init__(self, input_size, hidden_size, output_size, num_layers=2, dropout=0.3):
super(WorldModel, self).__init__()
self.lstm = nn.LSTM(input_size, hidden_size, num_layers=num_layers,
batch_first=True, dropout=dropout)
self.fc = nn.Linear(hidden_size, output_size)
self.value_head = nn.Linear(hidden_size, 1)
self.layer_norm = nn.LayerNorm(hidden_size)
def forward(self, x, hidden=None):
if x.dim() == 2:
x = x.unsqueeze(1) # Add a time dimension
out, hidden = self.lstm(x, hidden)
last_output = out[:, -1, :]
last_output = self.layer_norm(last_output)
action_scores = self.fc(last_output)
state_value = self.value_head(last_output)
return action_scores, state_value, hidden
# ==========================
# Manager and Worker Classes for HRL
# ==========================
class Manager:
def __init__(self, state_size, num_options, hidden_size=128, learning_rate=0.001, gamma=0.99,
epsilon=1.0, epsilon_decay=0.995, epsilon_min=0.01, memory_capacity=1000, device=torch.device("cpu")):
self.state_size = state_size
self.num_options = num_options
self.gamma = gamma
self.epsilon = epsilon
self.epsilon_decay = epsilon_decay
self.epsilon_min = epsilon_min
self.device = device
self.model = ManagerModel(state_size, hidden_size, num_options).to(self.device)
self.target_model = ManagerModel(state_size, hidden_size, num_options).to(self.device)
self.optimizer = optim.AdamW(self.model.parameters(), lr=learning_rate, weight_decay=1e-5)
self.loss_fn = nn.MSELoss()
self.scheduler = optim.lr_scheduler.ReduceLROnPlateau(self.optimizer, 'min', patience=5, factor=0.5, verbose=True)
self.memory = PrioritizedReplayMemory(capacity=memory_capacity, alpha=0.6)
self.update_target_model()
def update_target_model(self):
self.target_model.load_state_dict(self.model.state_dict())
def remember(self, state, option, reward, next_state, done, td_error):
sample = (state, option, reward, next_state, done)
self.memory.add(td_error, sample)
def act(self, state):
if random.random() < self.epsilon:
option = random.randint(0, self.num_options - 1)
return option
state = torch.FloatTensor(state).unsqueeze(0).to(self.model.lstm.weight.device)
with torch.no_grad():
option_scores, _ = self.model(state)
option = torch.argmax(option_scores).item()
return option
def replay(self, batch_size, beta=0.4):
if self.memory.tree.n_entries < batch_size:
return
batch, idxs, weights = self.memory.sample(batch_size, beta)
states, options, rewards, next_states, dones = zip(*batch)
states = torch.FloatTensor(states).to(self.model.lstm.weight.device)
next_states = torch.FloatTensor(next_states).to(self.model.lstm.weight.device)
options = torch.LongTensor(options).unsqueeze(1).to(self.model.lstm.weight.device)
rewards = torch.FloatTensor(rewards).unsqueeze(1).to(self.model.lstm.weight.device)
dones = torch.FloatTensor(dones).unsqueeze(1).to(self.model.lstm.weight.device)
weights = torch.FloatTensor(weights).unsqueeze(1).to(self.model.lstm.weight.device)
# Current Q values
current_q_values, _ = self.model(states)
current_q_values = current_q_values.gather(1, options)
# Target Q values
with torch.no_grad():
next_q_values, _ = self.target_model(next_states)
max_next_q_values = next_q_values.max(1)[0].unsqueeze(1)
target_q_values = rewards + (self.gamma * max_next_q_values * (1 - dones))
# Compute TD errors
td_errors = target_q_values - current_q_values
# Compute loss with importance-sampling weights
loss = (td_errors.pow(2) * weights).mean()
# Optimize the model
self.optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
self.optimizer.step()
self.scheduler.step(loss.item())
# Update priorities
td_errors_np = td_errors.detach().cpu().numpy().squeeze()
for idx, td_error in zip(idxs, td_errors_np):
self.memory.update(idx, np.abs(td_error))
# Decay epsilon
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
# ==========================
# AutonomousWebAgent Class
# ==========================
def truncate_text(text, max_length=1024):
tokens = text.split()
if len(tokens) > max_length:
return ' '.join(tokens[:max_length])
return text
class AutonomousWebAgent:
def __init__(self, state_size, action_size, num_options, hidden_size=64, learning_rate=0.001,
gamma=0.99, epsilon=1.0, epsilon_decay=0.995, epsilon_min=0.01,
knowledge_base_path='knowledge_base.json'):
self.state_size = state_size
self.action_size = action_size
self.num_options = num_options # Number of high-level options for HRL
self.gamma = gamma
self.epsilon = epsilon
self.epsilon_decay = epsilon_decay
self.epsilon_min = epsilon_min
# Initialize RAGSummarizer first to get the device
self.summarizer = RAGSummarizer()
self.device = self.summarizer.device
# Initialize SentenceTransformer with the correct device
self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2', device=self.device)
# Low-level (Worker) Model
self.worker_model = WorldModel(state_size, hidden_size, action_size).to(self.device)
self.worker_target_model = WorldModel(state_size, hidden_size, action_size).to(self.device)
self.worker_optimizer = optim.AdamW(self.worker_model.parameters(), lr=learning_rate, weight_decay=1e-5)
self.worker_loss_fn = nn.MSELoss()
self.worker_scheduler = optim.lr_scheduler.ReduceLROnPlateau(self.worker_optimizer, 'min', patience=5, factor=0.5, verbose=True)
self.worker_memory = PrioritizedReplayMemory(capacity=2000, alpha=0.6)
self.update_worker_target_model()
# High-level (Manager) Model
self.manager = Manager(state_size, num_options, hidden_size=128, learning_rate=learning_rate,
gamma=gamma, epsilon=epsilon, epsilon_decay=epsilon_decay,
epsilon_min=epsilon_min, memory_capacity=1000, device=self.device)
self.mcts = MCTS(initial_state="")
logger.info(f"Initialized AutonomousWebAgent with state_size={state_size}, action_size={action_size}, num_options={num_options}")
self.site_performance = {} # {(site, query): performance_score}
# List of all search sites (base URLs without the query)
self.all_search_sites = [
"https://en.wikibooks.org/w/index.php?search=",
"https://en.wikiversity.org/w/index.php?search=",
"https://commons.wikimedia.org/w/index.php?search=",
"https://stackexchange.com/search?q=",
"https://arxiv.org/search/?query=",
"https://www.ncbi.nlm.nih.gov/pmc/?term=",
"https://www.gutenberg.org/ebooks/search/?query=",
"https://openlibrary.org/search?q=",
"https://doaj.org/search/articles?ref=homepage&q=",
"https://www.ted.com/search?q=",
"https://en.citizendium.org/wiki?search=",
"https://www.jstor.org/action/doBasicSearch?Query=",
"https://archive.org/search.php?query=",
"https://search.scielo.org/?q=",
"https://paperswithcode.com/search?q=",
"https://www.reddit.com/search/?q=",
"https://huggingface.co./models?search=",
"https://huggingface.co./datasets?search=",
"https://machinelearningmastery.com/?s=",
"https://www.kaggle.com/search?q=",
"https://towardsdatascience.com/search?q=",
"https://github.com/search?q=",
"https://stackoverflow.com/search?q=",
"https://www.youtube.com/results?search_query=",
"https://www.slideshare.net/search/slideshow?searchfrom=header&q="
]
# Initialize Knowledge Base
self.knowledge_base_path = knowledge_base_path
self.knowledge_base = []
self.kb_embeddings = None
self.load_knowledge_base()
# Additional Features for State Representation
self.additional_features = ['image_count', 'script_count', 'css_count']
def save(self, filename):
"""Save the entire agent state."""
state = {
'worker_model': self.worker_model.state_dict(),
'manager_model': self.manager.model.state_dict(),
'worker_optimizer': self.worker_optimizer.state_dict(),
'manager_optimizer': self.manager.optimizer.state_dict(),
'epsilon': self.epsilon
}
torch.save(state, filename)
logger.info(f"Saved agent state to {filename}")
def load(self, filename):
"""Load the entire agent state."""
state = torch.load(filename, map_location=self.device)
self.worker_model.load_state_dict(state['worker_model'])
self.manager.model.load_state_dict(state['manager_model'])
self.worker_optimizer.load_state_dict(state['worker_optimizer'])
self.manager.optimizer.load_state_dict(state['manager_optimizer'])
self.epsilon = state['epsilon']
logger.info(f"Loaded agent state from {filename}")
# ==========================
# Text Generation
# ==========================
def generate_text(self, prompt):
# Use the RAGSummarizer to generate text
chunks = self.summarizer.split_into_chunks(prompt)
embeddings = self.summarizer.get_embeddings(chunks)
relevant_chunks = self.summarizer.retrieve_relevant_chunks(query=prompt, chunks=chunks, embeddings=embeddings)
generated_text = self.summarizer.generate_summary(prompt, relevant_chunks)
return generated_text
# ==========================
# Knowledge Base Management
# ==========================
def load_knowledge_base(self):
if not os.path.exists(self.knowledge_base_path):
logger.warning(f"Knowledge base file {self.knowledge_base_path} does not exist. Initializing empty KB.")
self.knowledge_base = []
self.kb_embeddings = torch.tensor([]).to(self.device)
return
with open(self.knowledge_base_path, 'r', encoding='utf-8') as f:
self.knowledge_base = json.load(f)
if self.knowledge_base:
texts = [doc['content'] for doc in self.knowledge_base]
self.kb_embeddings = self.embedding_model.encode(texts, convert_to_tensor=True)
logger.info(f"Loaded {len(self.knowledge_base)} documents into the knowledge base.")
else:
self.kb_embeddings = torch.tensor([]).to(self.device)
logger.info("Knowledge base is empty.")
def save_knowledge_base(self):
with open(self.knowledge_base_path, 'w', encoding='utf-8') as f:
json.dump(self.knowledge_base, f, indent=2)
logger.info(f"Knowledge base saved with {len(self.knowledge_base)} documents.")
def add_document_to_kb(self, title, content, metadata=None):
document = {
"title": title,
"content": content,
"metadata": metadata or {}
}
self.knowledge_base.append(document)
# Update embeddings
new_embedding = self.embedding_model.encode([content], convert_to_tensor=True).to(self.device)
if self.kb_embeddings.numel() == 0:
self.kb_embeddings = new_embedding
else:
self.kb_embeddings = torch.cat([self.kb_embeddings, new_embedding], dim=0)
# Save to knowledge base
self.save_knowledge_base()
logger.info(f"Added new document to knowledge base: {title}")
def retrieve_from_kb(self, query, top_k=5):
if not self.knowledge_base:
logger.warning("Knowledge base is empty. No documents to retrieve.")
return []
query_embedding = self.embedding_model.encode([query], convert_to_tensor=True).to(self.device)
if self.kb_embeddings is None or self.kb_embeddings.numel() == 0:
logger.warning("Knowledge base embeddings are empty. No documents to retrieve.")
return []
if query_embedding.size(1) != self.kb_embeddings.size(1):
logger.error("Dimension mismatch between query embedding and KB embeddings.")
return []
cosine_scores = cosine_similarity(query_embedding.cpu().numpy(), self.kb_embeddings.cpu().numpy())[0]
top_indices = cosine_scores.argsort()[-top_k:][::-1]
# Ensure indices are within the knowledge_base length
top_indices = [idx for idx in top_indices if idx < len(self.knowledge_base)]
retrieved_docs = []
for idx in top_indices:
doc = self.knowledge_base[idx]
doc['score'] = cosine_scores[idx]
retrieved_docs.append(doc)
logger.info(f"Retrieved top {len(retrieved_docs)} documents from Knowledge Base for the query.")
return retrieved_docs
# ==========================
# RAG Integration
# ==========================
def retrieve_from_web(self, query, top_k=5):
logger.info(f"Performing web search for query: {query}")
mcts_iterations = self.calculate_mcts_iterations(np.zeros(self.state_size, dtype=np.float32))
self.mcts = MCTS(initial_state=query, num_simulations=mcts_iterations)
try:
new_query = yield self.mcts.run()
logger.debug(f"New query from MCTS: {new_query}")
# Select search sites
search_sites = self.select_search_sites(new_query)
results = yield self.mcts.web_search(new_query, search_sites)
logger.debug(f"Web search completed. Found {len(results)} results")
return results[:top_k] if results else []
except Exception as e:
logger.error(f"Error during MCTS or web search: {str(e)}", exc_info=True)
return []
def combine_documents(self, kb_docs, web_docs):
combined = kb_docs + web_docs
logger.info(f"Combined {len(kb_docs)} KB documents and {len(web_docs)} Web documents.")
return combined
def save_llm_training_data(self, query, content, summary=None, link=None, title=None):
data = {
"query": query,
"search_result": {
"link": link,
"title": title
},
"content": content,
"description": summary
}
os.makedirs("llm_training_data", exist_ok=True)
file_path = "llm_training_data/llm_training_data.jsonl"
# Append the new data as a new line in the JSONL file
with open(file_path, 'a', encoding='utf-8') as f:
json.dump(data, f)
f.write('\n')
logger.info(f"Appended LLM training data to {file_path}")
# ==========================
# Hierarchical RL Integration
# ==========================
def remember_manager(self, state, option, reward, next_state, done, td_error):
self.manager.remember(state, option, reward, next_state, done, td_error)
def remember_worker(self, state, action, reward, next_state, done):
self.worker_memory.add(reward, (state, action, reward, next_state, done))
# ==========================
# Action Selection and Execution
# ==========================
def act_manager(self, state):
option = self.manager.act(state)
return option
def act_worker(self, state):
action = self.worker_model.act(state, epsilon=self.epsilon)
return action
# ==========================
# Replay Methods
# ==========================
def replay_manager(self, batch_size=32, beta=0.4):
self.manager.replay(batch_size, beta)
def replay_worker(self, batch_size=32, beta=0.4):
result = self.worker_memory.replay(batch_size, beta)
if result is None:
return
batch, idxs, weights = result
if len(self.worker_memory.tree.data) >= batch_size:
batch, idxs, weights = self.worker_memory.sample(batch_size, beta)
states, actions, rewards, next_states, dones = zip(*batch)
states = torch.FloatTensor(states).to(self.worker_model.lstm.weight.device)
next_states = torch.FloatTensor(next_states).to(self.worker_model.lstm.weight.device)
actions = torch.LongTensor(actions).unsqueeze(1).to(self.worker_model.lstm.weight.device)
rewards = torch.FloatTensor(rewards).unsqueeze(1).to(self.worker_model.lstm.weight.device)
dones = torch.FloatTensor(dones).unsqueeze(1).to(self.worker_model.lstm.weight.device)
weights = torch.FloatTensor(weights).unsqueeze(1).to(self.worker_model.lstm.weight.device)
# Current Q values
current_q_values, _ = self.worker_model(states)
current_q_values = current_q_values.gather(1, actions)
# Target Q values
with torch.no_grad():
next_q_values, _ = self.worker_target_model(next_states)
max_next_q_values = next_q_values.max(1)[0].unsqueeze(1)
target_q_values = rewards + (self.gamma * max_next_q_values * (1 - dones))
# Compute TD errors
td_errors = target_q_values - current_q_values
# Compute loss with importance-sampling weights
loss = (td_errors.pow(2) * weights).mean()
# Optimize the model
self.worker_optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(self.worker_model.parameters(), max_norm=1.0)
self.worker_optimizer.step()
self.worker_scheduler.step(loss.item())
# Update priorities
td_errors_np = td_errors.detach().cpu().numpy().squeeze()
for idx, td_error in zip(idxs, td_errors_np):
self.worker_memory.update(idx, np.abs(td_error))
# Decay epsilon
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
logger.debug(f"Updated epsilon to: {self.epsilon}")
# ==========================
# Load and Save Models
# ==========================
def load_worker_model(self, name):
self.worker_model.load_state_dict(torch.load(name, map_location=self.device))
logger.info(f"Loaded worker model weights from {name}")
def save_worker_model(self, name):
torch.save(self.worker_model.state_dict(), name)
logger.info(f"Saved worker model weights to {name}")
def load_manager_model(self, name):
self.manager.model.load_state_dict(torch.load(name, map_location=self.device))
self.manager.update_target_model()
logger.info(f"Loaded manager model weights from {name}")
def save_manager_model(self, name):
torch.save(self.manager.model.state_dict(), name)
logger.info(f"Saved manager model weights to {name}")
# ==========================
# Update Target Models
# ==========================
def update_worker_target_model(self):
self.worker_target_model.load_state_dict(self.worker_model.state_dict())
logger.info("Updated worker target model with current model weights")
def update_manager_target_model(self):
self.manager.update_target_model()
logger.info("Updated manager target model with current model weights")
# ==========================
# Feature Extraction
# ==========================
def extract_features(self, content, query):
content = truncate_text(content)
query = truncate_text(query)
soup = BeautifulSoup(content, 'html.parser')
text = soup.get_text()
word_count = len(text.split())
link_count = len(soup.find_all('a'))
header_count = len(soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6']))
# Calculate semantic similarity
text_embedding = self.embedding_model.encode([text], convert_to_tensor=True).to(self.device)
query_embedding = self.embedding_model.encode([query], convert_to_tensor=True).to(self.device)
semantic_similarity = cosine_similarity(text_embedding.cpu().numpy(), query_embedding.cpu().numpy())[0][0]
# Additional Features
image_count = len(soup.find_all('img'))
script_count = len(soup.find_all('script'))
css_count = len(soup.find_all('link', rel='stylesheet'))
return np.array([word_count, link_count, header_count, semantic_similarity, image_count, script_count, css_count])
# ==========================
# Reward Calculation
# ==========================
def calculate_reward(self, content, query):
try:
ranked_results = train_ranking_model(query, [{'content': content}])
logger.debug(f"Ranked results: {ranked_results}")
if ranked_results and isinstance(ranked_results[0], dict) and 'predicted_score' in ranked_results[0]:
reward = max(ranked_results[0]['predicted_score'], 0)
logger.debug(f"Calculated reward: {reward}")
return reward
else:
logger.warning(f"Invalid ranked results: {ranked_results}")
return 0
except Exception as e:
logger.error(f"Error in calculate_reward: {str(e)}", exc_info=True)
return 0
# ==========================
# Search Site Selection
# ==========================
def select_search_sites(self, query, num_sites=5):
# Select top sites based on past performance for this query
site_scores = {}
for (site, q), score in self.site_performance.items():
if q == query:
site_scores[site] = site_scores.get(site, 0) + score
if site_scores:
sorted_sites = sorted(site_scores.items(), key=lambda x: x[1], reverse=True)
top_sites = [site for site, score in sorted_sites[:num_sites]]
else:
# If no past data, select random sites
top_sites = random.sample(self.all_search_sites, num_sites)
# Construct full URLs with query
search_sites = [site + query for site in top_sites]
return search_sites
# ==========================
# Search Method with HRL
# ==========================
@defer.inlineCallbacks
def search(self, query, max_steps=2):
logger.info(f"Starting search for query: {query}")
state = np.zeros(self.state_size, dtype=np.float32)
total_reward = 0
content = ""
done = False
results = None
try:
# High-Level: Manager selects an option
option = self.act_manager(state)
logger.debug(f"Manager selected option: {option}")
# Execute the selected option
if option == 0: # Search Option
logger.debug("Executing Search Option")
results = yield self.retrieve_from_web(query)
if results:
content = results[0]['content']
site = urlparse(results[0]['link']).netloc
self.save_llm_training_data(
query,
content,
summary=results[0].get('summary'),
link=results[0].get('link'),
title=results[0].get('title')
)
self.add_document_to_kb(title=results[0].get('title', 'No Title'), content=content, metadata=results[0].get('meta', {}))
next_state = self.extract_features(content, query)
reward = self.calculate_reward(content, query)
logger.debug(f"Extracted features: {next_state}, Reward: {reward}")
# Update site performance
key = (site, query)
self.site_performance[key] = self.site_performance.get(key, 0) + reward
# Remember Manager's experience
self.remember_manager(state, option, reward, next_state, done, td_error=reward)
# Remember Worker's experience
self.remember_worker(state, 0, reward, next_state, done)
state = next_state.astype(np.float32)
total_reward += reward
else:
reward = -1
logger.warning(f"No results for query: {query}")
# Remember Manager's experience
self.remember_manager(state, option, reward, state, True, td_error=reward)
elif option == 1: # Summarize Option
logger.debug("Executing Summarize Option")
if content:
summary = self.summarizer.generate_summary(content, query)
self.save_llm_training_data(
query,
content,
summary=summary,
link=results[0].get('link') if results else None,
title=results[0].get('title') if results else None
)
reward = self.calculate_reward(summary, query)
next_state = self.extract_features(summary, query)
logger.info(f"Summary:\n{summary}")
logger.info(f"Summarized content. Reward: {reward}")
# Remember Manager's experience
self.remember_manager(state, option, reward, next_state, done, td_error=reward)
# Remember Worker's experience
self.remember_worker(state, 1, reward, next_state, done)
state = next_state.astype(np.float32)
total_reward += reward
else:
reward = -1
logger.warning("No content to summarize")
# Remember Manager's experience
self.remember_manager(state, option, reward, state, True, td_error=reward)
elif option == 2: # RAG-based Generation Option
logger.debug("Executing RAG-based Generation Option")
kb_docs = self.retrieve_from_kb(query, top_k=5)
web_docs = [] # Assuming web_docs are already retrieved
combined_docs = self.combine_documents(kb_docs, web_docs)
generated_output = self.generate_rag_response(query, combined_docs)
logger.info(f"Generated Output:\n{generated_output}")
self.save_llm_training_data(
query,
generated_output,
summary=None,
link=None,
title="RAG-generated response"
)
reward = self.calculate_reward(generated_output, query)
next_state = self.extract_features(generated_output, query)
# Remember Manager's experience
self.remember_manager(state, option, reward, next_state, done, td_error=reward)
# Remember Worker's experience
self.remember_worker(state, 2, reward, next_state, done)
state = next_state.astype(np.float32)
total_reward += reward
else:
logger.warning(f"Unknown option selected by Manager: {option}")
# Perform replay for both Manager and Worker
self.replay_manager(batch_size=32, beta=0.4)
self.replay_worker(batch_size=32, beta=0.4)
# Update target models periodically
self.update_worker_target_model()
self.update_manager_target_model()
logger.info(f"Search completed. Total reward: {total_reward}")
defer.returnValue(total_reward)
except Exception as e:
logger.error(f"Error during search: {str(e)}", exc_info=True)
defer.returnValue(-1) # Return a negative reward on error
# ==========================
# Summarization Method
# ==========================
def summarize(self, content, query):
chunks = self.summarizer.split_into_chunks(content)
embeddings = self.summarizer.get_embeddings(chunks)
relevant_chunks = self.summarizer.retrieve_relevant_chunks(query, chunks, embeddings)
summary = self.summarizer.generate_summary(query, relevant_chunks)
# Save RAG data
self.summarizer.save_rag_data(query, chunks, embeddings)
return summary
# ==========================
# MCTS Iterations Calculation
# ==========================
def calculate_mcts_iterations(self, state):
# Calculate MCTS iterations based on state complexity
base_iterations = 2
complexity_factor = np.mean(state) / 100 # Normalize state values
iterations = int(base_iterations * (1 + complexity_factor))
max_iterations = 5 # Set a reasonable maximum
return min(iterations, max_iterations)
# ==========================
# RAG-based Response Generation
# ==========================
def generate_rag_response(self, query, combined_docs):
if not combined_docs:
logger.warning("No documents available for RAG-based generation.")
return "I'm sorry, I couldn't find any relevant information."
# Prepare context for the generator
context = "\n\n".join([f"Title: {doc.get('title', 'No Title')}\nContent: {doc.get('content', '')}" for doc in combined_docs])
prompt = f"Query: {query}\n\nContext:\n{context}\n\nAnswer:"
# Check cache first
cache_key = hashlib.md5(prompt.encode()).hexdigest()
cached_response = self.summarizer.cache.get(cache_key)
if cached_response:
logger.debug("Using cached RAG response.")
return cached_response
# Generate response
input_ids = self.summarizer.tokenizer.encode(prompt, return_tensors='pt').to(self.summarizer.device)
try:
output = self.summarizer.model.generate(
input_ids,
max_length=input_ids.shape[1] + self.summarizer.max_length,
num_return_sequences=1,
no_repeat_ngram_size=2,
top_k=50,
top_p=0.95,
temperature=0.7,
early_stopping=True
)
except Exception as e:
logger.error(f"Error during RAG response generation: {str(e)}")
return "RAG response generation failed."
response = self.summarizer.tokenizer.decode(output[0], skip_special_tokens=True)
answer = response.split("Answer:")[-1].strip()
# Cache the response
self.summarizer.cache.put(cache_key, answer)
self.summarizer.save_persistent_cache()
return answer
# ==========================
# Manager and Worker Interaction
# ==========================
def select_option(self, option):
"""
Define the mapping of options to their corresponding actions.
"""
# This can be expanded based on the number of options
option_actions = {
0: self.perform_search,
1: self.perform_summarization,
2: self.perform_rag_generation
}
action = option_actions.get(option, None)
if action:
return action
else:
logger.error(f"No action defined for option: {option}")
return None
def perform_search(self, query):
"""
Perform the search action.
"""
# Implementation is handled in the 'search' method
pass
def perform_summarization(self, content, query):
"""
Perform the summarization action.
"""
# Implementation is handled in the 'summarize' method
pass
def perform_rag_generation(self, query, combined_docs):
"""
Perform the RAG-based generation action.
"""
# Implementation is handled in the 'generate_rag_response' method
pass
# ==========================
# LRUCache Class
# ==========================
class LRUCache:
def __init__(self, capacity):
self.cache = OrderedDict()
self.capacity = capacity
def get(self, key):
if key not in self.cache:
return None
self.cache.move_to_end(key)
return self.cache[key]
def put(self, key, value):
if key in self.cache:
self.cache.move_to_end(key)
self.cache[key] = value
if len(self.cache) > self.capacity:
self.cache.popitem(last=False)