diff --git a/platformio.ini b/platformio.ini index 29bf9e1a..cbe47fe9 100644 --- a/platformio.ini +++ b/platformio.ini @@ -48,7 +48,6 @@ lib_deps = ArduinoJson @ 7.4.2 QRCode @ 0.0.1 links2004/WebSockets @ ^2.4.1 - mathieucarbou/AsyncTCP @ ^3.2.14 [env:default] extends = base diff --git a/src/activities/network/CalibreWirelessActivity.cpp b/src/activities/network/CalibreWirelessActivity.cpp index ff3239d0..6a701696 100644 --- a/src/activities/network/CalibreWirelessActivity.cpp +++ b/src/activities/network/CalibreWirelessActivity.cpp @@ -21,15 +21,15 @@ void CalibreWirelessActivity::displayTaskTrampoline(void* param) { static_cast(param)->displayTaskLoop(); } -void CalibreWirelessActivity::discoveryTaskTrampoline(void* param) { - static_cast(param)->discoveryTaskLoop(); +void CalibreWirelessActivity::networkTaskTrampoline(void* param) { + static_cast(param)->networkTaskLoop(); } void CalibreWirelessActivity::onEnter() { Activity::onEnter(); renderingMutex = xSemaphoreCreateMutex(); - dataMutex = xSemaphoreCreateMutex(); + stateMutex = xSemaphoreCreateMutex(); state = WirelessState::DISCOVERING; statusMessage = "Discovering Calibre..."; @@ -54,11 +54,8 @@ void CalibreWirelessActivity::onEnter() { udp.begin(LOCAL_UDP_PORT); - // Create display task xTaskCreate(&CalibreWirelessActivity::displayTaskTrampoline, "CalDisplayTask", 2048, this, 1, &displayTaskHandle); - - // Create discovery task (UDP is synchronous) - xTaskCreate(&CalibreWirelessActivity::discoveryTaskTrampoline, "CalDiscoveryTask", 4096, this, 2, &discoveryTaskHandle); + xTaskCreate(&CalibreWirelessActivity::networkTaskTrampoline, "CalNetworkTask", 12288, this, 2, &networkTaskHandle); } void CalibreWirelessActivity::onExit() { @@ -67,18 +64,14 @@ void CalibreWirelessActivity::onExit() { shouldExit = true; vTaskDelay(50 / portTICK_PERIOD_MS); - // Close async TCP client - if (tcpClient) { - tcpClient->close(true); - delete tcpClient; - tcpClient = nullptr; + if (tcpClient.connected()) { + tcpClient.stop(); } - udp.stop(); - vTaskDelay(100 / portTICK_PERIOD_MS); - // Tasks will self-delete when they see shouldExit - discoveryTaskHandle = nullptr; + vTaskDelay(250 / portTICK_PERIOD_MS); + + networkTaskHandle = nullptr; displayTaskHandle = nullptr; WiFi.mode(WIFI_OFF); @@ -97,9 +90,9 @@ void CalibreWirelessActivity::onExit() { renderingMutex = nullptr; } - if (dataMutex) { - vSemaphoreDelete(dataMutex); - dataMutex = nullptr; + if (stateMutex) { + vSemaphoreDelete(stateMutex); + stateMutex = nullptr; } } @@ -125,248 +118,136 @@ void CalibreWirelessActivity::displayTaskLoop() { vTaskDelete(nullptr); } -void CalibreWirelessActivity::discoveryTaskLoop() { - while (!shouldExit && state == WirelessState::DISCOVERING) { - // Broadcast "hello" on all UDP discovery ports - for (const uint16_t port : UDP_PORTS) { - udp.beginPacket("255.255.255.255", port); - udp.write(reinterpret_cast("hello"), 5); - udp.endPacket(); - } +void CalibreWirelessActivity::networkTaskLoop() { + while (!shouldExit) { + xSemaphoreTake(stateMutex, portMAX_DELAY); + const auto currentState = state; + xSemaphoreGive(stateMutex); - vTaskDelay(500 / portTICK_PERIOD_MS); if (shouldExit) break; - const int packetSize = udp.parsePacket(); - if (packetSize > 0) { - char buffer[256]; - const int len = udp.read(buffer, sizeof(buffer) - 1); - if (len > 0) { - buffer[len] = '\0'; - std::string response(buffer); - - // Parse Calibre response: "calibre wireless device client (on HOSTNAME);PORT,ALT_PORT" - size_t onPos = response.find("(on "); - size_t closePos = response.find(')'); - size_t semiPos = response.find(';'); - size_t commaPos = response.find(',', semiPos); - - if (semiPos != std::string::npos) { - std::string portStr; - if (commaPos != std::string::npos && commaPos > semiPos) { - portStr = response.substr(semiPos + 1, commaPos - semiPos - 1); - uint16_t altPort = 0; - for (size_t i = commaPos + 1; i < response.size(); i++) { - char c = response[i]; - if (c >= '0' && c <= '9') { - altPort = altPort * 10 + (c - '0'); - } else { - break; - } - } - calibreAltPort = altPort; - } else { - portStr = response.substr(semiPos + 1); - } - - uint16_t mainPort = 0; - for (char c : portStr) { - if (c >= '0' && c <= '9') { - mainPort = mainPort * 10 + (c - '0'); - } else if (c != ' ' && c != '\t') { - break; - } - } - calibrePort = mainPort; - - if (onPos != std::string::npos && closePos != std::string::npos && closePos > onPos + 4) { - calibreHostname = response.substr(onPos + 4, closePos - onPos - 4); - } - } - - calibreHost = udp.remoteIP().toString().c_str(); - if (calibreHostname.empty()) { - calibreHostname = calibreHost; - } - - if (calibrePort > 0) { - Serial.printf("[%lu] [CAL] Discovered Calibre at %s:%d (alt:%d)\n", millis(), calibreHost.c_str(), calibrePort, - calibreAltPort); - setState(WirelessState::CONNECTING); - setStatus("Connecting to " + calibreHostname + "..."); - connectToCalibr(); - } - } + switch (currentState) { + case WirelessState::DISCOVERING: + listenForDiscovery(); + break; + case WirelessState::CONNECTING: + case WirelessState::WAITING: + case WirelessState::RECEIVING: + handleTcpClient(); + break; + case WirelessState::COMPLETE: + case WirelessState::DISCONNECTED: + case WirelessState::ERROR: + vTaskDelay(100 / portTICK_PERIOD_MS); + break; } + + vTaskDelay(10 / portTICK_PERIOD_MS); } vTaskDelete(nullptr); } -void CalibreWirelessActivity::connectToCalibr() { - Serial.printf("[%lu] [CAL] connectToCalibr called\n", millis()); - - if (tcpClient) { - tcpClient->close(true); - delete tcpClient; - tcpClient = nullptr; +void CalibreWirelessActivity::listenForDiscovery() { + for (const uint16_t port : UDP_PORTS) { + udp.beginPacket("255.255.255.255", port); + udp.write(reinterpret_cast("hello"), 5); + udp.endPacket(); } - tcpClient = new AsyncClient(); - if (!tcpClient) { - Serial.printf("[%lu] [CAL] Failed to create AsyncClient\n", millis()); - setState(WirelessState::DISCOVERING); - return; - } + vTaskDelay(500 / portTICK_PERIOD_MS); - // Set up callbacks with lambdas that call our member functions - tcpClient->onConnect( - [](void* arg, AsyncClient* client) { - Serial.printf("[%lu] [CAL] onConnect callback fired\n", millis()); - static_cast(arg)->onTcpConnect(client); - }, - this); + const int packetSize = udp.parsePacket(); + if (packetSize > 0) { + char buffer[256]; + const int len = udp.read(buffer, sizeof(buffer) - 1); + if (len > 0) { + buffer[len] = '\0'; + std::string response(buffer); - tcpClient->onDisconnect( - [](void* arg, AsyncClient* client) { - Serial.printf("[%lu] [CAL] onDisconnect callback fired\n", millis()); - static_cast(arg)->onTcpDisconnect(client); - }, - this); + size_t onPos = response.find("(on "); + size_t closePos = response.find(')'); + size_t semiPos = response.find(';'); + size_t commaPos = response.find(',', semiPos); - tcpClient->onData( - [](void* arg, AsyncClient* client, void* data, size_t len) { - static_cast(arg)->onTcpData(client, data, len); - }, - this); + if (semiPos != std::string::npos) { + std::string portStr; + if (commaPos != std::string::npos && commaPos > semiPos) { + portStr = response.substr(semiPos + 1, commaPos - semiPos - 1); + uint16_t altPort = 0; + for (size_t i = commaPos + 1; i < response.size(); i++) { + char c = response[i]; + if (c >= '0' && c <= '9') { + altPort = altPort * 10 + (c - '0'); + } else { + break; + } + } + calibreAltPort = altPort; + } else { + portStr = response.substr(semiPos + 1); + } - tcpClient->onError( - [](void* arg, AsyncClient* client, int8_t error) { - Serial.printf("[%lu] [CAL] onError callback fired: %d\n", millis(), error); - static_cast(arg)->onTcpError(client, error); - }, - this); + uint16_t mainPort = 0; + for (char c : portStr) { + if (c >= '0' && c <= '9') { + mainPort = mainPort * 10 + (c - '0'); + } else if (c != ' ' && c != '\t') { + break; + } + } + calibrePort = mainPort; - // Use IPAddress explicitly to avoid any DNS resolution issues - IPAddress ip; - if (!ip.fromString(calibreHost.c_str())) { - Serial.printf("[%lu] [CAL] Failed to parse IP: %s\n", millis(), calibreHost.c_str()); - setState(WirelessState::DISCOVERING); - return; - } - - Serial.printf("[%lu] [CAL] Attempting connect to %s:%d\n", millis(), ip.toString().c_str(), calibrePort); - bool connectResult = tcpClient->connect(ip, calibrePort); - Serial.printf("[%lu] [CAL] connect() returned %s\n", millis(), connectResult ? "true" : "false"); - - if (!connectResult) { - // Try alternative port - if (calibreAltPort > 0) { - Serial.printf("[%lu] [CAL] Trying alt port %d\n", millis(), calibreAltPort); - connectResult = tcpClient->connect(ip, calibreAltPort); - Serial.printf("[%lu] [CAL] alt connect() returned %s\n", millis(), connectResult ? "true" : "false"); - if (!connectResult) { - setState(WirelessState::DISCOVERING); - setStatus("Discovering Calibre...\n(Connection failed, retrying)"); + if (onPos != std::string::npos && closePos != std::string::npos && closePos > onPos + 4) { + calibreHostname = response.substr(onPos + 4, closePos - onPos - 4); + } + } + + calibreHost = udp.remoteIP().toString().c_str(); + if (calibreHostname.empty()) { + calibreHostname = calibreHost; + } + + if (calibrePort > 0) { + setState(WirelessState::CONNECTING); + setStatus("Connecting to " + calibreHostname + "..."); + + vTaskDelay(100 / portTICK_PERIOD_MS); + + Serial.printf("[%lu] [CAL] Connecting to %s:%d\n", millis(), calibreHost.c_str(), calibrePort); + if (tcpClient.connect(calibreHost.c_str(), calibrePort, 5000)) { + Serial.printf("[%lu] [CAL] Connected!\n", millis()); + setState(WirelessState::WAITING); + setStatus("Connected to " + calibreHostname + "\nWaiting for commands..."); + } else if (calibreAltPort > 0 && tcpClient.connect(calibreHost.c_str(), calibreAltPort, 5000)) { + Serial.printf("[%lu] [CAL] Connected on alt port!\n", millis()); + setState(WirelessState::WAITING); + setStatus("Connected to " + calibreHostname + "\nWaiting for commands..."); + } else { + Serial.printf("[%lu] [CAL] Connection failed\n", millis()); + setState(WirelessState::DISCOVERING); + setStatus("Discovering Calibre...\n(Connection failed, retrying)"); + calibrePort = 0; + calibreAltPort = 0; + } } - } else { - setState(WirelessState::DISCOVERING); - setStatus("Discovering Calibre...\n(Connection failed, retrying)"); } } - // If connect() returned true, connection is in progress - wait for callbacks } -void CalibreWirelessActivity::onTcpConnect(AsyncClient* client) { - Serial.printf("[%lu] [CAL] Connected to Calibre\n", millis()); - setState(WirelessState::WAITING); - setStatus("Connected to " + calibreHostname + "\nWaiting for commands..."); -} - -void CalibreWirelessActivity::onTcpDisconnect(AsyncClient* client) { - Serial.printf("[%lu] [CAL] Disconnected from Calibre\n", millis()); - if (state != WirelessState::ERROR) { +void CalibreWirelessActivity::handleTcpClient() { + if (!tcpClient.connected()) { setState(WirelessState::DISCONNECTED); setStatus("Calibre disconnected"); + return; } -} - -void CalibreWirelessActivity::onTcpError(AsyncClient* client, int8_t error) { - Serial.printf("[%lu] [CAL] TCP error: %d\n", millis(), error); - setError("Connection error"); -} - -void CalibreWirelessActivity::onTcpData(AsyncClient* client, void* data, size_t len) { - // This is the key callback - data arrives here like KOReader's receiveCallback - const char* charData = static_cast(data); - - Serial.printf("[%lu] [CAL] Received %zu bytes\n", millis(), len); if (inBinaryMode) { - processBinaryData(charData, len); - } else { - // Append to buffer and process JSON messages - xSemaphoreTake(dataMutex, portMAX_DELAY); - recvBuffer.append(charData, len); - xSemaphoreGive(dataMutex); - processJsonData(); - } -} - -void CalibreWirelessActivity::processBinaryData(const char* data, size_t len) { - // Like KOReader: write only what we need, put excess in buffer - size_t toWrite = std::min(len, binaryBytesRemaining); - - if (toWrite > 0) { - currentFile.write(reinterpret_cast(data), toWrite); - bytesReceived += toWrite; - binaryBytesRemaining -= toWrite; - updateRequired = true; - - // Progress logging - static unsigned long lastLog = 0; - unsigned long now = millis(); - if (now - lastLog > 500) { - Serial.printf("[%lu] [CAL] Binary: %zu/%zu bytes (%.1f%%)\n", now, bytesReceived, currentFileSize, - currentFileSize > 0 ? (100.0 * bytesReceived / currentFileSize) : 0.0); - lastLog = now; - } + receiveBinaryData(); + return; } - // If we received more than needed, it's the next JSON message - if (len > toWrite) { - size_t excess = len - toWrite; - xSemaphoreTake(dataMutex, portMAX_DELAY); - recvBuffer.assign(data + toWrite, excess); - xSemaphoreGive(dataMutex); - Serial.printf("[%lu] [CAL] Binary complete, %zu excess bytes buffered\n", millis(), excess); - } - - // Check if binary transfer is complete - if (binaryBytesRemaining == 0) { - currentFile.flush(); - currentFile.close(); - inBinaryMode = false; - - Serial.printf("[%lu] [CAL] File complete: %zu bytes\n", millis(), bytesReceived); - setState(WirelessState::WAITING); - setStatus("Received: " + currentFilename + "\nWaiting for more..."); - - // Process any buffered JSON data - if (!recvBuffer.empty()) { - processJsonData(); - } - } -} - -void CalibreWirelessActivity::processJsonData() { - // Process JSON messages from buffer (like KOReader's onReceiveJSON) - while (true) { - std::string message; - if (!parseJsonMessage(message)) { - break; // Need more data - } - - // Parse opcode from JSON array format: [opcode, {...}] + std::string message; + if (readJsonMessage(message)) { size_t start = message.find('['); if (start != std::string::npos) { start++; @@ -384,15 +265,12 @@ void CalibreWirelessActivity::processJsonData() { if (opcodeInt >= 0 && opcodeInt <= OpCode::ERROR) { auto opcode = static_cast(opcodeInt); - - // Extract data object size_t dataStart = end + 1; size_t dataEnd = message.rfind(']'); std::string data; if (dataEnd != std::string::npos && dataEnd > dataStart) { data = message.substr(dataStart, dataEnd - dataStart); } - handleCommand(opcode, data); } } @@ -400,57 +278,74 @@ void CalibreWirelessActivity::processJsonData() { } } -bool CalibreWirelessActivity::parseJsonMessage(std::string& message) { +bool CalibreWirelessActivity::readJsonMessage(std::string& message) { constexpr size_t MAX_BUFFERED_MSG_SIZE = 32768; - xSemaphoreTake(dataMutex, portMAX_DELAY); - // Handle skip mode for large messages if (inSkipMode) { - if (recvBuffer.size() >= skipBytesRemaining) { - recvBuffer = recvBuffer.substr(skipBytesRemaining); - skipBytesRemaining = 0; - inSkipMode = false; + while (skipBytesRemaining > 0 && tcpClient.available() > 0) { + uint8_t discardBuf[1024]; + size_t toRead = std::min({static_cast(tcpClient.available()), sizeof(discardBuf), skipBytesRemaining}); + int bytesRead = tcpClient.read(discardBuf, toRead); + if (bytesRead > 0) { + skipBytesRemaining -= bytesRead; + } else { + break; + } + } + if (skipBytesRemaining == 0) { + inSkipMode = false; if (skipOpcode == OpCode::SEND_BOOK && !skipExtractedLpath.empty() && skipExtractedLength > 0) { message = "[" + std::to_string(skipOpcode) + ",{\"lpath\":\"" + skipExtractedLpath + "\",\"length\":" + std::to_string(skipExtractedLength) + "}]"; skipOpcode = -1; skipExtractedLpath.clear(); skipExtractedLength = 0; - xSemaphoreGive(dataMutex); return true; } if (skipOpcode >= 0) { message = "[" + std::to_string(skipOpcode) + ",{}]"; skipOpcode = -1; - xSemaphoreGive(dataMutex); return true; } - } else { - skipBytesRemaining -= recvBuffer.size(); - recvBuffer.clear(); - xSemaphoreGive(dataMutex); - return false; + } + return false; + } + + // Read available data into buffer + int available = tcpClient.available(); + if (available > 0) { + size_t maxBuffer = MAX_BUFFERED_MSG_SIZE + 20; + if (recvBuffer.size() < maxBuffer) { + char buf[1024]; + size_t spaceLeft = maxBuffer - recvBuffer.size(); + while (available > 0 && spaceLeft > 0) { + int toRead = std::min({available, static_cast(sizeof(buf)), static_cast(spaceLeft)}); + int bytesRead = tcpClient.read(reinterpret_cast(buf), toRead); + if (bytesRead > 0) { + recvBuffer.append(buf, bytesRead); + available -= bytesRead; + spaceLeft -= bytesRead; + } else { + break; + } + } } } if (recvBuffer.empty()) { - xSemaphoreGive(dataMutex); return false; } - // Find '[' which marks JSON start size_t bracketPos = recvBuffer.find('['); if (bracketPos == std::string::npos) { if (recvBuffer.size() > 1000) { recvBuffer.clear(); } - xSemaphoreGive(dataMutex); return false; } - // Parse length prefix size_t msgLen = 0; bool validPrefix = false; @@ -476,21 +371,18 @@ bool CalibreWirelessActivity::parseJsonMessage(std::string& message) { if (bracketPos > 0) { recvBuffer = recvBuffer.substr(bracketPos); } - xSemaphoreGive(dataMutex); return false; } if (msgLen > 10000000) { recvBuffer.clear(); - xSemaphoreGive(dataMutex); return false; } - // Handle large messages by extracting essential fields and skipping the rest + // Handle large messages if (msgLen > MAX_BUFFERED_MSG_SIZE) { Serial.printf("[%lu] [CAL] Large message (%zu bytes), streaming\n", millis(), msgLen); - // Extract opcode int opcodeInt = -1; size_t opcodeStart = bracketPos + 1; size_t commaPos = recvBuffer.find(',', opcodeStart); @@ -510,7 +402,6 @@ bool CalibreWirelessActivity::parseJsonMessage(std::string& message) { skipExtractedLpath.clear(); skipExtractedLength = 0; - // Extract lpath and length for SEND_BOOK if (opcodeInt == OpCode::SEND_BOOK) { size_t lpathPos = recvBuffer.find("\"lpath\""); if (lpathPos != std::string::npos) { @@ -526,7 +417,6 @@ bool CalibreWirelessActivity::parseJsonMessage(std::string& message) { } } - // Extract top-level length int depth = 0; const char* lengthKey = "\"length\""; for (size_t i = bracketPos; i < recvBuffer.size() && i < bracketPos + 2000; i++) { @@ -562,13 +452,11 @@ bool CalibreWirelessActivity::parseJsonMessage(std::string& message) { skipOpcode = -1; skipExtractedLpath.clear(); skipExtractedLength = 0; - xSemaphoreGive(dataMutex); return true; } if (skipOpcode >= 0) { message = "[" + std::to_string(skipOpcode) + ",{}]"; skipOpcode = -1; - xSemaphoreGive(dataMutex); return true; } } else { @@ -576,31 +464,25 @@ bool CalibreWirelessActivity::parseJsonMessage(std::string& message) { recvBuffer.clear(); inSkipMode = true; } - xSemaphoreGive(dataMutex); return false; } - // Normal message handling size_t totalNeeded = bracketPos + msgLen; if (recvBuffer.size() < totalNeeded) { - xSemaphoreGive(dataMutex); return false; } message = recvBuffer.substr(bracketPos, msgLen); recvBuffer = recvBuffer.size() > totalNeeded ? recvBuffer.substr(totalNeeded) : ""; - xSemaphoreGive(dataMutex); return true; } void CalibreWirelessActivity::sendJsonResponse(const OpCode opcode, const std::string& data) { - if (!tcpClient || !tcpClient->connected()) return; - std::string json = "[" + std::to_string(opcode) + "," + data + "]"; std::string msg = std::to_string(json.length()) + json; - - tcpClient->write(msg.c_str(), msg.length()); + tcpClient.write(reinterpret_cast(msg.c_str()), msg.length()); + tcpClient.flush(); } void CalibreWirelessActivity::handleCommand(const OpCode opcode, const std::string& data) { @@ -639,12 +521,67 @@ void CalibreWirelessActivity::handleCommand(const OpCode opcode, const std::stri sendJsonResponse(OpCode::OK, "{}"); break; default: - Serial.printf("[%lu] [CAL] Unknown opcode: %d\n", millis(), opcode); sendJsonResponse(OpCode::OK, "{}"); break; } } +void CalibreWirelessActivity::receiveBinaryData() { + // KOReader-style: read all available data, write only what we need to file, + // put excess (next JSON message) back into buffer. + + int available = tcpClient.available(); + if (available <= 0) { + // Brief wait for more data + vTaskDelay(1); + return; + } + + uint8_t buffer[4096]; + int bytesRead = tcpClient.read(buffer, std::min(sizeof(buffer), static_cast(available))); + + if (bytesRead <= 0) { + return; + } + + // Write only what we need (like KOReader's data:sub(1, to_write_bytes)) + size_t toWrite = std::min(static_cast(bytesRead), binaryBytesRemaining); + + if (toWrite > 0) { + currentFile.write(buffer, toWrite); + bytesReceived += toWrite; + binaryBytesRemaining -= toWrite; + updateRequired = true; + } + + // If we read more than needed, it's the next JSON message (like KOReader's buffer handling) + if (static_cast(bytesRead) > toWrite) { + size_t excess = bytesRead - toWrite; + recvBuffer.assign(reinterpret_cast(buffer + toWrite), excess); + Serial.printf("[%lu] [CAL] Binary done, %zu excess bytes -> buffer\n", millis(), excess); + } + + // Progress logging + static unsigned long lastLog = 0; + unsigned long now = millis(); + if (now - lastLog > 500) { + Serial.printf("[%lu] [CAL] Binary: %zu/%zu (%.1f%%)\n", now, bytesReceived, currentFileSize, + currentFileSize > 0 ? (100.0 * bytesReceived / currentFileSize) : 0.0); + lastLog = now; + } + + // Check completion + if (binaryBytesRemaining == 0) { + currentFile.flush(); + currentFile.close(); + inBinaryMode = false; + + Serial.printf("[%lu] [CAL] File complete: %zu bytes\n", millis(), bytesReceived); + setState(WirelessState::WAITING); + setStatus("Received: " + currentFilename + "\nWaiting for more..."); + } +} + void CalibreWirelessActivity::handleGetInitializationInfo(const std::string& data) { setState(WirelessState::WAITING); setStatus("Connected to " + calibreHostname + @@ -699,7 +636,7 @@ void CalibreWirelessActivity::handleGetBookCount() { } void CalibreWirelessActivity::handleSendBook(const std::string& data) { - Serial.printf("[%lu] [CAL] SEND_BOOK data (first 500 chars): %.500s\n", millis(), data.c_str()); + Serial.printf("[%lu] [CAL] SEND_BOOK (first 500): %.500s\n", millis(), data.c_str()); // Extract lpath std::string lpath; @@ -769,7 +706,8 @@ void CalibreWirelessActivity::handleSendBook(const std::string& data) { bytesReceived = 0; binaryBytesRemaining = length; - Serial.printf("[%lu] [CAL] SEND_BOOK: file='%s', length=%zu\n", millis(), currentFilename.c_str(), length); + Serial.printf("[%lu] [CAL] File: %s, size: %zu, buffer: %zu\n", millis(), currentFilename.c_str(), length, + recvBuffer.size()); setState(WirelessState::RECEIVING); setStatus("Receiving: " + filename); @@ -780,17 +718,16 @@ void CalibreWirelessActivity::handleSendBook(const std::string& data) { return; } - // Send OK - Calibre will start sending binary data + // Send OK - Calibre will start sending binary sendJsonResponse(OpCode::OK, "{}"); - // Switch to binary mode - subsequent data in onTcpData will be file content + // Switch to binary mode inBinaryMode = true; // Process any data already in buffer (like KOReader) - xSemaphoreTake(dataMutex, portMAX_DELAY); if (!recvBuffer.empty()) { size_t toWrite = std::min(recvBuffer.size(), binaryBytesRemaining); - Serial.printf("[%lu] [CAL] Writing %zu bytes from buffer\n", millis(), toWrite); + Serial.printf("[%lu] [CAL] Writing %zu from buffer\n", millis(), toWrite); currentFile.write(reinterpret_cast(recvBuffer.data()), toWrite); bytesReceived += toWrite; binaryBytesRemaining -= toWrite; @@ -811,11 +748,10 @@ void CalibreWirelessActivity::handleSendBook(const std::string& data) { setStatus("Received: " + currentFilename + "\nWaiting for more..."); } } - xSemaphoreGive(dataMutex); } void CalibreWirelessActivity::handleSendBookMetadata(const std::string& data) { - Serial.printf("[%lu] [CAL] SEND_BOOK_METADATA received\n", millis()); + Serial.printf("[%lu] [CAL] SEND_BOOK_METADATA\n", millis()); sendJsonResponse(OpCode::OK, "{}"); } @@ -886,9 +822,9 @@ std::string CalibreWirelessActivity::getDeviceUuid() const { } void CalibreWirelessActivity::setState(WirelessState newState) { - xSemaphoreTake(dataMutex, portMAX_DELAY); + xSemaphoreTake(stateMutex, portMAX_DELAY); state = newState; - xSemaphoreGive(dataMutex); + xSemaphoreGive(stateMutex); updateRequired = true; } diff --git a/src/activities/network/CalibreWirelessActivity.h b/src/activities/network/CalibreWirelessActivity.h index 505bf9cc..a84cebd8 100644 --- a/src/activities/network/CalibreWirelessActivity.h +++ b/src/activities/network/CalibreWirelessActivity.h @@ -1,6 +1,6 @@ #pragma once -#include #include +#include #include #include #include @@ -17,29 +17,18 @@ * * Protocol specification sourced from Calibre's smart device driver: * https://github.com/kovidgoyal/calibre/blob/master/src/calibre/devices/smart_device_app/driver.py - * - * Uses AsyncTCP for callback-based networking (like KOReader's StreamMessageQueue). - * - * Protocol overview: - * 1. Device broadcasts "hello" on UDP ports 54982, 48123, 39001, 44044, 59678 - * 2. Calibre responds with its TCP server address - * 3. Device connects to Calibre's TCP server - * 4. Calibre sends JSON commands with length-prefixed messages - * 5. Books are transferred as binary data after SEND_BOOK command */ class CalibreWirelessActivity final : public Activity { - // Calibre wireless device states enum class WirelessState { - DISCOVERING, // Listening for Calibre server broadcasts - CONNECTING, // Establishing TCP connection - WAITING, // Connected, waiting for commands - RECEIVING, // Receiving a book file - COMPLETE, // Transfer complete - DISCONNECTED, // Calibre disconnected - ERROR // Connection/transfer error + DISCOVERING, + CONNECTING, + WAITING, + RECEIVING, + COMPLETE, + DISCONNECTED, + ERROR }; - // Calibre protocol opcodes (from calibre/devices/smart_device_app/driver.py) enum OpCode : uint8_t { OK = 0, SET_CALIBRE_DEVICE_INFO = 1, @@ -64,70 +53,52 @@ class CalibreWirelessActivity final : public Activity { }; TaskHandle_t displayTaskHandle = nullptr; - TaskHandle_t discoveryTaskHandle = nullptr; + TaskHandle_t networkTaskHandle = nullptr; SemaphoreHandle_t renderingMutex = nullptr; - SemaphoreHandle_t dataMutex = nullptr; // Protects shared data accessed from callbacks + SemaphoreHandle_t stateMutex = nullptr; bool updateRequired = false; volatile bool shouldExit = false; WirelessState state = WirelessState::DISCOVERING; const std::function onCompleteCallback; - // UDP discovery WiFiUDP udp; - - // Async TCP connection - AsyncClient* tcpClient = nullptr; + WiFiClient tcpClient; std::string calibreHost; uint16_t calibrePort = 0; uint16_t calibreAltPort = 0; std::string calibreHostname; - // Transfer state std::string currentFilename; size_t currentFileSize = 0; size_t bytesReceived = 0; std::string statusMessage; std::string errorMessage; - // Protocol state bool inBinaryMode = false; size_t binaryBytesRemaining = 0; FsFile currentFile; - std::string recvBuffer; // Buffer for incoming data (like KOReader) + std::string recvBuffer; - // Large message skip state bool inSkipMode = false; size_t skipBytesRemaining = 0; int skipOpcode = -1; std::string skipExtractedLpath; size_t skipExtractedLength = 0; - // Display task static void displayTaskTrampoline(void* param); + static void networkTaskTrampoline(void* param); void displayTaskLoop(); + void networkTaskLoop(); void render() const; - // Discovery task (UDP is not async) - static void discoveryTaskTrampoline(void* param); - void discoveryTaskLoop(); - - // AsyncTCP callbacks - void onTcpConnect(AsyncClient* client); - void onTcpDisconnect(AsyncClient* client); - void onTcpData(AsyncClient* client, void* data, size_t len); - void onTcpError(AsyncClient* client, int8_t error); - - // Data processing (called from onTcpData callback) - void processReceivedData(); - void processBinaryData(const char* data, size_t len); - void processJsonData(); - bool parseJsonMessage(std::string& message); - + void listenForDiscovery(); + void handleTcpClient(); + bool readJsonMessage(std::string& message); void sendJsonResponse(OpCode opcode, const std::string& data); void handleCommand(OpCode opcode, const std::string& data); + void receiveBinaryData(); - // Protocol handlers void handleGetInitializationInfo(const std::string& data); void handleGetDeviceInformation(); void handleFreeSpace(); @@ -137,12 +108,10 @@ class CalibreWirelessActivity final : public Activity { void handleDisplayMessage(const std::string& data); void handleNoop(const std::string& data); - // Utility std::string getDeviceUuid() const; void setState(WirelessState newState); void setStatus(const std::string& message); void setError(const std::string& message); - void connectToCalibr(); public: explicit CalibreWirelessActivity(GfxRenderer& renderer, MappedInputManager& mappedInput,