Spaces:
Sleeping
Sleeping
import csv | |
from typing import Dict, List | |
import os | |
import logging | |
logger = logging.getLogger(__name__) | |
class CSVHandler: | |
def __init__(self): | |
self.clusters_file = 'clusters.csv' | |
self.completed_posts_file = 'completed_posts.csv' | |
def get_cluster_data(self) -> Dict: | |
try: | |
with open(self.clusters_file, 'r', newline='') as file: | |
reader = csv.DictReader(file) | |
for i, row in enumerate(reader, start=2): # start=2 because row 1 is header | |
if row['Status'].lower() == 'no': | |
return { | |
'Keywords': row['Keywords'], | |
'Intent': row['Intent'], | |
'Primary Keyword': row['Primary Keyword'], | |
'row_number': i | |
} | |
return None | |
except Exception as e: | |
print(f"Error reading clusters CSV: {e}") | |
return None | |
def get_previous_posts(self) -> List[Dict]: | |
try: | |
posts = [] | |
with open(self.completed_posts_file, 'r', newline='') as file: | |
reader = csv.DictReader(file) | |
for row in reader: | |
posts.append({ | |
'title': row['Title'], | |
'keywords': row['Keywords'], | |
'summary': row['Meta Description'], | |
'url': row['URL'] | |
}) | |
return posts | |
except Exception as e: | |
print(f"Error reading completed posts CSV: {e}") | |
return [] | |
def mark_cluster_complete(self, row_number: int): | |
try: | |
# Read all rows | |
rows = [] | |
with open(self.clusters_file, 'r', newline='', encoding='utf-8') as file: | |
reader = csv.reader(file) | |
header = next(reader) # Get header row | |
# Find Status column index, default to last column if not found | |
status_index = header.index('Status') if 'Status' in header else -1 | |
if status_index == -1: | |
header.append('Status') | |
status_index = len(header) - 1 | |
rows = [header] | |
rows.extend(list(reader)) | |
# Update status to 'completed' for the specified row | |
if row_number < len(rows): | |
# Ensure row has enough columns | |
while len(rows[row_number]) <= status_index: | |
rows[row_number].append('') | |
rows[row_number][status_index] = 'completed' | |
# Write back all rows | |
with open(self.clusters_file, 'w', newline='', encoding='utf-8') as file: | |
writer = csv.writer(file) | |
writer.writerows(rows) | |
except Exception as e: | |
logger.error(f"Error updating cluster status: {e}") | |
raise | |
def log_completed_post(self, metadata: Dict): | |
try: | |
with open(self.completed_posts_file, 'a', newline='') as file: | |
writer = csv.writer(file) | |
writer.writerow([ | |
metadata['title'], | |
metadata['keywords'], | |
metadata['meta_description'], | |
f"https://yourblog.com/{metadata['slug']}" | |
]) | |
except Exception as e: | |
print(f"Error logging completed post: {e}") | |
def get_all_clusters(self): | |
"""Get all uncompleted clusters from the CSV file.""" | |
clusters = [] | |
try: | |
with open(self.clusters_file, 'r', newline='', encoding='utf-8') as file: | |
reader = csv.DictReader(file) | |
for row_number, row in enumerate(reader, start=1): | |
# Check if Status column exists, if not or empty, treat as not completed | |
status = row.get('Status', '').lower() | |
if status != 'completed': | |
cluster_data = { | |
'Keywords': row.get('Keywords', ''), | |
'Intent': row.get('Intent', ''), | |
'Primary Keyword': row.get('Primary Keyword', ''), | |
'row_number': row_number, | |
'Status': status | |
} | |
# Validate required fields | |
if all(cluster_data[field] for field in ['Keywords', 'Intent', 'Primary Keyword']): | |
clusters.append(cluster_data) | |
else: | |
logger.warning(f"Row {row_number}: Missing required fields, skipping") | |
return clusters | |
except Exception as e: | |
logger.error(f"Error reading clusters file: {e}") | |
raise | |
def process_uploaded_csv(self, csv_content: str) -> List[Dict]: | |
""" | |
Process an uploaded CSV content string and return cluster data for blog generation. | |
Args: | |
csv_content (str): The decoded CSV content as a string | |
Returns: | |
List[Dict]: List of cluster data dictionaries | |
""" | |
clusters = [] | |
try: | |
# Split the content into lines and process as CSV | |
from io import StringIO | |
csv_file = StringIO(csv_content) | |
reader = csv.DictReader(csv_file) | |
for row_number, row in enumerate(reader, start=1): | |
# Validate required columns | |
required_columns = ['Keywords', 'Intent', 'Primary Keyword'] | |
if not all(col in row for col in required_columns): | |
logger.error(f"Row {row_number}: Missing required columns. Required: {required_columns}") | |
continue | |
cluster_data = { | |
'Keywords': row['Keywords'], | |
'Intent': row['Intent'], | |
'Primary Keyword': row['Primary Keyword'], | |
'row_number': row_number | |
} | |
clusters.append(cluster_data) | |
logger.info(f"Successfully processed {len(clusters)} clusters from uploaded CSV") | |
return clusters | |
except Exception as e: | |
logger.error(f"Error processing uploaded CSV: {e}") | |
raise | |
def process_csv_text(self, csv_text: str) -> List[Dict]: | |
""" | |
Process CSV content provided as a text string and return cluster data for blog generation. | |
Args: | |
csv_text (str): The CSV content as a string | |
Returns: | |
List[Dict]: List of cluster data dictionaries | |
""" | |
clusters = [] | |
try: | |
# Split the text into lines | |
from io import StringIO | |
csv_file = StringIO(csv_text.strip()) | |
reader = csv.DictReader(csv_file) | |
for row_number, row in enumerate(reader, start=1): | |
# Validate required columns | |
required_columns = ['Keywords', 'Intent', 'Primary Keyword'] | |
if not all(col in row for col in required_columns): | |
logger.error(f"Row {row_number}: Missing required columns. Required: {required_columns}") | |
continue | |
cluster_data = { | |
'Keywords': row['Keywords'], | |
'Intent': row['Intent'], | |
'Primary Keyword': row['Primary Keyword'], | |
'row_number': row_number | |
} | |
clusters.append(cluster_data) | |
logger.info(f"Successfully processed {len(clusters)} clusters from CSV text") | |
return clusters | |
except Exception as e: | |
logger.error(f"Error processing CSV text: {e}") | |
raise |