refactor original method as the prior commit broke connecting

This commit is contained in:
Justin Mitchell 2026-01-14 14:28:19 -05:00
parent dce9f7dd4d
commit 53592e58b2
3 changed files with 236 additions and 332 deletions

View File

@ -48,7 +48,6 @@ lib_deps =
ArduinoJson @ 7.4.2 ArduinoJson @ 7.4.2
QRCode @ 0.0.1 QRCode @ 0.0.1
links2004/WebSockets @ ^2.4.1 links2004/WebSockets @ ^2.4.1
mathieucarbou/AsyncTCP @ ^3.2.14
[env:default] [env:default]
extends = base extends = base

View File

@ -21,15 +21,15 @@ void CalibreWirelessActivity::displayTaskTrampoline(void* param) {
static_cast<CalibreWirelessActivity*>(param)->displayTaskLoop(); static_cast<CalibreWirelessActivity*>(param)->displayTaskLoop();
} }
void CalibreWirelessActivity::discoveryTaskTrampoline(void* param) { void CalibreWirelessActivity::networkTaskTrampoline(void* param) {
static_cast<CalibreWirelessActivity*>(param)->discoveryTaskLoop(); static_cast<CalibreWirelessActivity*>(param)->networkTaskLoop();
} }
void CalibreWirelessActivity::onEnter() { void CalibreWirelessActivity::onEnter() {
Activity::onEnter(); Activity::onEnter();
renderingMutex = xSemaphoreCreateMutex(); renderingMutex = xSemaphoreCreateMutex();
dataMutex = xSemaphoreCreateMutex(); stateMutex = xSemaphoreCreateMutex();
state = WirelessState::DISCOVERING; state = WirelessState::DISCOVERING;
statusMessage = "Discovering Calibre..."; statusMessage = "Discovering Calibre...";
@ -54,11 +54,8 @@ void CalibreWirelessActivity::onEnter() {
udp.begin(LOCAL_UDP_PORT); udp.begin(LOCAL_UDP_PORT);
// Create display task
xTaskCreate(&CalibreWirelessActivity::displayTaskTrampoline, "CalDisplayTask", 2048, this, 1, &displayTaskHandle); xTaskCreate(&CalibreWirelessActivity::displayTaskTrampoline, "CalDisplayTask", 2048, this, 1, &displayTaskHandle);
xTaskCreate(&CalibreWirelessActivity::networkTaskTrampoline, "CalNetworkTask", 12288, this, 2, &networkTaskHandle);
// Create discovery task (UDP is synchronous)
xTaskCreate(&CalibreWirelessActivity::discoveryTaskTrampoline, "CalDiscoveryTask", 4096, this, 2, &discoveryTaskHandle);
} }
void CalibreWirelessActivity::onExit() { void CalibreWirelessActivity::onExit() {
@ -67,18 +64,14 @@ void CalibreWirelessActivity::onExit() {
shouldExit = true; shouldExit = true;
vTaskDelay(50 / portTICK_PERIOD_MS); vTaskDelay(50 / portTICK_PERIOD_MS);
// Close async TCP client if (tcpClient.connected()) {
if (tcpClient) { tcpClient.stop();
tcpClient->close(true);
delete tcpClient;
tcpClient = nullptr;
} }
udp.stop(); udp.stop();
vTaskDelay(100 / portTICK_PERIOD_MS);
// Tasks will self-delete when they see shouldExit vTaskDelay(250 / portTICK_PERIOD_MS);
discoveryTaskHandle = nullptr;
networkTaskHandle = nullptr;
displayTaskHandle = nullptr; displayTaskHandle = nullptr;
WiFi.mode(WIFI_OFF); WiFi.mode(WIFI_OFF);
@ -97,9 +90,9 @@ void CalibreWirelessActivity::onExit() {
renderingMutex = nullptr; renderingMutex = nullptr;
} }
if (dataMutex) { if (stateMutex) {
vSemaphoreDelete(dataMutex); vSemaphoreDelete(stateMutex);
dataMutex = nullptr; stateMutex = nullptr;
} }
} }
@ -125,248 +118,136 @@ void CalibreWirelessActivity::displayTaskLoop() {
vTaskDelete(nullptr); vTaskDelete(nullptr);
} }
void CalibreWirelessActivity::discoveryTaskLoop() { void CalibreWirelessActivity::networkTaskLoop() {
while (!shouldExit && state == WirelessState::DISCOVERING) { while (!shouldExit) {
// Broadcast "hello" on all UDP discovery ports xSemaphoreTake(stateMutex, portMAX_DELAY);
for (const uint16_t port : UDP_PORTS) { const auto currentState = state;
udp.beginPacket("255.255.255.255", port); xSemaphoreGive(stateMutex);
udp.write(reinterpret_cast<const uint8_t*>("hello"), 5);
udp.endPacket();
}
vTaskDelay(500 / portTICK_PERIOD_MS);
if (shouldExit) break; if (shouldExit) break;
const int packetSize = udp.parsePacket(); switch (currentState) {
if (packetSize > 0) { case WirelessState::DISCOVERING:
char buffer[256]; listenForDiscovery();
const int len = udp.read(buffer, sizeof(buffer) - 1); break;
if (len > 0) { case WirelessState::CONNECTING:
buffer[len] = '\0'; case WirelessState::WAITING:
std::string response(buffer); case WirelessState::RECEIVING:
handleTcpClient();
// Parse Calibre response: "calibre wireless device client (on HOSTNAME);PORT,ALT_PORT" break;
size_t onPos = response.find("(on "); case WirelessState::COMPLETE:
size_t closePos = response.find(')'); case WirelessState::DISCONNECTED:
size_t semiPos = response.find(';'); case WirelessState::ERROR:
size_t commaPos = response.find(',', semiPos); vTaskDelay(100 / portTICK_PERIOD_MS);
break;
if (semiPos != std::string::npos) {
std::string portStr;
if (commaPos != std::string::npos && commaPos > semiPos) {
portStr = response.substr(semiPos + 1, commaPos - semiPos - 1);
uint16_t altPort = 0;
for (size_t i = commaPos + 1; i < response.size(); i++) {
char c = response[i];
if (c >= '0' && c <= '9') {
altPort = altPort * 10 + (c - '0');
} else {
break;
}
}
calibreAltPort = altPort;
} else {
portStr = response.substr(semiPos + 1);
}
uint16_t mainPort = 0;
for (char c : portStr) {
if (c >= '0' && c <= '9') {
mainPort = mainPort * 10 + (c - '0');
} else if (c != ' ' && c != '\t') {
break;
}
}
calibrePort = mainPort;
if (onPos != std::string::npos && closePos != std::string::npos && closePos > onPos + 4) {
calibreHostname = response.substr(onPos + 4, closePos - onPos - 4);
}
}
calibreHost = udp.remoteIP().toString().c_str();
if (calibreHostname.empty()) {
calibreHostname = calibreHost;
}
if (calibrePort > 0) {
Serial.printf("[%lu] [CAL] Discovered Calibre at %s:%d (alt:%d)\n", millis(), calibreHost.c_str(), calibrePort,
calibreAltPort);
setState(WirelessState::CONNECTING);
setStatus("Connecting to " + calibreHostname + "...");
connectToCalibr();
}
}
} }
vTaskDelay(10 / portTICK_PERIOD_MS);
} }
vTaskDelete(nullptr); vTaskDelete(nullptr);
} }
void CalibreWirelessActivity::connectToCalibr() { void CalibreWirelessActivity::listenForDiscovery() {
Serial.printf("[%lu] [CAL] connectToCalibr called\n", millis()); for (const uint16_t port : UDP_PORTS) {
udp.beginPacket("255.255.255.255", port);
if (tcpClient) { udp.write(reinterpret_cast<const uint8_t*>("hello"), 5);
tcpClient->close(true); udp.endPacket();
delete tcpClient;
tcpClient = nullptr;
} }
tcpClient = new AsyncClient(); vTaskDelay(500 / portTICK_PERIOD_MS);
if (!tcpClient) {
Serial.printf("[%lu] [CAL] Failed to create AsyncClient\n", millis());
setState(WirelessState::DISCOVERING);
return;
}
// Set up callbacks with lambdas that call our member functions const int packetSize = udp.parsePacket();
tcpClient->onConnect( if (packetSize > 0) {
[](void* arg, AsyncClient* client) { char buffer[256];
Serial.printf("[%lu] [CAL] onConnect callback fired\n", millis()); const int len = udp.read(buffer, sizeof(buffer) - 1);
static_cast<CalibreWirelessActivity*>(arg)->onTcpConnect(client); if (len > 0) {
}, buffer[len] = '\0';
this); std::string response(buffer);
tcpClient->onDisconnect( size_t onPos = response.find("(on ");
[](void* arg, AsyncClient* client) { size_t closePos = response.find(')');
Serial.printf("[%lu] [CAL] onDisconnect callback fired\n", millis()); size_t semiPos = response.find(';');
static_cast<CalibreWirelessActivity*>(arg)->onTcpDisconnect(client); size_t commaPos = response.find(',', semiPos);
},
this);
tcpClient->onData( if (semiPos != std::string::npos) {
[](void* arg, AsyncClient* client, void* data, size_t len) { std::string portStr;
static_cast<CalibreWirelessActivity*>(arg)->onTcpData(client, data, len); if (commaPos != std::string::npos && commaPos > semiPos) {
}, portStr = response.substr(semiPos + 1, commaPos - semiPos - 1);
this); uint16_t altPort = 0;
for (size_t i = commaPos + 1; i < response.size(); i++) {
char c = response[i];
if (c >= '0' && c <= '9') {
altPort = altPort * 10 + (c - '0');
} else {
break;
}
}
calibreAltPort = altPort;
} else {
portStr = response.substr(semiPos + 1);
}
tcpClient->onError( uint16_t mainPort = 0;
[](void* arg, AsyncClient* client, int8_t error) { for (char c : portStr) {
Serial.printf("[%lu] [CAL] onError callback fired: %d\n", millis(), error); if (c >= '0' && c <= '9') {
static_cast<CalibreWirelessActivity*>(arg)->onTcpError(client, error); mainPort = mainPort * 10 + (c - '0');
}, } else if (c != ' ' && c != '\t') {
this); break;
}
}
calibrePort = mainPort;
// Use IPAddress explicitly to avoid any DNS resolution issues if (onPos != std::string::npos && closePos != std::string::npos && closePos > onPos + 4) {
IPAddress ip; calibreHostname = response.substr(onPos + 4, closePos - onPos - 4);
if (!ip.fromString(calibreHost.c_str())) { }
Serial.printf("[%lu] [CAL] Failed to parse IP: %s\n", millis(), calibreHost.c_str()); }
setState(WirelessState::DISCOVERING);
return; calibreHost = udp.remoteIP().toString().c_str();
} if (calibreHostname.empty()) {
calibreHostname = calibreHost;
Serial.printf("[%lu] [CAL] Attempting connect to %s:%d\n", millis(), ip.toString().c_str(), calibrePort); }
bool connectResult = tcpClient->connect(ip, calibrePort);
Serial.printf("[%lu] [CAL] connect() returned %s\n", millis(), connectResult ? "true" : "false"); if (calibrePort > 0) {
setState(WirelessState::CONNECTING);
if (!connectResult) { setStatus("Connecting to " + calibreHostname + "...");
// Try alternative port
if (calibreAltPort > 0) { vTaskDelay(100 / portTICK_PERIOD_MS);
Serial.printf("[%lu] [CAL] Trying alt port %d\n", millis(), calibreAltPort);
connectResult = tcpClient->connect(ip, calibreAltPort); Serial.printf("[%lu] [CAL] Connecting to %s:%d\n", millis(), calibreHost.c_str(), calibrePort);
Serial.printf("[%lu] [CAL] alt connect() returned %s\n", millis(), connectResult ? "true" : "false"); if (tcpClient.connect(calibreHost.c_str(), calibrePort, 5000)) {
if (!connectResult) { Serial.printf("[%lu] [CAL] Connected!\n", millis());
setState(WirelessState::DISCOVERING); setState(WirelessState::WAITING);
setStatus("Discovering Calibre...\n(Connection failed, retrying)"); setStatus("Connected to " + calibreHostname + "\nWaiting for commands...");
} else if (calibreAltPort > 0 && tcpClient.connect(calibreHost.c_str(), calibreAltPort, 5000)) {
Serial.printf("[%lu] [CAL] Connected on alt port!\n", millis());
setState(WirelessState::WAITING);
setStatus("Connected to " + calibreHostname + "\nWaiting for commands...");
} else {
Serial.printf("[%lu] [CAL] Connection failed\n", millis());
setState(WirelessState::DISCOVERING);
setStatus("Discovering Calibre...\n(Connection failed, retrying)");
calibrePort = 0;
calibreAltPort = 0;
}
} }
} else {
setState(WirelessState::DISCOVERING);
setStatus("Discovering Calibre...\n(Connection failed, retrying)");
} }
} }
// If connect() returned true, connection is in progress - wait for callbacks
} }
void CalibreWirelessActivity::onTcpConnect(AsyncClient* client) { void CalibreWirelessActivity::handleTcpClient() {
Serial.printf("[%lu] [CAL] Connected to Calibre\n", millis()); if (!tcpClient.connected()) {
setState(WirelessState::WAITING);
setStatus("Connected to " + calibreHostname + "\nWaiting for commands...");
}
void CalibreWirelessActivity::onTcpDisconnect(AsyncClient* client) {
Serial.printf("[%lu] [CAL] Disconnected from Calibre\n", millis());
if (state != WirelessState::ERROR) {
setState(WirelessState::DISCONNECTED); setState(WirelessState::DISCONNECTED);
setStatus("Calibre disconnected"); setStatus("Calibre disconnected");
return;
} }
}
void CalibreWirelessActivity::onTcpError(AsyncClient* client, int8_t error) {
Serial.printf("[%lu] [CAL] TCP error: %d\n", millis(), error);
setError("Connection error");
}
void CalibreWirelessActivity::onTcpData(AsyncClient* client, void* data, size_t len) {
// This is the key callback - data arrives here like KOReader's receiveCallback
const char* charData = static_cast<const char*>(data);
Serial.printf("[%lu] [CAL] Received %zu bytes\n", millis(), len);
if (inBinaryMode) { if (inBinaryMode) {
processBinaryData(charData, len); receiveBinaryData();
} else { return;
// Append to buffer and process JSON messages
xSemaphoreTake(dataMutex, portMAX_DELAY);
recvBuffer.append(charData, len);
xSemaphoreGive(dataMutex);
processJsonData();
}
}
void CalibreWirelessActivity::processBinaryData(const char* data, size_t len) {
// Like KOReader: write only what we need, put excess in buffer
size_t toWrite = std::min(len, binaryBytesRemaining);
if (toWrite > 0) {
currentFile.write(reinterpret_cast<const uint8_t*>(data), toWrite);
bytesReceived += toWrite;
binaryBytesRemaining -= toWrite;
updateRequired = true;
// Progress logging
static unsigned long lastLog = 0;
unsigned long now = millis();
if (now - lastLog > 500) {
Serial.printf("[%lu] [CAL] Binary: %zu/%zu bytes (%.1f%%)\n", now, bytesReceived, currentFileSize,
currentFileSize > 0 ? (100.0 * bytesReceived / currentFileSize) : 0.0);
lastLog = now;
}
} }
// If we received more than needed, it's the next JSON message std::string message;
if (len > toWrite) { if (readJsonMessage(message)) {
size_t excess = len - toWrite;
xSemaphoreTake(dataMutex, portMAX_DELAY);
recvBuffer.assign(data + toWrite, excess);
xSemaphoreGive(dataMutex);
Serial.printf("[%lu] [CAL] Binary complete, %zu excess bytes buffered\n", millis(), excess);
}
// Check if binary transfer is complete
if (binaryBytesRemaining == 0) {
currentFile.flush();
currentFile.close();
inBinaryMode = false;
Serial.printf("[%lu] [CAL] File complete: %zu bytes\n", millis(), bytesReceived);
setState(WirelessState::WAITING);
setStatus("Received: " + currentFilename + "\nWaiting for more...");
// Process any buffered JSON data
if (!recvBuffer.empty()) {
processJsonData();
}
}
}
void CalibreWirelessActivity::processJsonData() {
// Process JSON messages from buffer (like KOReader's onReceiveJSON)
while (true) {
std::string message;
if (!parseJsonMessage(message)) {
break; // Need more data
}
// Parse opcode from JSON array format: [opcode, {...}]
size_t start = message.find('['); size_t start = message.find('[');
if (start != std::string::npos) { if (start != std::string::npos) {
start++; start++;
@ -384,15 +265,12 @@ void CalibreWirelessActivity::processJsonData() {
if (opcodeInt >= 0 && opcodeInt <= OpCode::ERROR) { if (opcodeInt >= 0 && opcodeInt <= OpCode::ERROR) {
auto opcode = static_cast<OpCode>(opcodeInt); auto opcode = static_cast<OpCode>(opcodeInt);
// Extract data object
size_t dataStart = end + 1; size_t dataStart = end + 1;
size_t dataEnd = message.rfind(']'); size_t dataEnd = message.rfind(']');
std::string data; std::string data;
if (dataEnd != std::string::npos && dataEnd > dataStart) { if (dataEnd != std::string::npos && dataEnd > dataStart) {
data = message.substr(dataStart, dataEnd - dataStart); data = message.substr(dataStart, dataEnd - dataStart);
} }
handleCommand(opcode, data); handleCommand(opcode, data);
} }
} }
@ -400,57 +278,74 @@ void CalibreWirelessActivity::processJsonData() {
} }
} }
bool CalibreWirelessActivity::parseJsonMessage(std::string& message) { bool CalibreWirelessActivity::readJsonMessage(std::string& message) {
constexpr size_t MAX_BUFFERED_MSG_SIZE = 32768; constexpr size_t MAX_BUFFERED_MSG_SIZE = 32768;
xSemaphoreTake(dataMutex, portMAX_DELAY);
// Handle skip mode for large messages // Handle skip mode for large messages
if (inSkipMode) { if (inSkipMode) {
if (recvBuffer.size() >= skipBytesRemaining) { while (skipBytesRemaining > 0 && tcpClient.available() > 0) {
recvBuffer = recvBuffer.substr(skipBytesRemaining); uint8_t discardBuf[1024];
skipBytesRemaining = 0; size_t toRead = std::min({static_cast<size_t>(tcpClient.available()), sizeof(discardBuf), skipBytesRemaining});
inSkipMode = false; int bytesRead = tcpClient.read(discardBuf, toRead);
if (bytesRead > 0) {
skipBytesRemaining -= bytesRead;
} else {
break;
}
}
if (skipBytesRemaining == 0) {
inSkipMode = false;
if (skipOpcode == OpCode::SEND_BOOK && !skipExtractedLpath.empty() && skipExtractedLength > 0) { if (skipOpcode == OpCode::SEND_BOOK && !skipExtractedLpath.empty() && skipExtractedLength > 0) {
message = "[" + std::to_string(skipOpcode) + ",{\"lpath\":\"" + skipExtractedLpath + message = "[" + std::to_string(skipOpcode) + ",{\"lpath\":\"" + skipExtractedLpath +
"\",\"length\":" + std::to_string(skipExtractedLength) + "}]"; "\",\"length\":" + std::to_string(skipExtractedLength) + "}]";
skipOpcode = -1; skipOpcode = -1;
skipExtractedLpath.clear(); skipExtractedLpath.clear();
skipExtractedLength = 0; skipExtractedLength = 0;
xSemaphoreGive(dataMutex);
return true; return true;
} }
if (skipOpcode >= 0) { if (skipOpcode >= 0) {
message = "[" + std::to_string(skipOpcode) + ",{}]"; message = "[" + std::to_string(skipOpcode) + ",{}]";
skipOpcode = -1; skipOpcode = -1;
xSemaphoreGive(dataMutex);
return true; return true;
} }
} else { }
skipBytesRemaining -= recvBuffer.size(); return false;
recvBuffer.clear(); }
xSemaphoreGive(dataMutex);
return false; // Read available data into buffer
int available = tcpClient.available();
if (available > 0) {
size_t maxBuffer = MAX_BUFFERED_MSG_SIZE + 20;
if (recvBuffer.size() < maxBuffer) {
char buf[1024];
size_t spaceLeft = maxBuffer - recvBuffer.size();
while (available > 0 && spaceLeft > 0) {
int toRead = std::min({available, static_cast<int>(sizeof(buf)), static_cast<int>(spaceLeft)});
int bytesRead = tcpClient.read(reinterpret_cast<uint8_t*>(buf), toRead);
if (bytesRead > 0) {
recvBuffer.append(buf, bytesRead);
available -= bytesRead;
spaceLeft -= bytesRead;
} else {
break;
}
}
} }
} }
if (recvBuffer.empty()) { if (recvBuffer.empty()) {
xSemaphoreGive(dataMutex);
return false; return false;
} }
// Find '[' which marks JSON start
size_t bracketPos = recvBuffer.find('['); size_t bracketPos = recvBuffer.find('[');
if (bracketPos == std::string::npos) { if (bracketPos == std::string::npos) {
if (recvBuffer.size() > 1000) { if (recvBuffer.size() > 1000) {
recvBuffer.clear(); recvBuffer.clear();
} }
xSemaphoreGive(dataMutex);
return false; return false;
} }
// Parse length prefix
size_t msgLen = 0; size_t msgLen = 0;
bool validPrefix = false; bool validPrefix = false;
@ -476,21 +371,18 @@ bool CalibreWirelessActivity::parseJsonMessage(std::string& message) {
if (bracketPos > 0) { if (bracketPos > 0) {
recvBuffer = recvBuffer.substr(bracketPos); recvBuffer = recvBuffer.substr(bracketPos);
} }
xSemaphoreGive(dataMutex);
return false; return false;
} }
if (msgLen > 10000000) { if (msgLen > 10000000) {
recvBuffer.clear(); recvBuffer.clear();
xSemaphoreGive(dataMutex);
return false; return false;
} }
// Handle large messages by extracting essential fields and skipping the rest // Handle large messages
if (msgLen > MAX_BUFFERED_MSG_SIZE) { if (msgLen > MAX_BUFFERED_MSG_SIZE) {
Serial.printf("[%lu] [CAL] Large message (%zu bytes), streaming\n", millis(), msgLen); Serial.printf("[%lu] [CAL] Large message (%zu bytes), streaming\n", millis(), msgLen);
// Extract opcode
int opcodeInt = -1; int opcodeInt = -1;
size_t opcodeStart = bracketPos + 1; size_t opcodeStart = bracketPos + 1;
size_t commaPos = recvBuffer.find(',', opcodeStart); size_t commaPos = recvBuffer.find(',', opcodeStart);
@ -510,7 +402,6 @@ bool CalibreWirelessActivity::parseJsonMessage(std::string& message) {
skipExtractedLpath.clear(); skipExtractedLpath.clear();
skipExtractedLength = 0; skipExtractedLength = 0;
// Extract lpath and length for SEND_BOOK
if (opcodeInt == OpCode::SEND_BOOK) { if (opcodeInt == OpCode::SEND_BOOK) {
size_t lpathPos = recvBuffer.find("\"lpath\""); size_t lpathPos = recvBuffer.find("\"lpath\"");
if (lpathPos != std::string::npos) { if (lpathPos != std::string::npos) {
@ -526,7 +417,6 @@ bool CalibreWirelessActivity::parseJsonMessage(std::string& message) {
} }
} }
// Extract top-level length
int depth = 0; int depth = 0;
const char* lengthKey = "\"length\""; const char* lengthKey = "\"length\"";
for (size_t i = bracketPos; i < recvBuffer.size() && i < bracketPos + 2000; i++) { for (size_t i = bracketPos; i < recvBuffer.size() && i < bracketPos + 2000; i++) {
@ -562,13 +452,11 @@ bool CalibreWirelessActivity::parseJsonMessage(std::string& message) {
skipOpcode = -1; skipOpcode = -1;
skipExtractedLpath.clear(); skipExtractedLpath.clear();
skipExtractedLength = 0; skipExtractedLength = 0;
xSemaphoreGive(dataMutex);
return true; return true;
} }
if (skipOpcode >= 0) { if (skipOpcode >= 0) {
message = "[" + std::to_string(skipOpcode) + ",{}]"; message = "[" + std::to_string(skipOpcode) + ",{}]";
skipOpcode = -1; skipOpcode = -1;
xSemaphoreGive(dataMutex);
return true; return true;
} }
} else { } else {
@ -576,31 +464,25 @@ bool CalibreWirelessActivity::parseJsonMessage(std::string& message) {
recvBuffer.clear(); recvBuffer.clear();
inSkipMode = true; inSkipMode = true;
} }
xSemaphoreGive(dataMutex);
return false; return false;
} }
// Normal message handling
size_t totalNeeded = bracketPos + msgLen; size_t totalNeeded = bracketPos + msgLen;
if (recvBuffer.size() < totalNeeded) { if (recvBuffer.size() < totalNeeded) {
xSemaphoreGive(dataMutex);
return false; return false;
} }
message = recvBuffer.substr(bracketPos, msgLen); message = recvBuffer.substr(bracketPos, msgLen);
recvBuffer = recvBuffer.size() > totalNeeded ? recvBuffer.substr(totalNeeded) : ""; recvBuffer = recvBuffer.size() > totalNeeded ? recvBuffer.substr(totalNeeded) : "";
xSemaphoreGive(dataMutex);
return true; return true;
} }
void CalibreWirelessActivity::sendJsonResponse(const OpCode opcode, const std::string& data) { void CalibreWirelessActivity::sendJsonResponse(const OpCode opcode, const std::string& data) {
if (!tcpClient || !tcpClient->connected()) return;
std::string json = "[" + std::to_string(opcode) + "," + data + "]"; std::string json = "[" + std::to_string(opcode) + "," + data + "]";
std::string msg = std::to_string(json.length()) + json; std::string msg = std::to_string(json.length()) + json;
tcpClient.write(reinterpret_cast<const uint8_t*>(msg.c_str()), msg.length());
tcpClient->write(msg.c_str(), msg.length()); tcpClient.flush();
} }
void CalibreWirelessActivity::handleCommand(const OpCode opcode, const std::string& data) { void CalibreWirelessActivity::handleCommand(const OpCode opcode, const std::string& data) {
@ -639,12 +521,67 @@ void CalibreWirelessActivity::handleCommand(const OpCode opcode, const std::stri
sendJsonResponse(OpCode::OK, "{}"); sendJsonResponse(OpCode::OK, "{}");
break; break;
default: default:
Serial.printf("[%lu] [CAL] Unknown opcode: %d\n", millis(), opcode);
sendJsonResponse(OpCode::OK, "{}"); sendJsonResponse(OpCode::OK, "{}");
break; break;
} }
} }
void CalibreWirelessActivity::receiveBinaryData() {
// KOReader-style: read all available data, write only what we need to file,
// put excess (next JSON message) back into buffer.
int available = tcpClient.available();
if (available <= 0) {
// Brief wait for more data
vTaskDelay(1);
return;
}
uint8_t buffer[4096];
int bytesRead = tcpClient.read(buffer, std::min(sizeof(buffer), static_cast<size_t>(available)));
if (bytesRead <= 0) {
return;
}
// Write only what we need (like KOReader's data:sub(1, to_write_bytes))
size_t toWrite = std::min(static_cast<size_t>(bytesRead), binaryBytesRemaining);
if (toWrite > 0) {
currentFile.write(buffer, toWrite);
bytesReceived += toWrite;
binaryBytesRemaining -= toWrite;
updateRequired = true;
}
// If we read more than needed, it's the next JSON message (like KOReader's buffer handling)
if (static_cast<size_t>(bytesRead) > toWrite) {
size_t excess = bytesRead - toWrite;
recvBuffer.assign(reinterpret_cast<char*>(buffer + toWrite), excess);
Serial.printf("[%lu] [CAL] Binary done, %zu excess bytes -> buffer\n", millis(), excess);
}
// Progress logging
static unsigned long lastLog = 0;
unsigned long now = millis();
if (now - lastLog > 500) {
Serial.printf("[%lu] [CAL] Binary: %zu/%zu (%.1f%%)\n", now, bytesReceived, currentFileSize,
currentFileSize > 0 ? (100.0 * bytesReceived / currentFileSize) : 0.0);
lastLog = now;
}
// Check completion
if (binaryBytesRemaining == 0) {
currentFile.flush();
currentFile.close();
inBinaryMode = false;
Serial.printf("[%lu] [CAL] File complete: %zu bytes\n", millis(), bytesReceived);
setState(WirelessState::WAITING);
setStatus("Received: " + currentFilename + "\nWaiting for more...");
}
}
void CalibreWirelessActivity::handleGetInitializationInfo(const std::string& data) { void CalibreWirelessActivity::handleGetInitializationInfo(const std::string& data) {
setState(WirelessState::WAITING); setState(WirelessState::WAITING);
setStatus("Connected to " + calibreHostname + setStatus("Connected to " + calibreHostname +
@ -699,7 +636,7 @@ void CalibreWirelessActivity::handleGetBookCount() {
} }
void CalibreWirelessActivity::handleSendBook(const std::string& data) { void CalibreWirelessActivity::handleSendBook(const std::string& data) {
Serial.printf("[%lu] [CAL] SEND_BOOK data (first 500 chars): %.500s\n", millis(), data.c_str()); Serial.printf("[%lu] [CAL] SEND_BOOK (first 500): %.500s\n", millis(), data.c_str());
// Extract lpath // Extract lpath
std::string lpath; std::string lpath;
@ -769,7 +706,8 @@ void CalibreWirelessActivity::handleSendBook(const std::string& data) {
bytesReceived = 0; bytesReceived = 0;
binaryBytesRemaining = length; binaryBytesRemaining = length;
Serial.printf("[%lu] [CAL] SEND_BOOK: file='%s', length=%zu\n", millis(), currentFilename.c_str(), length); Serial.printf("[%lu] [CAL] File: %s, size: %zu, buffer: %zu\n", millis(), currentFilename.c_str(), length,
recvBuffer.size());
setState(WirelessState::RECEIVING); setState(WirelessState::RECEIVING);
setStatus("Receiving: " + filename); setStatus("Receiving: " + filename);
@ -780,17 +718,16 @@ void CalibreWirelessActivity::handleSendBook(const std::string& data) {
return; return;
} }
// Send OK - Calibre will start sending binary data // Send OK - Calibre will start sending binary
sendJsonResponse(OpCode::OK, "{}"); sendJsonResponse(OpCode::OK, "{}");
// Switch to binary mode - subsequent data in onTcpData will be file content // Switch to binary mode
inBinaryMode = true; inBinaryMode = true;
// Process any data already in buffer (like KOReader) // Process any data already in buffer (like KOReader)
xSemaphoreTake(dataMutex, portMAX_DELAY);
if (!recvBuffer.empty()) { if (!recvBuffer.empty()) {
size_t toWrite = std::min(recvBuffer.size(), binaryBytesRemaining); size_t toWrite = std::min(recvBuffer.size(), binaryBytesRemaining);
Serial.printf("[%lu] [CAL] Writing %zu bytes from buffer\n", millis(), toWrite); Serial.printf("[%lu] [CAL] Writing %zu from buffer\n", millis(), toWrite);
currentFile.write(reinterpret_cast<const uint8_t*>(recvBuffer.data()), toWrite); currentFile.write(reinterpret_cast<const uint8_t*>(recvBuffer.data()), toWrite);
bytesReceived += toWrite; bytesReceived += toWrite;
binaryBytesRemaining -= toWrite; binaryBytesRemaining -= toWrite;
@ -811,11 +748,10 @@ void CalibreWirelessActivity::handleSendBook(const std::string& data) {
setStatus("Received: " + currentFilename + "\nWaiting for more..."); setStatus("Received: " + currentFilename + "\nWaiting for more...");
} }
} }
xSemaphoreGive(dataMutex);
} }
void CalibreWirelessActivity::handleSendBookMetadata(const std::string& data) { void CalibreWirelessActivity::handleSendBookMetadata(const std::string& data) {
Serial.printf("[%lu] [CAL] SEND_BOOK_METADATA received\n", millis()); Serial.printf("[%lu] [CAL] SEND_BOOK_METADATA\n", millis());
sendJsonResponse(OpCode::OK, "{}"); sendJsonResponse(OpCode::OK, "{}");
} }
@ -886,9 +822,9 @@ std::string CalibreWirelessActivity::getDeviceUuid() const {
} }
void CalibreWirelessActivity::setState(WirelessState newState) { void CalibreWirelessActivity::setState(WirelessState newState) {
xSemaphoreTake(dataMutex, portMAX_DELAY); xSemaphoreTake(stateMutex, portMAX_DELAY);
state = newState; state = newState;
xSemaphoreGive(dataMutex); xSemaphoreGive(stateMutex);
updateRequired = true; updateRequired = true;
} }

View File

@ -1,6 +1,6 @@
#pragma once #pragma once
#include <AsyncTCP.h>
#include <SDCardManager.h> #include <SDCardManager.h>
#include <WiFiClient.h>
#include <WiFiUdp.h> #include <WiFiUdp.h>
#include <freertos/FreeRTOS.h> #include <freertos/FreeRTOS.h>
#include <freertos/semphr.h> #include <freertos/semphr.h>
@ -17,29 +17,18 @@
* *
* Protocol specification sourced from Calibre's smart device driver: * Protocol specification sourced from Calibre's smart device driver:
* https://github.com/kovidgoyal/calibre/blob/master/src/calibre/devices/smart_device_app/driver.py * https://github.com/kovidgoyal/calibre/blob/master/src/calibre/devices/smart_device_app/driver.py
*
* Uses AsyncTCP for callback-based networking (like KOReader's StreamMessageQueue).
*
* Protocol overview:
* 1. Device broadcasts "hello" on UDP ports 54982, 48123, 39001, 44044, 59678
* 2. Calibre responds with its TCP server address
* 3. Device connects to Calibre's TCP server
* 4. Calibre sends JSON commands with length-prefixed messages
* 5. Books are transferred as binary data after SEND_BOOK command
*/ */
class CalibreWirelessActivity final : public Activity { class CalibreWirelessActivity final : public Activity {
// Calibre wireless device states
enum class WirelessState { enum class WirelessState {
DISCOVERING, // Listening for Calibre server broadcasts DISCOVERING,
CONNECTING, // Establishing TCP connection CONNECTING,
WAITING, // Connected, waiting for commands WAITING,
RECEIVING, // Receiving a book file RECEIVING,
COMPLETE, // Transfer complete COMPLETE,
DISCONNECTED, // Calibre disconnected DISCONNECTED,
ERROR // Connection/transfer error ERROR
}; };
// Calibre protocol opcodes (from calibre/devices/smart_device_app/driver.py)
enum OpCode : uint8_t { enum OpCode : uint8_t {
OK = 0, OK = 0,
SET_CALIBRE_DEVICE_INFO = 1, SET_CALIBRE_DEVICE_INFO = 1,
@ -64,70 +53,52 @@ class CalibreWirelessActivity final : public Activity {
}; };
TaskHandle_t displayTaskHandle = nullptr; TaskHandle_t displayTaskHandle = nullptr;
TaskHandle_t discoveryTaskHandle = nullptr; TaskHandle_t networkTaskHandle = nullptr;
SemaphoreHandle_t renderingMutex = nullptr; SemaphoreHandle_t renderingMutex = nullptr;
SemaphoreHandle_t dataMutex = nullptr; // Protects shared data accessed from callbacks SemaphoreHandle_t stateMutex = nullptr;
bool updateRequired = false; bool updateRequired = false;
volatile bool shouldExit = false; volatile bool shouldExit = false;
WirelessState state = WirelessState::DISCOVERING; WirelessState state = WirelessState::DISCOVERING;
const std::function<void()> onCompleteCallback; const std::function<void()> onCompleteCallback;
// UDP discovery
WiFiUDP udp; WiFiUDP udp;
WiFiClient tcpClient;
// Async TCP connection
AsyncClient* tcpClient = nullptr;
std::string calibreHost; std::string calibreHost;
uint16_t calibrePort = 0; uint16_t calibrePort = 0;
uint16_t calibreAltPort = 0; uint16_t calibreAltPort = 0;
std::string calibreHostname; std::string calibreHostname;
// Transfer state
std::string currentFilename; std::string currentFilename;
size_t currentFileSize = 0; size_t currentFileSize = 0;
size_t bytesReceived = 0; size_t bytesReceived = 0;
std::string statusMessage; std::string statusMessage;
std::string errorMessage; std::string errorMessage;
// Protocol state
bool inBinaryMode = false; bool inBinaryMode = false;
size_t binaryBytesRemaining = 0; size_t binaryBytesRemaining = 0;
FsFile currentFile; FsFile currentFile;
std::string recvBuffer; // Buffer for incoming data (like KOReader) std::string recvBuffer;
// Large message skip state
bool inSkipMode = false; bool inSkipMode = false;
size_t skipBytesRemaining = 0; size_t skipBytesRemaining = 0;
int skipOpcode = -1; int skipOpcode = -1;
std::string skipExtractedLpath; std::string skipExtractedLpath;
size_t skipExtractedLength = 0; size_t skipExtractedLength = 0;
// Display task
static void displayTaskTrampoline(void* param); static void displayTaskTrampoline(void* param);
static void networkTaskTrampoline(void* param);
void displayTaskLoop(); void displayTaskLoop();
void networkTaskLoop();
void render() const; void render() const;
// Discovery task (UDP is not async) void listenForDiscovery();
static void discoveryTaskTrampoline(void* param); void handleTcpClient();
void discoveryTaskLoop(); bool readJsonMessage(std::string& message);
// AsyncTCP callbacks
void onTcpConnect(AsyncClient* client);
void onTcpDisconnect(AsyncClient* client);
void onTcpData(AsyncClient* client, void* data, size_t len);
void onTcpError(AsyncClient* client, int8_t error);
// Data processing (called from onTcpData callback)
void processReceivedData();
void processBinaryData(const char* data, size_t len);
void processJsonData();
bool parseJsonMessage(std::string& message);
void sendJsonResponse(OpCode opcode, const std::string& data); void sendJsonResponse(OpCode opcode, const std::string& data);
void handleCommand(OpCode opcode, const std::string& data); void handleCommand(OpCode opcode, const std::string& data);
void receiveBinaryData();
// Protocol handlers
void handleGetInitializationInfo(const std::string& data); void handleGetInitializationInfo(const std::string& data);
void handleGetDeviceInformation(); void handleGetDeviceInformation();
void handleFreeSpace(); void handleFreeSpace();
@ -137,12 +108,10 @@ class CalibreWirelessActivity final : public Activity {
void handleDisplayMessage(const std::string& data); void handleDisplayMessage(const std::string& data);
void handleNoop(const std::string& data); void handleNoop(const std::string& data);
// Utility
std::string getDeviceUuid() const; std::string getDeviceUuid() const;
void setState(WirelessState newState); void setState(WirelessState newState);
void setStatus(const std::string& message); void setStatus(const std::string& message);
void setError(const std::string& message); void setError(const std::string& message);
void connectToCalibr();
public: public:
explicit CalibreWirelessActivity(GfxRenderer& renderer, MappedInputManager& mappedInput, explicit CalibreWirelessActivity(GfxRenderer& renderer, MappedInputManager& mappedInput,