add storage emulation

This commit is contained in:
Xuan Son Nguyen 2026-01-23 02:06:06 +01:00
parent dc2649775a
commit db359b2798
9 changed files with 1206 additions and 319 deletions

179
lib/hal/EmulationUtils.h Normal file
View File

@ -0,0 +1,179 @@
#pragma once
#include <Arduino.h>
#include <vector>
namespace EmulationUtils {
// Packet format (from device to host):
// $$CMD_(COMMAND):[(ARG0)][:(ARG1)][:(ARG2)]$$\n
// Host response with either a base64-encoded payload or int64, terminated by newline:
// (BASE64_ENCODED_PAYLOAD)\n
// 123\n
static const char* CMD_DISPLAY = "DISPLAY"; // arg0: base64-encoded buffer -- no return value
static const char* CMD_FS_LIST = "FS_LIST"; // arg0: path, arg1: max files -- returns list of filenames, one per line, terminated by empty line
static const char* CMD_FS_READ = "FS_READ"; // arg0: path, arg1: offset, arg2: length (-1 means read all) -- returns base64-encoded file contents
static const char* CMD_FS_STAT = "FS_STAT"; // arg0: path -- return file size int64_t; -1 means not found; -2 means is a directory
static const char* CMD_FS_WRITE = "FS_WRITE"; // arg0: path, arg1: base64-encoded data, arg2: offset, arg3: is inplace (0/1) -- return int64_t bytes written
static const char* CMD_FS_MKDIR = "FS_MKDIR"; // arg0: path -- return int64_t: 0=success
static const char* CMD_FS_RM = "FS_RM"; // arg0: path -- return int64_t: 0=success
static std::string base64_encode(const char* buf, unsigned int bufLen);
static std::vector<uint8_t> base64_decode(const char* encoded_string, unsigned int in_len);
static void sendCmd(const char* cmd, const char* arg0 = nullptr, const char* arg1 = nullptr, const char* arg2 = nullptr, const char* arg3 = nullptr) {
Serial.printf("[%lu] [EMU] Sending command: %s\n", millis(), cmd);
Serial.print("$$CMD_");
Serial.print(cmd);
if (arg0 != nullptr) {
Serial.print(":");
Serial.print(arg0);
} else {
Serial.print(":"); // ensure at least one colon
}
if (arg1 != nullptr) {
Serial.print(":");
Serial.print(arg1);
}
if (arg2 != nullptr) {
Serial.print(":");
Serial.print(arg2);
}
if (arg3 != nullptr) {
Serial.print(":");
Serial.print(arg3);
}
Serial.print("$$\n");
}
static String recvRespStr() {
unsigned long startMillis = millis();
String line;
const unsigned long timeoutMs = 5000;
while (millis() - startMillis < (unsigned long)timeoutMs) {
if (Serial.available()) {
char c = Serial.read();
if (c == '\n') {
return line;
} else {
line += c;
}
}
}
// should never reach here
Serial.println("FATAL: Timeout waiting for response");
return String();
}
static std::vector<uint8_t> recvRespBuf() {
String respStr = recvRespStr();
return base64_decode(respStr.c_str(), respStr.length());
}
static int64_t recvRespInt64() {
String respStr = recvRespStr();
return respStr.toInt();
}
static const std::string base64_chars =
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz"
"0123456789+/";
static inline bool is_base64(char c) {
return (isalnum(c) || (c == '+') || (c == '/'));
}
static std::string base64_encode(const char* buf, unsigned int bufLen) {
std::string ret;
ret.reserve(bufLen * 4 / 3 + 4); // reserve enough space
int i = 0;
int j = 0;
char char_array_3[3];
char char_array_4[4];
while (bufLen--) {
char_array_3[i++] = *(buf++);
if (i == 3) {
char_array_4[0] = (char_array_3[0] & 0xfc) >> 2;
char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 0xf0) >> 4);
char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 0xc0) >> 6);
char_array_4[3] = char_array_3[2] & 0x3f;
for(i = 0; (i < 4) ; i++) {
ret += base64_chars[char_array_4[i]];
}
i = 0;
}
}
if (i) {
for(j = i; j < 3; j++) {
char_array_3[j] = '\0';
}
char_array_4[0] = (char_array_3[0] & 0xfc) >> 2;
char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 0xf0) >> 4);
char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 0xc0) >> 6);
char_array_4[3] = char_array_3[2] & 0x3f;
for (j = 0; (j < i + 1); j++) {
ret += base64_chars[char_array_4[j]];
}
while((i++ < 3)) {
ret += '=';
}
}
return ret;
}
static std::vector<uint8_t> base64_decode(const char* encoded_string, unsigned int in_len) {
int i = 0;
int j = 0;
int in_ = 0;
char char_array_4[4], char_array_3[3];
std::vector<uint8_t> ret;
while (in_len-- && ( encoded_string[in_] != '=') && is_base64(encoded_string[in_])) {
char_array_4[i++] = encoded_string[in_]; in_++;
if (i ==4) {
for (i = 0; i < 4; i++) {
char_array_4[i] = base64_chars.find(char_array_4[i]);
}
char_array_3[0] = (char_array_4[0] << 2) + ((char_array_4[1] & 0x30) >> 4);
char_array_3[1] = ((char_array_4[1] & 0xf) << 4) + ((char_array_4[2] & 0x3c) >> 2);
char_array_3[2] = ((char_array_4[2] & 0x3) << 6) + char_array_4[3];
for (i = 0; (i < 3); i++) {
ret.push_back(char_array_3[i]);
}
i = 0;
}
}
if (i) {
for (j = i; j < 4; j++) {
char_array_4[j] = 0;
}
for (j = 0; j < 4; j++) {
char_array_4[j] = base64_chars.find(char_array_4[j]);
}
char_array_3[0] = (char_array_4[0] << 2) + ((char_array_4[1] & 0x30) >> 4);
char_array_3[1] = ((char_array_4[1] & 0xf) << 4) + ((char_array_4[2] & 0x3c) >> 2);
char_array_3[2] = ((char_array_4[2] & 0x3) << 6) + char_array_4[3];
for (j = 0; (j < i - 1); j++) {
ret.push_back(char_array_3[j]);
}
}
return ret;
}
} // namespace EmulationUtils

View File

@ -1,8 +1,7 @@
#include <HalDisplay.h>
#include <EmulationUtils.h>
#include <string>
std::string base64_encode(char* buf, unsigned int bufLen);
HalDisplay::HalDisplay(int8_t sclk, int8_t mosi, int8_t cs, int8_t dc, int8_t rst, int8_t busy) : einkDisplay(sclk, mosi, cs, dc, rst, busy) {
if (is_emulated) {
emuFramebuffer0 = new uint8_t[BUFFER_SIZE];
@ -71,10 +70,9 @@ void HalDisplay::displayBuffer(RefreshMode mode) {
einkDisplay.displayBuffer(mode);
} else {
Serial.printf("[%lu] [ ] Emulated display buffer with mode %d\n", millis(), static_cast<int>(mode));
std::string b64 = base64_encode(reinterpret_cast<char*>(emuFramebuffer0), BUFFER_SIZE);
Serial.printf("$$DATA:DISPLAY:{\"mode\":%d,\"buffer\":\"", static_cast<int>(mode));
Serial.print(b64.c_str());
Serial.print("\"}$$\n");
std::string b64 = EmulationUtils::base64_encode(reinterpret_cast<char*>(emuFramebuffer0), BUFFER_SIZE);
EmulationUtils::sendCmd(EmulationUtils::CMD_DISPLAY, b64.c_str());
// no response expected
}
}
@ -155,58 +153,3 @@ void HalDisplay::displayGrayBuffer() {
// TODO: not sure what this does
}
}
//
// Base64 utilities
//
static const std::string base64_chars =
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz"
"0123456789+/";
static inline bool is_base64(char c) {
return (isalnum(c) || (c == '+') || (c == '/'));
}
std::string base64_encode(char* buf, unsigned int bufLen) {
std::string ret;
ret.reserve(bufLen * 4 / 3 + 4); // reserve enough space
int i = 0;
int j = 0;
char char_array_3[3];
char char_array_4[4];
while (bufLen--) {
char_array_3[i++] = *(buf++);
if (i == 3) {
char_array_4[0] = (char_array_3[0] & 0xfc) >> 2;
char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 0xf0) >> 4);
char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 0xc0) >> 6);
char_array_4[3] = char_array_3[2] & 0x3f;
for(i = 0; (i < 4) ; i++)
ret += base64_chars[char_array_4[i]];
i = 0;
}
}
if (i) {
for(j = i; j < 3; j++) {
char_array_3[j] = '\0';
}
char_array_4[0] = (char_array_3[0] & 0xfc) >> 2;
char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 0xf0) >> 4);
char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 0xc0) >> 6);
char_array_4[3] = char_array_3[2] & 0x3f;
for (j = 0; (j < i + 1); j++)
ret += base64_chars[char_array_4[j]];
while((i++ < 3))
ret += '=';
}
return ret;
}

View File

@ -0,0 +1,327 @@
#include "HalStorage.h"
#include "EmulationUtils.h"
#include <vector>
#if CROSSPOINT_EMULATED == 0
#include <SDCardManager.h>
#endif
HalStorage HalStorage::instance;
HalStorage::HalStorage() {}
bool HalStorage::begin() {
#if CROSSPOINT_EMULATED == 0
return SdMan.begin();
#else
// no-op
return true;
#endif
}
bool HalStorage::ready() const {
#if CROSSPOINT_EMULATED == 0
return SdMan.ready();
#else
// no-op
return true;
#endif
}
std::vector<String> HalStorage::listFiles(const char* path, int maxFiles) {
#if CROSSPOINT_EMULATED == 0
return SdMan.listFiles(path, maxFiles);
#else
Serial.printf("[%lu] [FS ] Emulated listFiles: %s\n", millis(), path);
EmulationUtils::sendCmd(EmulationUtils::CMD_FS_LIST, path);
std::vector<String> output;
for (int i = 0; i < maxFiles; ++i) {
auto resp = EmulationUtils::recvRespStr();
if (resp.length() == 0) {
break;
}
output.push_back(resp);
}
return output;
#endif
}
String HalStorage::readFile(const char* path) {
#if CROSSPOINT_EMULATED == 0
return SdMan.readFile(path);
#else
Serial.printf("[%lu] [FS ] Emulated readFile: %s\n", millis(), path);
EmulationUtils::sendCmd(EmulationUtils::CMD_FS_READ, path, "0", "-1");
return EmulationUtils::recvRespStr();
#endif
}
static int64_t getFileSizeEmulated(const char* path) {
EmulationUtils::sendCmd(EmulationUtils::CMD_FS_STAT, path);
return EmulationUtils::recvRespInt64();
}
bool HalStorage::readFileToStream(const char* path, Print& out, size_t chunkSize) {
#if CROSSPOINT_EMULATED == 0
return SdMan.readFileToStream(path, out, chunkSize);
#else
Serial.printf("[%lu] [FS ] Emulated readFileToStream: %s\n", millis(), path);
auto size = getFileSizeEmulated(path);
if (size == -1) {
Serial.printf("[%lu] [FS ] File not found: %s\n", millis(), path);
return false;
}
if (size == -2) {
Serial.printf("[%lu] [FS ] Path is a directory, not a file: %s\n", millis(), path);
return false;
}
size_t bytesRead = 0;
while (bytesRead < static_cast<size_t>(size)) {
size_t toRead = std::min(chunkSize, static_cast<size_t>(size) - bytesRead);
EmulationUtils::sendCmd(EmulationUtils::CMD_FS_READ, path, String(bytesRead).c_str(), String(toRead).c_str());
auto buf = EmulationUtils::recvRespBuf();
out.write(buf.data(), buf.size());
bytesRead += buf.size();
}
return true;
#endif
}
size_t HalStorage::readFileToBuffer(const char* path, char* buffer, size_t bufferSize, size_t maxBytes) {
#if CROSSPOINT_EMULATED == 0
return SdMan.readFileToBuffer(path, buffer, bufferSize, maxBytes);
#else
Serial.printf("[%lu] [FS ] Emulated readFileToBuffer: %s\n", millis(), path);
auto size = getFileSizeEmulated(path);
if (size == -1) {
Serial.printf("[%lu] [FS ] File not found: %s\n", millis(), path);
return 0;
}
if (size == -2) {
Serial.printf("[%lu] [FS ] Path is a directory, not a file: %s\n", millis(), path);
return 0;
}
size_t toRead = static_cast<size_t>(size);
if (maxBytes > 0 && maxBytes < toRead) {
toRead = maxBytes;
}
if (toRead >= bufferSize) {
toRead = bufferSize - 1; // leave space for null terminator
}
EmulationUtils::sendCmd(EmulationUtils::CMD_FS_READ, path, "0", String(toRead).c_str());
auto buf = EmulationUtils::recvRespBuf();
size_t bytesRead = buf.size();
memcpy(buffer, buf.data(), bytesRead);
buffer[bytesRead] = '\0'; // null-terminate
return bytesRead;
#endif
}
bool HalStorage::writeFile(const char* path, const String& content) {
#if CROSSPOINT_EMULATED == 0
return SdMan.writeFile(path, content);
#else
Serial.printf("[%lu] [FS ] Emulated writeFile: %s\n", millis(), path);
std::string b64 = EmulationUtils::base64_encode((char*)content.c_str(), content.length());
EmulationUtils::sendCmd(EmulationUtils::CMD_FS_WRITE, path, b64.c_str(), "0", "0");
EmulationUtils::recvRespInt64(); // unused for now
return true;
#endif
}
bool HalStorage::ensureDirectoryExists(const char* path) {
#if CROSSPOINT_EMULATED == 0
return SdMan.ensureDirectoryExists(path);
#else
Serial.printf("[%lu] [FS ] Emulated ensureDirectoryExists: %s\n", millis(), path);
EmulationUtils::sendCmd(EmulationUtils::CMD_FS_MKDIR, path);
EmulationUtils::recvRespInt64(); // unused for now
return true;
#endif
}
FsFile HalStorage::open(const char* path, const oflag_t oflag) {
#if CROSSPOINT_EMULATED == 0
return SdMan.open(path, oflag);
#else
// TODO: do we need to check existence or create the file?
return FsFile(path, oflag);
#endif
}
bool HalStorage::mkdir(const char* path, const bool pFlag) {
#if CROSSPOINT_EMULATED == 0
return SdMan.mkdir(path, pFlag);
#else
EmulationUtils::sendCmd(EmulationUtils::CMD_FS_MKDIR, path);
EmulationUtils::recvRespInt64(); // unused for now
return true;
#endif
}
bool HalStorage::exists(const char* path) {
#if CROSSPOINT_EMULATED == 0
return SdMan.exists(path);
#else
Serial.printf("[%lu] [FS ] Emulated exists: %s\n", millis(), path);
auto size = getFileSizeEmulated(path);
return size != -1;
#endif
}
bool HalStorage::remove(const char* path) {
#if CROSSPOINT_EMULATED == 0
return SdMan.remove(path);
#else
Serial.printf("[%lu] [FS ] Emulated remove: %s\n", millis(), path);
EmulationUtils::sendCmd(EmulationUtils::CMD_FS_RM, path);
EmulationUtils::recvRespInt64(); // unused for now
return true;
#endif
}
bool HalStorage::rmdir(const char* path) {
#if CROSSPOINT_EMULATED == 0
return SdMan.rmdir(path);
#else
Serial.printf("[%lu] [FS ] Emulated rmdir: %s\n", millis(), path);
EmulationUtils::sendCmd(EmulationUtils::CMD_FS_RM, path);
EmulationUtils::recvRespInt64(); // unused for now
return true;
#endif
}
bool HalStorage::openFileForRead(const char* moduleName, const char* path, FsFile& file) {
#if CROSSPOINT_EMULATED == 0
return SdMan.openFileForRead(moduleName, path, file);
#else
Serial.printf("[%lu] [FS ] Emulated openFileForRead: %s\n", millis(), path);
auto size = getFileSizeEmulated(path);
if (size == -1) {
Serial.printf("[%lu] [FS ] File not found: %s\n", millis(), path);
return false;
}
if (size == -2) {
Serial.printf("[%lu] [FS ] Path is a directory, not a file: %s\n", millis(), path);
return false;
}
return true;
#endif
}
bool HalStorage::openFileForRead(const char* moduleName, const std::string& path, FsFile& file) {
return openFileForRead(moduleName, path.c_str(), file);
}
bool HalStorage::openFileForRead(const char* moduleName, const String& path, FsFile& file) {
return openFileForRead(moduleName, path.c_str(), file);
}
bool HalStorage::openFileForWrite(const char* moduleName, const char* path, FsFile& file) {
#if CROSSPOINT_EMULATED == 0
return SdMan.openFileForWrite(moduleName, path, file);
#else
Serial.printf("[%lu] [FS ] Emulated openFileForWrite: %s\n", millis(), path);
auto size = getFileSizeEmulated(path);
if (size == -1) {
Serial.printf("[%lu] [FS ] File not found: %s\n", millis(), path);
return false;
}
if (size == -2) {
Serial.printf("[%lu] [FS ] Path is a directory, not a file: %s\n", millis(), path);
return false;
}
return true;
#endif
}
bool HalStorage::openFileForWrite(const char* moduleName, const std::string& path, FsFile& file) {
return openFileForWrite(moduleName, path.c_str(), file);
}
bool HalStorage::openFileForWrite(const char* moduleName, const String& path, FsFile& file) {
return openFileForWrite(moduleName, path.c_str(), file);
}
bool HalStorage::removeDir(const char* path) {
#if CROSSPOINT_EMULATED == 0
return SdMan.removeDir(path);
#else
// to be implemented
return false;
#endif
}
#if CROSSPOINT_EMULATED == 1
//
// FsFile emulation methods
//
FsFile::FsFile(const char* path, oflag_t oflag) : path(path), oflag(oflag) {
Serial.printf("[%lu] [FS ] Emulated FsFile open: %s\n", millis(), path);
auto size = getFileSizeEmulated(path);
if (size == -1) {
Serial.printf("[%lu] [FS ] File not found: %s\n", millis(), path);
open = false;
fileSizeBytes = 0;
} else if (isDir) {
Serial.printf("[%lu] [FS ] Path is a directory: %s\n", millis(), path);
open = true;
// get directory entries
EmulationUtils::sendCmd(EmulationUtils::CMD_FS_LIST, path);
dirEntries.clear();
while (true) {
auto resp = EmulationUtils::recvRespStr();
if (resp.length() == 0) {
break;
}
dirEntries.push_back(resp);
}
Serial.printf("[%lu] [FS ] Directory has %u entries\n", millis(), (unsigned)dirEntries.size());
dirIndex = 0;
fileSizeBytes = 0;
} else {
open = true;
fileSizeBytes = static_cast<size_t>(size);
}
}
int FsFile::read(void* buf, size_t count) {
if (!open || isDir) return -1;
size_t bytesAvailable = (fileSizeBytes > filePos) ? (fileSizeBytes - filePos) : 0;
if (bytesAvailable == 0) return 0;
size_t toRead = std::min(count, bytesAvailable);
EmulationUtils::sendCmd(EmulationUtils::CMD_FS_READ, path.c_str(), String(filePos).c_str(), String(toRead).c_str());
auto data = EmulationUtils::recvRespBuf();
size_t bytesRead = data.size();
memcpy(buf, data.data(), bytesRead);
filePos += bytesRead;
return static_cast<int>(bytesRead);
}
int FsFile::read() {
uint8_t b;
int result = read(&b, 1);
if (result <= 0) return -1;
return b;
}
size_t FsFile::write(const uint8_t* buffer, size_t size) {
if (!open || isDir) return 0;
std::string b64 = EmulationUtils::base64_encode((const char*)buffer, size);
EmulationUtils::sendCmd(EmulationUtils::CMD_FS_WRITE, path.c_str(), b64.c_str(), String(filePos).c_str(), "1");
EmulationUtils::recvRespInt64(); // unused for now
filePos += size;
if (filePos > fileSizeBytes) {
fileSizeBytes = filePos;
}
return size;
}
size_t FsFile::write(uint8_t b) {
return write(&b, 1);
}
#endif

View File

@ -1,8 +1,76 @@
#pragma once
#include <SDCardManager.h>
#include <Arduino.h>
#include <vector>
#if CROSSPOINT_EMULATED == 0
#include <SDCardManager.h>
#else
typedef int oflag_t;
#define O_RDONLY 0 /* +1 == FREAD */
#define O_WRONLY 1 /* +1 == FWRITE */
#define O_RDWR 2 /* +1 == FREAD|FWRITE */
class FsFile : public Print {
String path;
String name;
oflag_t oflag; // unused for now
bool open = false;
// directory state
bool isDir = false;
std::vector<String> dirEntries;
size_t dirIndex = 0;
// file state
size_t filePos = 0;
size_t fileSizeBytes = 0;
public:
FsFile() = default;
FsFile(const char* path, oflag_t oflag);
~FsFile() = default;
void flush() { /* no-op */ }
size_t getName(char* name, size_t len) {
String n = path;
if (n.length() >= len) {
n = n.substring(0, len - 1);
}
n.toCharArray(name, len);
return n.length();
}
size_t size() { return fileSizeBytes; }
size_t fileSize() { return size(); }
size_t seek(size_t pos) {
filePos = pos;
return filePos;
}
size_t seekCur(int64_t offset) { return seek(filePos + offset); }
size_t seekSet(size_t offset) { return seek(offset); }
int available() const { return (fileSizeBytes > filePos) ? (fileSizeBytes - filePos) : 0; }
size_t position() const { return filePos; }
int read(void* buf, size_t count);
int read(); // read a single byte
size_t write(const uint8_t* buffer, size_t size);
size_t write(uint8_t b) override;
bool isDirectory() const { return isDir; }
int rewindDirectory() {
if (!isDir) return -1;
dirIndex = 0;
return 0;
}
bool close() { open = false; return true; }
FsFile openNextFile() {
if (!isDir || dirIndex >= dirEntries.size()) {
return FsFile();
}
FsFile f(dirEntries[dirIndex].c_str(), O_RDONLY);
dirIndex++;
return f;
}
bool isOpen() const { return open; }
operator bool() const { return isOpen(); }
};
#endif
class HalStorage {
public:
HalStorage();
@ -42,8 +110,9 @@ class HalStorage {
private:
static HalStorage instance;
bool initialized = false;
SdFat sd;
bool is_emulated = CROSSPOINT_EMULATED;
};
#define HAL_STORAGE HalStorage::getInstance()
#if CROSSPOINT_EMULATED == 1
#define SdMan HalStorage::getInstance()
#endif

View File

@ -4,6 +4,7 @@ services:
container_name: crosspoint-emulator
volumes:
- ../..:/app:ro
- ../../.sdcard:/sdcard:rw
tmpfs:
- /tmp
ports:
@ -31,4 +32,4 @@ services:
python3 -m pip install websockets --break-system-packages
cp /app/scripts/emulation/web_ui.html .
python3 /app/scripts/emulation/web_server.py
python3 /app/scripts/emulation/server.py

522
scripts/emulation/server.py Normal file
View File

@ -0,0 +1,522 @@
import asyncio
import base64
import json
import os
import re
import shutil
import signal
import sys
from http import HTTPStatus
from pathlib import Path
from typing import Set
import websockets
from websockets.http11 import Response
from websockets.server import WebSocketServerProtocol
"""
This script spawns a QEMU process, then forward its stdout and stderr to WebSocket clients.
"""
# SD card emulation directory
sdcard_path = os.environ.get("SDCARD_PATH", "/sdcard")
if not os.path.exists(sdcard_path):
os.makedirs(sdcard_path)
class SdCardHandler:
"""Handle SD card filesystem commands from the emulated device."""
def __init__(self, base_path: str):
self.base_path = base_path
def _resolve_path(self, path: str) -> str:
"""Resolve a device path to a host filesystem path."""
# Remove leading slash and join with base path
if path.startswith("/"):
path = path[1:]
return os.path.join(self.base_path, path)
def handle_fs_list(self, path: str, max_files: str = "200") -> list[str]:
"""List files in a directory. Returns list of filenames."""
resolved = self._resolve_path(path)
max_count = int(max_files) if max_files else 200
if not os.path.exists(resolved):
return []
if not os.path.isdir(resolved):
return []
try:
entries = []
for entry in os.listdir(resolved):
if len(entries) >= max_count:
break
full_path = os.path.join(resolved, entry)
# Append "/" for directories to match device behavior
if os.path.isdir(full_path):
entries.append(entry + "/")
else:
entries.append(entry)
return entries
except Exception as e:
print(f"[SdCard] Error listing {path}: {e}", file=sys.stderr)
return []
def handle_fs_read(self, path: str, offset: str = "0", length: str = "-1") -> str:
"""Read file contents. Returns base64-encoded data."""
resolved = self._resolve_path(path)
offset_int = int(offset) if offset else 0
length_int = int(length) if length else -1
if not os.path.exists(resolved) or os.path.isdir(resolved):
return ""
try:
with open(resolved, "rb") as f:
if offset_int > 0:
f.seek(offset_int)
if length_int == -1:
data = f.read()
else:
data = f.read(length_int)
return base64.b64encode(data).decode("ascii")
except Exception as e:
print(f"[SdCard] Error reading {path}: {e}", file=sys.stderr)
return ""
def handle_fs_stat(self, path: str) -> int:
"""Get file size. Returns -1 if not found, -2 if directory."""
resolved = self._resolve_path(path)
if not os.path.exists(resolved):
return -1
if os.path.isdir(resolved):
return -2
try:
return os.path.getsize(resolved)
except Exception as e:
print(f"[SdCard] Error stat {path}: {e}", file=sys.stderr)
return -1
def handle_fs_write(self, path: str, b64_data: str, offset: str = "0", inplace: str = "0") -> int:
"""Write data to file. Returns bytes written."""
resolved = self._resolve_path(path)
offset_int = int(offset) if offset else 0
is_inplace = inplace == "1"
try:
data = base64.b64decode(b64_data)
except Exception as e:
print(f"[SdCard] Error decoding base64 for {path}: {e}", file=sys.stderr)
return 0
try:
# Ensure parent directory exists
parent = os.path.dirname(resolved)
if parent and not os.path.exists(parent):
os.makedirs(parent)
if is_inplace and os.path.exists(resolved):
# In-place write at offset
with open(resolved, "r+b") as f:
f.seek(offset_int)
f.write(data)
else:
# Overwrite or create new file
mode = "r+b" if os.path.exists(resolved) and offset_int > 0 else "wb"
with open(resolved, mode) as f:
if offset_int > 0:
f.seek(offset_int)
f.write(data)
return len(data)
except Exception as e:
print(f"[SdCard] Error writing {path}: {e}", file=sys.stderr)
return 0
def handle_fs_mkdir(self, path: str) -> int:
"""Create directory. Returns 0 on success."""
resolved = self._resolve_path(path)
try:
os.makedirs(resolved, exist_ok=True)
return 0
except Exception as e:
print(f"[SdCard] Error mkdir {path}: {e}", file=sys.stderr)
return -1
def handle_fs_rm(self, path: str) -> int:
"""Remove file or directory. Returns 0 on success."""
resolved = self._resolve_path(path)
if not os.path.exists(resolved):
return -1
try:
if os.path.isdir(resolved):
shutil.rmtree(resolved)
else:
os.remove(resolved)
return 0
except Exception as e:
print(f"[SdCard] Error removing {path}: {e}", file=sys.stderr)
return -1
# Global SD card handler instance
sdcard_handler = SdCardHandler(sdcard_path)
# WebSocket clients
connected_clients: Set[WebSocketServerProtocol] = set()
# QEMU process
qemu_process: asyncio.subprocess.Process | None = None
# Command pattern: $$CMD_(COMMAND):(ARG0)[:(ARG1)][:(ARG2)][:(ARG3)]$$
CMD_PATTERN = re.compile(r'^\$\$CMD_([A-Z_]+):(.+)\$\$$')
def parse_command(message: str) -> tuple[str, list[str]] | None:
"""Parse a command message. Returns (command, args) or None if not a command."""
match = CMD_PATTERN.match(message)
if not match:
return None
command = match.group(1)
args_str = match.group(2)
# Split args by ':' but the last arg might contain ':'
args = args_str.split(':')
return (command, args)
async def send_response(response: str):
"""Send a response back to the QEMU process via stdin."""
global qemu_process
if qemu_process and qemu_process.stdin:
qemu_process.stdin.write((response + "\n").encode())
await qemu_process.stdin.drain()
LAST_DISPLAY_BUFFER = None
async def handle_command(command: str, args: list[str]) -> bool:
"""Handle a command from the device. Returns True if handled."""
global sdcard_handler, LAST_DISPLAY_BUFFER
try:
if command == "DISPLAY":
# Display command - no response needed
# arg0: base64-encoded buffer
print(f"[Emulator] DISPLAY command received (buffer size: {len(args[0]) if args else 0})", flush=True)
LAST_DISPLAY_BUFFER = args[0] if args else None
return True
elif command == "FS_LIST":
# arg0: path, arg1: max files (optional)
path = args[0] if len(args) > 0 else "/"
max_files = args[1] if len(args) > 1 else "200"
entries = sdcard_handler.handle_fs_list(path, max_files)
print(f"[Emulator] FS_LIST {path} -> {len(entries)} entries", flush=True)
# Send each entry as a line, then empty line to terminate
for entry in entries:
await send_response(entry)
await send_response("") # Empty line terminates the list
return True
elif command == "FS_READ":
# arg0: path, arg1: offset, arg2: length
path = args[0] if len(args) > 0 else ""
offset = args[1] if len(args) > 1 else "0"
length = args[2] if len(args) > 2 else "-1"
result = sdcard_handler.handle_fs_read(path, offset, length)
print(f"[Emulator] FS_READ {path} offset={offset} len={length} -> {len(result)} bytes (b64)", flush=True)
await send_response(result)
return True
elif command == "FS_STAT":
# arg0: path
path = args[0] if len(args) > 0 else ""
result = sdcard_handler.handle_fs_stat(path)
print(f"[Emulator] FS_STAT {path} -> {result}", flush=True)
await send_response(str(result))
return True
elif command == "FS_WRITE":
# arg0: path, arg1: base64-encoded data, arg2: offset, arg3: is inplace
path = args[0] if len(args) > 0 else ""
b64_data = args[1] if len(args) > 1 else ""
offset = args[2] if len(args) > 2 else "0"
inplace = args[3] if len(args) > 3 else "0"
result = sdcard_handler.handle_fs_write(path, b64_data, offset, inplace)
print(f"[Emulator] FS_WRITE {path} offset={offset} inplace={inplace} -> {result} bytes", flush=True)
await send_response(str(result))
return True
elif command == "FS_MKDIR":
# arg0: path
path = args[0] if len(args) > 0 else ""
result = sdcard_handler.handle_fs_mkdir(path)
print(f"[Emulator] FS_MKDIR {path} -> {result}", flush=True)
await send_response(str(result))
return True
elif command == "FS_RM":
# arg0: path
path = args[0] if len(args) > 0 else ""
result = sdcard_handler.handle_fs_rm(path)
print(f"[Emulator] FS_RM {path} -> {result}", flush=True)
await send_response(str(result))
return True
else:
print(f"[Emulator] Unknown command: {command}", flush=True)
return False
except Exception as e:
print(f"[Emulator] Error handling command {command}: {e}", file=sys.stderr)
return False
def process_message(message: str, for_ui: bool) -> str:
if message.startswith("$$CMD_"):
if for_ui:
return message
else:
# $$CMD_(COMMAND)[:(ARG0)][:(ARG1)][:(ARG2)]$$
command = message[2:].split(':')[0]
return "[Emulator] Received command: " + command
return message
async def broadcast_message(msg_type: str, data: str):
"""Broadcast a message to all connected WebSocket clients."""
if not connected_clients:
return
message = json.dumps({"type": msg_type, "data": data})
# Send to all clients, remove disconnected ones
disconnected = set()
for client in connected_clients:
try:
await client.send(message)
except websockets.exceptions.ConnectionClosed:
disconnected.add(client)
connected_clients.difference_update(disconnected)
async def read_stream(stream: asyncio.StreamReader, stream_type: str):
"""Read from a stream line by line and broadcast to clients."""
buffer = b""
while True:
try:
chunk = await stream.read(1024)
if not chunk:
break
buffer += chunk
# Process complete lines
while b"\n" in buffer:
line, buffer = buffer.split(b"\n", 1)
try:
decoded_line = line.decode("utf-8", errors="replace")
except Exception:
decoded_line = line.decode("latin-1", errors="replace")
# Check if this is a command that needs handling
parsed = parse_command(decoded_line)
if parsed:
command, args = parsed
await handle_command(command, args)
# Still broadcast to UI for visibility
await broadcast_message(stream_type, decoded_line)
continue
# Forward to parent process
if stream_type == "stdout":
print(process_message(decoded_line, for_ui=False), flush=True)
else:
print(process_message(decoded_line, for_ui=False), file=sys.stderr, flush=True)
# Broadcast to WebSocket clients
await broadcast_message(stream_type, process_message(decoded_line, for_ui=True))
except Exception as e:
print(f"Error reading {stream_type}: {e}", file=sys.stderr)
break
# Process remaining buffer
if buffer:
try:
decoded_line = buffer.decode("utf-8", errors="replace")
except Exception:
decoded_line = buffer.decode("latin-1", errors="replace")
if stream_type == "stdout":
print(decoded_line, flush=True)
else:
print(decoded_line, file=sys.stderr, flush=True)
await broadcast_message(stream_type, decoded_line)
async def spawn_qemu():
"""Spawn the QEMU process and capture its output."""
global qemu_process
# Build the command
cmd = [
"qemu-system-riscv32",
"-nographic",
"-M", "esp32c3",
"-drive", "file=flash.bin,if=mtd,format=raw",
"-global", "driver=timer.esp32c3.timg,property=wdt_disable,value=true", # got panic if we don't disable WDT, why?
]
# Get working directory from environment or use /tmp
work_dir = os.getcwd()
print(f"Starting QEMU in {work_dir}...", flush=True)
print(f"Command: {' '.join(cmd)}", flush=True)
try:
qemu_process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE,
cwd=work_dir,
env=os.environ.copy()
)
# Read stdout and stderr concurrently
await asyncio.gather(
read_stream(qemu_process.stdout, "stdout"),
read_stream(qemu_process.stderr, "stderr")
)
# Wait for process to complete
await qemu_process.wait()
print(f"QEMU process exited with code {qemu_process.returncode}", flush=True)
except FileNotFoundError:
print("Error: qemu-system-riscv32 not found. Make sure it's in PATH.", file=sys.stderr)
await broadcast_message("stderr", "Error: qemu-system-riscv32 not found")
except Exception as e:
print(f"Error spawning QEMU: {e}", file=sys.stderr)
await broadcast_message("stderr", f"Error spawning QEMU: {e}")
async def websocket_handler(websocket: WebSocketServerProtocol):
"""Handle a WebSocket connection."""
connected_clients.add(websocket)
print(f"Client connected. Total clients: {len(connected_clients)}", flush=True)
try:
# Send a welcome message
await websocket.send(json.dumps({
"type": "info",
"data": "Connected to CrossPoint emulator"
}))
# Send the last display buffer if available
if LAST_DISPLAY_BUFFER is not None:
await websocket.send(json.dumps({
"type": "stdout",
"data": f"$$CMD_DISPLAY:{LAST_DISPLAY_BUFFER}$$"
}))
# Handle incoming messages (for stdin forwarding)
async for message in websocket:
try:
data = json.loads(message)
if data.get("type") == "stdin" and qemu_process and qemu_process.stdin:
input_data = data.get("data", "")
qemu_process.stdin.write((input_data + "\n").encode())
await qemu_process.stdin.drain()
except json.JSONDecodeError:
pass
except Exception as e:
print(f"Error handling client message: {e}", file=sys.stderr)
except websockets.exceptions.ConnectionClosed:
pass
finally:
connected_clients.discard(websocket)
print(f"Client disconnected. Total clients: {len(connected_clients)}", flush=True)
async def main():
"""Main entry point."""
host = os.environ.get("HOST", "0.0.0.0")
port = int(os.environ.get("PORT", "8090"))
print(f"Starting WebSocket server on {host}:{port}", flush=True)
# Start WebSocket server
async with websockets.serve(
websocket_handler, host, port,
process_request=process_request,
origins=None,
):
print(f"WebSocket server running on ws://{host}:{port}/ws", flush=True)
print(f"Web UI available at http://{host}:{port}/", flush=True)
# Spawn QEMU process
await spawn_qemu()
def signal_handler(signum, frame):
"""Handle shutdown signals."""
print("\nShutting down...", flush=True)
if qemu_process:
qemu_process.terminate()
sys.exit(0)
def process_request(connection, request):
"""Handle HTTP requests for serving static files."""
path = request.path
if path == "/" or path == "/web_ui.html":
# Serve the web_ui.html file
html_path = Path(__file__).parent / "web_ui.html"
try:
content = html_path.read_bytes()
return Response(
HTTPStatus.OK,
"OK",
websockets.Headers([
("Content-Type", "text/html; charset=utf-8"),
("Content-Length", str(len(content))),
]),
content
)
except FileNotFoundError:
return Response(HTTPStatus.NOT_FOUND, "Not Found", websockets.Headers(), b"web_ui.html not found")
if path == "/ws":
# Return None to continue with WebSocket handshake
return None
# Return 404 for other paths
return Response(HTTPStatus.NOT_FOUND, "Not Found", websockets.Headers(), b"Not found")
if __name__ == "__main__":
# Set up signal handlers
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Run the main loop
asyncio.run(main())

View File

@ -1,242 +0,0 @@
import asyncio
import json
import os
import signal
import sys
from http import HTTPStatus
from pathlib import Path
from typing import Set
import websockets
from websockets.http11 import Response
from websockets.server import WebSocketServerProtocol
"""
This script spawns a QEMU process, then forward its stdout and stderr to WebSocket clients.
"""
# WebSocket clients
connected_clients: Set[WebSocketServerProtocol] = set()
# QEMU process
qemu_process: asyncio.subprocess.Process | None = None
def process_message(message: str, for_ui: bool) -> str:
if message.startswith("$$DATA:DISPLAY:"):
if for_ui:
return message
else:
return f"[DISPLAY DATA]"
return message
async def broadcast_message(msg_type: str, data: str):
"""Broadcast a message to all connected WebSocket clients."""
if not connected_clients:
return
message = json.dumps({"type": msg_type, "data": data})
# Send to all clients, remove disconnected ones
disconnected = set()
for client in connected_clients:
try:
await client.send(message)
except websockets.exceptions.ConnectionClosed:
disconnected.add(client)
connected_clients.difference_update(disconnected)
async def read_stream(stream: asyncio.StreamReader, stream_type: str):
"""Read from a stream line by line and broadcast to clients."""
buffer = b""
while True:
try:
chunk = await stream.read(1024)
if not chunk:
break
buffer += chunk
# Process complete lines
while b"\n" in buffer:
line, buffer = buffer.split(b"\n", 1)
try:
decoded_line = line.decode("utf-8", errors="replace")
except Exception:
decoded_line = line.decode("latin-1", errors="replace")
# Forward to parent process
if stream_type == "stdout":
print(process_message(decoded_line, for_ui=False), flush=True)
else:
print(process_message(decoded_line, for_ui=False), file=sys.stderr, flush=True)
# Broadcast to WebSocket clients
await broadcast_message(stream_type, process_message(decoded_line, for_ui=True))
except Exception as e:
print(f"Error reading {stream_type}: {e}", file=sys.stderr)
break
# Process remaining buffer
if buffer:
try:
decoded_line = buffer.decode("utf-8", errors="replace")
except Exception:
decoded_line = buffer.decode("latin-1", errors="replace")
if stream_type == "stdout":
print(decoded_line, flush=True)
else:
print(decoded_line, file=sys.stderr, flush=True)
await broadcast_message(stream_type, decoded_line)
async def spawn_qemu():
"""Spawn the QEMU process and capture its output."""
global qemu_process
# Build the command
cmd = [
"qemu-system-riscv32",
"-nographic",
"-M", "esp32c3",
"-drive", "file=flash.bin,if=mtd,format=raw"
]
# Get working directory from environment or use /tmp
work_dir = os.getcwd()
print(f"Starting QEMU in {work_dir}...", flush=True)
print(f"Command: {' '.join(cmd)}", flush=True)
try:
qemu_process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE,
cwd=work_dir,
env=os.environ.copy()
)
# Read stdout and stderr concurrently
await asyncio.gather(
read_stream(qemu_process.stdout, "stdout"),
read_stream(qemu_process.stderr, "stderr")
)
# Wait for process to complete
await qemu_process.wait()
print(f"QEMU process exited with code {qemu_process.returncode}", flush=True)
except FileNotFoundError:
print("Error: qemu-system-riscv32 not found. Make sure it's in PATH.", file=sys.stderr)
await broadcast_message("stderr", "Error: qemu-system-riscv32 not found")
except Exception as e:
print(f"Error spawning QEMU: {e}", file=sys.stderr)
await broadcast_message("stderr", f"Error spawning QEMU: {e}")
async def websocket_handler(websocket: WebSocketServerProtocol):
"""Handle a WebSocket connection."""
connected_clients.add(websocket)
print(f"Client connected. Total clients: {len(connected_clients)}", flush=True)
try:
# Send a welcome message
await websocket.send(json.dumps({
"type": "info",
"data": "Connected to CrossPoint emulator"
}))
# Handle incoming messages (for stdin forwarding)
async for message in websocket:
try:
data = json.loads(message)
if data.get("type") == "stdin" and qemu_process and qemu_process.stdin:
input_data = data.get("data", "")
qemu_process.stdin.write((input_data + "\n").encode())
await qemu_process.stdin.drain()
except json.JSONDecodeError:
pass
except Exception as e:
print(f"Error handling client message: {e}", file=sys.stderr)
except websockets.exceptions.ConnectionClosed:
pass
finally:
connected_clients.discard(websocket)
print(f"Client disconnected. Total clients: {len(connected_clients)}", flush=True)
async def main():
"""Main entry point."""
host = os.environ.get("HOST", "0.0.0.0")
port = int(os.environ.get("PORT", "8090"))
print(f"Starting WebSocket server on {host}:{port}", flush=True)
# Start WebSocket server
async with websockets.serve(
websocket_handler, host, port,
process_request=process_request,
origins=None,
):
print(f"WebSocket server running on ws://{host}:{port}/ws", flush=True)
print(f"Web UI available at http://{host}:{port}/", flush=True)
# Spawn QEMU process
await spawn_qemu()
def signal_handler(signum, frame):
"""Handle shutdown signals."""
print("\nShutting down...", flush=True)
if qemu_process:
qemu_process.terminate()
sys.exit(0)
def process_request(connection, request):
"""Handle HTTP requests for serving static files."""
path = request.path
if path == "/" or path == "/web_ui.html":
# Serve the web_ui.html file
html_path = Path(__file__).parent / "web_ui.html"
try:
content = html_path.read_bytes()
return Response(
HTTPStatus.OK,
"OK",
websockets.Headers([
("Content-Type", "text/html; charset=utf-8"),
("Content-Length", str(len(content))),
]),
content
)
except FileNotFoundError:
return Response(HTTPStatus.NOT_FOUND, "Not Found", websockets.Headers(), b"web_ui.html not found")
if path == "/ws":
# Return None to continue with WebSocket handshake
return None
# Return 404 for other paths
return Response(HTTPStatus.NOT_FOUND, "Not Found", websockets.Headers(), b"Not found")
if __name__ == "__main__":
# Set up signal handlers
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Run the main loop
asyncio.run(main())

View File

@ -8,13 +8,85 @@
body {
font-family: monospace;
margin: 0;
height: 100vh;
background-color: #bbb;
display: flex;
justify-content: center;
}
button {
font-size: 1.2em;
padding: 0.5em 1em;
margin: 0.2em;
}
.container {
display: flex;
flex-direction: row;
justify-content: center;
align-items: center;
width: 100%;
height: 100%;
max-width: 1200px;
}
.display, .control {
width: 50%;
height: 95vh;
display: flex;
justify-content: center;
align-items: center;
padding-left: 2em;
padding-right: 2em;
}
.display > canvas {
max-height: 100%;
}
.control {
flex-direction: column;
gap: 1em;
}
.control > .ctrl {
padding-top: 2em;
padding-bottom: 2em;
}
.control > #output {
height: 100%;
width: 100%;
padding: 10px;
overflow-y: scroll;
border: 1px solid #555;
}
</style>
</head>
<body>
<canvas id="screen" width="480" height="800" style="border:1px solid #555;"></canvas>
<div class="container">
<div class="display">
<!-- Display -->
<canvas id="screen" width="480" height="800" style="border:1px solid #555;"></canvas>
</div>
<div id="output"></div>
<div class="control">
<!-- Control + Log -->
<div class="ctrl">
<button id="btn0">Btn 0</button>
<button id="btn1">Btn 1</button>
&nbsp;&nbsp;&nbsp;
<button id="btn2">Btn 2</button>
<button id="btn3">Btn 3</button>
&nbsp;&nbsp;&nbsp;
<button id="btnU">Btn Up</button>
<button id="btnD">Btn Down</button>
&nbsp;&nbsp;&nbsp;
<button id="btnP">Btn Power</button>
</div>
<div id="output"></div>
</div>
</div>
<script>
const output = document.getElementById('output');
@ -23,12 +95,16 @@
let ws = null;
const MAX_LOG_LINES = 1000;
function appendLog(type, message) {
const line = document.createElement('div');
line.className = type;
line.textContent = message;
output.appendChild(line);
output.scrollTop = output.scrollHeight;
output.scrollTop = output.scrollHeight; // TODO: only scroll if already at bottom
while (output.childElementCount > MAX_LOG_LINES) {
output.removeChild(output.firstChild);
}
}
function drawScreen(b64Data) {
@ -92,13 +168,13 @@
ws.onmessage = (event) => {
try {
const msg = JSON.parse(event.data);
if (msg.data.startsWith('$$DATA:DISPLAY:')) {
// Parse the display data: $$DATA:DISPLAY:{"mode":X,"buffer":"..."}$$
const jsonStart = msg.data.indexOf('{');
const jsonEnd = msg.data.lastIndexOf('}');
if (jsonStart !== -1 && jsonEnd !== -1) {
const displayData = JSON.parse(msg.data.substring(jsonStart, jsonEnd + 1));
drawScreen(displayData.buffer);
if (msg.data.startsWith('$$CMD_')) {
// $$CMD_(COMMAND)[:(ARG0)][:(ARG1)][:(ARG2)]$$
const parts = msg.data.slice(2, -2).split(':');
const command = parts[0];
appendLog('cmd', `Received command: ${command}`);
if (command === 'CMD_DISPLAY') {
drawScreen(parts[1]);
}
} else {
appendLog(msg.type, msg.data);
@ -114,7 +190,13 @@
};
}
setTimeout(connect, 100);
window.onload = () => {
connect();
// fill canvas with black initially
ctx.fillStyle = 'white';
ctx.fillRect(0, 0, screen.width, screen.height);
};
</script>
</body>
</html>

View File

@ -190,6 +190,9 @@ void verifyWakeupLongPress() {
}
void waitForPowerRelease() {
if (CROSSPOINT_EMULATED) {
return;
}
inputManager.update();
while (inputManager.isPressed(InputManager::BTN_POWER)) {
delay(50);
@ -282,6 +285,9 @@ bool isUsbConnected() {
}
bool isWakeupAfterFlashing() {
if (CROSSPOINT_EMULATED) {
return true;
}
const auto wakeupCause = esp_sleep_get_wakeup_cause();
const auto resetReason = esp_reset_reason();