import random from threading import Thread import threading import serial import wx import wx.lib.mixins.listctrl from zeroconf import ServiceBrowser, Zeroconf import requests import os import webbrowser from app import espota from app.api import ApiHandler from app.fw_update import FwUpdate from app.zeroconf_listener import ZeroconfListener from app.espota import FLASH,SPIFFS class SerialPortsComboBox(wx.ComboBox): def __init__(self, parent, fw_update): self.fw_update = fw_update self.ports = serial.tools.list_ports.comports() wx.ComboBox.__init__(self,parent, choices=[port.device for port in self.ports]) class DevicesPanel(wx.ListCtrl,wx.lib.mixins.listctrl.ColumnSorterMixin, wx.lib.mixins.listctrl.ListCtrlAutoWidthMixin, ): def __init__(self, parent): wx.ListCtrl.__init__(self, parent, style=wx.LC_REPORT) self.column_headings = ["name", "Version", "SW Revision", "HW Revision", "IP", "FS Version"] wx.lib.mixins.listctrl.ColumnSorterMixin.__init__( self, len(self.column_headings), ) wx.lib.mixins.listctrl.ListCtrlAutoWidthMixin.__init__(self) for column, heading in enumerate(self.column_headings): self.AppendColumn(heading) self.itemDataMap = {} def OnSortOrderChanged(self): """Method to handle changes to the sort order""" column, ascending = self.GetSortState() self.ShowSortIndicator(column, ascending) self.SortListItems(column, ascending) def GetListCtrl(self): """Method required by the ColumnSorterMixin""" return self class BTClockOTAUpdater(wx.Frame): release_name = "" commit_hash = "" currentlyUpdating = False updatingName = "" def __init__(self, parent, title): wx.Frame.__init__(self, parent, title=title, size=(800,500)) self.SetMinSize((800,500)) ubuntu_it = wx.Font(32, wx.FONTFAMILY_MODERN, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_BOLD, False, faceName="Ubuntu") # "Ubuntu-RI.ttf") self.fw_update = FwUpdate() panel = wx.Panel(self) self.title = wx.StaticText(panel, label="BTClock OTA firmware updater") self.title.SetFont(ubuntu_it) vbox = wx.BoxSizer(wx.VERTICAL) vbox.Add(self.title, 0, wx.EXPAND | wx.ALL, 20, 0) # serialPorts = SerialPortsComboBox(panel, self.fw_update) # vbox.Add(serialPorts, 0, wx.EXPAND | wx.ALL, 20, 0) self.device_list = DevicesPanel(panel) self.device_list.Bind(wx.EVT_LIST_ITEM_SELECTED, self.on_item_selected) self.device_list.Bind(wx.EVT_LIST_ITEM_DESELECTED, self.on_item_deselected) vbox.Add(self.device_list, proportion = 2, flag=wx.EXPAND | wx.ALL, border = 20) hbox = wx.BoxSizer(wx.HORIZONTAL) bbox = wx.BoxSizer(wx.HORIZONTAL) gs = wx.GridSizer(1, 4, 1, 1) self.fw_label = wx.StaticText(panel, label=f"Checking latest version...") self.update_button = wx.Button(panel, label="Update Firmware") self.update_button.Bind(wx.EVT_BUTTON, self.on_click_update_firmware) self.update_fs_button = wx.Button(panel, label="Update Filesystem") self.update_fs_button.Bind(wx.EVT_BUTTON, self.on_click_update_fs) self.identify_button = wx.Button(panel, label="Identify") self.identify_button.Bind(wx.EVT_BUTTON, self.on_click_identify) self.open_webif_button = wx.Button(panel, label="Open WebUI") self.open_webif_button.Bind(wx.EVT_BUTTON, self.on_click_webui) self.update_button.Disable() self.update_fs_button.Disable() self.identify_button.Disable() self.open_webif_button.Disable() hbox.Add(self.fw_label, 1, wx.EXPAND | wx.ALL, 5) bbox.Add(self.update_button) bbox.Add(self.update_fs_button) bbox.Add(self.identify_button) bbox.Add(self.open_webif_button) hbox.AddStretchSpacer() hbox.Add(bbox, 2, wx.EXPAND | wx.ALL, 5) vbox.Add(hbox, 0, wx.EXPAND | wx.ALL, 20) self.progress_bar = wx.Gauge(panel, range=100) vbox.Add(self.progress_bar, 0, wx.EXPAND | wx.ALL, 20) panel.SetSizer(vbox) filemenu= wx.Menu() menuAbout = filemenu.Append(wx.ID_ABOUT, "&About"," Information about this program") menuExit = filemenu.Append(wx.ID_EXIT,"E&xit"," Terminate the program") menuBar = wx.MenuBar() menuBar.Append(filemenu,"&File") # Adding the "filemenu" to the MenuBar self.SetMenuBar(menuBar) # Adding the MenuBar to the Frame content. self.Bind(wx.EVT_MENU, self.OnAbout, menuAbout) self.Bind(wx.EVT_MENU, self.OnExit, menuExit) self.status_bar = self.CreateStatusBar(2) # self.StatusBar.SetFieldsCount(2) # self.StatusBar.SetStatusWidths(-3, -1) self.Show(True) self.zeroconf = Zeroconf() self.listener = ZeroconfListener(self.on_zeroconf_state_change) self.browser = ServiceBrowser(self.zeroconf, "_http._tcp.local.", self.listener) self.api_handler = ApiHandler() wx.CallAfter(self.fetch_latest_release) def on_item_selected(self, event): if self.release_name != "": self.update_button.Enable() self.update_fs_button.Enable() self.identify_button.Enable() self.open_webif_button.Enable() def on_item_deselected(self, event): if self.device_list.GetFirstSelected() == -1: self.update_button.Disable() self.update_fs_button.Disable() self.identify_button.Disable() self.open_webif_button.Disable() def on_zeroconf_state_change(self, type, name, state, info): index = self.device_list.FindItem(0, name) if state == "Added": if index == wx.NOT_FOUND: index = self.device_list.InsertItem(self.device_list.GetItemCount(), type) self.device_list.SetItem(index, 0, name) self.device_list.SetItem(index, 1, info.properties.get(b"version").decode()) self.device_list.SetItem(index, 2, info.properties.get(b"rev").decode()) if (info.properties.get(b"hw_rev") is not None): self.device_list.SetItem(index, 3, info.properties.get(b"hw_rev").decode()) self.device_list.SetItem(index, 4, info.parsed_addresses()[0]) else: self.device_list.SetItem(index, 0, name) self.device_list.SetItem(index, 1, info.properties.get(b"version").decode()) self.device_list.SetItem(index, 2, info.properties.get(b"rev").decode()) if (info.properties.get(b"hw_rev").decode()): self.device_list.SetItem(index, 3, info.properties.get(b"hw_rev").decode()) self.device_list.SetItem(index, 4, info.parsed_addresses()[0]) self.device_list.SetItem(index, 5, self.api_handler.check_fs_hash(info.parsed_addresses()[0])) self.device_list.SetItemData(index, index) self.device_list.itemDataMap[index] = [name, info.properties.get(b"version").decode(), info.properties.get(b"rev").decode(), info.properties.get(b"hw_rev").decode(), info.parsed_addresses()[0]] for col in range(0, len(self.device_list.column_headings)): self.device_list.SetColumnWidth(col, wx.LIST_AUTOSIZE_USEHEADER) elif state == "Removed": if index != wx.NOT_FOUND: self.device_list.DeleteItem(index) def on_click_update_firmware(self, event): selected_index = self.device_list.GetFirstSelected() if selected_index != -1: service_name = self.device_list.GetItemText(selected_index, 0) hw_rev = self.device_list.GetItemText(selected_index, 3) info = self.listener.services.get(service_name) if info: address = info.parsed_addresses()[0] if info.parsed_addresses() else "N/A" self.start_firmware_update(address, hw_rev) else: wx.MessageBox("No service information available for selected device", "Error", wx.ICON_ERROR) else: wx.MessageBox("Please select a device to update", "Error", wx.ICON_ERROR) def on_click_webui(self, event): selected_index = self.device_list.GetFirstSelected() if selected_index != -1: service_name = self.device_list.GetItemText(selected_index, 0) info = self.listener.services.get(service_name) if info: address = info.parsed_addresses()[0] if info.parsed_addresses() else "N/A" thread = threading.Thread(target=lambda: webbrowser.open(f"http://{address}")) thread.start() def run_fs_update(self, address, firmware_file, type): global PROGRESS PROGRESS = True espota.PROGRESS = True global TIMEOUT TIMEOUT = 10 espota.TIMEOUT = 10 espota.serve(address, "0.0.0.0", 3232, random.randint(10000,60000), "", firmware_file, type, self.call_progress) wx.CallAfter(self.update_progress, 100) self.currentlyUpdating = False self.SetStatusText(f"Finished!") def call_progress(self, progress): progressPerc = int(progress*100) self.SetStatusText(f"{self.updatingName} - Progress: {progressPerc}%") wx.CallAfter(self.update_progress, progress) def update_progress(self, progress): self.progress_bar.SetValue(int(progress*100)) wx.YieldIfNeeded() def on_click_update_fs(self, event): selected_index = self.device_list.GetFirstSelected() if selected_index != -1: service_name = self.device_list.GetItemText(selected_index, 0) info = self.listener.services.get(service_name) if self.currentlyUpdating: wx.MessageBox("Please wait, already updating", "Error", wx.ICON_ERROR) return if info: address = info.parsed_addresses()[0] if info.parsed_addresses() else "N/A" self.start_fs_update(address) else: wx.MessageBox("No service information available for selected device", "Error", wx.ICON_ERROR) else: wx.MessageBox("Please select a device to update", "Error", wx.ICON_ERROR) def start_firmware_update(self, address, hw_rev): self.SetStatusText(f"Starting firmware update") model_name = "lolin_s3_mini_213epd" if (hw_rev == "REV_B_EPD_2_13"): model_name = "btclock_rev_b_213epd" local_filename = f"firmware/{self.release_name}_{model_name}_firmware.bin" self.updatingName = address self.currentlyUpdating = True if os.path.exists(os.path.abspath(local_filename)): thread = Thread(target=self.run_fs_update, args=(address, os.path.abspath(local_filename), FLASH)) thread.start() def start_fs_update(self, address): # Path to the firmware file self.SetStatusText(f"Starting filesystem update") local_filename = f"firmware/{self.release_name}_littlefs.bin" self.updatingName = address self.currentlyUpdating = True if os.path.exists(os.path.abspath(local_filename)): thread = Thread(target=self.run_fs_update, args=(address, os.path.abspath(local_filename), SPIFFS)) thread.start() wx.CallAfter(self.update_progress, 100) def on_click_identify(self, event): selected_index = self.device_list.GetFirstSelected() if selected_index != -1: service_name = self.device_list.GetItemText(selected_index, 0) info = self.listener.services.get(service_name) if info: address = info.parsed_addresses()[0] if info.parsed_addresses() else "N/A" port = info.port self.api_handler.identify_btclock(address) else: wx.MessageBox("No service information available for selected device", "Error", wx.ICON_ERROR) else: wx.MessageBox("Please select a device to make an API call", "Error", wx.ICON_ERROR) def fetch_latest_release(self): repo = "btclock/btclock_v3" if not os.path.exists("firmware"): os.makedirs("firmware") filenames_to_download = ["lolin_s3_mini_213epd_firmware.bin", "btclock_rev_b_213epd_firmware.bin", "littlefs.bin"] url = f"https://api.github.com/repos/{repo}/releases/latest" try: response = requests.get(url) response.raise_for_status() latest_release = response.json() release_name = latest_release['tag_name'] self.release_name = release_name asset_url = None asset_urls = [] for asset in latest_release['assets']: if asset['name'] in filenames_to_download: asset_urls.append(asset['browser_download_url']) if asset_urls: for asset_url in asset_urls: self.download_file(asset_url, release_name) ref_url = f"https://api.github.com/repos/{repo}/git/ref/tags/{release_name}" response = requests.get(ref_url) response.raise_for_status() ref_info = response.json() if (ref_info["object"]["type"] == "commit"): self.commit_hash = ref_info["object"]["sha"] else: tag_url = f"https://api.github.com/repos/{repo}/git/tags/{ref_info["object"]["sha"]}" response = requests.get(tag_url) response.raise_for_status() tag_info = response.json() self.commit_hash = tag_info["object"]["sha"] self.fw_label.SetLabelText(f"Downloaded firmware version: {self.release_name}\nCommit: {self.commit_hash}") else: wx.CallAfter(self.SetStatusText, f"File {filenames_to_download} not found in latest release") except requests.RequestException as e: wx.CallAfter(self.SetStatusText, f"Error fetching release: {e}") def download_file(self, url, release_name): local_filename = f"{release_name}_{url.split('/')[-1]}" response = requests.get(url, stream=True) total_length = response.headers.get('content-length') if not os.path.exists("firmware"): os.makedirs("firmware") if os.path.exists(f"firmware/{local_filename}"): wx.CallAfter(self.SetStatusText, f"{local_filename} is already downloaded") return if total_length is None: wx.CallAfter(self.SetStatusText, "No content length header") else: total_length = int(total_length) chunk_size = 1024 num_chunks = total_length // chunk_size with open(f"firmware/{local_filename}", 'wb') as f: for i, chunk in enumerate(response.iter_content(chunk_size=chunk_size)): if chunk: f.write(chunk) f.flush() progress = int((i / num_chunks) * 100) wx.CallAfter(self.update_progress, progress) wx.CallAfter(self.update_progress, 100) wx.CallAfter(self.SetStatusText, "Download completed") def OnAbout(self,e): dlg = wx.MessageDialog( self, "An updater for BTClocks", "About BTClock OTA Updater", wx.OK) dlg.ShowModal() dlg.Destroy() def OnExit(self,e): self.Close(False)