# monitor.py import os import utils import streamlit as st import geopandas as gpd from authentication import greeting, check_password from senHub import SenHub from datetime import datetime from sentinelhub import SHConfig import requests import process from zipfile import ZipFile import plotly.express as px def check_authentication(): if not check_password(): st.stop() config = SHConfig() config.instance_id = '6c220beb-90c4-4131-b658-10cddd8d97b9' config.sh_client_id = '17e7c154-7f2d-4139-b1af-cef762385079' config.sh_client_secret = 'KvbQMKZB85ZWEgWuxqiWIVEvTAQEfoF9' def select_field(gdf): st.markdown(""" """, unsafe_allow_html=True) names = gdf['name'].tolist() names.append("Select Field") field_name = st.selectbox("Select Field", options=names, key="field_name_monitor", help="Select the field to edit", index=len(names)-1) return field_name def calculate_bbox(df, field): bbox = df.loc[df['name'] == field].bounds r = bbox.iloc[0] return [r.minx, r.miny, r.maxx, r.maxy] def get_available_dates_for_field(df, field, year, start_date='', end_date=''): bbox = calculate_bbox(df, field) token = SenHub(config).token headers = utils.get_bearer_token_headers(token) if start_date == '' or end_date == '': start_date = f'{year}-01-01' end_date = f'{year}-12-31' data = f'{{ "collections": [ "sentinel-2-l2a" ], "datetime": "{start_date}T00:00:00Z/{end_date}T23:59:59Z", "bbox": {bbox}, "limit": 100, "distinct": "date" }}' response = requests.post('https://services.sentinel-hub.com/api/v1/catalog/search', headers=headers, data=data) try: features = response.json()['features'] except: print(response.json()) features = [] return features @st.cache_data def get_and_cache_available_dates(_df, field, year, start_date, end_date): dates = get_available_dates_for_field(_df, field, year, start_date, end_date) print(f'Caching Dates for {field}') return dates def get_cuarted_df_for_field(df, field, date, metric, clientName): curated_date_path = utils.get_curated_location_img_path(clientName, metric, date, field) if curated_date_path is not None: curated_df = gpd.read_file(curated_date_path) else: process.Download_image_in_given_date(clientName, metric, df, field, date) process.mask_downladed_image(clientName, metric, df, field, date) process.convert_maske_image_to_geodataframe(clientName, metric, df, field, date, df.crs) curated_date_path = utils.get_curated_location_img_path(clientName, metric, date, field) curated_df = gpd.read_file(curated_date_path) return curated_df def track(metric, field_name, src_df, client_name): st.title(":green[Select Date and Start Monitoring]") dates = [] date = -1 if 'dates' not in st.session_state: st.session_state['dates'] = dates else: dates = st.session_state['dates'] if 'date' not in st.session_state: st.session_state['date'] = date else: date = st.session_state['date'] if True: start_date = '2024-01-01' today = datetime.today() end_date = today.strftime('%Y-%m-%d') year = '2024' dates = get_and_cache_available_dates(src_df, field_name, year, start_date, end_date) # Add None to the end of the list to be used as a default value #sort the dates from earliest to today dates = sorted(dates) #Add the dates to the session state st.session_state['dates'] = dates # Display the dropdown menu if len(dates) > 0: st.markdown(""" """, unsafe_allow_html=True) date = st.selectbox('Select Observation Date: ', dates, index=len(dates)-1, key=f'Select Date Dropdown Menu - {metric}') if date != -1: st.write('You selected:', date) #Add the date to the session state st.session_state['date'] = date else: st.write('Please Select A Date') else: st.info('No dates available for the selected field and dates range, select a different range or click the button to fetch the dates again') st.markdown('---') st.header('Show Field Data') # If a field and a date are selected, display the field data if date != -1: # Get the field data at the selected date with st.spinner('Loading Field Data...'): # Get the metric data and cloud cover data for the selected field and date metric_data = get_cuarted_df_for_field(src_df, field_name, date, metric, client_name) cloud_cover_data = get_cuarted_df_for_field(src_df, field_name, date, 'CLP', client_name) #Merge the metric and cloud cover data on the geometry column field_data = metric_data.merge(cloud_cover_data, on='geometry') # Display the field data avg_clp = field_data[f'CLP_{date}'].mean() *100 avg_metric = field_data[f'{metric}_{date}'].mean() st.write(f'Field Data for (Field ID: {field_name}) on {date}') col1,col3,col5,col2,col4 = st.columns(5) col1.metric(f":orange[Average {metric}]", value=f"{avg_metric :.2f}") col2.metric(":green[Cloud Cover]", value=f"{avg_clp :.2f}%") #Get Avarage Cloud Cover # If the avarage cloud cover is greater than 80%, display a warning message if avg_clp > 80: st.warning(f'⚠️ The Avarage Cloud Cover is {avg_clp}%') st.info('Please Select A Different Date') df = field_data.copy() df['latitude'] = df['geometry'].y df['longitude'] = df['geometry'].x # Create a scatter plot fig = px.scatter_mapbox( df, lat='latitude', lon='longitude', color=f'{metric}_{date}', color_continuous_scale='RdYlGn', range_color=(0, 1), width= 800, height=600, size_max=15, zoom=13, ) # Add the base map token = open("token.mapbox_token").read() fig.update_layout(mapbox_style="satellite", mapbox_accesstoken=token) st.plotly_chart(fig) #Dwonload Links # If the field data is not empty, display the download links if len(field_data) > 0: # Create two columns for the download links download_as_shp_col, download_as_tiff_col = st.columns(2) # Create a shapefile of the field data and add a download link with download_as_shp_col: #Set the shapefile name and path based on the field id, metric and date extension = 'shp' shapefilename = f"{field_name}_{metric}_{date}.{extension}" path = f'./shapefiles/{field_name}/{metric}/{extension}' # Create the target directory if it doesn't exist os.makedirs(path, exist_ok=True) # Save the field data as a shapefile field_data.to_file(f'{path}/{shapefilename}') # Create a zip file of the shapefile files = [] for i in os.listdir(path): if os.path.isfile(os.path.join(path,i)): if i[0:len(shapefilename)] == shapefilename: files.append(os.path.join(path,i)) zipFileName = f'{path}/{field_name}_{metric}_{date}.zip' zipObj = ZipFile(zipFileName, 'w') for file in files: zipObj.write(file) zipObj.close() # Add a download link for the zip file with open(zipFileName, 'rb') as f: st.download_button('Download as ShapeFile', f,file_name=zipFileName) # Get the tiff file path and create a download link with download_as_tiff_col: #get the tiff file path tiff_path = utils.get_masked_location_img_path(client_name, metric, date, field_name) # Add a download link for the tiff file donwnload_filename = f'{metric}_{field_name}_{date}.tiff' with open(tiff_path, 'rb') as f: st.download_button('Download as Tiff File', f,file_name=donwnload_filename) else: st.info('Please Select A Field and A Date') def monitor_fields(): row1,row2 = st.columns([1,2]) with row1: st.title(":orange[Field Monitoring]") current_user = greeting("Let's take a look how these fields are doing") if os.path.exists(f"fields_{current_user}.parquet"): gdf = gpd.read_parquet(f"fields_{current_user}.parquet") field_name = select_field(gdf) if field_name == "Select Field": st.info("No Field Selected Yet!") else: metric = st.radio("Select Metric to Monitor", ["NDVI", "LAI", "CAB"], key="metric", index=0, help="Select the metric to monitor") st.success(f"Monitoring {metric} for {field_name}") with st.expander("Metrics Explanation", expanded=False): st.write("NDVI: Normalized Difference Vegetation Index, Mainly used to monitor the health of vegetation") st.write("LAI: Leaf Area Index, Mainly used to monitor the productivity of vegetation") st.write("CAB: Chlorophyll Absorption in the Blue band, Mainly used to monitor the chlorophyll content in vegetation") # st.write("NDMI: Normalized Difference Moisture Index, Mainly used to monitor the moisture content in vegetation") st.info("More metrics and analysis features will be added soon") else: st.info("No Fields Added Yet!") return with row2: if field_name != "Select Field": track(metric, field_name, gdf, current_user) if __name__ == '__main__': check_authentication() monitor_fields()