mirror of
https://github.com/daveallie/crosspoint-reader.git
synced 2026-02-05 15:17:37 +03:00
523 lines
16 KiB
Python
523 lines
16 KiB
Python
import asyncio
|
|
import base64
|
|
import json
|
|
import os
|
|
import re
|
|
import shutil
|
|
import signal
|
|
import sys
|
|
from http import HTTPStatus
|
|
from pathlib import Path
|
|
from typing import Set
|
|
|
|
import websockets
|
|
from websockets.http11 import Response
|
|
from websockets.server import WebSocketServerProtocol
|
|
|
|
|
|
"""
|
|
This script spawns a QEMU process, then forward its stdout and stderr to WebSocket clients.
|
|
"""
|
|
|
|
|
|
|
|
# SD card emulation directory
|
|
sdcard_path = os.environ.get("SDCARD_PATH", "/sdcard")
|
|
if not os.path.exists(sdcard_path):
|
|
os.makedirs(sdcard_path)
|
|
|
|
|
|
class SdCardHandler:
|
|
"""Handle SD card filesystem commands from the emulated device."""
|
|
|
|
def __init__(self, base_path: str):
|
|
self.base_path = base_path
|
|
|
|
def _resolve_path(self, path: str) -> str:
|
|
"""Resolve a device path to a host filesystem path."""
|
|
# Remove leading slash and join with base path
|
|
if path.startswith("/"):
|
|
path = path[1:]
|
|
return os.path.join(self.base_path, path)
|
|
|
|
def handle_fs_list(self, path: str, max_files: str = "200") -> list[str]:
|
|
"""List files in a directory. Returns list of filenames."""
|
|
resolved = self._resolve_path(path)
|
|
max_count = int(max_files) if max_files else 200
|
|
|
|
if not os.path.exists(resolved):
|
|
return []
|
|
|
|
if not os.path.isdir(resolved):
|
|
return []
|
|
|
|
try:
|
|
entries = []
|
|
for entry in os.listdir(resolved):
|
|
if len(entries) >= max_count:
|
|
break
|
|
full_path = os.path.join(resolved, entry)
|
|
# Append "/" for directories to match device behavior
|
|
if os.path.isdir(full_path):
|
|
entries.append(entry + "/")
|
|
else:
|
|
entries.append(entry)
|
|
return entries
|
|
except Exception as e:
|
|
print(f"[SdCard] Error listing {path}: {e}", file=sys.stderr)
|
|
return []
|
|
|
|
def handle_fs_read(self, path: str, offset: str = "0", length: str = "-1") -> str:
|
|
"""Read file contents. Returns base64-encoded data."""
|
|
resolved = self._resolve_path(path)
|
|
offset_int = int(offset) if offset else 0
|
|
length_int = int(length) if length else -1
|
|
|
|
if not os.path.exists(resolved) or os.path.isdir(resolved):
|
|
return ""
|
|
|
|
try:
|
|
with open(resolved, "rb") as f:
|
|
if offset_int > 0:
|
|
f.seek(offset_int)
|
|
if length_int == -1:
|
|
data = f.read()
|
|
else:
|
|
data = f.read(length_int)
|
|
return base64.b64encode(data).decode("ascii")
|
|
except Exception as e:
|
|
print(f"[SdCard] Error reading {path}: {e}", file=sys.stderr)
|
|
return ""
|
|
|
|
def handle_fs_stat(self, path: str) -> int:
|
|
"""Get file size. Returns -1 if not found, -2 if directory."""
|
|
resolved = self._resolve_path(path)
|
|
|
|
if not os.path.exists(resolved):
|
|
return -1
|
|
|
|
if os.path.isdir(resolved):
|
|
return -2
|
|
|
|
try:
|
|
return os.path.getsize(resolved)
|
|
except Exception as e:
|
|
print(f"[SdCard] Error stat {path}: {e}", file=sys.stderr)
|
|
return -1
|
|
|
|
def handle_fs_write(self, path: str, b64_data: str, offset: str = "0", inplace: str = "0") -> int:
|
|
"""Write data to file. Returns bytes written."""
|
|
resolved = self._resolve_path(path)
|
|
offset_int = int(offset) if offset else 0
|
|
is_inplace = inplace == "1"
|
|
|
|
try:
|
|
data = base64.b64decode(b64_data)
|
|
except Exception as e:
|
|
print(f"[SdCard] Error decoding base64 for {path}: {e}", file=sys.stderr)
|
|
return 0
|
|
|
|
try:
|
|
# Ensure parent directory exists
|
|
parent = os.path.dirname(resolved)
|
|
if parent and not os.path.exists(parent):
|
|
os.makedirs(parent)
|
|
|
|
if is_inplace and os.path.exists(resolved):
|
|
# In-place write at offset
|
|
with open(resolved, "r+b") as f:
|
|
f.seek(offset_int)
|
|
f.write(data)
|
|
else:
|
|
# Overwrite or create new file
|
|
mode = "r+b" if os.path.exists(resolved) and offset_int > 0 else "wb"
|
|
with open(resolved, mode) as f:
|
|
if offset_int > 0:
|
|
f.seek(offset_int)
|
|
f.write(data)
|
|
|
|
return len(data)
|
|
except Exception as e:
|
|
print(f"[SdCard] Error writing {path}: {e}", file=sys.stderr)
|
|
return 0
|
|
|
|
def handle_fs_mkdir(self, path: str) -> int:
|
|
"""Create directory. Returns 0 on success."""
|
|
resolved = self._resolve_path(path)
|
|
|
|
try:
|
|
os.makedirs(resolved, exist_ok=True)
|
|
return 0
|
|
except Exception as e:
|
|
print(f"[SdCard] Error mkdir {path}: {e}", file=sys.stderr)
|
|
return -1
|
|
|
|
def handle_fs_rm(self, path: str) -> int:
|
|
"""Remove file or directory. Returns 0 on success."""
|
|
resolved = self._resolve_path(path)
|
|
|
|
if not os.path.exists(resolved):
|
|
return -1
|
|
|
|
try:
|
|
if os.path.isdir(resolved):
|
|
shutil.rmtree(resolved)
|
|
else:
|
|
os.remove(resolved)
|
|
return 0
|
|
except Exception as e:
|
|
print(f"[SdCard] Error removing {path}: {e}", file=sys.stderr)
|
|
return -1
|
|
|
|
|
|
# Global SD card handler instance
|
|
sdcard_handler = SdCardHandler(sdcard_path)
|
|
|
|
|
|
# WebSocket clients
|
|
connected_clients: Set[WebSocketServerProtocol] = set()
|
|
|
|
# QEMU process
|
|
qemu_process: asyncio.subprocess.Process | None = None
|
|
|
|
# Command pattern: $$CMD_(COMMAND):(ARG0)[:(ARG1)][:(ARG2)][:(ARG3)]$$
|
|
CMD_PATTERN = re.compile(r'^\$\$CMD_([A-Z_]+):(.+)\$\$$')
|
|
|
|
|
|
def parse_command(message: str) -> tuple[str, list[str]] | None:
|
|
"""Parse a command message. Returns (command, args) or None if not a command."""
|
|
match = CMD_PATTERN.match(message)
|
|
if not match:
|
|
return None
|
|
command = match.group(1)
|
|
args_str = match.group(2)
|
|
# Split args by ':' but the last arg might contain ':'
|
|
args = args_str.split(':')
|
|
return (command, args)
|
|
|
|
|
|
async def send_response(response: str):
|
|
"""Send a response back to the QEMU process via stdin."""
|
|
global qemu_process
|
|
if qemu_process and qemu_process.stdin:
|
|
qemu_process.stdin.write((response + "\n").encode())
|
|
await qemu_process.stdin.drain()
|
|
|
|
|
|
LAST_DISPLAY_BUFFER = None
|
|
|
|
async def handle_command(command: str, args: list[str]) -> bool:
|
|
"""Handle a command from the device. Returns True if handled."""
|
|
global sdcard_handler, LAST_DISPLAY_BUFFER
|
|
|
|
try:
|
|
if command == "DISPLAY":
|
|
# Display command - no response needed
|
|
# arg0: base64-encoded buffer
|
|
print(f"[Emulator] DISPLAY command received (buffer size: {len(args[0]) if args else 0})", flush=True)
|
|
LAST_DISPLAY_BUFFER = args[0] if args else None
|
|
return True
|
|
|
|
elif command == "FS_LIST":
|
|
# arg0: path, arg1: max files (optional)
|
|
path = args[0] if len(args) > 0 else "/"
|
|
max_files = args[1] if len(args) > 1 else "200"
|
|
entries = sdcard_handler.handle_fs_list(path, max_files)
|
|
print(f"[Emulator] FS_LIST {path} -> {len(entries)} entries", flush=True)
|
|
# Send each entry as a line, then empty line to terminate
|
|
for entry in entries:
|
|
await send_response(entry)
|
|
await send_response("") # Empty line terminates the list
|
|
return True
|
|
|
|
elif command == "FS_READ":
|
|
# arg0: path, arg1: offset, arg2: length
|
|
path = args[0] if len(args) > 0 else ""
|
|
offset = args[1] if len(args) > 1 else "0"
|
|
length = args[2] if len(args) > 2 else "-1"
|
|
result = sdcard_handler.handle_fs_read(path, offset, length)
|
|
print(f"[Emulator] FS_READ {path} offset={offset} len={length} -> {len(result)} bytes (b64)", flush=True)
|
|
await send_response(result)
|
|
return True
|
|
|
|
elif command == "FS_STAT":
|
|
# arg0: path
|
|
path = args[0] if len(args) > 0 else ""
|
|
result = sdcard_handler.handle_fs_stat(path)
|
|
print(f"[Emulator] FS_STAT {path} -> {result}", flush=True)
|
|
await send_response(str(result))
|
|
return True
|
|
|
|
elif command == "FS_WRITE":
|
|
# arg0: path, arg1: base64-encoded data, arg2: offset, arg3: is inplace
|
|
path = args[0] if len(args) > 0 else ""
|
|
b64_data = args[1] if len(args) > 1 else ""
|
|
offset = args[2] if len(args) > 2 else "0"
|
|
inplace = args[3] if len(args) > 3 else "0"
|
|
result = sdcard_handler.handle_fs_write(path, b64_data, offset, inplace)
|
|
print(f"[Emulator] FS_WRITE {path} offset={offset} inplace={inplace} -> {result} bytes", flush=True)
|
|
await send_response(str(result))
|
|
return True
|
|
|
|
elif command == "FS_MKDIR":
|
|
# arg0: path
|
|
path = args[0] if len(args) > 0 else ""
|
|
result = sdcard_handler.handle_fs_mkdir(path)
|
|
print(f"[Emulator] FS_MKDIR {path} -> {result}", flush=True)
|
|
await send_response(str(result))
|
|
return True
|
|
|
|
elif command == "FS_RM":
|
|
# arg0: path
|
|
path = args[0] if len(args) > 0 else ""
|
|
result = sdcard_handler.handle_fs_rm(path)
|
|
print(f"[Emulator] FS_RM {path} -> {result}", flush=True)
|
|
await send_response(str(result))
|
|
return True
|
|
|
|
else:
|
|
print(f"[Emulator] Unknown command: {command}", flush=True)
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"[Emulator] Error handling command {command}: {e}", file=sys.stderr)
|
|
return False
|
|
|
|
|
|
def process_message(message: str, for_ui: bool) -> str:
|
|
if message.startswith("$$CMD_"):
|
|
if for_ui:
|
|
return message
|
|
else:
|
|
# $$CMD_(COMMAND)[:(ARG0)][:(ARG1)][:(ARG2)]$$
|
|
command = message[2:].split(':')[0]
|
|
return "[Emulator] Received command: " + command
|
|
return message
|
|
|
|
|
|
async def broadcast_message(msg_type: str, data: str):
|
|
"""Broadcast a message to all connected WebSocket clients."""
|
|
if not connected_clients:
|
|
return
|
|
|
|
message = json.dumps({"type": msg_type, "data": data})
|
|
|
|
# Send to all clients, remove disconnected ones
|
|
disconnected = set()
|
|
for client in connected_clients:
|
|
try:
|
|
await client.send(message)
|
|
except websockets.exceptions.ConnectionClosed:
|
|
disconnected.add(client)
|
|
|
|
connected_clients.difference_update(disconnected)
|
|
|
|
|
|
async def read_stream(stream: asyncio.StreamReader, stream_type: str):
|
|
"""Read from a stream line by line and broadcast to clients."""
|
|
buffer = b""
|
|
|
|
while True:
|
|
try:
|
|
chunk = await stream.read(1024)
|
|
if not chunk:
|
|
break
|
|
|
|
buffer += chunk
|
|
|
|
# Process complete lines
|
|
while b"\n" in buffer:
|
|
line, buffer = buffer.split(b"\n", 1)
|
|
try:
|
|
decoded_line = line.decode("utf-8", errors="replace")
|
|
except Exception:
|
|
decoded_line = line.decode("latin-1", errors="replace")
|
|
|
|
# Check if this is a command that needs handling
|
|
parsed = parse_command(decoded_line)
|
|
if parsed:
|
|
command, args = parsed
|
|
await handle_command(command, args)
|
|
# Still broadcast to UI for visibility
|
|
await broadcast_message(stream_type, decoded_line)
|
|
continue
|
|
|
|
# Forward to parent process
|
|
if stream_type == "stdout":
|
|
print(process_message(decoded_line, for_ui=False), flush=True)
|
|
else:
|
|
print(process_message(decoded_line, for_ui=False), file=sys.stderr, flush=True)
|
|
|
|
# Broadcast to WebSocket clients
|
|
await broadcast_message(stream_type, process_message(decoded_line, for_ui=True))
|
|
|
|
except Exception as e:
|
|
print(f"Error reading {stream_type}: {e}", file=sys.stderr)
|
|
break
|
|
|
|
# Process remaining buffer
|
|
if buffer:
|
|
try:
|
|
decoded_line = buffer.decode("utf-8", errors="replace")
|
|
except Exception:
|
|
decoded_line = buffer.decode("latin-1", errors="replace")
|
|
|
|
if stream_type == "stdout":
|
|
print(decoded_line, flush=True)
|
|
else:
|
|
print(decoded_line, file=sys.stderr, flush=True)
|
|
|
|
await broadcast_message(stream_type, decoded_line)
|
|
|
|
|
|
async def spawn_qemu():
|
|
"""Spawn the QEMU process and capture its output."""
|
|
global qemu_process
|
|
|
|
# Build the command
|
|
cmd = [
|
|
"qemu-system-riscv32",
|
|
"-nographic",
|
|
"-M", "esp32c3",
|
|
"-drive", "file=flash.bin,if=mtd,format=raw",
|
|
# "-global", "driver=timer.esp32c3.timg,property=wdt_disable,value=true", # got panic if we don't disable WDT, why?
|
|
]
|
|
|
|
# Get working directory from environment or use /tmp
|
|
work_dir = os.getcwd()
|
|
|
|
print(f"Starting QEMU in {work_dir}...", flush=True)
|
|
print(f"Command: {' '.join(cmd)}", flush=True)
|
|
|
|
try:
|
|
qemu_process = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
stdin=asyncio.subprocess.PIPE,
|
|
cwd=work_dir,
|
|
env=os.environ.copy()
|
|
)
|
|
|
|
# Read stdout and stderr concurrently
|
|
await asyncio.gather(
|
|
read_stream(qemu_process.stdout, "stdout"),
|
|
read_stream(qemu_process.stderr, "stderr")
|
|
)
|
|
|
|
# Wait for process to complete
|
|
await qemu_process.wait()
|
|
print(f"QEMU process exited with code {qemu_process.returncode}", flush=True)
|
|
|
|
except FileNotFoundError:
|
|
print("Error: qemu-system-riscv32 not found. Make sure it's in PATH.", file=sys.stderr)
|
|
await broadcast_message("stderr", "Error: qemu-system-riscv32 not found")
|
|
except Exception as e:
|
|
print(f"Error spawning QEMU: {e}", file=sys.stderr)
|
|
await broadcast_message("stderr", f"Error spawning QEMU: {e}")
|
|
|
|
|
|
async def websocket_handler(websocket: WebSocketServerProtocol):
|
|
"""Handle a WebSocket connection."""
|
|
connected_clients.add(websocket)
|
|
print(f"Client connected. Total clients: {len(connected_clients)}", flush=True)
|
|
|
|
try:
|
|
# Send a welcome message
|
|
await websocket.send(json.dumps({
|
|
"type": "info",
|
|
"data": "Connected to CrossPoint emulator"
|
|
}))
|
|
|
|
# Send the last display buffer if available
|
|
if LAST_DISPLAY_BUFFER is not None:
|
|
await websocket.send(json.dumps({
|
|
"type": "stdout",
|
|
"data": f"$$CMD_DISPLAY:{LAST_DISPLAY_BUFFER}$$"
|
|
}))
|
|
|
|
# Handle incoming messages (for stdin forwarding)
|
|
async for message in websocket:
|
|
try:
|
|
data = json.loads(message)
|
|
if data.get("type") == "stdin" and qemu_process and qemu_process.stdin:
|
|
input_data = data.get("data", "")
|
|
qemu_process.stdin.write((input_data + "\n").encode())
|
|
await qemu_process.stdin.drain()
|
|
except json.JSONDecodeError:
|
|
pass
|
|
except Exception as e:
|
|
print(f"Error handling client message: {e}", file=sys.stderr)
|
|
|
|
except websockets.exceptions.ConnectionClosed:
|
|
pass
|
|
finally:
|
|
connected_clients.discard(websocket)
|
|
print(f"Client disconnected. Total clients: {len(connected_clients)}", flush=True)
|
|
|
|
|
|
async def main():
|
|
"""Main entry point."""
|
|
host = os.environ.get("HOST", "0.0.0.0")
|
|
port = int(os.environ.get("PORT", "8090"))
|
|
|
|
print(f"Starting WebSocket server on {host}:{port}", flush=True)
|
|
|
|
# Start WebSocket server
|
|
async with websockets.serve(
|
|
websocket_handler, host, port,
|
|
process_request=process_request,
|
|
origins=None,
|
|
):
|
|
print(f"WebSocket server running on ws://{host}:{port}/ws", flush=True)
|
|
print(f"Web UI available at http://{host}:{port}/", flush=True)
|
|
|
|
# Spawn QEMU process
|
|
await spawn_qemu()
|
|
|
|
|
|
def signal_handler(signum, frame):
|
|
"""Handle shutdown signals."""
|
|
print("\nShutting down...", flush=True)
|
|
if qemu_process:
|
|
qemu_process.terminate()
|
|
sys.exit(0)
|
|
|
|
|
|
def process_request(connection, request):
|
|
"""Handle HTTP requests for serving static files."""
|
|
path = request.path
|
|
|
|
if path == "/" or path == "/web_ui.html":
|
|
# Serve the web_ui.html file
|
|
html_path = Path(__file__).parent / "web_ui.html"
|
|
try:
|
|
content = html_path.read_bytes()
|
|
return Response(
|
|
HTTPStatus.OK,
|
|
"OK",
|
|
websockets.Headers([
|
|
("Content-Type", "text/html; charset=utf-8"),
|
|
("Content-Length", str(len(content))),
|
|
]),
|
|
content
|
|
)
|
|
except FileNotFoundError:
|
|
return Response(HTTPStatus.NOT_FOUND, "Not Found", websockets.Headers(), b"web_ui.html not found")
|
|
|
|
if path == "/ws":
|
|
# Return None to continue with WebSocket handshake
|
|
return None
|
|
|
|
# Return 404 for other paths
|
|
return Response(HTTPStatus.NOT_FOUND, "Not Found", websockets.Headers(), b"Not found")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Set up signal handlers
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
# Run the main loop
|
|
asyncio.run(main())
|