ota-flasher/app/gui.py
2024-05-19 02:42:36 +02:00

313 lines
No EOL
13 KiB
Python

import random
from threading import Thread
import threading
import wx
from zeroconf import ServiceBrowser, Zeroconf
import requests
import os
import webbrowser
from app import espota
from app.api import ApiHandler
from app.zeroconf_listener import ZeroconfListener
from espota import FLASH,SPIFFS
class DevicesPanel(wx.ListCtrl):
def __init__(self, parent):
wx.ListCtrl.__init__(self, parent, style=wx.LC_REPORT)
self.InsertColumn(0, 'Name', width=150)
self.InsertColumn(1, 'Version', width=50)
self.InsertColumn(2, 'SW Revision', width=310)
self.InsertColumn(3, 'HW Revision', width=130)
self.InsertColumn(4, 'IP', width=110)
self.InsertColumn(5, 'FS version', width=110)
class BTClockOTAUpdater(wx.Frame):
release_name = ""
commit_hash = ""
def __init__(self, parent, title):
wx.Frame.__init__(self, parent, title=title, size=(800,500))
self.SetMinSize((800,500))
ubuntu_it = wx.Font(32, wx.FONTFAMILY_MODERN, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_BOLD, False, faceName="Ubuntu")
# "Ubuntu-RI.ttf")
panel = wx.Panel(self)
self.title = wx.StaticText(panel, label="BTClock OTA firmware updater")
self.title.SetFont(ubuntu_it)
vbox = wx.BoxSizer(wx.VERTICAL)
vbox.Add(self.title, 0, wx.EXPAND | wx.ALL, 20, 0)
self.device_list = DevicesPanel(panel)
self.device_list.Bind(wx.EVT_LIST_ITEM_SELECTED, self.on_item_selected)
self.device_list.Bind(wx.EVT_LIST_ITEM_DESELECTED, self.on_item_deselected)
vbox.Add(self.device_list, proportion = 2, flag=wx.EXPAND | wx.ALL, border = 20)
hbox = wx.BoxSizer(wx.HORIZONTAL)
bbox = wx.BoxSizer(wx.HORIZONTAL)
gs = wx.GridSizer(1, 4, 1, 1)
self.fw_label = wx.StaticText(panel, label=f"Checking latest version...")
self.update_button = wx.Button(panel, label="Update Firmware")
self.update_button.Bind(wx.EVT_BUTTON, self.on_click_update_firmware)
self.update_fs_button = wx.Button(panel, label="Update Filesystem")
self.update_fs_button.Bind(wx.EVT_BUTTON, self.on_click_update_fs)
self.identify_button = wx.Button(panel, label="Identify")
self.identify_button.Bind(wx.EVT_BUTTON, self.on_click_identify)
self.open_webif_button = wx.Button(panel, label="Open WebUI")
self.open_webif_button.Bind(wx.EVT_BUTTON, self.on_click_webui)
self.update_button.Disable()
self.update_fs_button.Disable()
self.identify_button.Disable()
self.open_webif_button.Disable()
hbox.Add(self.fw_label, 1, wx.EXPAND | wx.ALL, 5)
bbox.Add(self.update_button)
bbox.Add(self.update_fs_button)
bbox.Add(self.identify_button)
bbox.Add(self.open_webif_button)
hbox.AddStretchSpacer()
hbox.Add(bbox, 2, wx.EXPAND | wx.ALL, 5)
vbox.Add(hbox, 0, wx.EXPAND | wx.ALL, 20)
self.progress_bar = wx.Gauge(panel, range=100)
vbox.Add(self.progress_bar, 0, wx.EXPAND | wx.ALL, 20)
panel.SetSizer(vbox)
filemenu= wx.Menu()
menuAbout = filemenu.Append(wx.ID_ABOUT, "&About"," Information about this program")
menuExit = filemenu.Append(wx.ID_EXIT,"E&xit"," Terminate the program")
menuBar = wx.MenuBar()
menuBar.Append(filemenu,"&File") # Adding the "filemenu" to the MenuBar
self.SetMenuBar(menuBar) # Adding the MenuBar to the Frame content.
self.Bind(wx.EVT_MENU, self.OnAbout, menuAbout)
self.Bind(wx.EVT_MENU, self.OnExit, menuExit)
self.status_bar = self.CreateStatusBar(2)
# self.StatusBar.SetFieldsCount(2)
# self.StatusBar.SetStatusWidths(-3, -1)
self.Show(True)
self.zeroconf = Zeroconf()
self.listener = ZeroconfListener(self.on_zeroconf_state_change)
self.browser = ServiceBrowser(self.zeroconf, "_http._tcp.local.", self.listener)
self.api_handler = ApiHandler()
wx.CallAfter(self.fetch_latest_release)
def on_item_selected(self, event):
self.update_button.Enable()
self.update_fs_button.Enable()
self.identify_button.Enable()
self.open_webif_button.Enable()
def on_item_deselected(self, event):
if self.device_list.GetFirstSelected() == -1:
self.update_button.Disable()
self.update_fs_button.Disable()
self.identify_button.Disable()
self.open_webif_button.Disable()
def on_zeroconf_state_change(self, type, name, state, info):
index = self.device_list.FindItem(0, name)
if state == "Added":
if index == wx.NOT_FOUND:
index = self.device_list.InsertItem(self.device_list.GetItemCount(), type)
self.device_list.SetItem(index, 0, name)
self.device_list.SetItem(index, 1, info.properties.get(b"version").decode())
self.device_list.SetItem(index, 2, info.properties.get(b"rev").decode())
if (info.properties.get(b"hw_rev") is not None):
self.device_list.SetItem(index, 3, info.properties.get(b"hw_rev").decode())
self.device_list.SetItem(index, 4, info.parsed_addresses()[0])
else:
self.device_list.SetItem(index, 0, name)
self.device_list.SetItem(index, 1, info.properties.get(b"version").decode())
self.device_list.SetItem(index, 2, info.properties.get(b"rev").decode())
if (info.properties.get(b"hw_rev").decode()):
self.device_list.SetItem(index, 3, info.properties.get(b"hw_rev").decode())
self.device_list.SetItem(index, 4, info.parsed_addresses()[0])
self.device_list.SetItem(index, 5, self.api_handler.check_fs_hash(info.parsed_addresses()[0]))
elif state == "Removed":
if index != wx.NOT_FOUND:
self.device_list.DeleteItem(index)
def on_click_update_firmware(self, event):
selected_index = self.device_list.GetFirstSelected()
if selected_index != -1:
service_name = self.device_list.GetItemText(selected_index, 0)
hw_rev = self.device_list.GetItemText(selected_index, 3)
info = self.listener.services.get(service_name)
if info:
address = info.parsed_addresses()[0] if info.parsed_addresses() else "N/A"
self.start_firmware_update(address, hw_rev)
else:
wx.MessageBox("No service information available for selected device", "Error", wx.ICON_ERROR)
else:
wx.MessageBox("Please select a device to update", "Error", wx.ICON_ERROR)
def on_click_webui(self, event):
selected_index = self.device_list.GetFirstSelected()
if selected_index != -1:
service_name = self.device_list.GetItemText(selected_index, 0)
info = self.listener.services.get(service_name)
if info:
address = info.parsed_addresses()[0] if info.parsed_addresses() else "N/A"
thread = threading.Thread(target=lambda: webbrowser.open(f"http://{address}"))
thread.start()
def run_fs_update(self, address, firmware_file, type):
global PROGRESS
PROGRESS = True
espota.PROGRESS = True
global TIMEOUT
TIMEOUT = 10
espota.TIMEOUT = 10
espota.serve(address, "0.0.0.0", 3232, random.randint(10000,60000), "", firmware_file, type, self.call_progress)
wx.CallAfter(self.update_progress, 100)
self.SetStatusText(f"Finished!")
def call_progress(self, progress):
progressPerc = int(progress*100)
self.SetStatusText(f"Progress: {progressPerc}%")
wx.CallAfter(self.update_progress, progress)
def update_progress(self, progress):
self.progress_bar.SetValue(int(progress*100))
wx.YieldIfNeeded()
def on_click_update_fs(self, event):
selected_index = self.device_list.GetFirstSelected()
if selected_index != -1:
service_name = self.device_list.GetItemText(selected_index, 0)
info = self.listener.services.get(service_name)
if info:
address = info.parsed_addresses()[0] if info.parsed_addresses() else "N/A"
self.start_fs_update(address)
else:
wx.MessageBox("No service information available for selected device", "Error", wx.ICON_ERROR)
else:
wx.MessageBox("Please select a device to update", "Error", wx.ICON_ERROR)
def start_firmware_update(self, address, hw_rev):
self.SetStatusText(f"Starting firmware update")
model_name = "lolin_s3_mini_213epd"
if (hw_rev == "REV_B_EPD_2_13"):
model_name = "btclock_rev_b_213epd"
local_filename = f"firmware/{self.release_name}_{model_name}_firmware.bin"
if os.path.exists(os.path.abspath(local_filename)):
thread = Thread(target=self.run_fs_update, args=(address, os.path.abspath(local_filename), FLASH))
thread.start()
def start_fs_update(self, address):
# Path to the firmware file
self.SetStatusText(f"Starting filesystem update")
local_filename = f"firmware/{self.release_name}_littlefs.bin"
if os.path.exists(os.path.abspath(local_filename)):
thread = Thread(target=self.run_fs_update, args=(address, os.path.abspath(local_filename), SPIFFS))
thread.start()
wx.CallAfter(self.update_progress, 100)
def on_click_identify(self, event):
selected_index = self.device_list.GetFirstSelected()
if selected_index != -1:
service_name = self.device_list.GetItemText(selected_index, 0)
info = self.listener.services.get(service_name)
if info:
address = info.parsed_addresses()[0] if info.parsed_addresses() else "N/A"
port = info.port
self.api_handler.identify_btclock(address)
else:
wx.MessageBox("No service information available for selected device", "Error", wx.ICON_ERROR)
else:
wx.MessageBox("Please select a device to make an API call", "Error", wx.ICON_ERROR)
def fetch_latest_release(self):
repo = "btclock/btclock_v3"
filenames_to_download = ["lolin_s3_mini_213epd_firmware.bin", "btclock_rev_b_213epd_firmware.bin", "littlefs.bin"]
url = f"https://api.github.com/repos/{repo}/releases/latest"
try:
response = requests.get(url)
response.raise_for_status()
latest_release = response.json()
release_name = latest_release['tag_name']
self.release_name = release_name
asset_url = None
asset_urls = []
for asset in latest_release['assets']:
if asset['name'] in filenames_to_download:
asset_urls.append(asset['browser_download_url'])
if asset_urls:
for asset_url in asset_urls:
self.download_file(asset_url, release_name)
ref_url = f"https://api.github.com/repos/{repo}/git/ref/tags/{release_name}"
response = requests.get(ref_url)
response.raise_for_status()
ref_info = response.json()
self.commit_hash = ref_info["object"]["sha"]
self.fw_label.SetLabelText(f"Downloaded firmware version: {self.release_name}\nCommit: {self.commit_hash}")
else:
wx.CallAfter(self.SetStatusText, f"File {filenames_to_download} not found in latest release")
except requests.RequestException as e:
wx.CallAfter(self.SetStatusText, f"Error fetching release: {e}")
def download_file(self, url, release_name):
local_filename = f"{release_name}_{url.split('/')[-1]}"
response = requests.get(url, stream=True)
total_length = response.headers.get('content-length')
if os.path.exists(f"firmware/{local_filename}"):
wx.CallAfter(self.SetStatusText, f"{local_filename} is already downloaded")
return
if total_length is None:
wx.CallAfter(self.SetStatusText, "No content length header")
else:
total_length = int(total_length)
chunk_size = 1024
num_chunks = total_length // chunk_size
with open(f"firmware/{local_filename}", 'wb') as f:
for i, chunk in enumerate(response.iter_content(chunk_size=chunk_size)):
if chunk:
f.write(chunk)
f.flush()
progress = int((i / num_chunks) * 100)
wx.CallAfter(self.update_progress, progress)
wx.CallAfter(self.update_progress, 100)
wx.CallAfter(self.SetStatusText, "Download completed")
def OnAbout(self,e):
dlg = wx.MessageDialog( self, "An updater for BTClocks", "About BTClock OTA Updater", wx.OK)
dlg.ShowModal()
dlg.Destroy()
def OnExit(self,e):
self.Close(True)