Initial commit

This commit is contained in:
Djuri 2024-05-19 02:42:36 +02:00
commit 10e2c89b63
12 changed files with 1241 additions and 0 deletions

0
app/__init__.py Normal file
View file

24
app/api.py Normal file
View file

@ -0,0 +1,24 @@
import time
import requests
from threading import Thread
class ApiHandler:
def identify_btclock(self, address):
self.make_api_call(address, "api/identify")
return
def check_fs_hash(self, address):
ret = self.run_api_call(address, "fs_hash.txt")
return ret
def make_api_call(self, address, path):
thread = Thread(target=self.run_api_call, args=(address,path))
thread.start()
def run_api_call(self, address, path):
try:
url = f"http://{address}/{path}"
response = requests.get(url)
return response.text
except requests.RequestException as e:
print("error")

325
app/espota.py Normal file
View file

@ -0,0 +1,325 @@
#!/usr/bin/env python
#
# Original espota.py by Ivan Grokhotkov:
# https://gist.github.com/igrr/d35ab8446922179dc58c
#
# Modified since 2015-09-18 from Pascal Gollor (https://github.com/pgollor)
# Modified since 2015-11-09 from Hristo Gochkov (https://github.com/me-no-dev)
# Modified since 2016-01-03 from Matthew O'Gorman (https://githumb.com/mogorman)
#
# This script will push an OTA update to the ESP
# use it like:
# python espota.py -i <ESP_IP_addr> -I <Host_IP_addr> -p <ESP_port> -P <Host_port> [-a password] -f <sketch.bin>
# Or to upload SPIFFS image:
# python espota.py -i <ESP_IP_addr> -I <Host_IP_addr> -p <ESP_port> -P <HOST_port> [-a password] -s -f <spiffs.bin>
#
# Changes
# 2015-09-18:
# - Add option parser.
# - Add logging.
# - Send command to controller to differ between flashing and transmitting SPIFFS image.
#
# Changes
# 2015-11-09:
# - Added digest authentication
# - Enhanced error tracking and reporting
#
# Changes
# 2016-01-03:
# - Added more options to parser.
#
# Changes
# 2023-05-22:
# - Replaced the deprecated optparse module with argparse.
# - Adjusted the code style to conform to PEP 8 guidelines.
# - Used with statement for file handling to ensure proper resource cleanup.
# - Incorporated exception handling to catch and handle potential errors.
# - Made variable names more descriptive for better readability.
# - Introduced constants for better code maintainability.
from __future__ import print_function
import socket
import sys
import os
import argparse
import logging
import hashlib
import random
# Commands
FLASH = 0
SPIFFS = 100
AUTH = 200
# Constants
PROGRESS_BAR_LENGTH = 60
# update_progress(): Displays or updates a console progress bar
def update_progress(progress):
if PROGRESS:
status = ""
if isinstance(progress, int):
progress = float(progress)
if not isinstance(progress, float):
progress = 0
status = "Error: progress var must be float\r\n"
if progress < 0:
progress = 0
status = "Halt...\r\n"
if progress >= 1:
progress = 1
status = "Done...\r\n"
block = int(round(PROGRESS_BAR_LENGTH * progress))
text = "\rUploading: [{0}] {1}% {2}".format(
"=" * block + " " * (PROGRESS_BAR_LENGTH - block), int(progress * 100), status
)
sys.stderr.write(text)
sys.stderr.flush()
else:
sys.stderr.write(".")
sys.stderr.flush()
def serve(remote_addr, local_addr, remote_port, local_port, password, filename, command=FLASH, progress_handler=False): # noqa: C901
# Create a TCP/IP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_address = (local_addr, local_port)
logging.info("Starting on %s:%s", str(server_address[0]), str(server_address[1]))
try:
sock.bind(server_address)
sock.listen(1)
except Exception as e:
logging.error("Listen Failed: %s", str(e))
return 1
content_size = os.path.getsize(filename)
file_md5 = hashlib.md5(open(filename, "rb").read()).hexdigest()
logging.info("Upload size: %d", content_size)
message = "%d %d %d %s\n" % (command, local_port, content_size, file_md5)
# Wait for a connection
inv_tries = 0
data = ""
msg = "Sending invitation to %s " % remote_addr
sys.stderr.write(msg)
sys.stderr.flush()
while inv_tries < 10:
inv_tries += 1
sock2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
remote_address = (remote_addr, int(remote_port))
try:
sent = sock2.sendto(message.encode(), remote_address) # noqa: F841
except: # noqa: E722
sys.stderr.write("failed\n")
sys.stderr.flush()
sock2.close()
logging.error("Host %s Not Found", remote_addr)
return 1
sock2.settimeout(TIMEOUT)
try:
data = sock2.recv(37).decode()
break
except: # noqa: E722
sys.stderr.write(".")
sys.stderr.flush()
sock2.close()
sys.stderr.write("\n")
sys.stderr.flush()
if inv_tries == 10:
logging.error("No response from the ESP")
return 1
if data != "OK":
if data.startswith("AUTH"):
nonce = data.split()[1]
cnonce_text = "%s%u%s%s" % (filename, content_size, file_md5, remote_addr)
cnonce = hashlib.md5(cnonce_text.encode()).hexdigest()
passmd5 = hashlib.md5(password.encode()).hexdigest()
result_text = "%s:%s:%s" % (passmd5, nonce, cnonce)
result = hashlib.md5(result_text.encode()).hexdigest()
sys.stderr.write("Authenticating...")
sys.stderr.flush()
message = "%d %s %s\n" % (AUTH, cnonce, result)
sock2.sendto(message.encode(), remote_address)
sock2.settimeout(10)
try:
data = sock2.recv(32).decode()
except: # noqa: E722
sys.stderr.write("FAIL\n")
logging.error("No Answer to our Authentication")
sock2.close()
return 1
if data != "OK":
sys.stderr.write("FAIL\n")
logging.error("%s", data)
sock2.close()
sys.exit(1)
return 1
sys.stderr.write("OK\n")
else:
logging.error("Bad Answer: %s", data)
sock2.close()
return 1
sock2.close()
logging.info("Waiting for device...")
try:
sock.settimeout(10)
connection, client_address = sock.accept()
sock.settimeout(None)
connection.settimeout(None)
except: # noqa: E722
logging.error("No response from device")
sock.close()
return 1
try:
with open(filename, "rb") as f:
if PROGRESS:
progress_handler(0)
else:
sys.stderr.write("Uploading")
sys.stderr.flush()
offset = 0
while True:
chunk = f.read(1024)
if not chunk:
break
offset += len(chunk)
progress_handler(offset / float(content_size))
connection.settimeout(10)
try:
connection.sendall(chunk)
res = connection.recv(10)
last_response_contained_ok = "OK" in res.decode()
except Exception as e:
sys.stderr.write("\n")
logging.error("Error Uploading: %s", str(e))
connection.close()
return 1
if last_response_contained_ok:
logging.info("Success")
connection.close()
return 0
sys.stderr.write("\n")
logging.info("Waiting for result...")
count = 0
while count < 5:
count += 1
connection.settimeout(60)
try:
data = connection.recv(32).decode()
logging.info("Result: %s", data)
if "OK" in data:
logging.info("Success")
connection.close()
return 0
except Exception as e:
logging.error("Error receiving result: %s", str(e))
connection.close()
return 1
logging.error("Error response from device")
connection.close()
return 1
finally:
connection.close()
sock.close()
return 1
def parse_args(unparsed_args):
parser = argparse.ArgumentParser(description="Transmit image over the air to the ESP32 module with OTA support.")
# destination ip and port
parser.add_argument("-i", "--ip", dest="esp_ip", action="store", help="ESP32 IP Address.", default=False)
parser.add_argument("-I", "--host_ip", dest="host_ip", action="store", help="Host IP Address.", default="0.0.0.0")
parser.add_argument("-p", "--port", dest="esp_port", type=int, help="ESP32 OTA Port. Default: 3232", default=3232)
parser.add_argument(
"-P",
"--host_port",
dest="host_port",
type=int,
help="Host server OTA Port. Default: random 10000-60000",
default=random.randint(10000, 60000),
)
# authentication
parser.add_argument("-a", "--auth", dest="auth", help="Set authentication password.", action="store", default="")
# image
parser.add_argument("-f", "--file", dest="image", help="Image file.", metavar="FILE", default=None)
parser.add_argument(
"-s",
"--spiffs",
dest="spiffs",
action="store_true",
help="Transmit a SPIFFS image and do not flash the module.",
default=False,
)
# output
parser.add_argument(
"-d",
"--debug",
dest="debug",
action="store_true",
help="Show debug output. Overrides loglevel with debug.",
default=False,
)
parser.add_argument(
"-r",
"--progress",
dest="progress",
action="store_true",
help="Show progress output. Does not work for Arduino IDE.",
default=False,
)
parser.add_argument(
"-t",
"--timeout",
dest="timeout",
type=int,
help="Timeout to wait for the ESP32 to accept invitation.",
default=10,
)
return parser.parse_args(unparsed_args)
def main(args):
options = parse_args(args)
log_level = logging.WARNING
if options.debug:
log_level = logging.DEBUG
logging.basicConfig(level=log_level, format="%(asctime)-8s [%(levelname)s]: %(message)s", datefmt="%H:%M:%S")
logging.debug("Options: %s", str(options))
# check options
global PROGRESS
PROGRESS = options.progress
global TIMEOUT
TIMEOUT = options.timeout
if not options.esp_ip or not options.image:
logging.critical("Not enough arguments.")
return 1
command = FLASH
if options.spiffs:
command = SPIFFS
return serve(
options.esp_ip, options.host_ip, options.esp_port, options.host_port, options.auth, options.image, command
)
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))

0
app/fw_update.py Normal file
View file

313
app/gui.py Normal file
View file

@ -0,0 +1,313 @@
import random
from threading import Thread
import threading
import wx
from zeroconf import ServiceBrowser, Zeroconf
import requests
import os
import webbrowser
from app import espota
from app.api import ApiHandler
from app.zeroconf_listener import ZeroconfListener
from espota import FLASH,SPIFFS
class DevicesPanel(wx.ListCtrl):
def __init__(self, parent):
wx.ListCtrl.__init__(self, parent, style=wx.LC_REPORT)
self.InsertColumn(0, 'Name', width=150)
self.InsertColumn(1, 'Version', width=50)
self.InsertColumn(2, 'SW Revision', width=310)
self.InsertColumn(3, 'HW Revision', width=130)
self.InsertColumn(4, 'IP', width=110)
self.InsertColumn(5, 'FS version', width=110)
class BTClockOTAUpdater(wx.Frame):
release_name = ""
commit_hash = ""
def __init__(self, parent, title):
wx.Frame.__init__(self, parent, title=title, size=(800,500))
self.SetMinSize((800,500))
ubuntu_it = wx.Font(32, wx.FONTFAMILY_MODERN, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_BOLD, False, faceName="Ubuntu")
# "Ubuntu-RI.ttf")
panel = wx.Panel(self)
self.title = wx.StaticText(panel, label="BTClock OTA firmware updater")
self.title.SetFont(ubuntu_it)
vbox = wx.BoxSizer(wx.VERTICAL)
vbox.Add(self.title, 0, wx.EXPAND | wx.ALL, 20, 0)
self.device_list = DevicesPanel(panel)
self.device_list.Bind(wx.EVT_LIST_ITEM_SELECTED, self.on_item_selected)
self.device_list.Bind(wx.EVT_LIST_ITEM_DESELECTED, self.on_item_deselected)
vbox.Add(self.device_list, proportion = 2, flag=wx.EXPAND | wx.ALL, border = 20)
hbox = wx.BoxSizer(wx.HORIZONTAL)
bbox = wx.BoxSizer(wx.HORIZONTAL)
gs = wx.GridSizer(1, 4, 1, 1)
self.fw_label = wx.StaticText(panel, label=f"Checking latest version...")
self.update_button = wx.Button(panel, label="Update Firmware")
self.update_button.Bind(wx.EVT_BUTTON, self.on_click_update_firmware)
self.update_fs_button = wx.Button(panel, label="Update Filesystem")
self.update_fs_button.Bind(wx.EVT_BUTTON, self.on_click_update_fs)
self.identify_button = wx.Button(panel, label="Identify")
self.identify_button.Bind(wx.EVT_BUTTON, self.on_click_identify)
self.open_webif_button = wx.Button(panel, label="Open WebUI")
self.open_webif_button.Bind(wx.EVT_BUTTON, self.on_click_webui)
self.update_button.Disable()
self.update_fs_button.Disable()
self.identify_button.Disable()
self.open_webif_button.Disable()
hbox.Add(self.fw_label, 1, wx.EXPAND | wx.ALL, 5)
bbox.Add(self.update_button)
bbox.Add(self.update_fs_button)
bbox.Add(self.identify_button)
bbox.Add(self.open_webif_button)
hbox.AddStretchSpacer()
hbox.Add(bbox, 2, wx.EXPAND | wx.ALL, 5)
vbox.Add(hbox, 0, wx.EXPAND | wx.ALL, 20)
self.progress_bar = wx.Gauge(panel, range=100)
vbox.Add(self.progress_bar, 0, wx.EXPAND | wx.ALL, 20)
panel.SetSizer(vbox)
filemenu= wx.Menu()
menuAbout = filemenu.Append(wx.ID_ABOUT, "&About"," Information about this program")
menuExit = filemenu.Append(wx.ID_EXIT,"E&xit"," Terminate the program")
menuBar = wx.MenuBar()
menuBar.Append(filemenu,"&File") # Adding the "filemenu" to the MenuBar
self.SetMenuBar(menuBar) # Adding the MenuBar to the Frame content.
self.Bind(wx.EVT_MENU, self.OnAbout, menuAbout)
self.Bind(wx.EVT_MENU, self.OnExit, menuExit)
self.status_bar = self.CreateStatusBar(2)
# self.StatusBar.SetFieldsCount(2)
# self.StatusBar.SetStatusWidths(-3, -1)
self.Show(True)
self.zeroconf = Zeroconf()
self.listener = ZeroconfListener(self.on_zeroconf_state_change)
self.browser = ServiceBrowser(self.zeroconf, "_http._tcp.local.", self.listener)
self.api_handler = ApiHandler()
wx.CallAfter(self.fetch_latest_release)
def on_item_selected(self, event):
self.update_button.Enable()
self.update_fs_button.Enable()
self.identify_button.Enable()
self.open_webif_button.Enable()
def on_item_deselected(self, event):
if self.device_list.GetFirstSelected() == -1:
self.update_button.Disable()
self.update_fs_button.Disable()
self.identify_button.Disable()
self.open_webif_button.Disable()
def on_zeroconf_state_change(self, type, name, state, info):
index = self.device_list.FindItem(0, name)
if state == "Added":
if index == wx.NOT_FOUND:
index = self.device_list.InsertItem(self.device_list.GetItemCount(), type)
self.device_list.SetItem(index, 0, name)
self.device_list.SetItem(index, 1, info.properties.get(b"version").decode())
self.device_list.SetItem(index, 2, info.properties.get(b"rev").decode())
if (info.properties.get(b"hw_rev") is not None):
self.device_list.SetItem(index, 3, info.properties.get(b"hw_rev").decode())
self.device_list.SetItem(index, 4, info.parsed_addresses()[0])
else:
self.device_list.SetItem(index, 0, name)
self.device_list.SetItem(index, 1, info.properties.get(b"version").decode())
self.device_list.SetItem(index, 2, info.properties.get(b"rev").decode())
if (info.properties.get(b"hw_rev").decode()):
self.device_list.SetItem(index, 3, info.properties.get(b"hw_rev").decode())
self.device_list.SetItem(index, 4, info.parsed_addresses()[0])
self.device_list.SetItem(index, 5, self.api_handler.check_fs_hash(info.parsed_addresses()[0]))
elif state == "Removed":
if index != wx.NOT_FOUND:
self.device_list.DeleteItem(index)
def on_click_update_firmware(self, event):
selected_index = self.device_list.GetFirstSelected()
if selected_index != -1:
service_name = self.device_list.GetItemText(selected_index, 0)
hw_rev = self.device_list.GetItemText(selected_index, 3)
info = self.listener.services.get(service_name)
if info:
address = info.parsed_addresses()[0] if info.parsed_addresses() else "N/A"
self.start_firmware_update(address, hw_rev)
else:
wx.MessageBox("No service information available for selected device", "Error", wx.ICON_ERROR)
else:
wx.MessageBox("Please select a device to update", "Error", wx.ICON_ERROR)
def on_click_webui(self, event):
selected_index = self.device_list.GetFirstSelected()
if selected_index != -1:
service_name = self.device_list.GetItemText(selected_index, 0)
info = self.listener.services.get(service_name)
if info:
address = info.parsed_addresses()[0] if info.parsed_addresses() else "N/A"
thread = threading.Thread(target=lambda: webbrowser.open(f"http://{address}"))
thread.start()
def run_fs_update(self, address, firmware_file, type):
global PROGRESS
PROGRESS = True
espota.PROGRESS = True
global TIMEOUT
TIMEOUT = 10
espota.TIMEOUT = 10
espota.serve(address, "0.0.0.0", 3232, random.randint(10000,60000), "", firmware_file, type, self.call_progress)
wx.CallAfter(self.update_progress, 100)
self.SetStatusText(f"Finished!")
def call_progress(self, progress):
progressPerc = int(progress*100)
self.SetStatusText(f"Progress: {progressPerc}%")
wx.CallAfter(self.update_progress, progress)
def update_progress(self, progress):
self.progress_bar.SetValue(int(progress*100))
wx.YieldIfNeeded()
def on_click_update_fs(self, event):
selected_index = self.device_list.GetFirstSelected()
if selected_index != -1:
service_name = self.device_list.GetItemText(selected_index, 0)
info = self.listener.services.get(service_name)
if info:
address = info.parsed_addresses()[0] if info.parsed_addresses() else "N/A"
self.start_fs_update(address)
else:
wx.MessageBox("No service information available for selected device", "Error", wx.ICON_ERROR)
else:
wx.MessageBox("Please select a device to update", "Error", wx.ICON_ERROR)
def start_firmware_update(self, address, hw_rev):
self.SetStatusText(f"Starting firmware update")
model_name = "lolin_s3_mini_213epd"
if (hw_rev == "REV_B_EPD_2_13"):
model_name = "btclock_rev_b_213epd"
local_filename = f"firmware/{self.release_name}_{model_name}_firmware.bin"
if os.path.exists(os.path.abspath(local_filename)):
thread = Thread(target=self.run_fs_update, args=(address, os.path.abspath(local_filename), FLASH))
thread.start()
def start_fs_update(self, address):
# Path to the firmware file
self.SetStatusText(f"Starting filesystem update")
local_filename = f"firmware/{self.release_name}_littlefs.bin"
if os.path.exists(os.path.abspath(local_filename)):
thread = Thread(target=self.run_fs_update, args=(address, os.path.abspath(local_filename), SPIFFS))
thread.start()
wx.CallAfter(self.update_progress, 100)
def on_click_identify(self, event):
selected_index = self.device_list.GetFirstSelected()
if selected_index != -1:
service_name = self.device_list.GetItemText(selected_index, 0)
info = self.listener.services.get(service_name)
if info:
address = info.parsed_addresses()[0] if info.parsed_addresses() else "N/A"
port = info.port
self.api_handler.identify_btclock(address)
else:
wx.MessageBox("No service information available for selected device", "Error", wx.ICON_ERROR)
else:
wx.MessageBox("Please select a device to make an API call", "Error", wx.ICON_ERROR)
def fetch_latest_release(self):
repo = "btclock/btclock_v3"
filenames_to_download = ["lolin_s3_mini_213epd_firmware.bin", "btclock_rev_b_213epd_firmware.bin", "littlefs.bin"]
url = f"https://api.github.com/repos/{repo}/releases/latest"
try:
response = requests.get(url)
response.raise_for_status()
latest_release = response.json()
release_name = latest_release['tag_name']
self.release_name = release_name
asset_url = None
asset_urls = []
for asset in latest_release['assets']:
if asset['name'] in filenames_to_download:
asset_urls.append(asset['browser_download_url'])
if asset_urls:
for asset_url in asset_urls:
self.download_file(asset_url, release_name)
ref_url = f"https://api.github.com/repos/{repo}/git/ref/tags/{release_name}"
response = requests.get(ref_url)
response.raise_for_status()
ref_info = response.json()
self.commit_hash = ref_info["object"]["sha"]
self.fw_label.SetLabelText(f"Downloaded firmware version: {self.release_name}\nCommit: {self.commit_hash}")
else:
wx.CallAfter(self.SetStatusText, f"File {filenames_to_download} not found in latest release")
except requests.RequestException as e:
wx.CallAfter(self.SetStatusText, f"Error fetching release: {e}")
def download_file(self, url, release_name):
local_filename = f"{release_name}_{url.split('/')[-1]}"
response = requests.get(url, stream=True)
total_length = response.headers.get('content-length')
if os.path.exists(f"firmware/{local_filename}"):
wx.CallAfter(self.SetStatusText, f"{local_filename} is already downloaded")
return
if total_length is None:
wx.CallAfter(self.SetStatusText, "No content length header")
else:
total_length = int(total_length)
chunk_size = 1024
num_chunks = total_length // chunk_size
with open(f"firmware/{local_filename}", 'wb') as f:
for i, chunk in enumerate(response.iter_content(chunk_size=chunk_size)):
if chunk:
f.write(chunk)
f.flush()
progress = int((i / num_chunks) * 100)
wx.CallAfter(self.update_progress, progress)
wx.CallAfter(self.update_progress, 100)
wx.CallAfter(self.SetStatusText, "Download completed")
def OnAbout(self,e):
dlg = wx.MessageDialog( self, "An updater for BTClocks", "About BTClock OTA Updater", wx.OK)
dlg.ShowModal()
dlg.Destroy()
def OnExit(self,e):
self.Close(True)

29
app/zeroconf_listener.py Normal file
View file

@ -0,0 +1,29 @@
import wx
from zeroconf import Zeroconf, ServiceBrowser, ServiceStateChange
class ZeroconfListener:
release_name = ""
firmware_file = ""
def __init__(self, update_callback):
self.update_callback = update_callback
# self.update_service = update_callback
self.services = {}
def update_service(self, zc: Zeroconf, type: str, name: str) -> None:
if (name.startswith('btclock-')):
info = zc.get_service_info(type, name)
self.services[name] = info
wx.CallAfter(self.update_callback, type, name, "Added", info)
def remove_service(self, zeroconf, type, name):
if name in self.services:
del self.services[name]
wx.CallAfter(self.update_callback, type, name, "Removed")
def add_service(self, zeroconf, type, name):
if (name.startswith('btclock-')):
info = zeroconf.get_service_info(type, name)
self.services[name] = info
wx.CallAfter(self.update_callback, type, name, "Added", info)