ota-flasher/app/gui.py
2024-06-09 13:18:14 +02:00

372 lines
No EOL
15 KiB
Python

import random
from threading import Thread
import threading
import serial
import wx
import wx.lib.mixins.listctrl
from zeroconf import ServiceBrowser, Zeroconf
import requests
import os
import webbrowser
from app import espota
from app.api import ApiHandler
from app.fw_update import FwUpdate
from app.zeroconf_listener import ZeroconfListener
from app.espota import FLASH,SPIFFS
class SerialPortsComboBox(wx.ComboBox):
def __init__(self, parent, fw_update):
self.fw_update = fw_update
self.ports = serial.tools.list_ports.comports()
wx.ComboBox.__init__(self,parent, choices=[port.device for port in self.ports])
class DevicesPanel(wx.ListCtrl,wx.lib.mixins.listctrl.ColumnSorterMixin, wx.lib.mixins.listctrl.ListCtrlAutoWidthMixin,
):
def __init__(self, parent):
wx.ListCtrl.__init__(self, parent, style=wx.LC_REPORT)
self.column_headings = ["name", "Version", "SW Revision", "HW Revision", "IP", "FS Version"]
wx.lib.mixins.listctrl.ColumnSorterMixin.__init__(
self,
len(self.column_headings),
)
wx.lib.mixins.listctrl.ListCtrlAutoWidthMixin.__init__(self)
for column, heading in enumerate(self.column_headings):
self.AppendColumn(heading)
self.itemDataMap = {}
def OnSortOrderChanged(self):
"""Method to handle changes to the sort order"""
column, ascending = self.GetSortState()
self.ShowSortIndicator(column, ascending)
self.SortListItems(column, ascending)
def GetListCtrl(self):
"""Method required by the ColumnSorterMixin"""
return self
class BTClockOTAUpdater(wx.Frame):
release_name = ""
commit_hash = ""
currentlyUpdating = False
updatingName = ""
def __init__(self, parent, title):
wx.Frame.__init__(self, parent, title=title, size=(800,500))
self.SetMinSize((800,500))
ubuntu_it = wx.Font(32, wx.FONTFAMILY_MODERN, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_BOLD, False, faceName="Ubuntu")
# "Ubuntu-RI.ttf")
self.fw_update = FwUpdate()
panel = wx.Panel(self)
self.title = wx.StaticText(panel, label="BTClock OTA firmware updater")
self.title.SetFont(ubuntu_it)
vbox = wx.BoxSizer(wx.VERTICAL)
vbox.Add(self.title, 0, wx.EXPAND | wx.ALL, 20, 0)
# serialPorts = SerialPortsComboBox(panel, self.fw_update)
# vbox.Add(serialPorts, 0, wx.EXPAND | wx.ALL, 20, 0)
self.device_list = DevicesPanel(panel)
self.device_list.Bind(wx.EVT_LIST_ITEM_SELECTED, self.on_item_selected)
self.device_list.Bind(wx.EVT_LIST_ITEM_DESELECTED, self.on_item_deselected)
vbox.Add(self.device_list, proportion = 2, flag=wx.EXPAND | wx.ALL, border = 20)
hbox = wx.BoxSizer(wx.HORIZONTAL)
bbox = wx.BoxSizer(wx.HORIZONTAL)
gs = wx.GridSizer(1, 4, 1, 1)
self.fw_label = wx.StaticText(panel, label=f"Checking latest version...")
self.update_button = wx.Button(panel, label="Update Firmware")
self.update_button.Bind(wx.EVT_BUTTON, self.on_click_update_firmware)
self.update_fs_button = wx.Button(panel, label="Update Filesystem")
self.update_fs_button.Bind(wx.EVT_BUTTON, self.on_click_update_fs)
self.identify_button = wx.Button(panel, label="Identify")
self.identify_button.Bind(wx.EVT_BUTTON, self.on_click_identify)
self.open_webif_button = wx.Button(panel, label="Open WebUI")
self.open_webif_button.Bind(wx.EVT_BUTTON, self.on_click_webui)
self.update_button.Disable()
self.update_fs_button.Disable()
self.identify_button.Disable()
self.open_webif_button.Disable()
hbox.Add(self.fw_label, 1, wx.EXPAND | wx.ALL, 5)
bbox.Add(self.update_button)
bbox.Add(self.update_fs_button)
bbox.Add(self.identify_button)
bbox.Add(self.open_webif_button)
hbox.AddStretchSpacer()
hbox.Add(bbox, 2, wx.EXPAND | wx.ALL, 5)
vbox.Add(hbox, 0, wx.EXPAND | wx.ALL, 20)
self.progress_bar = wx.Gauge(panel, range=100)
vbox.Add(self.progress_bar, 0, wx.EXPAND | wx.ALL, 20)
panel.SetSizer(vbox)
filemenu= wx.Menu()
menuAbout = filemenu.Append(wx.ID_ABOUT, "&About"," Information about this program")
menuExit = filemenu.Append(wx.ID_EXIT,"E&xit"," Terminate the program")
menuBar = wx.MenuBar()
menuBar.Append(filemenu,"&File") # Adding the "filemenu" to the MenuBar
self.SetMenuBar(menuBar) # Adding the MenuBar to the Frame content.
self.Bind(wx.EVT_MENU, self.OnAbout, menuAbout)
self.Bind(wx.EVT_MENU, self.OnExit, menuExit)
self.status_bar = self.CreateStatusBar(2)
# self.StatusBar.SetFieldsCount(2)
# self.StatusBar.SetStatusWidths(-3, -1)
self.Show(True)
self.zeroconf = Zeroconf()
self.listener = ZeroconfListener(self.on_zeroconf_state_change)
self.browser = ServiceBrowser(self.zeroconf, "_http._tcp.local.", self.listener)
self.api_handler = ApiHandler()
wx.CallAfter(self.fetch_latest_release)
def on_item_selected(self, event):
if self.release_name != "":
self.update_button.Enable()
self.update_fs_button.Enable()
self.identify_button.Enable()
self.open_webif_button.Enable()
def on_item_deselected(self, event):
if self.device_list.GetFirstSelected() == -1:
self.update_button.Disable()
self.update_fs_button.Disable()
self.identify_button.Disable()
self.open_webif_button.Disable()
def on_zeroconf_state_change(self, type, name, state, info):
index = self.device_list.FindItem(0, name)
if state == "Added":
if index == wx.NOT_FOUND:
index = self.device_list.InsertItem(self.device_list.GetItemCount(), type)
self.device_list.SetItem(index, 0, name)
self.device_list.SetItem(index, 1, info.properties.get(b"version").decode())
self.device_list.SetItem(index, 2, info.properties.get(b"rev").decode())
if (info.properties.get(b"hw_rev") is not None):
self.device_list.SetItem(index, 3, info.properties.get(b"hw_rev").decode())
self.device_list.SetItem(index, 4, info.parsed_addresses()[0])
else:
self.device_list.SetItem(index, 0, name)
self.device_list.SetItem(index, 1, info.properties.get(b"version").decode())
self.device_list.SetItem(index, 2, info.properties.get(b"rev").decode())
if (info.properties.get(b"hw_rev").decode()):
self.device_list.SetItem(index, 3, info.properties.get(b"hw_rev").decode())
self.device_list.SetItem(index, 4, info.parsed_addresses()[0])
self.device_list.SetItem(index, 5, self.api_handler.check_fs_hash(info.parsed_addresses()[0]))
self.device_list.SetItemData(index, index)
self.device_list.itemDataMap[index] = [name, info.properties.get(b"version").decode(), info.properties.get(b"rev").decode(), info.properties.get(b"hw_rev").decode(), info.parsed_addresses()[0]]
for col in range(0, len(self.device_list.column_headings)):
self.device_list.SetColumnWidth(col, wx.LIST_AUTOSIZE_USEHEADER)
elif state == "Removed":
if index != wx.NOT_FOUND:
self.device_list.DeleteItem(index)
def on_click_update_firmware(self, event):
selected_index = self.device_list.GetFirstSelected()
if selected_index != -1:
service_name = self.device_list.GetItemText(selected_index, 0)
hw_rev = self.device_list.GetItemText(selected_index, 3)
info = self.listener.services.get(service_name)
if info:
address = info.parsed_addresses()[0] if info.parsed_addresses() else "N/A"
self.start_firmware_update(address, hw_rev)
else:
wx.MessageBox("No service information available for selected device", "Error", wx.ICON_ERROR)
else:
wx.MessageBox("Please select a device to update", "Error", wx.ICON_ERROR)
def on_click_webui(self, event):
selected_index = self.device_list.GetFirstSelected()
if selected_index != -1:
service_name = self.device_list.GetItemText(selected_index, 0)
info = self.listener.services.get(service_name)
if info:
address = info.parsed_addresses()[0] if info.parsed_addresses() else "N/A"
thread = threading.Thread(target=lambda: webbrowser.open(f"http://{address}"))
thread.start()
def run_fs_update(self, address, firmware_file, type):
global PROGRESS
PROGRESS = True
espota.PROGRESS = True
global TIMEOUT
TIMEOUT = 10
espota.TIMEOUT = 10
espota.serve(address, "0.0.0.0", 3232, random.randint(10000,60000), "", firmware_file, type, self.call_progress)
wx.CallAfter(self.update_progress, 100)
self.currentlyUpdating = False
self.SetStatusText(f"Finished!")
def call_progress(self, progress):
progressPerc = int(progress*100)
self.SetStatusText(f"{self.updatingName} - Progress: {progressPerc}%")
wx.CallAfter(self.update_progress, progress)
def update_progress(self, progress):
self.progress_bar.SetValue(int(progress*100))
wx.YieldIfNeeded()
def on_click_update_fs(self, event):
selected_index = self.device_list.GetFirstSelected()
if selected_index != -1:
service_name = self.device_list.GetItemText(selected_index, 0)
info = self.listener.services.get(service_name)
if self.currentlyUpdating:
wx.MessageBox("Please wait, already updating", "Error", wx.ICON_ERROR)
return
if info:
address = info.parsed_addresses()[0] if info.parsed_addresses() else "N/A"
self.start_fs_update(address)
else:
wx.MessageBox("No service information available for selected device", "Error", wx.ICON_ERROR)
else:
wx.MessageBox("Please select a device to update", "Error", wx.ICON_ERROR)
def start_firmware_update(self, address, hw_rev):
self.SetStatusText(f"Starting firmware update")
model_name = "lolin_s3_mini_213epd"
if (hw_rev == "REV_B_EPD_2_13"):
model_name = "btclock_rev_b_213epd"
local_filename = f"firmware/{self.release_name}_{model_name}_firmware.bin"
self.updatingName = address
self.currentlyUpdating = True
if os.path.exists(os.path.abspath(local_filename)):
thread = Thread(target=self.run_fs_update, args=(address, os.path.abspath(local_filename), FLASH))
thread.start()
def start_fs_update(self, address):
# Path to the firmware file
self.SetStatusText(f"Starting filesystem update")
local_filename = f"firmware/{self.release_name}_littlefs.bin"
self.updatingName = address
self.currentlyUpdating = True
if os.path.exists(os.path.abspath(local_filename)):
thread = Thread(target=self.run_fs_update, args=(address, os.path.abspath(local_filename), SPIFFS))
thread.start()
wx.CallAfter(self.update_progress, 100)
def on_click_identify(self, event):
selected_index = self.device_list.GetFirstSelected()
if selected_index != -1:
service_name = self.device_list.GetItemText(selected_index, 0)
info = self.listener.services.get(service_name)
if info:
address = info.parsed_addresses()[0] if info.parsed_addresses() else "N/A"
port = info.port
self.api_handler.identify_btclock(address)
else:
wx.MessageBox("No service information available for selected device", "Error", wx.ICON_ERROR)
else:
wx.MessageBox("Please select a device to make an API call", "Error", wx.ICON_ERROR)
def fetch_latest_release(self):
repo = "btclock/btclock_v3"
if not os.path.exists("firmware"):
os.makedirs("firmware")
filenames_to_download = ["lolin_s3_mini_213epd_firmware.bin", "btclock_rev_b_213epd_firmware.bin", "littlefs.bin"]
url = f"https://api.github.com/repos/{repo}/releases/latest"
try:
response = requests.get(url)
response.raise_for_status()
latest_release = response.json()
release_name = latest_release['tag_name']
self.release_name = release_name
asset_url = None
asset_urls = []
for asset in latest_release['assets']:
if asset['name'] in filenames_to_download:
asset_urls.append(asset['browser_download_url'])
if asset_urls:
for asset_url in asset_urls:
self.download_file(asset_url, release_name)
ref_url = f"https://api.github.com/repos/{repo}/git/ref/tags/{release_name}"
response = requests.get(ref_url)
response.raise_for_status()
ref_info = response.json()
if (ref_info["object"]["type"] == "commit"):
self.commit_hash = ref_info["object"]["sha"]
else:
tag_url = f"https://api.github.com/repos/{repo}/git/tags/{ref_info["object"]["sha"]}"
response = requests.get(tag_url)
response.raise_for_status()
tag_info = response.json()
self.commit_hash = tag_info["object"]["sha"]
self.fw_label.SetLabelText(f"Downloaded firmware version: {self.release_name}\nCommit: {self.commit_hash}")
else:
wx.CallAfter(self.SetStatusText, f"File {filenames_to_download} not found in latest release")
except requests.RequestException as e:
wx.CallAfter(self.SetStatusText, f"Error fetching release: {e}")
def download_file(self, url, release_name):
local_filename = f"{release_name}_{url.split('/')[-1]}"
response = requests.get(url, stream=True)
total_length = response.headers.get('content-length')
if not os.path.exists("firmware"):
os.makedirs("firmware")
if os.path.exists(f"firmware/{local_filename}"):
wx.CallAfter(self.SetStatusText, f"{local_filename} is already downloaded")
return
if total_length is None:
wx.CallAfter(self.SetStatusText, "No content length header")
else:
total_length = int(total_length)
chunk_size = 1024
num_chunks = total_length // chunk_size
with open(f"firmware/{local_filename}", 'wb') as f:
for i, chunk in enumerate(response.iter_content(chunk_size=chunk_size)):
if chunk:
f.write(chunk)
f.flush()
progress = int((i / num_chunks) * 100)
wx.CallAfter(self.update_progress, progress)
wx.CallAfter(self.update_progress, 100)
wx.CallAfter(self.SetStatusText, "Download completed")
def OnAbout(self,e):
dlg = wx.MessageDialog( self, "An updater for BTClocks", "About BTClock OTA Updater", wx.OK)
dlg.ShowModal()
dlg.Destroy()
def OnExit(self,e):
self.Close(False)