Spaces:
Running
Running
from fastapi import FastAPI, HTTPException, Request, Form | |
from fastapi.responses import HTMLResponse | |
from fastapi.templating import Jinja2Templates | |
from datetime import datetime, timedelta | |
import random | |
import folium | |
from folium.plugins import MarkerCluster | |
import csv | |
import os | |
app = FastAPI() | |
templates = Jinja2Templates(directory="templates") | |
CSV_FILE = "wifi_signals.csv" | |
def generate_data(): | |
base_latitude = 35.6837 | |
base_longitude = 139.6805 | |
start_date = datetime(2024, 8, 1) | |
end_date = datetime(2024, 8, 7) | |
delta = end_date - start_date | |
with open(CSV_FILE, mode='w', newline='') as file: | |
writer = csv.writer(file) | |
writer.writerow(['latitude', 'longitude', 'timestamp', 'signal_strength']) | |
for _ in range(100): | |
random_days = random.randint(0, delta.days) | |
random_seconds = random.randint(0, 86400) | |
random_time = start_date + timedelta(days=random_days, seconds=random_seconds) | |
random_latitude = base_latitude + random.uniform(-0.01, 0.01) | |
random_longitude = base_longitude + random.uniform(-0.01, 0.01) | |
random_signal_strength = random.randint(0, 4) | |
writer.writerow([ | |
random_latitude, | |
random_longitude, | |
random_time.strftime('%Y-%m-%d %H:%M:%S'), | |
random_signal_strength | |
]) | |
return {"message": "Demo data generated successfully"} | |
def delete_data(): | |
try: | |
if os.path.exists(CSV_FILE): | |
os.remove(CSV_FILE) | |
# Create a new empty CSV file with headers | |
with open(CSV_FILE, mode='w', newline='') as file: | |
writer = csv.writer(file) | |
writer.writerow(['latitude', 'longitude', 'timestamp', 'signal_strength']) | |
return {"message": "Data file deleted and reset successfully"} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") | |
async def upload_data( | |
latitude: float = Form(...), | |
longitude: float = Form(...), | |
timestamp: str = Form(...), | |
signal_strength: int = Form(...) | |
): | |
try: | |
# Validate timestamp format | |
datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S') | |
with open(CSV_FILE, mode='a', newline='') as file: | |
writer = csv.writer(file) | |
writer.writerow([latitude, longitude, timestamp, signal_strength]) | |
return {"message": "Data uploaded successfully"} | |
except ValueError: | |
raise HTTPException(status_code=400, detail="Invalid timestamp format. Use 'YYYY-MM-DD HH:MM:SS'") | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") | |
def show_map(request: Request, start_date: str = None, end_date: str = None): | |
signal_data = [] | |
if os.path.exists(CSV_FILE): | |
with open(CSV_FILE, mode='r') as file: | |
reader = csv.reader(file) | |
next(reader) # Skip header row | |
for row in reader: | |
lat, lon, timestamp, strength = row | |
timestamp = datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S') | |
if start_date and end_date: | |
start_datetime = datetime.strptime(start_date, '%Y-%m-%d') | |
end_datetime = datetime.strptime(end_date, '%Y-%m-%d') | |
if start_datetime <= timestamp <= end_datetime: | |
signal_data.append((float(lat), float(lon), int(strength))) | |
else: | |
signal_data.append((float(lat), float(lon), int(strength))) | |
if signal_data: | |
min_signal = min(s[2] for s in signal_data) | |
max_signal = max(s[2] for s in signal_data) | |
else: | |
min_signal = max_signal = 1 | |
m = folium.Map(location=[35.6837, 139.6805], zoom_start=12) | |
marker_cluster = MarkerCluster().add_to(m) | |
for lat, lon, signal_strength in signal_data: | |
if max_signal - min_signal != 0: | |
radius = 5 + 10 * ((signal_strength - min_signal) / (max_signal - min_signal)) | |
else: | |
radius = 15 | |
folium.CircleMarker( | |
location=[lat, lon], | |
radius=radius, | |
popup=f'Signal Strength: {signal_strength}', | |
color='blue', | |
fill=True, | |
fill_opacity=0.6 | |
).add_to(marker_cluster) | |
map_html = m._repr_html_() | |
return templates.TemplateResponse("map.html", {"request": request, "map_html": map_html, "start_date": start_date, "end_date": end_date}) | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=5000) | |