diff --git a/app.py b/app.py index 7fa4641..7e92065 100644 --- a/app.py +++ b/app.py @@ -1,6 +1,6 @@ import wx -from app.gui import BTClockOTAUpdater +from app.main import BTClockOTAUpdater app = wx.App(False) frame = BTClockOTAUpdater(None, 'BTClock OTA updater') diff --git a/app/api.py b/app/api.py index 7ebba98..9235886 100644 --- a/app/api.py +++ b/app/api.py @@ -1,18 +1,24 @@ import time import requests +import json from threading import Thread + class ApiHandler: def identify_btclock(self, address): self.make_api_call(address, "api/identify") return - + def check_fs_hash(self, address): ret = self.run_api_call(address, "fs_hash.txt") return ret + def get_settings(self, address): + ret = json.loads(self.run_api_call(address, "api/settings")) + return ret + def make_api_call(self, address, path): - thread = Thread(target=self.run_api_call, args=(address,path)) + thread = Thread(target=self.run_api_call, args=(address, path)) thread.start() def run_api_call(self, address, path): @@ -21,4 +27,4 @@ class ApiHandler: response = requests.get(url) return response.text except requests.RequestException as e: - print("error") \ No newline at end of file + print("error") diff --git a/app/fw_update.py b/app/fw_update.py deleted file mode 100644 index 279ec9a..0000000 --- a/app/fw_update.py +++ /dev/null @@ -1,37 +0,0 @@ -import esptool -import serial - -class FwUpdate: - def get_serial_ports(self): - ports = serial.tools.list_ports.comports() - available_ports = [] - for port, desc, hwid in sorted(ports): - available_ports.append((port, desc, hwid)) - print(f"Port: {port}, Description: {desc}, Hardware ID: {hwid}") - return available_ports - - def flash_firmware(port, baud, firmware_path): - try: - # Initialize the serial port - serial_port = serial.Serial(port, baud) - - # Initialize the ESP32ROM with the serial port - esp = esptool.ESP32ROM(serial_port) - - # Connect to the ESP32 - esp.connect() - - # Perform the flashing operation - esp.flash_file(firmware_path, offset=0x1000) - - # Optionally, verify the flash - esp.verify_flash(firmware_path, offset=0x1000) - - print("Firmware flashed successfully!") - - except esptool.FatalError as e: - print(f"Failed to flash firmware: {e}") - finally: - # Ensure the serial port is closed - if serial_port.is_open: - serial_port.close() \ No newline at end of file diff --git a/app/fw_updater.py b/app/fw_updater.py new file mode 100644 index 0000000..1ca37af --- /dev/null +++ b/app/fw_updater.py @@ -0,0 +1,96 @@ +import os +import random +from threading import Thread +from app import espota +from app.espota import FLASH, SPIFFS +import esptool +import serial +import wx + + +class FwUpdater: + update_progress = None + currentlyUpdating = False + + def __init__(self, update_progress): + self.update_progress = update_progress + + def get_serial_ports(self): + ports = serial.tools.list_ports.comports() + available_ports = [] + for port, desc, hwid in sorted(ports): + available_ports.append((port, desc, hwid)) + print(f"Port: {port}, Description: {desc}, Hardware ID: {hwid}") + return available_ports + + def run_fs_update(self, address, firmware_file, type): + global PROGRESS + PROGRESS = True + espota.PROGRESS = True + global TIMEOUT + TIMEOUT = 10 + espota.TIMEOUT = 10 + + espota.serve(address, "0.0.0.0", 3232, random.randint( + 10000, 60000), "", firmware_file, type, self.update_progress) + + wx.CallAfter(self.update_progress, 1) + self.currentlyUpdating = False +# self.SetStatusText(f"Finished!") + + def flash_firmware(port, baud, firmware_path): + try: + # Initialize the serial port + serial_port = serial.Serial(port, baud) + + # Initialize the ESP32ROM with the serial port + esp = esptool.ESP32ROM(serial_port) + + # Connect to the ESP32 + esp.connect() + + # Perform the flashing operation + esp.flash_file(firmware_path, offset=0x1000) + + # Optionally, verify the flash + esp.verify_flash(firmware_path, offset=0x1000) + + print("Firmware flashed successfully!") + + except esptool.FatalError as e: + print(f"Failed to flash firmware: {e}") + finally: + # Ensure the serial port is closed + if serial_port.is_open: + serial_port.close() + + def start_firmware_update(self, release_name, address, hw_rev): +# self.SetStatusText(f"Starting firmware update") + + model_name = "lolin_s3_mini_213epd" + if (hw_rev == "REV_B_EPD_2_13"): + model_name = "btclock_rev_b_213epd" + + local_filename = f"firmware/{ + release_name}_{model_name}_firmware.bin" + + self.updatingName = address + self.currentlyUpdating = True + + if os.path.exists(os.path.abspath(local_filename)): + thread = Thread(target=self.run_fs_update, args=( + address, os.path.abspath(local_filename), FLASH)) + thread.start() + + def start_fs_update(self, release_name, address): + # Path to the firmware file + local_filename = f"firmware/{release_name}_littlefs.bin" + + self.updatingName = address + self.currentlyUpdating = True + + if os.path.exists(os.path.abspath(local_filename)): + thread = Thread(target=self.run_fs_update, args=( + address, os.path.abspath(local_filename), SPIFFS)) + thread.start() + diff --git a/app/gui.py b/app/gui.py deleted file mode 100644 index 5e8ebb0..0000000 --- a/app/gui.py +++ /dev/null @@ -1,372 +0,0 @@ -import random -from threading import Thread -import threading -import serial -import wx -import wx.lib.mixins.listctrl - -from zeroconf import ServiceBrowser, Zeroconf -import requests -import os -import webbrowser - -from app import espota -from app.api import ApiHandler -from app.fw_update import FwUpdate -from app.zeroconf_listener import ZeroconfListener - -from app.espota import FLASH,SPIFFS - -class SerialPortsComboBox(wx.ComboBox): - def __init__(self, parent, fw_update): - self.fw_update = fw_update - self.ports = serial.tools.list_ports.comports() - wx.ComboBox.__init__(self,parent, choices=[port.device for port in self.ports]) - -class DevicesPanel(wx.ListCtrl,wx.lib.mixins.listctrl.ColumnSorterMixin, wx.lib.mixins.listctrl.ListCtrlAutoWidthMixin, -): - def __init__(self, parent): - wx.ListCtrl.__init__(self, parent, style=wx.LC_REPORT) - self.column_headings = ["name", "Version", "SW Revision", "HW Revision", "IP", "FS Version"] - wx.lib.mixins.listctrl.ColumnSorterMixin.__init__( - self, - len(self.column_headings), - ) - wx.lib.mixins.listctrl.ListCtrlAutoWidthMixin.__init__(self) - - for column, heading in enumerate(self.column_headings): - self.AppendColumn(heading) - - self.itemDataMap = {} - - def OnSortOrderChanged(self): - """Method to handle changes to the sort order""" - - column, ascending = self.GetSortState() - self.ShowSortIndicator(column, ascending) - self.SortListItems(column, ascending) - - def GetListCtrl(self): - """Method required by the ColumnSorterMixin""" - return self - -class BTClockOTAUpdater(wx.Frame): - release_name = "" - commit_hash = "" - currentlyUpdating = False - updatingName = "" - - def __init__(self, parent, title): - wx.Frame.__init__(self, parent, title=title, size=(800,500)) - self.SetMinSize((800,500)) - ubuntu_it = wx.Font(32, wx.FONTFAMILY_MODERN, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_BOLD, False, faceName="Ubuntu") - # "Ubuntu-RI.ttf") - - self.fw_update = FwUpdate() - - panel = wx.Panel(self) - - self.title = wx.StaticText(panel, label="BTClock OTA firmware updater") - self.title.SetFont(ubuntu_it) - vbox = wx.BoxSizer(wx.VERTICAL) - vbox.Add(self.title, 0, wx.EXPAND | wx.ALL, 20, 0) - - # serialPorts = SerialPortsComboBox(panel, self.fw_update) - # vbox.Add(serialPorts, 0, wx.EXPAND | wx.ALL, 20, 0) - - self.device_list = DevicesPanel(panel) - self.device_list.Bind(wx.EVT_LIST_ITEM_SELECTED, self.on_item_selected) - self.device_list.Bind(wx.EVT_LIST_ITEM_DESELECTED, self.on_item_deselected) - - vbox.Add(self.device_list, proportion = 2, flag=wx.EXPAND | wx.ALL, border = 20) - hbox = wx.BoxSizer(wx.HORIZONTAL) - bbox = wx.BoxSizer(wx.HORIZONTAL) - - gs = wx.GridSizer(1, 4, 1, 1) - - self.fw_label = wx.StaticText(panel, label=f"Checking latest version...") - self.update_button = wx.Button(panel, label="Update Firmware") - self.update_button.Bind(wx.EVT_BUTTON, self.on_click_update_firmware) - self.update_fs_button = wx.Button(panel, label="Update Filesystem") - self.update_fs_button.Bind(wx.EVT_BUTTON, self.on_click_update_fs) - - self.identify_button = wx.Button(panel, label="Identify") - self.identify_button.Bind(wx.EVT_BUTTON, self.on_click_identify) - self.open_webif_button = wx.Button(panel, label="Open WebUI") - self.open_webif_button.Bind(wx.EVT_BUTTON, self.on_click_webui) - self.update_button.Disable() - self.update_fs_button.Disable() - self.identify_button.Disable() - self.open_webif_button.Disable() - - hbox.Add(self.fw_label, 1, wx.EXPAND | wx.ALL, 5) - bbox.Add(self.update_button) - bbox.Add(self.update_fs_button) - bbox.Add(self.identify_button) - bbox.Add(self.open_webif_button) - - hbox.AddStretchSpacer() - hbox.Add(bbox, 2, wx.EXPAND | wx.ALL, 5) - vbox.Add(hbox, 0, wx.EXPAND | wx.ALL, 20) - - self.progress_bar = wx.Gauge(panel, range=100) - vbox.Add(self.progress_bar, 0, wx.EXPAND | wx.ALL, 20) - - panel.SetSizer(vbox) - - filemenu= wx.Menu() - menuAbout = filemenu.Append(wx.ID_ABOUT, "&About"," Information about this program") - menuExit = filemenu.Append(wx.ID_EXIT,"E&xit"," Terminate the program") - - menuBar = wx.MenuBar() - menuBar.Append(filemenu,"&File") # Adding the "filemenu" to the MenuBar - self.SetMenuBar(menuBar) # Adding the MenuBar to the Frame content. - - - - self.Bind(wx.EVT_MENU, self.OnAbout, menuAbout) - self.Bind(wx.EVT_MENU, self.OnExit, menuExit) - self.status_bar = self.CreateStatusBar(2) - # self.StatusBar.SetFieldsCount(2) -# self.StatusBar.SetStatusWidths(-3, -1) - self.Show(True) - - self.zeroconf = Zeroconf() - self.listener = ZeroconfListener(self.on_zeroconf_state_change) - self.browser = ServiceBrowser(self.zeroconf, "_http._tcp.local.", self.listener) - self.api_handler = ApiHandler() - - wx.CallAfter(self.fetch_latest_release) - - def on_item_selected(self, event): - if self.release_name != "": - self.update_button.Enable() - self.update_fs_button.Enable() - self.identify_button.Enable() - self.open_webif_button.Enable() - - def on_item_deselected(self, event): - if self.device_list.GetFirstSelected() == -1: - self.update_button.Disable() - self.update_fs_button.Disable() - self.identify_button.Disable() - self.open_webif_button.Disable() - - def on_zeroconf_state_change(self, type, name, state, info): - index = self.device_list.FindItem(0, name) - - if state == "Added": - if index == wx.NOT_FOUND: - index = self.device_list.InsertItem(self.device_list.GetItemCount(), type) - self.device_list.SetItem(index, 0, name) - self.device_list.SetItem(index, 1, info.properties.get(b"version").decode()) - self.device_list.SetItem(index, 2, info.properties.get(b"rev").decode()) - if (info.properties.get(b"hw_rev") is not None): - self.device_list.SetItem(index, 3, info.properties.get(b"hw_rev").decode()) - self.device_list.SetItem(index, 4, info.parsed_addresses()[0]) - - else: - self.device_list.SetItem(index, 0, name) - self.device_list.SetItem(index, 1, info.properties.get(b"version").decode()) - self.device_list.SetItem(index, 2, info.properties.get(b"rev").decode()) - if (info.properties.get(b"hw_rev").decode()): - self.device_list.SetItem(index, 3, info.properties.get(b"hw_rev").decode()) - self.device_list.SetItem(index, 4, info.parsed_addresses()[0]) - self.device_list.SetItem(index, 5, self.api_handler.check_fs_hash(info.parsed_addresses()[0])) - self.device_list.SetItemData(index, index) - self.device_list.itemDataMap[index] = [name, info.properties.get(b"version").decode(), info.properties.get(b"rev").decode(), info.properties.get(b"hw_rev").decode(), info.parsed_addresses()[0]] - for col in range(0, len(self.device_list.column_headings)): - self.device_list.SetColumnWidth(col, wx.LIST_AUTOSIZE_USEHEADER) - elif state == "Removed": - if index != wx.NOT_FOUND: - self.device_list.DeleteItem(index) - - def on_click_update_firmware(self, event): - selected_index = self.device_list.GetFirstSelected() - if selected_index != -1: - service_name = self.device_list.GetItemText(selected_index, 0) - hw_rev = self.device_list.GetItemText(selected_index, 3) - - info = self.listener.services.get(service_name) - if info: - address = info.parsed_addresses()[0] if info.parsed_addresses() else "N/A" - self.start_firmware_update(address, hw_rev) - else: - wx.MessageBox("No service information available for selected device", "Error", wx.ICON_ERROR) - else: - wx.MessageBox("Please select a device to update", "Error", wx.ICON_ERROR) - - def on_click_webui(self, event): - selected_index = self.device_list.GetFirstSelected() - if selected_index != -1: - service_name = self.device_list.GetItemText(selected_index, 0) - info = self.listener.services.get(service_name) - if info: - address = info.parsed_addresses()[0] if info.parsed_addresses() else "N/A" - thread = threading.Thread(target=lambda: webbrowser.open(f"http://{address}")) - thread.start() - - def run_fs_update(self, address, firmware_file, type): - global PROGRESS - PROGRESS = True - espota.PROGRESS = True - global TIMEOUT - TIMEOUT = 10 - espota.TIMEOUT = 10 - - espota.serve(address, "0.0.0.0", 3232, random.randint(10000,60000), "", firmware_file, type, self.call_progress) - - wx.CallAfter(self.update_progress, 100) - self.currentlyUpdating = False - self.SetStatusText(f"Finished!") - - def call_progress(self, progress): - progressPerc = int(progress*100) - self.SetStatusText(f"{self.updatingName} - Progress: {progressPerc}%") - wx.CallAfter(self.update_progress, progress) - - def update_progress(self, progress): - self.progress_bar.SetValue(int(progress*100)) - wx.YieldIfNeeded() - - def on_click_update_fs(self, event): - selected_index = self.device_list.GetFirstSelected() - if selected_index != -1: - service_name = self.device_list.GetItemText(selected_index, 0) - info = self.listener.services.get(service_name) - if self.currentlyUpdating: - wx.MessageBox("Please wait, already updating", "Error", wx.ICON_ERROR) - return - - if info: - address = info.parsed_addresses()[0] if info.parsed_addresses() else "N/A" - self.start_fs_update(address) - else: - wx.MessageBox("No service information available for selected device", "Error", wx.ICON_ERROR) - else: - wx.MessageBox("Please select a device to update", "Error", wx.ICON_ERROR) - - def start_firmware_update(self, address, hw_rev): - self.SetStatusText(f"Starting firmware update") - - model_name = "lolin_s3_mini_213epd" - if (hw_rev == "REV_B_EPD_2_13"): - model_name = "btclock_rev_b_213epd" - - local_filename = f"firmware/{self.release_name}_{model_name}_firmware.bin" - - self.updatingName = address - self.currentlyUpdating = True - - if os.path.exists(os.path.abspath(local_filename)): - thread = Thread(target=self.run_fs_update, args=(address, os.path.abspath(local_filename), FLASH)) - thread.start() - - def start_fs_update(self, address): - # Path to the firmware file - self.SetStatusText(f"Starting filesystem update") - local_filename = f"firmware/{self.release_name}_littlefs.bin" - - self.updatingName = address - self.currentlyUpdating = True - - if os.path.exists(os.path.abspath(local_filename)): - thread = Thread(target=self.run_fs_update, args=(address, os.path.abspath(local_filename), SPIFFS)) - thread.start() - - wx.CallAfter(self.update_progress, 100) - - def on_click_identify(self, event): - selected_index = self.device_list.GetFirstSelected() - if selected_index != -1: - service_name = self.device_list.GetItemText(selected_index, 0) - info = self.listener.services.get(service_name) - if info: - address = info.parsed_addresses()[0] if info.parsed_addresses() else "N/A" - port = info.port - self.api_handler.identify_btclock(address) - else: - wx.MessageBox("No service information available for selected device", "Error", wx.ICON_ERROR) - else: - wx.MessageBox("Please select a device to make an API call", "Error", wx.ICON_ERROR) - - def fetch_latest_release(self): - repo = "btclock/btclock_v3" - - if not os.path.exists("firmware"): - os.makedirs("firmware") - filenames_to_download = ["lolin_s3_mini_213epd_firmware.bin", "btclock_rev_b_213epd_firmware.bin", "littlefs.bin"] - url = f"https://api.github.com/repos/{repo}/releases/latest" - try: - response = requests.get(url) - response.raise_for_status() - latest_release = response.json() - release_name = latest_release['tag_name'] - self.release_name = release_name - - - asset_url = None - asset_urls = [] - for asset in latest_release['assets']: - if asset['name'] in filenames_to_download: - asset_urls.append(asset['browser_download_url']) - if asset_urls: - for asset_url in asset_urls: - self.download_file(asset_url, release_name) - ref_url = f"https://api.github.com/repos/{repo}/git/ref/tags/{release_name}" - response = requests.get(ref_url) - response.raise_for_status() - ref_info = response.json() - if (ref_info["object"]["type"] == "commit"): - self.commit_hash = ref_info["object"]["sha"] - else: - tag_url = f"https://api.github.com/repos/{repo}/git/tags/{ref_info["object"]["sha"]}" - response = requests.get(tag_url) - response.raise_for_status() - tag_info = response.json() - self.commit_hash = tag_info["object"]["sha"] - - self.fw_label.SetLabelText(f"Downloaded firmware version: {self.release_name}\nCommit: {self.commit_hash}") - - - else: - wx.CallAfter(self.SetStatusText, f"File {filenames_to_download} not found in latest release") - except requests.RequestException as e: - wx.CallAfter(self.SetStatusText, f"Error fetching release: {e}") - - - - def download_file(self, url, release_name): - local_filename = f"{release_name}_{url.split('/')[-1]}" - response = requests.get(url, stream=True) - total_length = response.headers.get('content-length') - if not os.path.exists("firmware"): - os.makedirs("firmware") - if os.path.exists(f"firmware/{local_filename}"): - wx.CallAfter(self.SetStatusText, f"{local_filename} is already downloaded") - return - - if total_length is None: - wx.CallAfter(self.SetStatusText, "No content length header") - else: - total_length = int(total_length) - chunk_size = 1024 - num_chunks = total_length // chunk_size - with open(f"firmware/{local_filename}", 'wb') as f: - for i, chunk in enumerate(response.iter_content(chunk_size=chunk_size)): - if chunk: - f.write(chunk) - f.flush() - progress = int((i / num_chunks) * 100) - wx.CallAfter(self.update_progress, progress) - - wx.CallAfter(self.update_progress, 100) - wx.CallAfter(self.SetStatusText, "Download completed") - - def OnAbout(self,e): - dlg = wx.MessageDialog( self, "An updater for BTClocks", "About BTClock OTA Updater", wx.OK) - dlg.ShowModal() - dlg.Destroy() - - def OnExit(self,e): - self.Close(False) \ No newline at end of file diff --git a/app/gui/__init__.py b/app/gui/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/gui/action_button_panel.py b/app/gui/action_button_panel.py new file mode 100644 index 0000000..5c28669 --- /dev/null +++ b/app/gui/action_button_panel.py @@ -0,0 +1,126 @@ +import threading +import webbrowser +from app.api import ApiHandler +from app.gui.devices_panel import DevicesPanel +from app.zeroconf_listener import ZeroconfListener +import wx + +class ActionButtonPanel(wx.Panel): + currentlyUpdating = False + + def __init__(self, parent:wx.Panel, parent_frame:wx.Frame, *args, **kwargs): + super(ActionButtonPanel, self).__init__(parent, *args, **kwargs) + + self.parent = parent + self.parent_frame = parent_frame + self.api_handler:ApiHandler = parent_frame.api_handler + self.device_list:DevicesPanel = parent_frame.device_list + self.listener:ZeroconfListener = parent_frame.listener + + self.device_list.Bind(wx.EVT_LIST_ITEM_SELECTED, self.on_item_selected) + self.device_list.Bind(wx.EVT_LIST_ITEM_DESELECTED, + self.on_item_deselected) + self.InitUI() + + def InitUI(self): + sizer = wx.BoxSizer(wx.HORIZONTAL) + + self.update_button = wx.Button(self, label="Update Firmware") + self.update_button.Bind(wx.EVT_BUTTON, self.on_click_update_firmware) + self.update_fs_button = wx.Button(self, label="Update Filesystem") + self.update_fs_button.Bind(wx.EVT_BUTTON, self.on_click_update_fs) + + self.identify_button = wx.Button(self, label="Identify") + self.identify_button.Bind(wx.EVT_BUTTON, self.on_click_identify) + self.open_webif_button = wx.Button(self, label="Open WebUI") + self.open_webif_button.Bind(wx.EVT_BUTTON, self.on_click_webui) + self.update_button.Disable() + self.update_fs_button.Disable() + self.identify_button.Disable() + self.open_webif_button.Disable() + + sizer.Add(self.update_button) + sizer.Add(self.update_fs_button) + sizer.Add(self.identify_button) + sizer.Add(self.open_webif_button) + + self.SetSizer(sizer) + + def on_click_update_firmware(self, event): + selected_index = self.device_list.GetFirstSelected() + if selected_index != -1: + service_name = self.device_list.GetItemText(selected_index, 0) + hw_rev = self.device_list.GetItemText(selected_index, 3) + + info = self.listener.services.get(service_name) + if info: + address = info.parsed_addresses( + )[0] if info.parsed_addresses() else "N/A" + self.parent_frame.fw_updater.start_firmware_update(self.parent_frame.releaseChecker.release_name, address, hw_rev) + else: + wx.MessageBox( + "No service information available for selected device", "Error", wx.ICON_ERROR) + else: + wx.MessageBox("Please select a device to update", + "Error", wx.ICON_ERROR) + + def on_click_webui(self, event): + selected_index = self.device_list.GetFirstSelected() + if selected_index != -1: + service_name = self.device_list.GetItemText(selected_index, 0) + info = self.listener.services.get(service_name) + if info: + address = info.parsed_addresses( + )[0] if info.parsed_addresses() else "N/A" + thread = threading.Thread( + target=lambda: webbrowser.open(f"http://{address}")) + thread.start() + + def on_click_update_fs(self, event): + selected_index = self.device_list.GetFirstSelected() + if selected_index != -1: + service_name = self.device_list.GetItemText(selected_index, 0) + info = self.listener.services.get(service_name) + if self.currentlyUpdating: + wx.MessageBox("Please wait, already updating", + "Error", wx.ICON_ERROR) + return + + if info: + address = info.parsed_addresses( + )[0] if info.parsed_addresses() else "N/A" + self.parent_frame.fw_updater.start_fs_update(self.parent_frame.releaseChecker.release_name, address) + else: + wx.MessageBox( + "No service information available for selected device", "Error", wx.ICON_ERROR) + else: + wx.MessageBox("Please select a device to update", + "Error", wx.ICON_ERROR) + def on_click_identify(self, event): + selected_index = self.device_list.GetFirstSelected() + if selected_index != -1: + service_name = self.device_list.GetItemText(selected_index, 0) + info = self.listener.services.get(service_name) + if info: + address = info.parsed_addresses( + )[0] if info.parsed_addresses() else "N/A" + port = info.port + self.api_handler.identify_btclock(address) + else: + wx.MessageBox( + "No service information available for selected device", "Error", wx.ICON_ERROR) + else: + wx.MessageBox( + "Please select a device to make an API call", "Error", wx.ICON_ERROR) + def on_item_selected(self, event): + self.update_button.Enable() + self.update_fs_button.Enable() + self.identify_button.Enable() + self.open_webif_button.Enable() + + def on_item_deselected(self, event): + if self.device_list.GetFirstSelected() == -1: + self.update_button.Disable() + self.update_fs_button.Disable() + self.identify_button.Disable() + self.open_webif_button.Disable() diff --git a/app/gui/devices_panel.py b/app/gui/devices_panel.py new file mode 100644 index 0000000..bbf0df5 --- /dev/null +++ b/app/gui/devices_panel.py @@ -0,0 +1,31 @@ +import wx +import wx.lib.mixins.listctrl + + +class DevicesPanel(wx.ListCtrl, wx.lib.mixins.listctrl.ColumnSorterMixin, wx.lib.mixins.listctrl.ListCtrlAutoWidthMixin, + ): + def __init__(self, parent): + wx.ListCtrl.__init__(self, parent, style=wx.LC_REPORT) + self.column_headings = [ + "Hostname", "Version", "FW commit", "HW Revision", "IP", "WebUI commit"] + wx.lib.mixins.listctrl.ColumnSorterMixin.__init__( + self, + len(self.column_headings), + ) + wx.lib.mixins.listctrl.ListCtrlAutoWidthMixin.__init__(self) + + for column, heading in enumerate(self.column_headings): + self.AppendColumn(heading) + + self.itemDataMap = {} + + def OnSortOrderChanged(self): + """Method to handle changes to the sort order""" + + column, ascending = self.GetSortState() + self.ShowSortIndicator(column, ascending) + self.SortListItems(column, ascending) + + def GetListCtrl(self): + """Method required by the ColumnSorterMixin""" + return self diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..93383ec --- /dev/null +++ b/app/main.py @@ -0,0 +1,167 @@ +import concurrent.futures + +import serial +from app.gui.action_button_panel import ActionButtonPanel +from app.release_checker import ReleaseChecker +import wx + +from zeroconf import ServiceBrowser, Zeroconf +import os +import webbrowser + +from app import espota +from app.api import ApiHandler +from app.fw_updater import FwUpdater +from app.gui.devices_panel import DevicesPanel +from app.zeroconf_listener import ZeroconfListener + +from app.espota import FLASH, SPIFFS + +class SerialPortsComboBox(wx.ComboBox): + def __init__(self, parent, fw_update): + self.fw_update = fw_update + self.ports = serial.tools.list_ports.comports() + wx.ComboBox.__init__(self, parent, choices=[ + port.device for port in self.ports]) + + +class BTClockOTAUpdater(wx.Frame): + updatingName = "" + + def __init__(self, parent, title): + wx.Frame.__init__(self, parent, title=title, size=(800, 500)) + self.SetMinSize((800, 500)) + self.releaseChecker = ReleaseChecker() + self.zeroconf = Zeroconf() + self.listener = ZeroconfListener(self.on_zeroconf_state_change) + self.browser = ServiceBrowser( + self.zeroconf, "_http._tcp.local.", self.listener) + self.api_handler = ApiHandler() + self.fw_updater = FwUpdater(self.call_progress) + + panel = wx.Panel(self) + self.device_list = DevicesPanel(panel) + + vbox = wx.BoxSizer(wx.VERTICAL) + + vbox.Add(self.device_list, proportion=2, + flag=wx.EXPAND | wx.ALL, border=20) + hbox = wx.BoxSizer(wx.HORIZONTAL) + + self.fw_label = wx.StaticText( + panel, label=f"Fetching latest version from GitHub...") + hbox.Add(self.fw_label, 1, wx.EXPAND | wx.ALL, 5) + + self.actionButtons = ActionButtonPanel( + panel, self) + hbox.AddStretchSpacer() + + hbox.Add(self.actionButtons, 2, wx.EXPAND | wx.ALL, 5) + vbox.Add(hbox, 0, wx.EXPAND | wx.ALL, 20) + + self.progress_bar = wx.Gauge(panel, range=100) + vbox.Add(self.progress_bar, 0, wx.EXPAND | wx.ALL, 20) + + panel.SetSizer(vbox) + + self.setup_ui() + wx.CallAfter(self.fetch_latest_release_async) + + def setup_ui(self): + self.setup_menubar() + self.status_bar = self.CreateStatusBar(2) + self.Show(True) + self.Centre() + + def setup_menubar(self): + filemenu = wx.Menu() + menuAbout = filemenu.Append( + wx.ID_ABOUT, "&About", " Information about this program") + menuExit = filemenu.Append( + wx.ID_EXIT, "E&xit", " Terminate the program") + + menuBar = wx.MenuBar() + menuBar.Append(filemenu, "&File") + self.SetMenuBar(menuBar) + + self.Bind(wx.EVT_MENU, self.OnAbout, menuAbout) + self.Bind(wx.EVT_MENU, self.OnExit, menuExit) + + def on_zeroconf_state_change(self, type, name, state, info): + index = self.device_list.FindItem(0, name) + + if state == "Added": + deviceSettings = self.api_handler.get_settings( + info.parsed_addresses()[0]) + + version = info.properties.get(b"rev").decode() + + if 'gitTag' in deviceSettings: + version = deviceSettings["gitTag"] + + fwHash = info.properties.get(b"rev").decode()[:7] + fsHash = deviceSettings['fsRev'][:7] + address = info.parsed_addresses()[0] + + if index == wx.NOT_FOUND: + index = self.device_list.InsertItem( + self.device_list.GetItemCount(), type) + self.device_list.SetItem(index, 0, name) + self.device_list.SetItem(index, 1, version) + self.device_list.SetItem(index, 2, fwHash) + if (info.properties.get(b"hw_rev") is not None): + self.device_list.SetItem( + index, 3, info.properties.get(b"hw_rev").decode()) + self.device_list.SetItem(index, 4, address) + + else: + self.device_list.SetItem(index, 0, name) + self.device_list.SetItem(index, 1, version) + self.device_list.SetItem(index, 2, fwHash) + if (info.properties.get(b"hw_rev").decode()): + self.device_list.SetItem( + index, 3, info.properties.get(b"hw_rev").decode()) + self.device_list.SetItem(index, 4, address) + self.device_list.SetItem(index, 5, fsHash) + self.device_list.SetItemData(index, index) + self.device_list.itemDataMap[index] = [ + name, version, fwHash, info.properties.get(b"hw_rev").decode(), address, fsHash] + for col in range(0, len(self.device_list.column_headings)): + self.device_list.SetColumnWidth( + col, wx.LIST_AUTOSIZE_USEHEADER) + elif state == "Removed": + if index != wx.NOT_FOUND: + self.device_list.DeleteItem(index) + + def call_progress(self, progress): + progressPerc = int(progress*100) + self.SetStatusText(f"Progress: {progressPerc}%") + wx.CallAfter(self.update_progress, progress) + + def update_progress(self, progress): + progressPerc = int(progress*100) + self.progress_bar.SetValue(progressPerc) + wx.YieldIfNeeded() + + def fetch_latest_release_async(self): + # Start a new thread to execute fetch_latest_release + executor = concurrent.futures.ThreadPoolExecutor() + future = executor.submit(self.releaseChecker.fetch_latest_release) + future.add_done_callback(self.handle_latest_release) + + def handle_latest_release(self, future): + try: + latest_release = future.result() + self.fw_label.SetLabelText(f"Downloaded firmware version: { + latest_release}\nCommit: {self.releaseChecker.commit_hash}") + except Exception as e: + self.fw_label.SetLabel(f"Error occurred: {str(e)}") + + def OnAbout(self, e): + dlg = wx.MessageDialog( + self, "An updater for BTClocks", "About BTClock OTA Updater", wx.OK) + dlg.ShowModal() + dlg.Destroy() + + def OnExit(self, e): + self.Close(False) diff --git a/app/release_checker.py b/app/release_checker.py new file mode 100644 index 0000000..0e4ec9e --- /dev/null +++ b/app/release_checker.py @@ -0,0 +1,97 @@ +import os +import requests +import wx +from typing import Callable + +from app.utils import keep_latest_versions + + +class ReleaseChecker: + '''Release Checker for firmware updates''' + release_name = "" + commit_hash = "" + + def __init__(self): + self.progress_callback: Callable[[int], None] = None + + def fetch_latest_release(self): + '''Fetch latest firmware release from GitHub''' + repo = "btclock/btclock_v3" + + if not os.path.exists("firmware"): + os.makedirs("firmware") + filenames_to_download = ["lolin_s3_mini_213epd_firmware.bin", + "btclock_rev_b_213epd_firmware.bin", "littlefs.bin"] + url = f"https://api.github.com/repos/{repo}/releases/latest" + try: + response = requests.get(url) + response.raise_for_status() + latest_release = response.json() + release_name = latest_release['tag_name'] + self.release_name = release_name + + asset_url = None + asset_urls = [] + for asset in latest_release['assets']: + if asset['name'] in filenames_to_download: + asset_urls.append(asset['browser_download_url']) + if asset_urls: + for asset_url in asset_urls: + self.download_file(asset_url, release_name) + ref_url = f"https://api.github.com/repos/{ + repo}/git/ref/tags/{release_name}" + response = requests.get(ref_url) + response.raise_for_status() + ref_info = response.json() + if (ref_info["object"]["type"] == "commit"): + self.commit_hash = ref_info["object"]["sha"] + else: + tag_url = f"https://api.github.com/repos/{ + repo}/git/tags/{ref_info["object"]["sha"]}" + response = requests.get(tag_url) + response.raise_for_status() + tag_info = response.json() + self.commit_hash = tag_info["object"]["sha"] + + return self.release_name + + else: + raise ReleaseCheckerException( + f"File {filenames_to_download} not found in latest release") + except requests.RequestException as e: + raise ReleaseCheckerException( + f"Error fetching release: {e}") from e + + def download_file(self, url, release_name): + '''Downloads Fimware Files''' + local_filename = f"{release_name}_{url.split('/')[-1]}" + response = requests.get(url, stream=True) + total_length = response.headers.get('content-length') + if not os.path.exists("firmware"): + os.makedirs("firmware") + if os.path.exists(f"firmware/{local_filename}"): + return + + keep_latest_versions('firmware', 2) + + if total_length is None: + raise ReleaseCheckerException("No content length header") + else: + total_length = int(total_length) + chunk_size = 1024 + num_chunks = total_length // chunk_size + with open(f"firmware/{local_filename}", 'wb') as f: + for i, chunk in enumerate(response.iter_content(chunk_size=chunk_size)): + if chunk: + f.write(chunk) + f.flush() + progress = int((i / num_chunks) * 100) + if callable(self.progress_callback): + self.progress_callback(progress) + + if callable(self.progress_callback): + self.progress_callback(100) + + +class ReleaseCheckerException(Exception): + pass diff --git a/app/utils.py b/app/utils.py new file mode 100644 index 0000000..8b79405 --- /dev/null +++ b/app/utils.py @@ -0,0 +1,32 @@ +import os +import re +import shutil + + +def count_versions(folder_path): + versions = set() + version_pattern = re.compile(r'^(\d+\.\d+\.\d+)') + for file_name in os.listdir(folder_path): + match = version_pattern.match(file_name) + if match: + versions.add(match.group(1)) + return len(versions) + + +def keep_latest_versions(folder_path, num_versions_to_keep=2): + version_files = {} + version_pattern = re.compile(r'^(\d+\.\d+\.\d+)') + for file_name in os.listdir(folder_path): + match = version_pattern.match(file_name) + if match: + version = match.group(1) + if version not in version_files: + version_files[version] = [] + version_files[version].append(file_name) + + versions_sorted = sorted(version_files.keys(), reverse=True) + versions_to_remove = versions_sorted[num_versions_to_keep:] + + for version in versions_to_remove: + for file_name in version_files[version]: + os.remove(os.path.join(folder_path, file_name)) diff --git a/app/zeroconf_listener.py b/app/zeroconf_listener.py index 21204ea..bedf9d2 100644 --- a/app/zeroconf_listener.py +++ b/app/zeroconf_listener.py @@ -1,7 +1,9 @@ import wx -from zeroconf import Zeroconf, ServiceBrowser, ServiceStateChange +from zeroconf import Zeroconf + class ZeroconfListener: + '''Zeroconf Handler to find BTClocks in the network''' release_name = "" firmware_file = "" @@ -26,4 +28,4 @@ class ZeroconfListener: if (name.startswith('btclock-')): info = zeroconf.get_service_info(type, name) self.services[name] = info - wx.CallAfter(self.update_callback, type, name, "Added", info) \ No newline at end of file + wx.CallAfter(self.update_callback, type, name, "Added", info) diff --git a/requirements.txt b/requirements.txt index 6f72ad6..e24dba4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ Requests==2.32.3 -wxPython==4.2.1 +wxPython==4.2.2a1.dev5670+a207b407 zeroconf==0.132.2 esptool==4.7.0 pyserial==3.5