- numpy - Pillow - micropip: - boto3 - paths: - ./canvas.py from pyodide import to_js, create_proxy from PIL import Image import io import time import base64 from collections import deque import numpy as np from js import ( console, document, parent, devicePixelRatio, ImageData, Uint8ClampedArray, CanvasRenderingContext2D as Context2d, requestAnimationFrame, window, encodeURIComponent, w2ui, update_eraser, update_scale, adjust_selection, update_count, enable_result_lst, setup_shortcut, update_undo_redo, alert, uploadImageToFirebase, firebase, aws, fetch, overrideW2uiStyles ) answer = aws("hello", 1, 2) console.log(answer) #addPhoto("demo") # async def get_latest_image_from_firebase(): # alert("get_latest_image_from_firebase called") # try: # database = firebase.database() # alert("try called") # latestImageRef = database.ref("latestImage") # latestImageSnapshot = await latestImageRef.once("value") # latestImageInfo = latestImageSnapshot.val() # download_url = latestImageInfo["downloadURL"] # with pyodide.open_url(download_url) as f: # img = Image.open(f) # print("Downloaded image:", str(img)) # return img # except Exception as e: # print("Error while getting the latest image from Firebase:", str(e)) # return None async def fetch_latest_image_url(database_url): console.log("fetch_latest_image called") # different methods to call response = await fetch(f"{database_url}/latestImage.json") console.log(f"response status: {response.status}, status text: {response.statusText}") latest_image_data = await response.json() latest_image_data = latest_image_data.to_py() image_url = latest_image_data["downloadURL"] image_name = latest_image_data["fileName"] console.log(f"Latest image URL: {image_url}") console.log(f"Latest image name: {image_name}") # Fetch the image data as ArrayBuffer image_response = await fetch(image_url) image_data = await image_response.arrayBuffer() return image_data, image_name from canvas import InfCanvas class History: def __init__(self,maxlen=10): self.idx=-1 self.undo_lst=deque([],maxlen=maxlen) self.redo_lst=deque([],maxlen=maxlen) self.state=None def undo(self): cur=None if len(self.undo_lst): cur=self.undo_lst.pop() self.redo_lst.appendleft(cur) return cur def redo(self): cur=None if len(self.redo_lst): cur=self.redo_lst.popleft() self.undo_lst.append(cur) return cur def check(self): return len(self.undo_lst)>0,len(self.redo_lst)>0 def append(self,state,update=True): self.redo_lst.clear() self.undo_lst.append(state) if update: update_undo_redo(*self.check()) history = History() base_lst = [None] async def draw_canvas() -> None: # alert("draw_canvas called") width=1024 height=700 canvas=InfCanvas(1024,700) update_eraser(canvas.eraser_size,min(canvas.selection_size_h,canvas.selection_size_w)) document.querySelector("#container").style.height= f"{height}px" document.querySelector("#container").style.width = f"{width}px" canvas.setup_mouse() canvas.clear_background() canvas.draw_buffer() canvas.draw_selection_box() base_lst[0]=canvas # latest_image = await get_latest_image_from_firebase() # if latest_image is not None: # Log the URL of the latest image to the console # console.log(f"Latest image URL: {latest_image.url}") # Request the parent window to display the latest image on the canvas # (commented out to fix the indentation error) # window.parent.postMessage({ type: "displayLatestImageOnCanvas", image: latest_image }, "*") # else: # print("No latest image found in Firebase.") # new draw_canvas scale method from PIL import ImageOps async def draw_canvas_func(event): # alert("draw_canvas gradio called") database_url = "https://nyucapstone-7c22c-default-rtdb.firebaseio.com" image_data, latest_image_name = await fetch_latest_image_url(database_url) pil_image = Image.open(io.BytesIO(image_data.to_py())) np_image = np.array(pil_image) # Get the dimensions of the input image img_height, img_width, _ = np_image.shape # Set the desired display dimensions display_width = 1024 display_height = 768 # Calculate the zoom level based on the input image dimensions and the desired display dimensions zoom_level = min(display_width / img_width, display_height / img_height) # Pad the input image to fit the canvas buffer while maintaining the aspect ratio padded_image = ImageOps.pad(pil_image, (int(display_width), int(display_height)), Image.ANTIALIAS, color=(0, 0, 0, 0)) padded_np_image = np.array(padded_image) # Set the canvas dimensions to match the desired display dimensions width = display_width height = display_height # selection_size = min(width, height) // 2 selection_size = int(min(img_width, img_height) * 0.25) document.querySelector("#container").style.width = f"{width}px" document.querySelector("#container").style.height = f"{height}px" canvas = InfCanvas(int(width), int(height), selection_size=int(selection_size), firebase_image_data=padded_np_image) canvas.setup_mouse() canvas.clear_background() canvas.draw_buffer() canvas.draw_selection_box() # Update the canvas buffer with the padded image data and redraw the buffer h, w, c = canvas.buffer.shape canvas.sync_to_buffer() canvas.buffer_dirty = True # Create a mask using the padded image shape mask = padded_np_image[:, :, 3:4].repeat(4, axis=2) canvas.buffer[mask > 0] = 0 canvas.buffer[:height, :width] += padded_np_image canvas.draw_buffer() base_lst[0] = canvas # alert("made it to end of draw_canvas gradio") # original draw_canvas async def test_canvas_func(event): # alert("draw_canvas gradio called") try: app=parent.document.querySelector("gradio-app") if app.shadowRoot: app=app.shadowRoot width=app.querySelector("#canvas_width input").value height=app.querySelector("#canvas_height input").value selection_size=app.querySelector("#selection_size input").value except: width=1024 height=768 selection_size=384 document.querySelector("#container").style.width = f"{width}px" document.querySelector("#container").style.height= f"{height}px" database_url = "https://nyucapstone-7c22c-default-rtdb.firebaseio.com" image_data, latest_image_name = await fetch_latest_image_url(database_url) pil_image = Image.open(io.BytesIO(image_data.to_py())) np_image = np.array(pil_image) canvas=InfCanvas(int(width),int(height),selection_size=int(selection_size),firebase_image_data=np_image) canvas.setup_mouse() canvas.clear_background() canvas.draw_buffer() canvas.draw_selection_box() # Update the canvas buffer with the new image data and redraw the buffer h, w, c = canvas.buffer.shape canvas.sync_to_buffer() canvas.buffer_dirty = True h_min = min(h, np_image.shape[0]) w_min = min(w, np_image.shape[1]) # mask = np_image[:, :, 3:4].repeat(4, axis=2) # canvas.buffer[mask > 0] = 0 # canvas.buffer[0:h, 0:w, :] += np_image mask = np_image[:h_min, :w_min, 3:4].repeat(4, axis=2) canvas.buffer[:h_min, :w_min][mask > 0] = 0 canvas.buffer[:h_min, :w_min] += np_image[:h_min, :w_min] canvas.draw_buffer() base_lst[0]=canvas # alert("made it to end of draw_canvas gradio") import js async def export_func(event): base = base_lst[0] arr = base.export() base.draw_buffer() base.canvas[2].clear() base64_str = base.numpy_to_base64(arr) time_str = time.strftime("%Y%m%d_%H%M%S") # The rest of the original export_func code link = document.createElement("a") if len(event.data) > 2 and event.data[2]: filename = event.data[2] else: filename = f"outpaint_{time_str}" link.download = f"{filename}.png" link.href = "data:image/png;base64," + base64_str link.click() console.log(f"Canvas saved to {filename}.png") img_candidate_lst=[None,0] async def outpaint_func(event): base=base_lst[0] if len(event.data)==2: app=parent.document.querySelector("gradio-app") if app.shadowRoot: app=app.shadowRoot base64_str_raw=app.querySelector("#output textarea").value base64_str_lst=base64_str_raw.split(",") img_candidate_lst[0]=base64_str_lst img_candidate_lst[1]=0 elif event.data[2]=="next": img_candidate_lst[1]+=1 elif event.data[2]=="prev": img_candidate_lst[1]-=1 enable_result_lst() if img_candidate_lst[0] is None: return lst=img_candidate_lst[0] idx=img_candidate_lst[1] update_count(idx%len(lst)+1,len(lst)) arr=base.base64_to_numpy(lst[idx%len(lst)]) base.fill_selection(arr) base.draw_selection_box() js.overrideW2uiStyles() async def undo_func(event): base=base_lst[0] img_candidate_lst[0]=None if base.sel_dirty: base.sel_buffer = np.zeros((base.selection_size_h, base.selection_size_w, 4), dtype=np.uint8) base.sel_dirty = False base.canvas[2].clear() async def commit_func(event): base = base_lst[0] img_candidate_lst[0] = None if base.sel_dirty: base.write_selection_to_buffer() base.draw_buffer() base.canvas[2].clear() if len(event.data) > 2: history.append(base.save()) # sending the image to firebase here arr = base.export() base64_str = base.numpy_to_base64(arr) time_str = time.strftime("%Y%m%d_%H%M%S") # Call the JavaScript function to upload the image to Firebase storage await js.uploadImageToFirebase(base64_str, time_str) async def history_undo_func(event): base=base_lst[0] if base.buffer_dirty or len(history.redo_lst)>0: state=history.undo() else: history.undo() state=history.undo() if state is not None: base.load(state) update_undo_redo(*history.check()) async def history_setup_func(event): base=base_lst[0] history.undo_lst.clear() history.redo_lst.clear() history.append(base.save(),update=False) async def history_redo_func(event): base=base_lst[0] if len(history.undo_lst)>0: state=history.redo() else: history.redo() state=history.redo() if state is not None: base.load(state) update_undo_redo(*history.check()) async def transfer_func(event): base=base_lst[0] base.read_selection_from_buffer() sel_buffer=base.sel_buffer sel_buffer_str=base.numpy_to_base64(sel_buffer) app=parent.document.querySelector("gradio-app") if app.shadowRoot: app=app.shadowRoot app.querySelector("#input textarea").value=sel_buffer_str app.querySelector("#proceed").click() async def upload_func(event): base=base_lst[0] # base64_str=event.data[1] # Retrieve the base64 encoded image string from the #upload_content HTML element base64_str=document.querySelector("#upload_content").value base64_str=base64_str.split(",")[-1] # base64_str=parent.document.querySelector("gradio-app").shadowRoot.querySelector("#upload textarea").value arr=base.base64_to_numpy(base64_str) h,w,c=base.buffer.shape base.sync_to_buffer() base.buffer_dirty=True mask=arr[:,:,3:4].repeat(4,axis=2) base.buffer[mask>0]=0 # in case mismatch base.buffer[0:h,0:w,:]+=arr #base.buffer[yo:yo+h,xo:xo+w,0:3]=arr[:,:,0:3] #base.buffer[yo:yo+h,xo:xo+w,-1]=arr[:,:,-1] base.draw_buffer() if len(event.data)>2: history.append(base.save()) async def setup_shortcut_func(event): setup_shortcut(event.data[1]) document.querySelector("#export").addEventListener("click",create_proxy(export_func)) document.querySelector("#undo").addEventListener("click",create_proxy(undo_func)) document.querySelector("#commit").addEventListener("click",create_proxy(commit_func)) document.querySelector("#outpaint").addEventListener("click",create_proxy(outpaint_func)) document.querySelector("#upload").addEventListener("click",create_proxy(upload_func)) document.querySelector("#transfer").addEventListener("click",create_proxy(transfer_func)) document.querySelector("#draw").addEventListener("click",create_proxy(draw_canvas_func)) async def setup_func(): document.querySelector("#setup").value="1" async def reset_func(event): base=base_lst[0] base.reset() async def load_func(event): base=base_lst[0] base.load(event.data[1]) async def save_func(event): base=base_lst[0] json_str=base.save() time_str = time.strftime("%Y%m%d_%H%M%S") link = document.createElement("a") if len(event.data)>2 and event.data[2]: filename = str(event.data[2]).strip() else: filename = f"outpaint_{time_str}" # link.download = f"sdinf_state_{time_str}.json" link.download = f"{filename}.sdinf" link.href = "data:text/json;charset=utf-8,"+encodeURIComponent(json_str) link.click() async def prev_result_func(event): base=base_lst[0] base.reset() async def next_result_func(event): base=base_lst[0] base.reset() async def zoom_in_func(event): base=base_lst[0] scale=base.scale if scale>=0.2: scale-=0.1 if len(event.data)>2: base.update_scale(scale,int(event.data[2]),int(event.data[3])) else: base.update_scale(scale) scale=base.scale update_scale(f"{base.width}x{base.height} ({round(100/scale)}%)") js.overrideW2uiStyles() async def zoom_out_func(event): base=base_lst[0] scale=base.scale if scale<10: scale+=0.1 console.log(len(event.data)) if len(event.data)>2: base.update_scale(scale,int(event.data[2]),int(event.data[3])) else: base.update_scale(scale) scale=base.scale update_scale(f"{base.width}x{base.height} ({round(100/scale)}%)") js.overrideW2uiStyles() async def sync_func(event): base=base_lst[0] base.sync_to_buffer() base.canvas[2].clear() js.overrideW2uiStyles() async def eraser_size_func(event): base=base_lst[0] eraser_size=min(int(event.data[1]),min(base.selection_size_h,base.selection_size_w)) eraser_size=max(8,eraser_size) base.eraser_size=eraser_size async def resize_selection_func(event): base=base_lst[0] cursor=base.cursor if len(event.data)>3: console.log(event.data) base.cursor[0]=int(event.data[1]) base.cursor[1]=int(event.data[2]) base.selection_size_w=int(event.data[3])//8*8 base.selection_size_h=int(event.data[4])//8*8 base.refine_selection() base.draw_selection_box() elif len(event.data)>2: base.draw_selection_box() else: base.canvas[-1].clear() adjust_selection(cursor[0],cursor[1],base.selection_size_w,base.selection_size_h) async def eraser_func(event): base=base_lst[0] if event.data[1]!="eraser": base.canvas[-2].clear() else: x,y=base.mouse_pos base.draw_eraser(x,y) async def resize_func(event): base=base_lst[0] width=int(event.data[1]) height=int(event.data[2]) if width>=256 and height>=256: if max(base.selection_size_h,base.selection_size_w)>min(width,height): base.selection_size_h=256 base.selection_size_w=256 base.resize(width,height) async def message_func(event): if event.data[0]=="click": if event.data[1]=="clear": await reset_func(event) elif event.data[1]=="save": await save_func(event) elif event.data[1]=="export": await export_func(event) elif event.data[1]=="accept": await commit_func(event) elif event.data[1]=="cancel": await undo_func(event) elif event.data[1]=="zoom_in": await zoom_in_func(event) elif event.data[1]=="zoom_out": await zoom_out_func(event) elif event.data[1]=="redo": await history_redo_func(event) elif event.data[1]=="undo": await history_undo_func(event) elif event.data[1]=="history": await history_setup_func(event) js.overrideW2uiStyles() elif event.data[0]=="sync": await sync_func(event) elif event.data[0]=="load": await load_func(event) elif event.data[0]=="upload": await upload_func(event) elif event.data[0]=="outpaint": await outpaint_func(event) js.overrideW2uiStyles() elif event.data[0]=="mode": if event.data[1]!="selection": await sync_func(event) await eraser_func(event) document.querySelector("#mode").value=event.data[1] elif event.data[0]=="transfer": await transfer_func(event) elif event.data[0]=="setup": await draw_canvas_func(event) elif event.data[0]=="eraser_size": await eraser_size_func(event) elif event.data[0]=="resize_selection": await resize_selection_func(event) elif event.data[0]=="shortcut": await setup_shortcut_func(event) elif event.data[0]=="resize": await resize_func(event) window.addEventListener("message",create_proxy(message_func)) import asyncio _ = await asyncio.gather( setup_func() )