mirror of
https://github.com/daveallie/crosspoint-reader.git
synced 2026-02-04 14:47:37 +03:00
feat: Debugging monitor script (#555)
## Summary * **What is the goal of this PR?** Add a debugging script to help developers monitor the ESP32 serial port directly from a PC. * **What changes are included?** Added a new script: scripts/debugging_monitor.py ## Additional Context While working on a new Crosspoint-Reader feature, it quickly became clear that watching the ESP32 serial output without any visual cues was inconvenient and easy to mess up. This script improves the debugging experience by reading data from the serial port and providing: 1. A timestamp prefix for every log line (instead of milliseconds since power-up) 2. Color-coded output for different message types 3. A secondary window displaying a live graph of RAM usage, which is especially useful for tracking the memory impact of new features <img width="1916" height="1049" alt="Screenshot_20260126_183811" src="https://github.com/user-attachments/assets/6291887f-ac17-43ac-9e43-f5dec8a7097e" /> --- ### AI Usage Did you use AI tools to help write this code? _**< PARTIALLY >**_ I wrote the initial version of the script. Gemini was used to help add the Matplotlib-based graphing and threading logic.
This commit is contained in:
parent
4bb8a869e7
commit
4436e44666
14
README.md
14
README.md
@ -95,6 +95,20 @@ Connect your Xteink X4 to your computer via USB-C and run the following command.
|
||||
```sh
|
||||
pio run --target upload
|
||||
```
|
||||
### Debugging
|
||||
|
||||
After flashing the new features, it’s recommended to capture detailed logs from the serial port.
|
||||
|
||||
First, make sure all required Python packages are installed:
|
||||
|
||||
```python
|
||||
python3 -m pip install serial colorama matplotlib
|
||||
```
|
||||
after that run the script:
|
||||
```sh
|
||||
python3 scripts/debugging_monitor.py
|
||||
```
|
||||
This was tested on Debian and should work on most Linux systems. Minor adjustments may be required for Windows or macOS.
|
||||
|
||||
## Internals
|
||||
|
||||
|
||||
214
scripts/debugging_monitor.py
Executable file
214
scripts/debugging_monitor.py
Executable file
@ -0,0 +1,214 @@
|
||||
import sys
|
||||
import argparse
|
||||
import re
|
||||
import threading
|
||||
from datetime import datetime
|
||||
from collections import deque
|
||||
import time
|
||||
|
||||
# Try to import potentially missing packages
|
||||
try:
|
||||
import serial
|
||||
from colorama import init, Fore, Style
|
||||
import matplotlib.pyplot as plt
|
||||
import matplotlib.animation as animation
|
||||
except ImportError as e:
|
||||
missing_package = e.name
|
||||
print("\n" + "!" * 50)
|
||||
print(f" Error: The required package '{missing_package}' is not installed.")
|
||||
print("!" * 50)
|
||||
|
||||
print(f"\nTo fix this, please run the following command in your terminal:\n")
|
||||
|
||||
install_cmd = "pip install "
|
||||
packages = []
|
||||
if 'serial' in str(e): packages.append("pyserial")
|
||||
if 'colorama' in str(e): packages.append("colorama")
|
||||
if 'matplotlib' in str(e): packages.append("matplotlib")
|
||||
|
||||
print(f" {install_cmd}{' '.join(packages)}")
|
||||
|
||||
print("\nExiting...")
|
||||
sys.exit(1)
|
||||
|
||||
# --- Global Variables for Data Sharing ---
|
||||
# Store last 50 data points
|
||||
MAX_POINTS = 50
|
||||
time_data = deque(maxlen=MAX_POINTS)
|
||||
free_mem_data = deque(maxlen=MAX_POINTS)
|
||||
total_mem_data = deque(maxlen=MAX_POINTS)
|
||||
data_lock = threading.Lock() # Prevent reading while writing
|
||||
|
||||
# Initialize colors
|
||||
init(autoreset=True)
|
||||
|
||||
def get_color_for_line(line):
|
||||
"""
|
||||
Classify log lines by type and assign appropriate colors.
|
||||
"""
|
||||
line_upper = line.upper()
|
||||
|
||||
if any(keyword in line_upper for keyword in ["ERROR", "[ERR]", "[SCT]", "FAILED", "WARNING"]):
|
||||
return Fore.RED
|
||||
if "[MEM]" in line_upper or "FREE:" in line_upper:
|
||||
return Fore.CYAN
|
||||
if any(keyword in line_upper for keyword in ["[GFX]", "[ERS]", "DISPLAY", "RAM WRITE", "RAM COMPLETE", "REFRESH", "POWERING ON", "FRAME BUFFER", "LUT"]):
|
||||
return Fore.MAGENTA
|
||||
if any(keyword in line_upper for keyword in ["[EBP]", "[BMC]", "[ZIP]", "[PARSER]", "[EHP]", "LOADING EPUB", "CACHE", "DECOMPRESSED", "PARSING"]):
|
||||
return Fore.GREEN
|
||||
if "[ACT]" in line_upper or "ENTERING ACTIVITY" in line_upper or "EXITING ACTIVITY" in line_upper:
|
||||
return Fore.YELLOW
|
||||
if any(keyword in line_upper for keyword in ["RENDERED PAGE", "[LOOP]", "DURATION", "WAIT COMPLETE"]):
|
||||
return Fore.BLUE
|
||||
if any(keyword in line_upper for keyword in ["[CPS]", "SETTINGS", "[CLEAR_CACHE]"]):
|
||||
return Fore.LIGHTYELLOW_EX
|
||||
if any(keyword in line_upper for keyword in ["ESP-ROM", "BUILD:", "RST:", "BOOT:", "SPIWP:", "MODE:", "LOAD:", "ENTRY", "[SD]", "STARTING CROSSPOINT", "VERSION"]):
|
||||
return Fore.LIGHTBLACK_EX
|
||||
if "[RBS]" in line_upper:
|
||||
return Fore.LIGHTCYAN_EX
|
||||
if "[KRS]" in line_upper:
|
||||
return Fore.LIGHTMAGENTA_EX
|
||||
if any(keyword in line_upper for keyword in ["EINKDISPLAY:", "STATIC FRAME", "INITIALIZING", "SPI INITIALIZED", "GPIO PINS", "RESETTING", "SSD1677", "E-INK"]):
|
||||
return Fore.LIGHTMAGENTA_EX
|
||||
if any(keyword in line_upper for keyword in ["[FNS]", "FOOTNOTE"]):
|
||||
return Fore.LIGHTGREEN_EX
|
||||
if any(keyword in line_upper for keyword in ["[CHAP]", "[OPDS]", "[COF]"]):
|
||||
return Fore.LIGHTYELLOW_EX
|
||||
|
||||
return Fore.WHITE
|
||||
|
||||
def parse_memory_line(line):
|
||||
"""
|
||||
Extracts Free and Total bytes from the specific log line.
|
||||
Format: [MEM] Free: 196344 bytes, Total: 226412 bytes, Min Free: 112620 bytes
|
||||
"""
|
||||
# Regex to find 'Free: <digits>' and 'Total: <digits>'
|
||||
match = re.search(r"Free:\s*(\d+).*Total:\s*(\d+)", line)
|
||||
if match:
|
||||
try:
|
||||
free_bytes = int(match.group(1))
|
||||
total_bytes = int(match.group(2))
|
||||
return free_bytes, total_bytes
|
||||
except ValueError:
|
||||
return None, None
|
||||
return None, None
|
||||
|
||||
def serial_worker(port, baud):
|
||||
"""
|
||||
Runs in a background thread. Handles reading serial, printing to console,
|
||||
and updating the data lists.
|
||||
"""
|
||||
print(f"{Fore.CYAN}--- Opening {port} at {baud} baud ---{Style.RESET_ALL}")
|
||||
|
||||
try:
|
||||
ser = serial.Serial(port, baud, timeout=0.1)
|
||||
ser.dtr = False
|
||||
ser.rts = False
|
||||
except serial.SerialException as e:
|
||||
print(f"{Fore.RED}Error opening port: {e}{Style.RESET_ALL}")
|
||||
return
|
||||
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
raw_data = ser.readline().decode('utf-8', errors='replace')
|
||||
|
||||
if not raw_data:
|
||||
continue
|
||||
|
||||
clean_line = raw_data.strip()
|
||||
if not clean_line:
|
||||
continue
|
||||
|
||||
# Add PC timestamp
|
||||
pc_time = datetime.now().strftime("%H:%M:%S")
|
||||
formatted_line = re.sub(r"^\[\d+\]", f"[{pc_time}]", clean_line)
|
||||
|
||||
# Check for Memory Line
|
||||
if "[MEM]" in formatted_line:
|
||||
free_val, total_val = parse_memory_line(formatted_line)
|
||||
if free_val is not None:
|
||||
with data_lock:
|
||||
time_data.append(pc_time)
|
||||
free_mem_data.append(free_val / 1024) # Convert to KB
|
||||
total_mem_data.append(total_val / 1024) # Convert to KB
|
||||
|
||||
# Print to console
|
||||
line_color = get_color_for_line(formatted_line)
|
||||
print(f"{line_color}{formatted_line}")
|
||||
|
||||
except OSError:
|
||||
print(f"{Fore.RED}Device disconnected.{Style.RESET_ALL}")
|
||||
break
|
||||
except Exception as e:
|
||||
# If thread is killed violently (e.g. main exit), silence errors
|
||||
pass
|
||||
finally:
|
||||
if 'ser' in locals() and ser.is_open:
|
||||
ser.close()
|
||||
|
||||
def update_graph(frame):
|
||||
"""
|
||||
Called by Matplotlib animation to redraw the chart.
|
||||
"""
|
||||
with data_lock:
|
||||
if not time_data:
|
||||
return
|
||||
|
||||
# Convert deques to lists for plotting
|
||||
x = list(time_data)
|
||||
y_free = list(free_mem_data)
|
||||
y_total = list(total_mem_data)
|
||||
|
||||
plt.cla() # Clear axis
|
||||
|
||||
# Plot Total RAM
|
||||
plt.plot(x, y_total, label='Total RAM (KB)', color='red', linestyle='--')
|
||||
|
||||
# Plot Free RAM
|
||||
plt.plot(x, y_free, label='Free RAM (KB)', color='green', marker='o')
|
||||
|
||||
# Fill area under Free RAM
|
||||
plt.fill_between(x, y_free, color='green', alpha=0.1)
|
||||
|
||||
plt.title("ESP32 Memory Monitor")
|
||||
plt.ylabel("Memory (KB)")
|
||||
plt.xlabel("Time")
|
||||
plt.legend(loc='upper left')
|
||||
plt.grid(True, linestyle=':', alpha=0.6)
|
||||
|
||||
# Rotate date labels
|
||||
plt.xticks(rotation=45, ha='right')
|
||||
plt.tight_layout()
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="ESP32 Monitor with Graph")
|
||||
parser.add_argument("port", nargs="?", default="/dev/ttyACM0", help="Serial port")
|
||||
parser.add_argument("--baud", type=int, default=115200, help="Baud rate")
|
||||
args = parser.parse_args()
|
||||
|
||||
# 1. Start the Serial Reader in a separate thread
|
||||
# Daemon=True means this thread dies when the main program closes
|
||||
t = threading.Thread(target=serial_worker, args=(args.port, args.baud), daemon=True)
|
||||
t.start()
|
||||
|
||||
# 2. Set up the Graph (Main Thread)
|
||||
try:
|
||||
plt.style.use('light_background')
|
||||
except:
|
||||
pass
|
||||
|
||||
fig = plt.figure(figsize=(10, 6))
|
||||
|
||||
# Update graph every 1000ms
|
||||
ani = animation.FuncAnimation(fig, update_graph, interval=1000)
|
||||
|
||||
try:
|
||||
print(f"{Fore.YELLOW}Starting Graph Window... (Close window to exit){Style.RESET_ALL}")
|
||||
plt.show()
|
||||
except KeyboardInterrupt:
|
||||
print(f"\n{Fore.YELLOW}Exiting...{Style.RESET_ALL}")
|
||||
plt.close('all') # Force close any lingering plot windows
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Reference in New Issue
Block a user