import csv from typing import Dict, List import os import logging logger = logging.getLogger(__name__) class CSVHandler: def __init__(self): self.clusters_file = 'clusters.csv' self.completed_posts_file = 'completed_posts.csv' def get_cluster_data(self) -> Dict: try: with open(self.clusters_file, 'r', newline='') as file: reader = csv.DictReader(file) for i, row in enumerate(reader, start=2): # start=2 because row 1 is header if row['Status'].lower() == 'no': return { 'Keywords': row['Keywords'], 'Intent': row['Intent'], 'Primary Keyword': row['Primary Keyword'], 'row_number': i } return None except Exception as e: print(f"Error reading clusters CSV: {e}") return None def get_previous_posts(self) -> List[Dict]: try: posts = [] with open(self.completed_posts_file, 'r', newline='') as file: reader = csv.DictReader(file) for row in reader: posts.append({ 'title': row['Title'], 'keywords': row['Keywords'], 'summary': row['Meta Description'], 'url': row['URL'] }) return posts except Exception as e: print(f"Error reading completed posts CSV: {e}") return [] def mark_cluster_complete(self, row_number: int): try: # Read all rows rows = [] with open(self.clusters_file, 'r', newline='', encoding='utf-8') as file: reader = csv.reader(file) header = next(reader) # Get header row # Find Status column index, default to last column if not found status_index = header.index('Status') if 'Status' in header else -1 if status_index == -1: header.append('Status') status_index = len(header) - 1 rows = [header] rows.extend(list(reader)) # Update status to 'completed' for the specified row if row_number < len(rows): # Ensure row has enough columns while len(rows[row_number]) <= status_index: rows[row_number].append('') rows[row_number][status_index] = 'completed' # Write back all rows with open(self.clusters_file, 'w', newline='', encoding='utf-8') as file: writer = csv.writer(file) writer.writerows(rows) except Exception as e: logger.error(f"Error updating cluster status: {e}") raise def log_completed_post(self, metadata: Dict): try: with open(self.completed_posts_file, 'a', newline='') as file: writer = csv.writer(file) writer.writerow([ metadata['title'], metadata['keywords'], metadata['meta_description'], f"https://yourblog.com/{metadata['slug']}" ]) except Exception as e: print(f"Error logging completed post: {e}") def get_all_clusters(self): """Get all uncompleted clusters from the CSV file.""" clusters = [] try: with open(self.clusters_file, 'r', newline='', encoding='utf-8') as file: reader = csv.DictReader(file) for row_number, row in enumerate(reader, start=1): # Check if Status column exists, if not or empty, treat as not completed status = row.get('Status', '').lower() if status != 'completed': cluster_data = { 'Keywords': row.get('Keywords', ''), 'Intent': row.get('Intent', ''), 'Primary Keyword': row.get('Primary Keyword', ''), 'row_number': row_number, 'Status': status } # Validate required fields if all(cluster_data[field] for field in ['Keywords', 'Intent', 'Primary Keyword']): clusters.append(cluster_data) else: logger.warning(f"Row {row_number}: Missing required fields, skipping") return clusters except Exception as e: logger.error(f"Error reading clusters file: {e}") raise def process_uploaded_csv(self, csv_content: str) -> List[Dict]: """ Process an uploaded CSV content string and return cluster data for blog generation. Args: csv_content (str): The decoded CSV content as a string Returns: List[Dict]: List of cluster data dictionaries """ clusters = [] try: # Split the content into lines and process as CSV from io import StringIO csv_file = StringIO(csv_content) reader = csv.DictReader(csv_file) for row_number, row in enumerate(reader, start=1): # Validate required columns required_columns = ['Keywords', 'Intent', 'Primary Keyword'] if not all(col in row for col in required_columns): logger.error(f"Row {row_number}: Missing required columns. Required: {required_columns}") continue cluster_data = { 'Keywords': row['Keywords'], 'Intent': row['Intent'], 'Primary Keyword': row['Primary Keyword'], 'row_number': row_number } clusters.append(cluster_data) logger.info(f"Successfully processed {len(clusters)} clusters from uploaded CSV") return clusters except Exception as e: logger.error(f"Error processing uploaded CSV: {e}") raise def process_csv_text(self, csv_text: str) -> List[Dict]: """ Process CSV content provided as a text string and return cluster data for blog generation. Args: csv_text (str): The CSV content as a string Returns: List[Dict]: List of cluster data dictionaries """ clusters = [] try: # Split the text into lines from io import StringIO csv_file = StringIO(csv_text.strip()) reader = csv.DictReader(csv_file) for row_number, row in enumerate(reader, start=1): # Validate required columns required_columns = ['Keywords', 'Intent', 'Primary Keyword'] if not all(col in row for col in required_columns): logger.error(f"Row {row_number}: Missing required columns. Required: {required_columns}") continue cluster_data = { 'Keywords': row['Keywords'], 'Intent': row['Intent'], 'Primary Keyword': row['Primary Keyword'], 'row_number': row_number } clusters.append(cluster_data) logger.info(f"Successfully processed {len(clusters)} clusters from CSV text") return clusters except Exception as e: logger.error(f"Error processing CSV text: {e}") raise