automatedblogpostcreater / csv_handler.py
Pamudu13's picture
Upload 16 files
53e65b7 verified
import csv
from typing import Dict, List
import os
import logging
logger = logging.getLogger(__name__)
class CSVHandler:
def __init__(self):
self.clusters_file = 'clusters.csv'
self.completed_posts_file = 'completed_posts.csv'
def get_cluster_data(self) -> Dict:
try:
with open(self.clusters_file, 'r', newline='') as file:
reader = csv.DictReader(file)
for i, row in enumerate(reader, start=2): # start=2 because row 1 is header
if row['Status'].lower() == 'no':
return {
'Keywords': row['Keywords'],
'Intent': row['Intent'],
'Primary Keyword': row['Primary Keyword'],
'row_number': i
}
return None
except Exception as e:
print(f"Error reading clusters CSV: {e}")
return None
def get_previous_posts(self) -> List[Dict]:
try:
posts = []
with open(self.completed_posts_file, 'r', newline='') as file:
reader = csv.DictReader(file)
for row in reader:
posts.append({
'title': row['Title'],
'keywords': row['Keywords'],
'summary': row['Meta Description'],
'url': row['URL']
})
return posts
except Exception as e:
print(f"Error reading completed posts CSV: {e}")
return []
def mark_cluster_complete(self, row_number: int):
try:
# Read all rows
rows = []
with open(self.clusters_file, 'r', newline='', encoding='utf-8') as file:
reader = csv.reader(file)
header = next(reader) # Get header row
# Find Status column index, default to last column if not found
status_index = header.index('Status') if 'Status' in header else -1
if status_index == -1:
header.append('Status')
status_index = len(header) - 1
rows = [header]
rows.extend(list(reader))
# Update status to 'completed' for the specified row
if row_number < len(rows):
# Ensure row has enough columns
while len(rows[row_number]) <= status_index:
rows[row_number].append('')
rows[row_number][status_index] = 'completed'
# Write back all rows
with open(self.clusters_file, 'w', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
writer.writerows(rows)
except Exception as e:
logger.error(f"Error updating cluster status: {e}")
raise
def log_completed_post(self, metadata: Dict):
try:
with open(self.completed_posts_file, 'a', newline='') as file:
writer = csv.writer(file)
writer.writerow([
metadata['title'],
metadata['keywords'],
metadata['meta_description'],
f"https://yourblog.com/{metadata['slug']}"
])
except Exception as e:
print(f"Error logging completed post: {e}")
def get_all_clusters(self):
"""Get all uncompleted clusters from the CSV file."""
clusters = []
try:
with open(self.clusters_file, 'r', newline='', encoding='utf-8') as file:
reader = csv.DictReader(file)
for row_number, row in enumerate(reader, start=1):
# Check if Status column exists, if not or empty, treat as not completed
status = row.get('Status', '').lower()
if status != 'completed':
cluster_data = {
'Keywords': row.get('Keywords', ''),
'Intent': row.get('Intent', ''),
'Primary Keyword': row.get('Primary Keyword', ''),
'row_number': row_number,
'Status': status
}
# Validate required fields
if all(cluster_data[field] for field in ['Keywords', 'Intent', 'Primary Keyword']):
clusters.append(cluster_data)
else:
logger.warning(f"Row {row_number}: Missing required fields, skipping")
return clusters
except Exception as e:
logger.error(f"Error reading clusters file: {e}")
raise
def process_uploaded_csv(self, csv_content: str) -> List[Dict]:
"""
Process an uploaded CSV content string and return cluster data for blog generation.
Args:
csv_content (str): The decoded CSV content as a string
Returns:
List[Dict]: List of cluster data dictionaries
"""
clusters = []
try:
# Split the content into lines and process as CSV
from io import StringIO
csv_file = StringIO(csv_content)
reader = csv.DictReader(csv_file)
for row_number, row in enumerate(reader, start=1):
# Validate required columns
required_columns = ['Keywords', 'Intent', 'Primary Keyword']
if not all(col in row for col in required_columns):
logger.error(f"Row {row_number}: Missing required columns. Required: {required_columns}")
continue
cluster_data = {
'Keywords': row['Keywords'],
'Intent': row['Intent'],
'Primary Keyword': row['Primary Keyword'],
'row_number': row_number
}
clusters.append(cluster_data)
logger.info(f"Successfully processed {len(clusters)} clusters from uploaded CSV")
return clusters
except Exception as e:
logger.error(f"Error processing uploaded CSV: {e}")
raise
def process_csv_text(self, csv_text: str) -> List[Dict]:
"""
Process CSV content provided as a text string and return cluster data for blog generation.
Args:
csv_text (str): The CSV content as a string
Returns:
List[Dict]: List of cluster data dictionaries
"""
clusters = []
try:
# Split the text into lines
from io import StringIO
csv_file = StringIO(csv_text.strip())
reader = csv.DictReader(csv_file)
for row_number, row in enumerate(reader, start=1):
# Validate required columns
required_columns = ['Keywords', 'Intent', 'Primary Keyword']
if not all(col in row for col in required_columns):
logger.error(f"Row {row_number}: Missing required columns. Required: {required_columns}")
continue
cluster_data = {
'Keywords': row['Keywords'],
'Intent': row['Intent'],
'Primary Keyword': row['Primary Keyword'],
'row_number': row_number
}
clusters.append(cluster_data)
logger.info(f"Successfully processed {len(clusters)} clusters from CSV text")
return clusters
except Exception as e:
logger.error(f"Error processing CSV text: {e}")
raise