major-tom-explorer / components.py
MarcSkovMadsen's picture
Add core-1 dataset
57681d6
raw
history blame
10.7 kB
import holoviews as hv
import numpy as np
import pandas as pd
import panel as pn
import param
from holoviews.operation.datashader import dynspread, rasterize
from utils import (
DATASET_COLUMNS,
DATASETS,
DATASHADER_LOGO,
DATASHADER_URL,
DEFAULT_DATASET,
DESCRIPTION,
ESA_EASTING,
ESA_NORTHING,
MAJOR_TOM_LOGO,
MAJOR_TOM_LYRICS,
MAJOR_TOM_PICTURE,
MAJOR_TOM_REF_URL,
PANEL_LOGO,
PANEL_URL,
get_closest_rows,
get_image,
get_meta_data,
)
class DatasetInput(pn.viewable.Viewer):
value = param.Selector(
default=DEFAULT_DATASET,
objects=DATASETS,
allow_None=False,
label="Dataset",
doc="""The name of the dataset""",
)
data = param.DataFrame(allow_None=False, doc="""The metadata dataset""")
columns = param.Dict(allow_None=False, doc="""The columns of the dataset""")
def __panel__(self):
return pn.widgets.RadioButtonGroup.from_param(
self.param.value, button_style="outline"
)
@pn.depends("value", watch=True, on_init=True)
def _update_data(self):
columns = DATASET_COLUMNS[self.value]
data = pn.cache(get_meta_data)(dataset=self.value)
self.param.update(columns=columns, data=data)
class MapInput(pn.viewable.Viewer):
data = param.DataFrame(allow_refs=True, allow_None=False)
data_in_view = param.DataFrame(allow_None=False)
data_selected = param.DataFrame(allow_None=False)
_plot = param.Parameter(allow_None=False)
_pointer_x = param.Parameter(allow_None=False)
_pointer_y = param.Parameter(allow_None=False)
_range_xy = param.Parameter(allow_None=False)
_tap = param.Parameter(allow_None=False)
updating = param.Boolean()
def __panel__(self):
return pn.Column(
pn.pane.HoloViews(
self._plot, height=550, width=800, loading=self.param.updating
),
self._description,
)
@param.depends("data", watch=True, on_init=True)
def _handle_data_dask_change(self):
with self.param.update(updating=True):
data = self.data[["centre_easting", "centre_northing"]].copy()
points = hv.Points(
data, kdims=["centre_easting", "centre_northing"], vdims=[]
)
rangexy = hv.streams.RangeXY(source=points)
tap = hv.streams.Tap(source=points, x=ESA_EASTING, y=ESA_NORTHING)
agg = rasterize(
points, link_inputs=True, x_sampling=0.0001, y_sampling=0.0001
)
dyn = dynspread(agg)
dyn.opts(cmap="kr_r", colorbar=True)
pointerx = hv.streams.PointerX(x=ESA_EASTING, source=points)
pointery = hv.streams.PointerY(y=ESA_NORTHING, source=points)
vline = hv.DynamicMap(lambda x: hv.VLine(x), streams=[pointerx])
hline = hv.DynamicMap(lambda y: hv.HLine(y), streams=[pointery])
tiles = hv.Tiles(
"https://tile.openstreetmap.org/{Z}/{X}/{Y}.png", name="OSM"
).opts(xlabel="Longitude", ylabel="Latitude")
self.param.update(
_plot=tiles * agg * dyn * hline * vline,
_pointer_x=pointerx,
_pointer_y=pointery,
_range_xy=rangexy,
_tap=tap,
)
update_viewed = pn.bind(
self._update_data_in_view,
rangexy.param.x_range,
rangexy.param.y_range,
watch=True,
)
update_viewed()
update_selected = pn.bind(
self._update_data_selected, tap.param.x, tap.param.y, watch=True
)
update_selected()
def _update_data_in_view(self, x_range, y_range):
if not x_range or not y_range:
self.data_in_view = self.data
return
data = self.data
data = data[
(data.centre_easting.between(*x_range))
& (data.centre_northing.between(*y_range))
]
self.data_in_view = data.reset_index(drop=True)
def _update_data_selected(self, tap_x, tap_y):
self.data_selected = get_closest_rows(self.data, tap_x, tap_y)
@pn.depends("data_in_view")
def _description(self):
return f"Rows: {len(self.data_in_view):,}"
class ImageInput(pn.viewable.Viewer):
data = param.DataFrame(
allow_refs=True, allow_None=False, doc="""The metadata selected"""
)
columns = param.Dict(
allow_refs=True, allow_None=False, doc="""The list of columns of the dataset"""
)
column_name = param.Selector(
label="Image Type",
allow_None=False,
doc="""The name of the image type to view""",
)
updating = param.Boolean()
meta_data = param.DataFrame()
image = param.Parameter()
plot = param.Parameter()
_timestamp = param.Selector(label="Timestamp", objects=[None], doc="""The timestamp of the sample to view""")
def __panel__(self):
return pn.Column(
pn.Row(
pn.widgets.RadioButtonGroup.from_param(
self.param._timestamp,
button_style="outline",
align="end",
disabled=self.param.updating,
),
pn.widgets.Select.from_param(
self.param.column_name, disabled=self.param.updating
),
),
pn.Tabs(
pn.pane.HoloViews(
self.param.plot,
height=800,
width=800,
name="Interactive Image",
),
pn.pane.Image(
self.param.image,
name="Static Image",
width=800,
),
pn.widgets.Tabulator(
self.param.meta_data,
name="Meta Data",
disabled=True,
),
pn.pane.Markdown(self.code, name="Code"),
dynamic=True,
loading=self.param.updating,
),
)
@pn.depends("data", watch=True, on_init=True)
def _update_timestamp(self):
if self.data.empty:
default_value = None
options = [None]
else:
options = sorted(self.data["timestamp"].unique())
default_value = options[0]
self.param._timestamp.objects = options
if not self._timestamp in options:
self._timestamp = default_value
@pn.depends("columns", watch=True, on_init=True)
def _update_column_names(self):
options = sorted(self.columns)
default_value = "Thumbnail"
self.param.column_name.objects = options
if not self.column_name in options:
self.column_name = default_value
@property
def column(self):
return self.columns[self.column_name]
@pn.depends("_timestamp", "column_name", watch=True, on_init=True)
def _update_plot(self):
if self.data.empty or not self._timestamp or not self.column_name:
self.meta_data = self.data.T
self.image = None
self.plot = hv.RGB(np.array([]))
else:
with self.param.update(updating=True):
row = self.data[self.data.timestamp == self._timestamp].iloc[0]
self.meta_data = pd.DataFrame(row)
self.image = image = pn.cache(get_image)(row, self.column)
image_array = np.array(image)
if image_array.ndim == 2:
self.plot = hv.Image(image_array).opts(
cmap="gray_r", xaxis=None, yaxis=None, colorbar=True
)
else:
self.plot = hv.RGB(image_array).opts(xaxis=None, yaxis=None)
@pn.depends("meta_data", "column_name")
def code(self):
if self.meta_data.empty:
return ""
parquet_url = self.meta_data.T["parquet_url"].iloc[0]
parquet_row = self.meta_data.T["parquet_row"].iloc[0]
return f"""\
```bash
pip install aiohttp fsspec holoviews numpy panel pyarrow requests
```
```python
from io import BytesIO
import holoviews as hv
import numpy as np
import panel as pn
import pyarrow.parquet as pq
from fsspec.parquet import open_parquet_file
from PIL import Image
pn.extension()
parquet_url = "{parquet_url}"
parquet_row = {parquet_row}
column = "{self.column}"
with open_parquet_file(parquet_url, columns=[column]) as f:
with pq.ParquetFile(f) as pf:
first_row_group = pf.read_row_group(parquet_row, columns=[column])
stream = BytesIO(first_row_group[column][0].as_py())
image = Image.open(stream)
image_array = np.array(image)
if image_array.ndim==2:
plot = hv.Image(image_array).opts(cmap="gray", colorbar=True)
else:
plot = hv.RGB(image_array)
plot.opts(xaxis=None, yaxis=None)
pn.panel(plot).servable()
```
```bash
panel serve app.py --autoreload
```
"""
class App(param.Parameterized):
sidebar = param.Parameter()
main = param.Parameter()
def __init__(self, **params):
super().__init__(**params)
self.sidebar = self._create_sidebar()
self.main = pn.FlexBox(
pn.Column(
pn.Row(
pn.indicators.LoadingSpinner(value=True, size=50),
"**Loading data...**",
),
MAJOR_TOM_LYRICS,
)
)
pn.state.onload(self._update_main)
def _create_sidebar(self):
return pn.Column(
pn.pane.Image(
MAJOR_TOM_LOGO,
link_url=MAJOR_TOM_REF_URL,
height=60,
sizing_mode="stretch_width",
),
pn.pane.Image(
MAJOR_TOM_PICTURE,
link_url=MAJOR_TOM_REF_URL,
sizing_mode="stretch_width",
),
DESCRIPTION,
pn.pane.Image(PANEL_LOGO, link_url=PANEL_URL, width=200, margin=(10, 20)),
pn.pane.Image(
DATASHADER_LOGO, link_url=DATASHADER_URL, width=200, margin=(10, 20)
),
)
def _create_main_content(self):
dataset = DatasetInput()
map_input = MapInput(data=dataset.param.data)
image_input = ImageInput(
data=map_input.param.data_selected, columns=dataset.param.columns
)
return pn.Column(dataset, map_input), image_input
def _update_main(self):
self.main[:] = list(self._create_main_content())