Uncluttered code

This commit is contained in:
Djuri Baars 2024-06-09 19:38:32 +02:00
parent d356e089c9
commit 3eb23209e8
13 changed files with 564 additions and 416 deletions

2
app.py
View file

@ -1,6 +1,6 @@
import wx
from app.gui import BTClockOTAUpdater
from app.main import BTClockOTAUpdater
app = wx.App(False)
frame = BTClockOTAUpdater(None, 'BTClock OTA updater')

View file

@ -1,18 +1,24 @@
import time
import requests
import json
from threading import Thread
class ApiHandler:
def identify_btclock(self, address):
self.make_api_call(address, "api/identify")
return
def check_fs_hash(self, address):
ret = self.run_api_call(address, "fs_hash.txt")
return ret
def get_settings(self, address):
ret = json.loads(self.run_api_call(address, "api/settings"))
return ret
def make_api_call(self, address, path):
thread = Thread(target=self.run_api_call, args=(address,path))
thread = Thread(target=self.run_api_call, args=(address, path))
thread.start()
def run_api_call(self, address, path):
@ -21,4 +27,4 @@ class ApiHandler:
response = requests.get(url)
return response.text
except requests.RequestException as e:
print("error")
print("error")

View file

@ -1,37 +0,0 @@
import esptool
import serial
class FwUpdate:
def get_serial_ports(self):
ports = serial.tools.list_ports.comports()
available_ports = []
for port, desc, hwid in sorted(ports):
available_ports.append((port, desc, hwid))
print(f"Port: {port}, Description: {desc}, Hardware ID: {hwid}")
return available_ports
def flash_firmware(port, baud, firmware_path):
try:
# Initialize the serial port
serial_port = serial.Serial(port, baud)
# Initialize the ESP32ROM with the serial port
esp = esptool.ESP32ROM(serial_port)
# Connect to the ESP32
esp.connect()
# Perform the flashing operation
esp.flash_file(firmware_path, offset=0x1000)
# Optionally, verify the flash
esp.verify_flash(firmware_path, offset=0x1000)
print("Firmware flashed successfully!")
except esptool.FatalError as e:
print(f"Failed to flash firmware: {e}")
finally:
# Ensure the serial port is closed
if serial_port.is_open:
serial_port.close()

96
app/fw_updater.py Normal file
View file

@ -0,0 +1,96 @@
import os
import random
from threading import Thread
from app import espota
from app.espota import FLASH, SPIFFS
import esptool
import serial
import wx
class FwUpdater:
update_progress = None
currentlyUpdating = False
def __init__(self, update_progress):
self.update_progress = update_progress
def get_serial_ports(self):
ports = serial.tools.list_ports.comports()
available_ports = []
for port, desc, hwid in sorted(ports):
available_ports.append((port, desc, hwid))
print(f"Port: {port}, Description: {desc}, Hardware ID: {hwid}")
return available_ports
def run_fs_update(self, address, firmware_file, type):
global PROGRESS
PROGRESS = True
espota.PROGRESS = True
global TIMEOUT
TIMEOUT = 10
espota.TIMEOUT = 10
espota.serve(address, "0.0.0.0", 3232, random.randint(
10000, 60000), "", firmware_file, type, self.update_progress)
wx.CallAfter(self.update_progress, 1)
self.currentlyUpdating = False
# self.SetStatusText(f"Finished!")
def flash_firmware(port, baud, firmware_path):
try:
# Initialize the serial port
serial_port = serial.Serial(port, baud)
# Initialize the ESP32ROM with the serial port
esp = esptool.ESP32ROM(serial_port)
# Connect to the ESP32
esp.connect()
# Perform the flashing operation
esp.flash_file(firmware_path, offset=0x1000)
# Optionally, verify the flash
esp.verify_flash(firmware_path, offset=0x1000)
print("Firmware flashed successfully!")
except esptool.FatalError as e:
print(f"Failed to flash firmware: {e}")
finally:
# Ensure the serial port is closed
if serial_port.is_open:
serial_port.close()
def start_firmware_update(self, release_name, address, hw_rev):
# self.SetStatusText(f"Starting firmware update")
model_name = "lolin_s3_mini_213epd"
if (hw_rev == "REV_B_EPD_2_13"):
model_name = "btclock_rev_b_213epd"
local_filename = f"firmware/{
release_name}_{model_name}_firmware.bin"
self.updatingName = address
self.currentlyUpdating = True
if os.path.exists(os.path.abspath(local_filename)):
thread = Thread(target=self.run_fs_update, args=(
address, os.path.abspath(local_filename), FLASH))
thread.start()
def start_fs_update(self, release_name, address):
# Path to the firmware file
local_filename = f"firmware/{release_name}_littlefs.bin"
self.updatingName = address
self.currentlyUpdating = True
if os.path.exists(os.path.abspath(local_filename)):
thread = Thread(target=self.run_fs_update, args=(
address, os.path.abspath(local_filename), SPIFFS))
thread.start()

View file

@ -1,372 +0,0 @@
import random
from threading import Thread
import threading
import serial
import wx
import wx.lib.mixins.listctrl
from zeroconf import ServiceBrowser, Zeroconf
import requests
import os
import webbrowser
from app import espota
from app.api import ApiHandler
from app.fw_update import FwUpdate
from app.zeroconf_listener import ZeroconfListener
from app.espota import FLASH,SPIFFS
class SerialPortsComboBox(wx.ComboBox):
def __init__(self, parent, fw_update):
self.fw_update = fw_update
self.ports = serial.tools.list_ports.comports()
wx.ComboBox.__init__(self,parent, choices=[port.device for port in self.ports])
class DevicesPanel(wx.ListCtrl,wx.lib.mixins.listctrl.ColumnSorterMixin, wx.lib.mixins.listctrl.ListCtrlAutoWidthMixin,
):
def __init__(self, parent):
wx.ListCtrl.__init__(self, parent, style=wx.LC_REPORT)
self.column_headings = ["name", "Version", "SW Revision", "HW Revision", "IP", "FS Version"]
wx.lib.mixins.listctrl.ColumnSorterMixin.__init__(
self,
len(self.column_headings),
)
wx.lib.mixins.listctrl.ListCtrlAutoWidthMixin.__init__(self)
for column, heading in enumerate(self.column_headings):
self.AppendColumn(heading)
self.itemDataMap = {}
def OnSortOrderChanged(self):
"""Method to handle changes to the sort order"""
column, ascending = self.GetSortState()
self.ShowSortIndicator(column, ascending)
self.SortListItems(column, ascending)
def GetListCtrl(self):
"""Method required by the ColumnSorterMixin"""
return self
class BTClockOTAUpdater(wx.Frame):
release_name = ""
commit_hash = ""
currentlyUpdating = False
updatingName = ""
def __init__(self, parent, title):
wx.Frame.__init__(self, parent, title=title, size=(800,500))
self.SetMinSize((800,500))
ubuntu_it = wx.Font(32, wx.FONTFAMILY_MODERN, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_BOLD, False, faceName="Ubuntu")
# "Ubuntu-RI.ttf")
self.fw_update = FwUpdate()
panel = wx.Panel(self)
self.title = wx.StaticText(panel, label="BTClock OTA firmware updater")
self.title.SetFont(ubuntu_it)
vbox = wx.BoxSizer(wx.VERTICAL)
vbox.Add(self.title, 0, wx.EXPAND | wx.ALL, 20, 0)
# serialPorts = SerialPortsComboBox(panel, self.fw_update)
# vbox.Add(serialPorts, 0, wx.EXPAND | wx.ALL, 20, 0)
self.device_list = DevicesPanel(panel)
self.device_list.Bind(wx.EVT_LIST_ITEM_SELECTED, self.on_item_selected)
self.device_list.Bind(wx.EVT_LIST_ITEM_DESELECTED, self.on_item_deselected)
vbox.Add(self.device_list, proportion = 2, flag=wx.EXPAND | wx.ALL, border = 20)
hbox = wx.BoxSizer(wx.HORIZONTAL)
bbox = wx.BoxSizer(wx.HORIZONTAL)
gs = wx.GridSizer(1, 4, 1, 1)
self.fw_label = wx.StaticText(panel, label=f"Checking latest version...")
self.update_button = wx.Button(panel, label="Update Firmware")
self.update_button.Bind(wx.EVT_BUTTON, self.on_click_update_firmware)
self.update_fs_button = wx.Button(panel, label="Update Filesystem")
self.update_fs_button.Bind(wx.EVT_BUTTON, self.on_click_update_fs)
self.identify_button = wx.Button(panel, label="Identify")
self.identify_button.Bind(wx.EVT_BUTTON, self.on_click_identify)
self.open_webif_button = wx.Button(panel, label="Open WebUI")
self.open_webif_button.Bind(wx.EVT_BUTTON, self.on_click_webui)
self.update_button.Disable()
self.update_fs_button.Disable()
self.identify_button.Disable()
self.open_webif_button.Disable()
hbox.Add(self.fw_label, 1, wx.EXPAND | wx.ALL, 5)
bbox.Add(self.update_button)
bbox.Add(self.update_fs_button)
bbox.Add(self.identify_button)
bbox.Add(self.open_webif_button)
hbox.AddStretchSpacer()
hbox.Add(bbox, 2, wx.EXPAND | wx.ALL, 5)
vbox.Add(hbox, 0, wx.EXPAND | wx.ALL, 20)
self.progress_bar = wx.Gauge(panel, range=100)
vbox.Add(self.progress_bar, 0, wx.EXPAND | wx.ALL, 20)
panel.SetSizer(vbox)
filemenu= wx.Menu()
menuAbout = filemenu.Append(wx.ID_ABOUT, "&About"," Information about this program")
menuExit = filemenu.Append(wx.ID_EXIT,"E&xit"," Terminate the program")
menuBar = wx.MenuBar()
menuBar.Append(filemenu,"&File") # Adding the "filemenu" to the MenuBar
self.SetMenuBar(menuBar) # Adding the MenuBar to the Frame content.
self.Bind(wx.EVT_MENU, self.OnAbout, menuAbout)
self.Bind(wx.EVT_MENU, self.OnExit, menuExit)
self.status_bar = self.CreateStatusBar(2)
# self.StatusBar.SetFieldsCount(2)
# self.StatusBar.SetStatusWidths(-3, -1)
self.Show(True)
self.zeroconf = Zeroconf()
self.listener = ZeroconfListener(self.on_zeroconf_state_change)
self.browser = ServiceBrowser(self.zeroconf, "_http._tcp.local.", self.listener)
self.api_handler = ApiHandler()
wx.CallAfter(self.fetch_latest_release)
def on_item_selected(self, event):
if self.release_name != "":
self.update_button.Enable()
self.update_fs_button.Enable()
self.identify_button.Enable()
self.open_webif_button.Enable()
def on_item_deselected(self, event):
if self.device_list.GetFirstSelected() == -1:
self.update_button.Disable()
self.update_fs_button.Disable()
self.identify_button.Disable()
self.open_webif_button.Disable()
def on_zeroconf_state_change(self, type, name, state, info):
index = self.device_list.FindItem(0, name)
if state == "Added":
if index == wx.NOT_FOUND:
index = self.device_list.InsertItem(self.device_list.GetItemCount(), type)
self.device_list.SetItem(index, 0, name)
self.device_list.SetItem(index, 1, info.properties.get(b"version").decode())
self.device_list.SetItem(index, 2, info.properties.get(b"rev").decode())
if (info.properties.get(b"hw_rev") is not None):
self.device_list.SetItem(index, 3, info.properties.get(b"hw_rev").decode())
self.device_list.SetItem(index, 4, info.parsed_addresses()[0])
else:
self.device_list.SetItem(index, 0, name)
self.device_list.SetItem(index, 1, info.properties.get(b"version").decode())
self.device_list.SetItem(index, 2, info.properties.get(b"rev").decode())
if (info.properties.get(b"hw_rev").decode()):
self.device_list.SetItem(index, 3, info.properties.get(b"hw_rev").decode())
self.device_list.SetItem(index, 4, info.parsed_addresses()[0])
self.device_list.SetItem(index, 5, self.api_handler.check_fs_hash(info.parsed_addresses()[0]))
self.device_list.SetItemData(index, index)
self.device_list.itemDataMap[index] = [name, info.properties.get(b"version").decode(), info.properties.get(b"rev").decode(), info.properties.get(b"hw_rev").decode(), info.parsed_addresses()[0]]
for col in range(0, len(self.device_list.column_headings)):
self.device_list.SetColumnWidth(col, wx.LIST_AUTOSIZE_USEHEADER)
elif state == "Removed":
if index != wx.NOT_FOUND:
self.device_list.DeleteItem(index)
def on_click_update_firmware(self, event):
selected_index = self.device_list.GetFirstSelected()
if selected_index != -1:
service_name = self.device_list.GetItemText(selected_index, 0)
hw_rev = self.device_list.GetItemText(selected_index, 3)
info = self.listener.services.get(service_name)
if info:
address = info.parsed_addresses()[0] if info.parsed_addresses() else "N/A"
self.start_firmware_update(address, hw_rev)
else:
wx.MessageBox("No service information available for selected device", "Error", wx.ICON_ERROR)
else:
wx.MessageBox("Please select a device to update", "Error", wx.ICON_ERROR)
def on_click_webui(self, event):
selected_index = self.device_list.GetFirstSelected()
if selected_index != -1:
service_name = self.device_list.GetItemText(selected_index, 0)
info = self.listener.services.get(service_name)
if info:
address = info.parsed_addresses()[0] if info.parsed_addresses() else "N/A"
thread = threading.Thread(target=lambda: webbrowser.open(f"http://{address}"))
thread.start()
def run_fs_update(self, address, firmware_file, type):
global PROGRESS
PROGRESS = True
espota.PROGRESS = True
global TIMEOUT
TIMEOUT = 10
espota.TIMEOUT = 10
espota.serve(address, "0.0.0.0", 3232, random.randint(10000,60000), "", firmware_file, type, self.call_progress)
wx.CallAfter(self.update_progress, 100)
self.currentlyUpdating = False
self.SetStatusText(f"Finished!")
def call_progress(self, progress):
progressPerc = int(progress*100)
self.SetStatusText(f"{self.updatingName} - Progress: {progressPerc}%")
wx.CallAfter(self.update_progress, progress)
def update_progress(self, progress):
self.progress_bar.SetValue(int(progress*100))
wx.YieldIfNeeded()
def on_click_update_fs(self, event):
selected_index = self.device_list.GetFirstSelected()
if selected_index != -1:
service_name = self.device_list.GetItemText(selected_index, 0)
info = self.listener.services.get(service_name)
if self.currentlyUpdating:
wx.MessageBox("Please wait, already updating", "Error", wx.ICON_ERROR)
return
if info:
address = info.parsed_addresses()[0] if info.parsed_addresses() else "N/A"
self.start_fs_update(address)
else:
wx.MessageBox("No service information available for selected device", "Error", wx.ICON_ERROR)
else:
wx.MessageBox("Please select a device to update", "Error", wx.ICON_ERROR)
def start_firmware_update(self, address, hw_rev):
self.SetStatusText(f"Starting firmware update")
model_name = "lolin_s3_mini_213epd"
if (hw_rev == "REV_B_EPD_2_13"):
model_name = "btclock_rev_b_213epd"
local_filename = f"firmware/{self.release_name}_{model_name}_firmware.bin"
self.updatingName = address
self.currentlyUpdating = True
if os.path.exists(os.path.abspath(local_filename)):
thread = Thread(target=self.run_fs_update, args=(address, os.path.abspath(local_filename), FLASH))
thread.start()
def start_fs_update(self, address):
# Path to the firmware file
self.SetStatusText(f"Starting filesystem update")
local_filename = f"firmware/{self.release_name}_littlefs.bin"
self.updatingName = address
self.currentlyUpdating = True
if os.path.exists(os.path.abspath(local_filename)):
thread = Thread(target=self.run_fs_update, args=(address, os.path.abspath(local_filename), SPIFFS))
thread.start()
wx.CallAfter(self.update_progress, 100)
def on_click_identify(self, event):
selected_index = self.device_list.GetFirstSelected()
if selected_index != -1:
service_name = self.device_list.GetItemText(selected_index, 0)
info = self.listener.services.get(service_name)
if info:
address = info.parsed_addresses()[0] if info.parsed_addresses() else "N/A"
port = info.port
self.api_handler.identify_btclock(address)
else:
wx.MessageBox("No service information available for selected device", "Error", wx.ICON_ERROR)
else:
wx.MessageBox("Please select a device to make an API call", "Error", wx.ICON_ERROR)
def fetch_latest_release(self):
repo = "btclock/btclock_v3"
if not os.path.exists("firmware"):
os.makedirs("firmware")
filenames_to_download = ["lolin_s3_mini_213epd_firmware.bin", "btclock_rev_b_213epd_firmware.bin", "littlefs.bin"]
url = f"https://api.github.com/repos/{repo}/releases/latest"
try:
response = requests.get(url)
response.raise_for_status()
latest_release = response.json()
release_name = latest_release['tag_name']
self.release_name = release_name
asset_url = None
asset_urls = []
for asset in latest_release['assets']:
if asset['name'] in filenames_to_download:
asset_urls.append(asset['browser_download_url'])
if asset_urls:
for asset_url in asset_urls:
self.download_file(asset_url, release_name)
ref_url = f"https://api.github.com/repos/{repo}/git/ref/tags/{release_name}"
response = requests.get(ref_url)
response.raise_for_status()
ref_info = response.json()
if (ref_info["object"]["type"] == "commit"):
self.commit_hash = ref_info["object"]["sha"]
else:
tag_url = f"https://api.github.com/repos/{repo}/git/tags/{ref_info["object"]["sha"]}"
response = requests.get(tag_url)
response.raise_for_status()
tag_info = response.json()
self.commit_hash = tag_info["object"]["sha"]
self.fw_label.SetLabelText(f"Downloaded firmware version: {self.release_name}\nCommit: {self.commit_hash}")
else:
wx.CallAfter(self.SetStatusText, f"File {filenames_to_download} not found in latest release")
except requests.RequestException as e:
wx.CallAfter(self.SetStatusText, f"Error fetching release: {e}")
def download_file(self, url, release_name):
local_filename = f"{release_name}_{url.split('/')[-1]}"
response = requests.get(url, stream=True)
total_length = response.headers.get('content-length')
if not os.path.exists("firmware"):
os.makedirs("firmware")
if os.path.exists(f"firmware/{local_filename}"):
wx.CallAfter(self.SetStatusText, f"{local_filename} is already downloaded")
return
if total_length is None:
wx.CallAfter(self.SetStatusText, "No content length header")
else:
total_length = int(total_length)
chunk_size = 1024
num_chunks = total_length // chunk_size
with open(f"firmware/{local_filename}", 'wb') as f:
for i, chunk in enumerate(response.iter_content(chunk_size=chunk_size)):
if chunk:
f.write(chunk)
f.flush()
progress = int((i / num_chunks) * 100)
wx.CallAfter(self.update_progress, progress)
wx.CallAfter(self.update_progress, 100)
wx.CallAfter(self.SetStatusText, "Download completed")
def OnAbout(self,e):
dlg = wx.MessageDialog( self, "An updater for BTClocks", "About BTClock OTA Updater", wx.OK)
dlg.ShowModal()
dlg.Destroy()
def OnExit(self,e):
self.Close(False)

0
app/gui/__init__.py Normal file
View file

View file

@ -0,0 +1,126 @@
import threading
import webbrowser
from app.api import ApiHandler
from app.gui.devices_panel import DevicesPanel
from app.zeroconf_listener import ZeroconfListener
import wx
class ActionButtonPanel(wx.Panel):
currentlyUpdating = False
def __init__(self, parent:wx.Panel, parent_frame:wx.Frame, *args, **kwargs):
super(ActionButtonPanel, self).__init__(parent, *args, **kwargs)
self.parent = parent
self.parent_frame = parent_frame
self.api_handler:ApiHandler = parent_frame.api_handler
self.device_list:DevicesPanel = parent_frame.device_list
self.listener:ZeroconfListener = parent_frame.listener
self.device_list.Bind(wx.EVT_LIST_ITEM_SELECTED, self.on_item_selected)
self.device_list.Bind(wx.EVT_LIST_ITEM_DESELECTED,
self.on_item_deselected)
self.InitUI()
def InitUI(self):
sizer = wx.BoxSizer(wx.HORIZONTAL)
self.update_button = wx.Button(self, label="Update Firmware")
self.update_button.Bind(wx.EVT_BUTTON, self.on_click_update_firmware)
self.update_fs_button = wx.Button(self, label="Update Filesystem")
self.update_fs_button.Bind(wx.EVT_BUTTON, self.on_click_update_fs)
self.identify_button = wx.Button(self, label="Identify")
self.identify_button.Bind(wx.EVT_BUTTON, self.on_click_identify)
self.open_webif_button = wx.Button(self, label="Open WebUI")
self.open_webif_button.Bind(wx.EVT_BUTTON, self.on_click_webui)
self.update_button.Disable()
self.update_fs_button.Disable()
self.identify_button.Disable()
self.open_webif_button.Disable()
sizer.Add(self.update_button)
sizer.Add(self.update_fs_button)
sizer.Add(self.identify_button)
sizer.Add(self.open_webif_button)
self.SetSizer(sizer)
def on_click_update_firmware(self, event):
selected_index = self.device_list.GetFirstSelected()
if selected_index != -1:
service_name = self.device_list.GetItemText(selected_index, 0)
hw_rev = self.device_list.GetItemText(selected_index, 3)
info = self.listener.services.get(service_name)
if info:
address = info.parsed_addresses(
)[0] if info.parsed_addresses() else "N/A"
self.parent_frame.fw_updater.start_firmware_update(self.parent_frame.releaseChecker.release_name, address, hw_rev)
else:
wx.MessageBox(
"No service information available for selected device", "Error", wx.ICON_ERROR)
else:
wx.MessageBox("Please select a device to update",
"Error", wx.ICON_ERROR)
def on_click_webui(self, event):
selected_index = self.device_list.GetFirstSelected()
if selected_index != -1:
service_name = self.device_list.GetItemText(selected_index, 0)
info = self.listener.services.get(service_name)
if info:
address = info.parsed_addresses(
)[0] if info.parsed_addresses() else "N/A"
thread = threading.Thread(
target=lambda: webbrowser.open(f"http://{address}"))
thread.start()
def on_click_update_fs(self, event):
selected_index = self.device_list.GetFirstSelected()
if selected_index != -1:
service_name = self.device_list.GetItemText(selected_index, 0)
info = self.listener.services.get(service_name)
if self.currentlyUpdating:
wx.MessageBox("Please wait, already updating",
"Error", wx.ICON_ERROR)
return
if info:
address = info.parsed_addresses(
)[0] if info.parsed_addresses() else "N/A"
self.parent_frame.fw_updater.start_fs_update(self.parent_frame.releaseChecker.release_name, address)
else:
wx.MessageBox(
"No service information available for selected device", "Error", wx.ICON_ERROR)
else:
wx.MessageBox("Please select a device to update",
"Error", wx.ICON_ERROR)
def on_click_identify(self, event):
selected_index = self.device_list.GetFirstSelected()
if selected_index != -1:
service_name = self.device_list.GetItemText(selected_index, 0)
info = self.listener.services.get(service_name)
if info:
address = info.parsed_addresses(
)[0] if info.parsed_addresses() else "N/A"
port = info.port
self.api_handler.identify_btclock(address)
else:
wx.MessageBox(
"No service information available for selected device", "Error", wx.ICON_ERROR)
else:
wx.MessageBox(
"Please select a device to make an API call", "Error", wx.ICON_ERROR)
def on_item_selected(self, event):
self.update_button.Enable()
self.update_fs_button.Enable()
self.identify_button.Enable()
self.open_webif_button.Enable()
def on_item_deselected(self, event):
if self.device_list.GetFirstSelected() == -1:
self.update_button.Disable()
self.update_fs_button.Disable()
self.identify_button.Disable()
self.open_webif_button.Disable()

31
app/gui/devices_panel.py Normal file
View file

@ -0,0 +1,31 @@
import wx
import wx.lib.mixins.listctrl
class DevicesPanel(wx.ListCtrl, wx.lib.mixins.listctrl.ColumnSorterMixin, wx.lib.mixins.listctrl.ListCtrlAutoWidthMixin,
):
def __init__(self, parent):
wx.ListCtrl.__init__(self, parent, style=wx.LC_REPORT)
self.column_headings = [
"Hostname", "Version", "FW commit", "HW Revision", "IP", "WebUI commit"]
wx.lib.mixins.listctrl.ColumnSorterMixin.__init__(
self,
len(self.column_headings),
)
wx.lib.mixins.listctrl.ListCtrlAutoWidthMixin.__init__(self)
for column, heading in enumerate(self.column_headings):
self.AppendColumn(heading)
self.itemDataMap = {}
def OnSortOrderChanged(self):
"""Method to handle changes to the sort order"""
column, ascending = self.GetSortState()
self.ShowSortIndicator(column, ascending)
self.SortListItems(column, ascending)
def GetListCtrl(self):
"""Method required by the ColumnSorterMixin"""
return self

167
app/main.py Normal file
View file

@ -0,0 +1,167 @@
import concurrent.futures
import serial
from app.gui.action_button_panel import ActionButtonPanel
from app.release_checker import ReleaseChecker
import wx
from zeroconf import ServiceBrowser, Zeroconf
import os
import webbrowser
from app import espota
from app.api import ApiHandler
from app.fw_updater import FwUpdater
from app.gui.devices_panel import DevicesPanel
from app.zeroconf_listener import ZeroconfListener
from app.espota import FLASH, SPIFFS
class SerialPortsComboBox(wx.ComboBox):
def __init__(self, parent, fw_update):
self.fw_update = fw_update
self.ports = serial.tools.list_ports.comports()
wx.ComboBox.__init__(self, parent, choices=[
port.device for port in self.ports])
class BTClockOTAUpdater(wx.Frame):
updatingName = ""
def __init__(self, parent, title):
wx.Frame.__init__(self, parent, title=title, size=(800, 500))
self.SetMinSize((800, 500))
self.releaseChecker = ReleaseChecker()
self.zeroconf = Zeroconf()
self.listener = ZeroconfListener(self.on_zeroconf_state_change)
self.browser = ServiceBrowser(
self.zeroconf, "_http._tcp.local.", self.listener)
self.api_handler = ApiHandler()
self.fw_updater = FwUpdater(self.call_progress)
panel = wx.Panel(self)
self.device_list = DevicesPanel(panel)
vbox = wx.BoxSizer(wx.VERTICAL)
vbox.Add(self.device_list, proportion=2,
flag=wx.EXPAND | wx.ALL, border=20)
hbox = wx.BoxSizer(wx.HORIZONTAL)
self.fw_label = wx.StaticText(
panel, label=f"Fetching latest version from GitHub...")
hbox.Add(self.fw_label, 1, wx.EXPAND | wx.ALL, 5)
self.actionButtons = ActionButtonPanel(
panel, self)
hbox.AddStretchSpacer()
hbox.Add(self.actionButtons, 2, wx.EXPAND | wx.ALL, 5)
vbox.Add(hbox, 0, wx.EXPAND | wx.ALL, 20)
self.progress_bar = wx.Gauge(panel, range=100)
vbox.Add(self.progress_bar, 0, wx.EXPAND | wx.ALL, 20)
panel.SetSizer(vbox)
self.setup_ui()
wx.CallAfter(self.fetch_latest_release_async)
def setup_ui(self):
self.setup_menubar()
self.status_bar = self.CreateStatusBar(2)
self.Show(True)
self.Centre()
def setup_menubar(self):
filemenu = wx.Menu()
menuAbout = filemenu.Append(
wx.ID_ABOUT, "&About", " Information about this program")
menuExit = filemenu.Append(
wx.ID_EXIT, "E&xit", " Terminate the program")
menuBar = wx.MenuBar()
menuBar.Append(filemenu, "&File")
self.SetMenuBar(menuBar)
self.Bind(wx.EVT_MENU, self.OnAbout, menuAbout)
self.Bind(wx.EVT_MENU, self.OnExit, menuExit)
def on_zeroconf_state_change(self, type, name, state, info):
index = self.device_list.FindItem(0, name)
if state == "Added":
deviceSettings = self.api_handler.get_settings(
info.parsed_addresses()[0])
version = info.properties.get(b"rev").decode()
if 'gitTag' in deviceSettings:
version = deviceSettings["gitTag"]
fwHash = info.properties.get(b"rev").decode()[:7]
fsHash = deviceSettings['fsRev'][:7]
address = info.parsed_addresses()[0]
if index == wx.NOT_FOUND:
index = self.device_list.InsertItem(
self.device_list.GetItemCount(), type)
self.device_list.SetItem(index, 0, name)
self.device_list.SetItem(index, 1, version)
self.device_list.SetItem(index, 2, fwHash)
if (info.properties.get(b"hw_rev") is not None):
self.device_list.SetItem(
index, 3, info.properties.get(b"hw_rev").decode())
self.device_list.SetItem(index, 4, address)
else:
self.device_list.SetItem(index, 0, name)
self.device_list.SetItem(index, 1, version)
self.device_list.SetItem(index, 2, fwHash)
if (info.properties.get(b"hw_rev").decode()):
self.device_list.SetItem(
index, 3, info.properties.get(b"hw_rev").decode())
self.device_list.SetItem(index, 4, address)
self.device_list.SetItem(index, 5, fsHash)
self.device_list.SetItemData(index, index)
self.device_list.itemDataMap[index] = [
name, version, fwHash, info.properties.get(b"hw_rev").decode(), address, fsHash]
for col in range(0, len(self.device_list.column_headings)):
self.device_list.SetColumnWidth(
col, wx.LIST_AUTOSIZE_USEHEADER)
elif state == "Removed":
if index != wx.NOT_FOUND:
self.device_list.DeleteItem(index)
def call_progress(self, progress):
progressPerc = int(progress*100)
self.SetStatusText(f"Progress: {progressPerc}%")
wx.CallAfter(self.update_progress, progress)
def update_progress(self, progress):
progressPerc = int(progress*100)
self.progress_bar.SetValue(progressPerc)
wx.YieldIfNeeded()
def fetch_latest_release_async(self):
# Start a new thread to execute fetch_latest_release
executor = concurrent.futures.ThreadPoolExecutor()
future = executor.submit(self.releaseChecker.fetch_latest_release)
future.add_done_callback(self.handle_latest_release)
def handle_latest_release(self, future):
try:
latest_release = future.result()
self.fw_label.SetLabelText(f"Downloaded firmware version: {
latest_release}\nCommit: {self.releaseChecker.commit_hash}")
except Exception as e:
self.fw_label.SetLabel(f"Error occurred: {str(e)}")
def OnAbout(self, e):
dlg = wx.MessageDialog(
self, "An updater for BTClocks", "About BTClock OTA Updater", wx.OK)
dlg.ShowModal()
dlg.Destroy()
def OnExit(self, e):
self.Close(False)

97
app/release_checker.py Normal file
View file

@ -0,0 +1,97 @@
import os
import requests
import wx
from typing import Callable
from app.utils import keep_latest_versions
class ReleaseChecker:
'''Release Checker for firmware updates'''
release_name = ""
commit_hash = ""
def __init__(self):
self.progress_callback: Callable[[int], None] = None
def fetch_latest_release(self):
'''Fetch latest firmware release from GitHub'''
repo = "btclock/btclock_v3"
if not os.path.exists("firmware"):
os.makedirs("firmware")
filenames_to_download = ["lolin_s3_mini_213epd_firmware.bin",
"btclock_rev_b_213epd_firmware.bin", "littlefs.bin"]
url = f"https://api.github.com/repos/{repo}/releases/latest"
try:
response = requests.get(url)
response.raise_for_status()
latest_release = response.json()
release_name = latest_release['tag_name']
self.release_name = release_name
asset_url = None
asset_urls = []
for asset in latest_release['assets']:
if asset['name'] in filenames_to_download:
asset_urls.append(asset['browser_download_url'])
if asset_urls:
for asset_url in asset_urls:
self.download_file(asset_url, release_name)
ref_url = f"https://api.github.com/repos/{
repo}/git/ref/tags/{release_name}"
response = requests.get(ref_url)
response.raise_for_status()
ref_info = response.json()
if (ref_info["object"]["type"] == "commit"):
self.commit_hash = ref_info["object"]["sha"]
else:
tag_url = f"https://api.github.com/repos/{
repo}/git/tags/{ref_info["object"]["sha"]}"
response = requests.get(tag_url)
response.raise_for_status()
tag_info = response.json()
self.commit_hash = tag_info["object"]["sha"]
return self.release_name
else:
raise ReleaseCheckerException(
f"File {filenames_to_download} not found in latest release")
except requests.RequestException as e:
raise ReleaseCheckerException(
f"Error fetching release: {e}") from e
def download_file(self, url, release_name):
'''Downloads Fimware Files'''
local_filename = f"{release_name}_{url.split('/')[-1]}"
response = requests.get(url, stream=True)
total_length = response.headers.get('content-length')
if not os.path.exists("firmware"):
os.makedirs("firmware")
if os.path.exists(f"firmware/{local_filename}"):
return
keep_latest_versions('firmware', 2)
if total_length is None:
raise ReleaseCheckerException("No content length header")
else:
total_length = int(total_length)
chunk_size = 1024
num_chunks = total_length // chunk_size
with open(f"firmware/{local_filename}", 'wb') as f:
for i, chunk in enumerate(response.iter_content(chunk_size=chunk_size)):
if chunk:
f.write(chunk)
f.flush()
progress = int((i / num_chunks) * 100)
if callable(self.progress_callback):
self.progress_callback(progress)
if callable(self.progress_callback):
self.progress_callback(100)
class ReleaseCheckerException(Exception):
pass

32
app/utils.py Normal file
View file

@ -0,0 +1,32 @@
import os
import re
import shutil
def count_versions(folder_path):
versions = set()
version_pattern = re.compile(r'^(\d+\.\d+\.\d+)')
for file_name in os.listdir(folder_path):
match = version_pattern.match(file_name)
if match:
versions.add(match.group(1))
return len(versions)
def keep_latest_versions(folder_path, num_versions_to_keep=2):
version_files = {}
version_pattern = re.compile(r'^(\d+\.\d+\.\d+)')
for file_name in os.listdir(folder_path):
match = version_pattern.match(file_name)
if match:
version = match.group(1)
if version not in version_files:
version_files[version] = []
version_files[version].append(file_name)
versions_sorted = sorted(version_files.keys(), reverse=True)
versions_to_remove = versions_sorted[num_versions_to_keep:]
for version in versions_to_remove:
for file_name in version_files[version]:
os.remove(os.path.join(folder_path, file_name))

View file

@ -1,7 +1,9 @@
import wx
from zeroconf import Zeroconf, ServiceBrowser, ServiceStateChange
from zeroconf import Zeroconf
class ZeroconfListener:
'''Zeroconf Handler to find BTClocks in the network'''
release_name = ""
firmware_file = ""
@ -26,4 +28,4 @@ class ZeroconfListener:
if (name.startswith('btclock-')):
info = zeroconf.get_service_info(type, name)
self.services[name] = info
wx.CallAfter(self.update_callback, type, name, "Added", info)
wx.CallAfter(self.update_callback, type, name, "Added", info)

View file

@ -1,5 +1,5 @@
Requests==2.32.3
wxPython==4.2.1
wxPython==4.2.2a1.dev5670+a207b407
zeroconf==0.132.2
esptool==4.7.0
pyserial==3.5