fixes crash on connection

This commit is contained in:
Justin Mitchell 2026-01-14 13:13:57 -05:00
parent 56ec3dfb6d
commit 0fa57a6bb4
2 changed files with 345 additions and 112 deletions

View File

@ -45,6 +45,11 @@ void CalibreWirelessActivity::onEnter() {
bytesReceived = 0;
inBinaryMode = false;
recvBuffer.clear();
inSkipMode = false;
skipBytesRemaining = 0;
skipOpcode = -1;
skipExtractedLpath.clear();
skipExtractedLength = 0;
updateRequired = true;
@ -61,6 +66,20 @@ void CalibreWirelessActivity::onEnter() {
void CalibreWirelessActivity::onExit() {
Activity::onExit();
// Stop network task FIRST before touching any shared state
// This prevents the task from accessing members while we clean up
if (networkTaskHandle) {
vTaskDelete(networkTaskHandle);
networkTaskHandle = nullptr;
}
// Stop display task
if (displayTaskHandle) {
vTaskDelete(displayTaskHandle);
displayTaskHandle = nullptr;
}
// Now safe to clean up - tasks are stopped
// Turn off WiFi when exiting
WiFi.mode(WIFI_OFF);
@ -77,25 +96,22 @@ void CalibreWirelessActivity::onExit() {
currentFile.close();
}
// Acquire stateMutex before deleting network task to avoid race condition
xSemaphoreTake(stateMutex, portMAX_DELAY);
if (networkTaskHandle) {
vTaskDelete(networkTaskHandle);
networkTaskHandle = nullptr;
}
xSemaphoreGive(stateMutex);
// Clear string buffers to free memory
recvBuffer.clear();
recvBuffer.shrink_to_fit();
skipExtractedLpath.clear();
skipExtractedLpath.shrink_to_fit();
// Acquire renderingMutex before deleting display task
xSemaphoreTake(renderingMutex, portMAX_DELAY);
if (displayTaskHandle) {
vTaskDelete(displayTaskHandle);
displayTaskHandle = nullptr;
// Delete mutexes last
if (renderingMutex) {
vSemaphoreDelete(renderingMutex);
renderingMutex = nullptr;
}
vSemaphoreDelete(renderingMutex);
renderingMutex = nullptr;
vSemaphoreDelete(stateMutex);
stateMutex = nullptr;
if (stateMutex) {
vSemaphoreDelete(stateMutex);
stateMutex = nullptr;
}
}
void CalibreWirelessActivity::loop() {
@ -182,28 +198,32 @@ void CalibreWirelessActivity::listenForDiscovery() {
std::string portStr;
if (commaPos != std::string::npos && commaPos > semiPos) {
portStr = response.substr(semiPos + 1, commaPos - semiPos - 1);
// Get alternative port after comma
std::string altPortStr = response.substr(commaPos + 1);
// Trim whitespace and non-digits from alt port
size_t altEnd = 0;
while (altEnd < altPortStr.size() && altPortStr[altEnd] >= '0' && altPortStr[altEnd] <= '9') {
altEnd++;
}
if (altEnd > 0) {
calibreAltPort = static_cast<uint16_t>(std::stoi(altPortStr.substr(0, altEnd)));
// Get alternative port after comma - parse safely
uint16_t altPort = 0;
for (size_t i = commaPos + 1; i < response.size(); i++) {
char c = response[i];
if (c >= '0' && c <= '9') {
altPort = altPort * 10 + (c - '0');
} else {
break;
}
}
calibreAltPort = altPort;
} else {
portStr = response.substr(semiPos + 1);
}
// Trim whitespace from main port
while (!portStr.empty() && (portStr[0] == ' ' || portStr[0] == '\t')) {
portStr = portStr.substr(1);
}
if (!portStr.empty()) {
calibrePort = static_cast<uint16_t>(std::stoi(portStr));
// Parse main port safely
uint16_t mainPort = 0;
for (size_t i = 0; i < portStr.size(); i++) {
char c = portStr[i];
if (c >= '0' && c <= '9') {
mainPort = mainPort * 10 + (c - '0');
} else if (c != ' ' && c != '\t') {
break;
}
}
calibrePort = mainPort;
// Get hostname if present, otherwise use sender IP
if (onPos != std::string::npos && closePos != std::string::npos && closePos > onPos + 4) {
@ -276,7 +296,16 @@ void CalibreWirelessActivity::handleTcpClient() {
start++;
size_t end = message.find(',', start);
if (end != std::string::npos) {
const int opcodeInt = std::stoi(message.substr(start, end - start));
// Parse opcode safely without exceptions
int opcodeInt = 0;
for (size_t i = start; i < end; i++) {
char c = message[i];
if (c >= '0' && c <= '9') {
opcodeInt = opcodeInt * 10 + (c - '0');
} else if (c != ' ' && c != '\t') {
break; // Invalid character
}
}
if (opcodeInt < 0 || opcodeInt >= OpCode::ERROR) {
Serial.printf("[%lu] [CAL] Invalid opcode: %d\n", millis(), opcodeInt);
sendJsonResponse(OpCode::OK, "{}");
@ -288,8 +317,11 @@ void CalibreWirelessActivity::handleTcpClient() {
size_t dataStart = end + 1;
size_t dataEnd = message.rfind(']');
std::string data = "";
if (dataEnd != std::string::npos && dataEnd > dataStart) {
data = message.substr(dataStart, dataEnd - dataStart);
if (dataEnd != std::string::npos && dataEnd > dataStart && dataStart < message.size()) {
size_t len = dataEnd - dataStart;
if (dataStart + len <= message.size()) {
data = message.substr(dataStart, len);
}
}
handleCommand(opcode, data);
@ -299,26 +331,71 @@ void CalibreWirelessActivity::handleTcpClient() {
}
bool CalibreWirelessActivity::readJsonMessage(std::string& message) {
// Read available data into buffer
int available = tcpClient.available();
if (available > 0) {
// Limit buffer growth to prevent memory issues
if (recvBuffer.size() > 100000) {
recvBuffer.clear();
return false;
}
// Read in chunks
char buf[1024];
while (available > 0) {
int toRead = std::min(available, static_cast<int>(sizeof(buf)));
int bytesRead = tcpClient.read(reinterpret_cast<uint8_t*>(buf), toRead);
// Maximum message size we'll buffer in memory
// Messages larger than this (typically due to base64 covers) are streamed through
constexpr size_t MAX_BUFFERED_MSG_SIZE = 32768;
// If in skip mode, consume bytes until we've skipped the full message
if (inSkipMode) {
while (skipBytesRemaining > 0) {
int available = tcpClient.available();
if (available <= 0) {
return false; // Need more data
}
// Read and discard in chunks
uint8_t discardBuf[1024];
size_t toRead = std::min({static_cast<size_t>(available), sizeof(discardBuf), skipBytesRemaining});
int bytesRead = tcpClient.read(discardBuf, toRead);
if (bytesRead > 0) {
recvBuffer.append(buf, bytesRead);
available -= bytesRead;
skipBytesRemaining -= bytesRead;
} else {
break;
}
}
if (skipBytesRemaining == 0) {
// Skip complete - if this was SEND_BOOK, construct minimal message
inSkipMode = false;
if (skipOpcode == OpCode::SEND_BOOK && !skipExtractedLpath.empty() && skipExtractedLength > 0) {
// Build minimal JSON that handleSendBook can parse
message = "[" + std::to_string(skipOpcode) + ",{\"lpath\":\"" + skipExtractedLpath +
"\",\"length\":" + std::to_string(skipExtractedLength) + "}]";
skipOpcode = -1;
skipExtractedLpath.clear();
skipExtractedLength = 0;
return true;
}
// For other opcodes, just acknowledge
if (skipOpcode >= 0) {
message = "[" + std::to_string(skipOpcode) + ",{}]";
skipOpcode = -1;
return true;
}
}
return false;
}
// Read available data into buffer (limited to prevent memory issues)
int available = tcpClient.available();
if (available > 0) {
// Only buffer up to a reasonable amount while looking for length prefix
size_t maxBuffer = MAX_BUFFERED_MSG_SIZE + 20; // +20 for length prefix digits
if (recvBuffer.size() < maxBuffer) {
char buf[1024];
size_t spaceLeft = maxBuffer - recvBuffer.size();
while (available > 0 && spaceLeft > 0) {
int toRead = std::min({available, static_cast<int>(sizeof(buf)), static_cast<int>(spaceLeft)});
int bytesRead = tcpClient.read(reinterpret_cast<uint8_t*>(buf), toRead);
if (bytesRead > 0) {
recvBuffer.append(buf, bytesRead);
available -= bytesRead;
spaceLeft -= bytesRead;
} else {
break;
}
}
}
}
if (recvBuffer.empty()) {
@ -328,61 +405,174 @@ bool CalibreWirelessActivity::readJsonMessage(std::string& message) {
// Find '[' which marks the start of JSON
size_t bracketPos = recvBuffer.find('[');
if (bracketPos == std::string::npos) {
// No '[' found - if buffer is getting large, something is wrong
if (recvBuffer.size() > 1000) {
recvBuffer.clear();
}
return false;
}
// Try to extract length from digits before '['
// Calibre ALWAYS sends a length prefix, so if it's not valid digits, it's garbage
// Parse length prefix (digits before '[')
size_t msgLen = 0;
bool validPrefix = false;
if (bracketPos > 0 && bracketPos <= 12) {
// Check if prefix is all digits
bool allDigits = true;
size_t parsedLen = 0;
for (size_t i = 0; i < bracketPos; i++) {
char c = recvBuffer[i];
if (c < '0' || c > '9') {
if (c >= '0' && c <= '9') {
parsedLen = parsedLen * 10 + (c - '0');
} else {
allDigits = false;
break;
}
}
if (allDigits) {
msgLen = std::stoul(recvBuffer.substr(0, bracketPos));
msgLen = parsedLen;
validPrefix = true;
}
}
if (!validPrefix) {
// Not a valid length prefix - discard everything up to '[' and treat '[' as start
if (bracketPos > 0) {
if (bracketPos > 0 && bracketPos < recvBuffer.size()) {
recvBuffer = recvBuffer.substr(bracketPos);
}
// Without length prefix, we can't reliably parse - wait for more data
// that hopefully starts with a proper length prefix
return false;
}
// Sanity check the message length
if (msgLen > 1000000) {
recvBuffer = recvBuffer.substr(bracketPos + 1); // Skip past this '[' and try again
// Sanity check - reject absurdly large messages
if (msgLen > 10000000) {
Serial.printf("[%lu] [CAL] Rejecting message with length %zu (too large)\n", millis(), msgLen);
recvBuffer.clear();
return false;
}
// Check if we have the complete message
// For large messages, extract essential fields then skip the rest
if (msgLen > MAX_BUFFERED_MSG_SIZE) {
Serial.printf("[%lu] [CAL] Large message detected (%zu bytes), streaming\n", millis(), msgLen);
// We need to extract: opcode, and for SEND_BOOK: lpath and length
// These fields appear early in the JSON before the large cover data
// Parse opcode from what we have buffered
int opcodeInt = -1;
size_t opcodeStart = bracketPos + 1;
size_t commaPos = recvBuffer.find(',', opcodeStart);
if (commaPos != std::string::npos && commaPos < recvBuffer.size()) {
opcodeInt = 0;
for (size_t i = opcodeStart; i < commaPos; i++) {
char c = recvBuffer[i];
if (c >= '0' && c <= '9') {
opcodeInt = opcodeInt * 10 + (c - '0');
} else if (c != ' ' && c != '\t') {
break;
}
}
}
skipOpcode = opcodeInt;
skipExtractedLpath.clear();
skipExtractedLength = 0;
// For SEND_BOOK, try to extract lpath and length from buffered data
if (opcodeInt == OpCode::SEND_BOOK) {
// Extract lpath
size_t lpathPos = recvBuffer.find("\"lpath\"");
if (lpathPos != std::string::npos && lpathPos + 7 < recvBuffer.size()) {
size_t colonPos = recvBuffer.find(':', lpathPos + 7);
if (colonPos != std::string::npos && colonPos + 1 < recvBuffer.size()) {
size_t quoteStart = recvBuffer.find('"', colonPos + 1);
if (quoteStart != std::string::npos && quoteStart + 1 < recvBuffer.size()) {
size_t quoteEnd = recvBuffer.find('"', quoteStart + 1);
if (quoteEnd != std::string::npos && quoteEnd > quoteStart + 1) {
skipExtractedLpath = recvBuffer.substr(quoteStart + 1, quoteEnd - quoteStart - 1);
}
}
}
}
// Extract top-level length (track depth to skip nested length fields in cover metadata)
// Message format is [opcode, {data}], so depth 2 = top level of data object
int depth = 0;
const char* lengthKey = "\"length\"";
const size_t keyLen = 8;
for (size_t i = bracketPos; i < recvBuffer.size() && i < bracketPos + 2000; i++) {
char c = recvBuffer[i];
if (c == '{' || c == '[') {
depth++;
} else if (c == '}' || c == ']') {
depth--;
} else if (depth == 2 && c == '"' && i + keyLen <= recvBuffer.size()) {
bool match = true;
for (size_t j = 0; j < keyLen && match; j++) {
if (recvBuffer[i + j] != lengthKey[j]) match = false;
}
if (match) {
size_t numStart = i + keyLen;
while (numStart < recvBuffer.size() && (recvBuffer[numStart] == ':' || recvBuffer[numStart] == ' ')) {
numStart++;
}
while (numStart < recvBuffer.size() && recvBuffer[numStart] >= '0' && recvBuffer[numStart] <= '9') {
skipExtractedLength = skipExtractedLength * 10 + (recvBuffer[numStart] - '0');
numStart++;
}
break;
}
}
}
}
// Calculate how many bytes we still need to skip
size_t totalMsgBytes = bracketPos + msgLen;
size_t alreadyBuffered = recvBuffer.size();
if (alreadyBuffered >= totalMsgBytes) {
// Entire message is already buffered - just discard it
recvBuffer = recvBuffer.substr(totalMsgBytes);
skipBytesRemaining = 0;
} else {
// Need to skip remaining bytes from network
skipBytesRemaining = totalMsgBytes - alreadyBuffered;
recvBuffer.clear();
}
inSkipMode = true;
// If skip is already complete, return immediately
if (skipBytesRemaining == 0) {
inSkipMode = false;
if (skipOpcode == OpCode::SEND_BOOK && !skipExtractedLpath.empty() && skipExtractedLength > 0) {
message = "[" + std::to_string(skipOpcode) + ",{\"lpath\":\"" + skipExtractedLpath +
"\",\"length\":" + std::to_string(skipExtractedLength) + "}]";
skipOpcode = -1;
skipExtractedLpath.clear();
skipExtractedLength = 0;
return true;
}
if (skipOpcode >= 0) {
message = "[" + std::to_string(skipOpcode) + ",{}]";
skipOpcode = -1;
return true;
}
return false;
}
return false; // Continue skipping in next iteration
}
// Normal path for small messages
size_t totalNeeded = bracketPos + msgLen;
if (recvBuffer.size() < totalNeeded) {
// Not enough data yet - wait for more
return false;
return false; // Wait for more data
}
// Extract the message
message = recvBuffer.substr(bracketPos, msgLen);
if (bracketPos < recvBuffer.size() && bracketPos + msgLen <= recvBuffer.size()) {
message = recvBuffer.substr(bracketPos, msgLen);
} else {
recvBuffer.clear();
return false;
}
// Keep the rest in buffer (may contain binary data or next message)
// Keep remainder in buffer
if (recvBuffer.size() > totalNeeded) {
recvBuffer = recvBuffer.substr(totalNeeded);
} else {
@ -403,6 +593,8 @@ void CalibreWirelessActivity::sendJsonResponse(const OpCode opcode, const std::s
}
void CalibreWirelessActivity::handleCommand(const OpCode opcode, const std::string& data) {
Serial.printf("[%lu] [CAL] Received opcode: %d, data size: %zu\n", millis(), opcode, data.size());
switch (opcode) {
case OpCode::GET_INITIALIZATION_INFO:
handleGetInitializationInfo(data);
@ -475,8 +667,9 @@ void CalibreWirelessActivity::handleGetInitializationInfo(const std::string& dat
// ccVersionNumber: Calibre Companion protocol version. 212 matches CC 5.4.20+.
// Using a known version ensures compatibility with Calibre's feature detection.
response += "\"ccVersionNumber\":212,";
// coverHeight: Max cover image height. We don't process covers, so this is informational only.
response += "\"coverHeight\":800,";
// coverHeight: Max cover image height. Set to 0 to prevent Calibre from embedding
// large base64-encoded covers in SEND_BOOK metadata, which would bloat the JSON.
response += "\"coverHeight\":0,";
response += "\"deviceKind\":\"CrossPoint\",";
response += "\"deviceName\":\"CrossPoint\",";
response += "\"extensionPathLengths\":{\"epub\":37},";
@ -522,14 +715,19 @@ void CalibreWirelessActivity::handleSendBook(const std::string& data) {
// Extract "lpath" field - format: "lpath": "value"
std::string lpath;
size_t lpathPos = data.find("\"lpath\"");
if (lpathPos != std::string::npos) {
if (lpathPos != std::string::npos && lpathPos + 7 < data.size()) {
size_t colonPos = data.find(':', lpathPos + 7);
if (colonPos != std::string::npos) {
if (colonPos != std::string::npos && colonPos + 1 < data.size()) {
size_t quoteStart = data.find('"', colonPos + 1);
if (quoteStart != std::string::npos) {
if (quoteStart != std::string::npos && quoteStart + 1 < data.size()) {
size_t quoteEnd = data.find('"', quoteStart + 1);
if (quoteEnd != std::string::npos) {
lpath = data.substr(quoteStart + 1, quoteEnd - quoteStart - 1);
if (quoteEnd != std::string::npos && quoteEnd > quoteStart + 1) {
// Safe bounds check before substr
size_t start = quoteStart + 1;
size_t len = quoteEnd - quoteStart - 1;
if (start < data.size() && start + len <= data.size()) {
lpath = data.substr(start, len);
}
}
}
}
@ -539,6 +737,9 @@ void CalibreWirelessActivity::handleSendBook(const std::string& data) {
// The metadata contains nested "length" fields (e.g., cover image length)
size_t length = 0;
int depth = 0;
const char* lengthKey = "\"length\"";
const size_t keyLen = 8;
for (size_t i = 0; i < data.size(); i++) {
char c = data[i];
if (c == '{' || c == '[') {
@ -546,22 +747,35 @@ void CalibreWirelessActivity::handleSendBook(const std::string& data) {
} else if (c == '}' || c == ']') {
depth--;
} else if (depth == 1 && c == '"') {
// At top level, check if this is "length"
if (i + 9 < data.size() && data.substr(i, 8) == "\"length\"") {
// Found top-level "length" - extract the number after ':'
size_t colonPos = data.find(':', i + 8);
if (colonPos != std::string::npos) {
size_t numStart = colonPos + 1;
while (numStart < data.size() && (data[numStart] == ' ' || data[numStart] == '\t')) {
numStart++;
// At top level, check if this is "length" by comparing directly
if (i + keyLen <= data.size()) {
bool match = true;
for (size_t j = 0; j < keyLen && match; j++) {
if (data[i + j] != lengthKey[j]) {
match = false;
}
size_t numEnd = numStart;
while (numEnd < data.size() && data[numEnd] >= '0' && data[numEnd] <= '9') {
numEnd++;
}
if (match) {
// Found top-level "length" - extract the number after ':'
size_t colonPos = i + keyLen;
while (colonPos < data.size() && data[colonPos] != ':') {
colonPos++;
}
if (numEnd > numStart) {
length = std::stoul(data.substr(numStart, numEnd - numStart));
break;
if (colonPos < data.size()) {
size_t numStart = colonPos + 1;
while (numStart < data.size() && (data[numStart] == ' ' || data[numStart] == '\t')) {
numStart++;
}
// Parse number safely without exceptions
size_t parsedLen = 0;
while (numStart < data.size() && data[numStart] >= '0' && data[numStart] <= '9') {
parsedLen = parsedLen * 10 + (data[numStart] - '0');
numStart++;
}
if (parsedLen > 0) {
length = parsedLen;
break;
}
}
}
}
@ -588,6 +802,8 @@ void CalibreWirelessActivity::handleSendBook(const std::string& data) {
currentFileSize = length;
bytesReceived = 0;
Serial.printf("[%lu] [CAL] SEND_BOOK: lpath='%s', length=%zu\n", millis(), lpath.c_str(), length);
setState(WirelessState::RECEIVING);
setStatus("Receiving: " + filename);
@ -640,39 +856,49 @@ void CalibreWirelessActivity::handleNoop(const std::string& data) {
}
void CalibreWirelessActivity::receiveBinaryData() {
const int available = tcpClient.available();
if (available == 0) {
// Check if connection is still alive
if (!tcpClient.connected()) {
currentFile.close();
inBinaryMode = false;
setError("Transfer interrupted");
// Read all available data in a loop to prevent TCP backpressure
// This is important because Calibre sends data continuously
while (binaryBytesRemaining > 0) {
const int available = tcpClient.available();
if (available == 0) {
// Check if connection is still alive
if (!tcpClient.connected()) {
Serial.printf("[%lu] [CAL] Connection lost during binary transfer. Received %zu/%zu bytes\n",
millis(), bytesReceived, currentFileSize);
currentFile.close();
inBinaryMode = false;
setError("Transfer interrupted");
}
return; // No data available right now, will continue next iteration
}
return;
}
uint8_t buffer[1024];
const size_t toRead = std::min(sizeof(buffer), binaryBytesRemaining);
const size_t bytesRead = tcpClient.read(buffer, toRead);
uint8_t buffer[4096]; // Larger buffer for faster transfer
const size_t toRead = std::min({sizeof(buffer), binaryBytesRemaining, static_cast<size_t>(available)});
const size_t bytesRead = tcpClient.read(buffer, toRead);
if (bytesRead == 0) {
break; // No more data to read right now
}
if (bytesRead > 0) {
currentFile.write(buffer, bytesRead);
bytesReceived += bytesRead;
binaryBytesRemaining -= bytesRead;
updateRequired = true;
}
if (binaryBytesRemaining == 0) {
// Transfer complete
currentFile.flush();
currentFile.close();
inBinaryMode = false;
if (binaryBytesRemaining == 0) {
// Transfer complete - switch back to JSON mode
// Note: Do NOT send OK here. KOReader doesn't, and sending an extra OK
// could be misinterpreted as a response to SEND_BOOK_METADATA before
// we've received it, causing protocol desync.
currentFile.flush();
currentFile.close();
inBinaryMode = false;
setState(WirelessState::WAITING);
setStatus("Received: " + currentFilename + "\nWaiting for more...");
Serial.printf("[%lu] [CAL] Binary transfer complete: %zu bytes received\n", millis(), bytesReceived);
// Send OK to acknowledge completion
sendJsonResponse(OpCode::OK, "{}");
}
setState(WirelessState::WAITING);
setStatus("Received: " + currentFilename + "\nWaiting for more...");
}
}

View File

@ -93,6 +93,13 @@ class CalibreWirelessActivity final : public Activity {
FsFile currentFile;
std::string recvBuffer; // Buffer for incoming data (like KOReader)
// Large message skip state - for streaming past oversized JSON (e.g., large covers)
bool inSkipMode = false;
size_t skipBytesRemaining = 0;
int skipOpcode = -1; // Opcode of message being skipped
std::string skipExtractedLpath;
size_t skipExtractedLength = 0;
static void displayTaskTrampoline(void* param);
static void networkTaskTrampoline(void* param);
[[noreturn]] void displayTaskLoop();