SensESP 3.3.0
Universal Signal K sensor toolkit ESP32
Loading...
Searching...
No Matches
signalk_ws_client.cpp
Go to the documentation of this file.
1#include "sensesp.h"
2
3#include "signalk_ws_client.h"
4
5#include <ArduinoJson.h>
6#include <ESPmDNS.h>
7#include <esp_http_client.h>
8
9#ifdef SENSESP_SSL_SUPPORT
10#include <mbedtls/sha256.h>
11#include <mbedtls/ssl.h>
12#include <mbedtls/x509_crt.h>
13#endif
14
15#include <memory>
16
17#include "Arduino.h"
18#include "elapsedMillis.h"
19#include "esp_arduino_version.h"
23#include "sensesp/system/uuid.h"
24#include "sensesp_app.h"
25
26namespace sensesp {
27
28constexpr int kWsClientTaskStackSize = 8192; // Stack for ExecuteWebSocketTask
29constexpr int kWsTransportTaskStackSize = 6144; // Stack for esp_websocket_client internal task
30constexpr TickType_t kWsSendTimeoutTicks = pdMS_TO_TICKS(5000);
31
33
34static const char* kRequestPermission = "readwrite";
35
36#ifdef SENSESP_SSL_SUPPORT
37// Convert a SHA256 hash to hex string
38static void sha256_to_hex(const uint8_t* sha256, char* hex) {
39 for (int i = 0; i < 32; i++) {
40 sprintf(hex + (i * 2), "%02x", sha256[i]);
41 }
42 hex[64] = '\0';
43}
44
45// TOFU verification callback - called during SSL handshake
46// Returns 0 to allow connection, non-zero to reject
47static int tofu_verify_callback(void* ctx, mbedtls_x509_crt* crt,
48 int depth, uint32_t* flags) {
49 // Only check the server certificate (depth 0), not the CA chain
50 if (depth != 0) {
51 *flags = 0; // Clear errors for intermediate certs
52 return 0;
53 }
54
55 SKWSClient* client = static_cast<SKWSClient*>(ctx);
56 if (client == nullptr) {
57 ESP_LOGW("SKWSClient", "TOFU: No client context, allowing connection");
58 *flags = 0;
59 return 0;
60 }
61
62 // Compute SHA256 of the certificate
63 uint8_t sha256[32];
64 mbedtls_sha256_context sha256_ctx;
65 mbedtls_sha256_init(&sha256_ctx);
66 mbedtls_sha256_starts(&sha256_ctx, 0); // 0 = SHA256 (not SHA224)
67 mbedtls_sha256_update(&sha256_ctx, crt->raw.p, crt->raw.len);
68 mbedtls_sha256_finish(&sha256_ctx, sha256);
69 mbedtls_sha256_free(&sha256_ctx);
70
71 char hex[65];
72 sha256_to_hex(sha256, hex);
73 String current_fingerprint = String(hex);
74
75 ESP_LOGD("SKWSClient", "Server certificate fingerprint: %s", hex);
76
77 if (!client->is_tofu_enabled()) {
78 // TOFU disabled, allow any certificate
79 ESP_LOGD("SKWSClient", "TOFU disabled, allowing connection");
80 *flags = 0;
81 return 0;
82 }
83
84 if (!client->has_tofu_fingerprint()) {
85 // First connection - capture the fingerprint
86 ESP_LOGI("SKWSClient", "TOFU: First connection, capturing fingerprint: %s", hex);
87 client->set_tofu_fingerprint(current_fingerprint);
88 *flags = 0;
89 return 0;
90 }
91
92 // Verify against stored fingerprint
93 if (client->get_tofu_fingerprint() == current_fingerprint) {
94 ESP_LOGD("SKWSClient", "TOFU: Fingerprint verified successfully");
95 *flags = 0;
96 return 0;
97 }
98
99 // Fingerprint mismatch!
100 ESP_LOGE("SKWSClient", "TOFU: Fingerprint mismatch!");
101 ESP_LOGE("SKWSClient", " Expected: %s", client->get_tofu_fingerprint().c_str());
102 ESP_LOGE("SKWSClient", " Received: %s", hex);
103 // Return error to reject the connection
104 return MBEDTLS_ERR_X509_CERT_VERIFY_FAILED;
105}
106
107// Certificate bundle attach function for TOFU verification
108// Sets up a custom verification callback that implements Trust On First Use
109static esp_err_t tofu_crt_bundle_attach(void* conf) {
110 mbedtls_ssl_config* ssl_conf = static_cast<mbedtls_ssl_config*>(conf);
111 // Use OPTIONAL so we can handle verification ourselves
112 mbedtls_ssl_conf_authmode(ssl_conf, MBEDTLS_SSL_VERIFY_OPTIONAL);
113 mbedtls_ssl_conf_verify(ssl_conf, tofu_verify_callback, ws_client);
114 ESP_LOGD("SKWSClient", "TOFU verification callback installed");
115 return ESP_OK;
116}
117#endif // SENSESP_SSL_SUPPORT
118
119static void websocket_event_handler(void* handler_args,
120 esp_event_base_t base,
121 int32_t event_id, void* event_data) {
122 esp_websocket_event_data_t* data = (esp_websocket_event_data_t*)event_data;
123 switch (event_id) {
124 case WEBSOCKET_EVENT_CONNECTED:
125 ws_client->on_connected();
126 break;
127 case WEBSOCKET_EVENT_DISCONNECTED:
128 ws_client->on_disconnected();
129 break;
130 case WEBSOCKET_EVENT_DATA:
131 // Only process text frames (opcode 0x1) and continuation frames (0x0).
132 // Control frames (ping/pong/close: 0x8-0xA) have no JSON payload.
133 if (data->op_code <= 0x2) {
134 ws_client->on_receive_delta((uint8_t*)data->data_ptr, data->data_len);
135 }
136 break;
137 case WEBSOCKET_EVENT_ERROR:
138 ws_client->on_error();
139 break;
140 }
141}
142
143void ExecuteWebSocketTask(void* /*parameter*/) {
144 elapsedMillis delta_loop_elapsed = 0;
145
146 ws_client->connect();
147
148 while (true) {
149 if (ws_client->is_connect_due()) {
150 ws_client->connect();
151 }
152 if (delta_loop_elapsed > 5) {
153 delta_loop_elapsed = 0;
154 ws_client->send_delta();
155 }
156 vTaskDelay(pdMS_TO_TICKS(100));
157 }
158}
159
160
161SKWSClient::SKWSClient(const String& config_path,
162 std::shared_ptr<SKDeltaQueue> sk_delta_queue,
163 const String& server_address, uint16_t server_port,
164 bool use_mdns)
165 : FileSystemSaveable{config_path},
166 conf_server_address_{server_address},
167 conf_server_port_{server_port},
168 use_mdns_{use_mdns},
169 sk_delta_queue_{sk_delta_queue} {
170 // a SKWSClient object observes its own connection_state_ member
171 // and simply passes through any notification it emits. As a result,
172 // whenever the value of connection_state_ is updated, observers of the
173 // SKWSClient object get automatically notified.
174 this->connection_state_.attach(
175 [this]() { this->emit(this->connection_state_.get()); });
176
177 // process any received updates in the main task
178 event_loop()->onRepeat(1, [this]() { this->process_received_updates(); });
179
180 // set the singleton object pointer
181 ws_client = this;
182
183 load();
184
185 // Connect the counters
187
188 event_loop()->onDelay(0, [this]() {
189 ESP_LOGD(__FILENAME__, "Starting SKWSClient");
190 xTaskCreate(ExecuteWebSocketTask, "SKWSClient", kWsClientTaskStackSize,
191 this, 1, NULL);
192 MDNS.addService("signalk-sensesp", "tcp", 80);
193 });
194}
195
205 // Going from connecting directly to disconnect when we
206 // know we have found and talked to the server usually means
207 // the authentication token is bad.
208 ESP_LOGW(__FILENAME__, "Bad access token detected. Setting token to null.");
209 auth_token_ = NULL_AUTH_TOKEN;
210 save();
211 }
213 server_detected_ = false;
214}
215
224 ESP_LOGW(__FILENAME__, "Websocket client error.");
225}
226
235 this->sk_delta_queue_->reset_meta_send();
236 ESP_LOGI(__FILENAME__, "Subscribing to Signal K listeners...");
237 this->subscribe_listeners();
238}
239
247 bool output_available = false;
248 JsonDocument subscription;
249 subscription["context"] = "vessels.self";
250
252 const std::vector<SKListener*>& listeners = SKListener::get_listeners();
253
254 if (listeners.size() > 0) {
255 output_available = true;
256 JsonArray subscribe = subscription["subscribe"].to<JsonArray>();
257
258 for (size_t i = 0; i < listeners.size(); i++) {
259 auto* listener = listeners.at(i);
260 String sk_path = listener->get_sk_path();
261 int listen_delay = listener->get_listen_delay();
262
263 JsonObject subscribe_path = subscribe.add<JsonObject>();
264
265 subscribe_path["path"] = sk_path;
266 subscribe_path["period"] = listen_delay;
267 ESP_LOGI(__FILENAME__, "Adding %s subscription with listen_delay %d\n",
268 sk_path.c_str(), listen_delay);
269 }
270 }
272
273 if (output_available &&
275 String json_message;
276
277 serializeJson(subscription, json_message);
278 ESP_LOGI(__FILENAME__, "Subscription JSON message:\n %s",
279 json_message.c_str());
280 int result = esp_websocket_client_send_text(
281 client_, json_message.c_str(), json_message.length(),
283 if (result < 0) {
284 ESP_LOGE(__FILENAME__, "Subscription send failed (result=%d)", result);
285 }
286 }
287}
288
296void SKWSClient::on_receive_delta(uint8_t* payload, size_t length) {
297 // Need to work on null-terminated strings
298 constexpr size_t kMaxWsMessageSize = 4096;
299 if (length > kMaxWsMessageSize) {
300 ESP_LOGW(__FILENAME__, "WebSocket message too large (%u bytes), dropping",
301 (unsigned)length);
302 return;
303 }
304 std::unique_ptr<char[]> buf(new char[length + 1]);
305 memcpy(buf.get(), payload, length);
306 buf[length] = 0;
307
308#ifdef SIGNALK_PRINT_RCV_DELTA
309 ESP_LOGD(__FILENAME__, "Websocket payload received: %s", buf.get());
310#endif
311
312 JsonDocument message;
313 auto error = deserializeJson(message, buf.get());
314
315 if (!error) {
316 if (message["updates"].is<JsonVariant>()) {
317 on_receive_updates(message);
318 }
319
320 if (message["put"].is<JsonVariant>()) {
321 on_receive_put(message);
322 }
323
324 // Putrequest contains also requestId Key GA
325 if (message["requestId"].is<JsonVariant>() &&
326 !message["put"].is<JsonVariant>()) {
328 }
329 } else {
330 ESP_LOGE(__FILENAME__, "deserializeJson error: %s", error.c_str());
331 }
332}
333
341void SKWSClient::on_receive_updates(JsonDocument& message) {
342 // Process updates from subscriptions...
343 JsonArray updates = message["updates"];
344
346 for (size_t i = 0; i < updates.size(); i++) {
347 JsonObject update = updates[i];
348
349 JsonArray values = update["values"];
350
351 for (size_t vi = 0; vi < values.size(); vi++) {
352 JsonDocument value_doc =
353 static_cast<JsonDocument>(static_cast<JsonObject>((values[vi])));
354
355 // push all values into a separate list for processing
356 // in the main task
357 constexpr size_t kMaxReceivedUpdates = 20;
358 while (received_updates_.size() >= kMaxReceivedUpdates) {
359 received_updates_.pop_front();
360 ESP_LOGW(__FILENAME__,
361 "Dropping oldest received update (queue full)");
362 }
363 received_updates_.push_back(value_doc);
364 }
365 }
367}
368
377
378 const std::vector<SKListener*>& listeners = SKListener::get_listeners();
379 const std::vector<SKPutListener*>& put_listeners =
381
383 int num_updates = received_updates_.size();
384 while (!received_updates_.empty()) {
385 JsonDocument& doc = received_updates_.front();
386
387 const char* path = doc["path"];
388 JsonObject value = doc.as<JsonObject>();
389
390 for (size_t i = 0; i < listeners.size(); i++) {
391 SKListener* listener = listeners[i];
392 if (listener->get_sk_path().equals(path)) {
393 listener->parse_value(value);
394 }
395 }
396 // to be able to parse values of Put Listeners GA
397 for (size_t i = 0; i < put_listeners.size(); i++) {
398 SKPutListener* listener = put_listeners[i];
399 if (listener->get_sk_path().equals(path)) {
400 listener->parse_value(value);
401 }
402 }
403 received_updates_.pop_front();
404 }
406 delta_rx_count_producer_.set(num_updates);
407
409}
410
418void SKWSClient::on_receive_put(JsonDocument& message) {
419 // Process PUT requests...
420 JsonArray puts = message["put"];
421 bool all_matched = true;
422 for (size_t i = 0; i < puts.size(); i++) {
423 JsonObject value = puts[i];
424 const char* path = value["path"];
425 bool matched = false;
426
428 const std::vector<SKPutListener*>& listeners =
430 for (size_t j = 0; j < listeners.size(); j++) {
431 SKPutListener* listener = listeners[j];
432 if (listener->get_sk_path().equals(path)) {
434 constexpr size_t kMaxReceivedUpdates = 20;
435 while (received_updates_.size() >= kMaxReceivedUpdates) {
436 received_updates_.pop_front();
437 ESP_LOGW(__FILENAME__,
438 "Dropping oldest received update (queue full)");
439 }
440 received_updates_.push_back(value);
442 matched = true;
443 }
444 }
446
447 if (!matched) {
448 all_matched = false;
449 }
450 }
451
452 // Send back a single request response if still connected
454 JsonDocument put_response;
455 put_response["requestId"] = message["requestId"];
456 if (all_matched) {
457 put_response["state"] = "COMPLETED";
458 put_response["statusCode"] = 200;
459 } else {
460 put_response["state"] = "FAILED";
461 put_response["statusCode"] = 405;
462 }
463 String response_text;
464 serializeJson(put_response, response_text);
465 int result = esp_websocket_client_send_text(
466 client_, response_text.c_str(), response_text.length(),
468 if (result < 0) {
469 ESP_LOGE(__FILENAME__, "PUT response send failed (result=%d)", result);
470 }
471 }
472}
473
481void SKWSClient::sendTXT(String& payload) {
483 int result = esp_websocket_client_send_text(
484 client_, payload.c_str(), payload.length(), kWsSendTimeoutTicks);
485 if (result < 0) {
486 ESP_LOGE(__FILENAME__, "sendTXT failed (result=%d)", result);
487 }
488 }
489}
490
491bool SKWSClient::get_mdns_service(String& server_address,
492 uint16_t& server_port) {
493 // get IP address using an mDNS query
494 // Try SSL service first, then fall back to non-SSL
495 int num = MDNS.queryService("signalk-wss", "tcp");
496 if (num > 0) {
497 // Found SSL-enabled server
498 ssl_enabled_ = true;
499 ESP_LOGI(__FILENAME__, "Found Signal K server via mDNS (signalk-wss)");
500 } else {
501 // Try non-SSL service
502 num = MDNS.queryService("signalk-ws", "tcp");
503 if (num == 0) {
504 // no service found
505 return false;
506 }
507 // Found non-SSL server, disable SSL
508 ssl_enabled_ = false;
509 ESP_LOGI(__FILENAME__, "Found Signal K server via mDNS (signalk-ws)");
510 }
511
512#if ESP_ARDUINO_VERSION_MAJOR < 3
513 server_address = MDNS.IP(0).toString();
514#else
515 server_address = MDNS.address(0).toString();
516#endif
517 server_port = MDNS.port(0);
518 ESP_LOGI(__FILENAME__, "Found server %s (port %d)", server_address.c_str(),
519 server_port);
520 return true;
521}
522
523// Event handler for detect_ssl() to capture the Location response header
524static esp_err_t detect_ssl_event_handler(esp_http_client_event_t* evt) {
525 if (evt->event_id == HTTP_EVENT_ON_HEADER) {
526 // Check for Location header (case-insensitive)
527 if (strcasecmp(evt->header_key, "Location") == 0) {
528 String* location = static_cast<String*>(evt->user_data);
529 *location = evt->header_value;
530 }
531 }
532 return ESP_OK;
533}
534
536 // Try to detect if the server requires SSL by checking for HTTP->HTTPS
537 // redirects
538 String url =
539 String("http://") + server_address_ + ":" + server_port_ + "/signalk";
540
541 ESP_LOGD(__FILENAME__, "Probing for SSL redirect at %s", url.c_str());
542
543 String location;
544
545 esp_http_client_config_t config = {};
546 config.url = url.c_str();
547 config.disable_auto_redirect = true;
548 config.timeout_ms = 10000;
549 config.event_handler = detect_ssl_event_handler;
550 config.user_data = &location;
551
552 esp_http_client_handle_t client = esp_http_client_init(&config);
553 if (client == nullptr) {
554 ESP_LOGE(__FILENAME__, "Failed to initialize HTTP client");
555 return false;
556 }
557
558 esp_err_t err = esp_http_client_perform(client);
559 int status_code = esp_http_client_get_status_code(client);
560 esp_http_client_cleanup(client);
561
562 if (err != ESP_OK) {
563 ESP_LOGD(__FILENAME__, "HTTP request failed: %s", esp_err_to_name(err));
564 return false;
565 }
566
567 if ((status_code == 301 || status_code == 302 ||
568 status_code == 307 || status_code == 308) &&
569 location.startsWith("https://")) {
570 ESP_LOGI(__FILENAME__, "SSL redirect detected, enabling HTTPS/WSS");
571 ssl_enabled_ = true;
572 save();
573 return true;
574 }
575
576 return false;
577}
578
579
582 return;
583 }
584
585 // Schedule next attempt with backoff in case this one fails.
586 // Will be reset on successful connection.
588
589 // Wait for the active network provisioner (WiFi, Ethernet, …) to be
590 // up before initiating the WS connection. The provisioner abstracts
591 // away whether we're on WiFi, Ethernet, or some other transport.
592 auto provisioner = SensESPApp::get()->get_network_provisioner();
593 if (!provisioner || !provisioner->is_connected()) {
594 ESP_LOGI(__FILENAME__,
595 "Network is not yet up. SignalK client connection will be "
596 "initiated when the link comes up.");
597 return;
598 }
599
600 ESP_LOGI(__FILENAME__, "Initiating websocket connection with server...");
601
603 if (use_mdns_) {
604 if (!get_mdns_service(this->server_address_, this->server_port_)) {
605 ESP_LOGE(__FILENAME__,
606 "No Signal K server found in network when using mDNS service!");
607 } else {
608 ESP_LOGI(__FILENAME__,
609 "Signal K server has been found at address %s:%d by mDNS.",
610 this->server_address_.c_str(), this->server_port_);
611 }
612 } else {
614 this->server_port_ = this->conf_server_port_;
615 }
616
617 if (!this->server_address_.isEmpty() && this->server_port_ > 0) {
618 ESP_LOGD(__FILENAME__,
619 "Websocket is connecting to Signal K server on address %s:%d",
620 this->server_address_.c_str(), this->server_port_);
621
622 // Detect if server requires SSL (check for HTTP->HTTPS redirects)
623 if (!ssl_enabled_) {
624 detect_ssl();
625 }
626 } else {
627 // host and port not defined - don't try to connect
628 ESP_LOGD(__FILENAME__,
629 "Websocket is not connecting to Signal K server because host and "
630 "port are not defined.");
632 return;
633 }
634
635 if (this->polling_href_.length() > 0 && this->polling_href_.startsWith("/")) {
636 // existing pending request
638 this->polling_href_);
639 return;
640 }
641
642 if (this->auth_token_ == NULL_AUTH_TOKEN) {
643 // initiate HTTP authentication
644 ESP_LOGD(__FILENAME__, "No prior authorization token present.");
646 return;
647 }
648
649 // Test the validity of the authorization token
650 this->test_token(this->server_address_, this->server_port_);
651}
652
654 // No-op: esp_websocket_client handles data via event callbacks
655}
656
657void SKWSClient::test_token(const String server_address,
658 const uint16_t server_port) {
659 String protocol = ssl_enabled_ ? "https://" : "http://";
660 String url = protocol + server_address + ":" + server_port +
661 "/signalk/v1/stream";
662 ESP_LOGD(__FILENAME__, "Testing token with url %s", url.c_str());
663
664 const String full_token = String("Bearer ") + auth_token_;
665 ESP_LOGD(__FILENAME__, "Authorization: %.8s...[redacted]", full_token.c_str());
666
667 esp_http_client_config_t config = {};
668 config.url = url.c_str();
669 config.timeout_ms = 10000;
670#ifdef SENSESP_SSL_SUPPORT
671 if (ssl_enabled_) {
672 config.crt_bundle_attach = tofu_crt_bundle_attach;
673 config.skip_cert_common_name_check = true;
674 }
675#endif
676
677 esp_http_client_handle_t client = esp_http_client_init(&config);
678 if (client == nullptr) {
679 ESP_LOGE(__FILENAME__, "Failed to initialize HTTP client");
681 return;
682 }
683
684 esp_http_client_set_header(client, "Authorization", full_token.c_str());
685
686 // Use streaming API for GET request
687 esp_err_t err = esp_http_client_open(client, 0);
688 if (err != ESP_OK) {
689 ESP_LOGE(__FILENAME__, "Failed to open HTTP connection: %s", esp_err_to_name(err));
690 esp_http_client_cleanup(client);
692 return;
693 }
694
695 int content_length = esp_http_client_fetch_headers(client);
696 int http_code = esp_http_client_get_status_code(client);
697
698 ESP_LOGD(__FILENAME__, "Testing resulted in http status %d", http_code);
699
700 // Read response body
701 String payload;
702 if (content_length > 0 && content_length < 4096) {
703 char* buffer = new char[content_length + 1];
704 int read_len = esp_http_client_read(client, buffer, content_length);
705 buffer[read_len > 0 ? read_len : 0] = '\0';
706 payload = String(buffer);
707 delete[] buffer;
708 } else {
709 // Chunked encoding or unknown/large content length - read in chunks
710 char buffer[512];
711 int read_len;
712 while ((read_len = esp_http_client_read(client, buffer, sizeof(buffer) - 1)) > 0) {
713 buffer[read_len] = '\0';
714 payload += String(buffer);
715 if (payload.length() > 4096) break;
716 }
717 }
718
719 esp_http_client_close(client);
720 esp_http_client_cleanup(client);
721
722 if (payload.length() > 0) {
723 ESP_LOGD(__FILENAME__, "Returned payload (%d bytes): %s",
724 payload.length(), payload.c_str());
725 }
726
727 if (http_code == 426) {
728 // HTTP status 426 is "Upgrade Required", which is the expected
729 // response for a websocket connection.
730 ESP_LOGD(__FILENAME__, "Attempting to connect to Signal K Websocket...");
731 server_detected_ = true;
732 token_test_success_ = true;
733 this->connect_ws(server_address, server_port);
734 } else if (http_code == 401) {
735 // Token is invalid/expired - clear it and request new access
736 // Keep client_id_ so we reuse the same device identity
737 ESP_LOGW(__FILENAME__, "Token rejected (401), requesting new access");
738 this->auth_token_ = NULL_AUTH_TOKEN;
739 this->save();
740 this->send_access_request(server_address, server_port);
741 } else if (http_code > 0) {
743 } else {
744 ESP_LOGE(__FILENAME__, "HTTP request failed with code %d", http_code);
746 }
747}
748
749void SKWSClient::send_access_request(const String server_address,
750 const uint16_t server_port) {
751 ESP_LOGD(__FILENAME__, "Sending access request (client_id=%s, ssl=%d)",
752 client_id_.c_str(), ssl_enabled_);
753 if (client_id_ == "") {
754 // generate a client ID
756 save();
757 }
758
759 // create a new access request
760 JsonDocument doc;
761 doc["clientId"] = client_id_;
762 doc["description"] =
763 String("SensESP device: ") + SensESPBaseApp::get_hostname();
764 doc["permissions"] = kRequestPermission;
765 String json_req = "";
766 serializeJson(doc, json_req);
767
768 ESP_LOGD(__FILENAME__, "Access request: %s", json_req.c_str());
769
770 String protocol = ssl_enabled_ ? "https://" : "http://";
771 String url = protocol + server_address + ":" + server_port +
772 "/signalk/v1/access/requests";
773 ESP_LOGD(__FILENAME__, "Access request url: %s", url.c_str());
774
775 esp_http_client_config_t config = {};
776 config.url = url.c_str();
777 config.method = HTTP_METHOD_POST;
778 config.timeout_ms = 10000;
779#ifdef SENSESP_SSL_SUPPORT
780 if (ssl_enabled_) {
781 config.crt_bundle_attach = tofu_crt_bundle_attach;
782 config.skip_cert_common_name_check = true;
783 }
784#endif
785
786 esp_http_client_handle_t client = esp_http_client_init(&config);
787 if (client == nullptr) {
788 ESP_LOGE(__FILENAME__, "Failed to initialize HTTP client");
790 // Don't clear client_id_ - keep device identity for retry
791 return;
792 }
793
794 esp_http_client_set_header(client, "Content-Type", "application/json");
795
796 // Use streaming API: open -> write request -> fetch headers -> read response
797 esp_err_t err = esp_http_client_open(client, json_req.length());
798 if (err != ESP_OK) {
799 ESP_LOGE(__FILENAME__, "Failed to open HTTP connection: %s", esp_err_to_name(err));
800 esp_http_client_cleanup(client);
802 return;
803 }
804
805 int write_len = esp_http_client_write(client, json_req.c_str(), json_req.length());
806 if (write_len < 0 || write_len != (int)json_req.length()) {
807 ESP_LOGE(__FILENAME__, "Failed to write request body (wrote %d of %d bytes)",
808 write_len, json_req.length());
809 esp_http_client_close(client);
810 esp_http_client_cleanup(client);
812 return;
813 }
814
815 int content_length = esp_http_client_fetch_headers(client);
816 int http_code = esp_http_client_get_status_code(client);
817
818 ESP_LOGD(__FILENAME__, "HTTP response: code=%d, content_length=%d", http_code, content_length);
819
820 // Read response body
821 String payload;
822 char buffer[512];
823 int read_len;
824 while ((read_len = esp_http_client_read(client, buffer, sizeof(buffer) - 1)) > 0) {
825 buffer[read_len] = '\0';
826 payload += String(buffer);
827 if (payload.length() > 4096) break;
828 }
829 ESP_LOGD(__FILENAME__, "Response payload (%d bytes): %s",
830 payload.length(), payload.c_str());
831
832 esp_http_client_close(client);
833 esp_http_client_cleanup(client);
834
835 // Parse JSON response for both 202 and 400 status codes
836 deserializeJson(doc, payload.c_str());
837 String state = doc["state"].is<const char*>() ? doc["state"].as<String>() : "";
838 String href = doc["href"].is<const char*>() ? doc["href"].as<String>() : "";
839 String message = doc["message"].is<const char*>() ? doc["message"].as<String>() : "";
840
841 ESP_LOGD(__FILENAME__, "Access request response: http=%d, state=%s, href=%s",
842 http_code, state.c_str(), href.c_str());
843 if (message.length() > 0) {
844 ESP_LOGI(__FILENAME__, "Server message: %s", message.c_str());
845 }
846
847 // HTTP 400 with href means "already requested" - save href for polling on
848 // next connect() cycle (after backoff)
849 if (http_code == 400 && href.length() > 0 && href.startsWith("/")) {
850 ESP_LOGI(__FILENAME__, "Existing request found, will poll href: %s", href.c_str());
851 polling_href_ = href;
852 save();
854 return;
855 }
856
857 // HTTP 202 with href means new request pending - save href for polling on
858 // next connect() cycle (after backoff)
859 if (http_code == 202 && href.length() > 0 && href.startsWith("/")) {
860 polling_href_ = href;
861 save();
863 return;
864 }
865
866 // HTTP 404 means the server has no security enabled — access requests are
867 // not available. Connect without a token.
868 if (http_code == 404) {
869 ESP_LOGI(__FILENAME__,
870 "Server security disabled (404 on access request) — connecting "
871 "without token");
872 auth_token_ = NULL_AUTH_TOKEN;
873 this->connect_ws(server_address, server_port);
874 return;
875 }
876
877 // Can't proceed - disconnect and retry later
878 ESP_LOGW(__FILENAME__, "Cannot handle response: http=%d, state=%s", http_code, state.c_str());
880}
881
882void SKWSClient::poll_access_request(const String server_address,
883 const uint16_t server_port,
884 const String href) {
885 ESP_LOGD(__FILENAME__, "Polling SK Server for authentication token");
886
887 String protocol = ssl_enabled_ ? "https://" : "http://";
888 String url = protocol + server_address + ":" + server_port + href;
889
890 esp_http_client_config_t config = {};
891 config.url = url.c_str();
892 config.timeout_ms = 10000;
893#ifdef SENSESP_SSL_SUPPORT
894 if (ssl_enabled_) {
895 config.crt_bundle_attach = tofu_crt_bundle_attach;
896 config.skip_cert_common_name_check = true;
897 }
898#endif
899
900 esp_http_client_handle_t client = esp_http_client_init(&config);
901 if (client == nullptr) {
902 ESP_LOGE(__FILENAME__, "Failed to initialize HTTP client");
904 return;
905 }
906
907 // Use streaming API for GET request
908 esp_err_t err = esp_http_client_open(client, 0);
909 if (err != ESP_OK) {
910 ESP_LOGE(__FILENAME__, "Failed to open HTTP connection: %s", esp_err_to_name(err));
911 esp_http_client_cleanup(client);
913 return;
914 }
915
916 int content_length = esp_http_client_fetch_headers(client);
917 int http_code = esp_http_client_get_status_code(client);
918
919 // Read response body
920 String payload;
921 if (content_length > 0 && content_length < 4096) {
922 char* buffer = new char[content_length + 1];
923 int read_len = esp_http_client_read(client, buffer, content_length);
924 buffer[read_len > 0 ? read_len : 0] = '\0';
925 payload = String(buffer);
926 delete[] buffer;
927 } else {
928 // Chunked encoding or unknown/large content length - read in chunks
929 char buffer[512];
930 int read_len;
931 while ((read_len = esp_http_client_read(client, buffer, sizeof(buffer) - 1)) > 0) {
932 buffer[read_len] = '\0';
933 payload += String(buffer);
934 if (payload.length() > 4096) break;
935 }
936 }
937
938 ESP_LOGD(__FILENAME__, "Poll response: http=%d, payload=%s", http_code, payload.c_str());
939
940 esp_http_client_close(client);
941 esp_http_client_cleanup(client);
942
943 if (http_code == 200 || http_code == 202) {
944 JsonDocument doc;
945 auto error = deserializeJson(doc, payload.c_str());
946 if (error) {
947 ESP_LOGW(__FILENAME__, "WARNING: Could not deserialize http payload.");
948 ESP_LOGW(__FILENAME__, "DeserializationError: %s", error.c_str());
949 return;
950 }
951 String state = doc["state"];
952 ESP_LOGD(__FILENAME__, "%s", state.c_str());
953 if (state == "PENDING") {
955 return;
956 }
957 if (state == "COMPLETED") {
958 JsonObject access_req = doc["accessRequest"];
959 String permission = access_req["permission"];
960
961 polling_href_ = "";
962 save();
963
964 if (permission == "DENIED") {
965 ESP_LOGW(__FILENAME__, "Permission denied");
967 return;
968 }
969
970 if (permission == "APPROVED") {
971 ESP_LOGI(__FILENAME__, "Permission granted");
972 String token = access_req["token"];
973 auth_token_ = token;
974 save();
975 this->connect_ws(server_address, server_port);
976 return;
977 }
978 }
979 } else {
980 if (http_code == 404 || http_code == 500) {
981 // Server doesn't recognize this request (stale href after
982 // server restart, different server, or security disabled).
983 // Clear the polling href so the next connect cycle starts
984 // a fresh access-request flow.
985 ESP_LOGD(__FILENAME__,
986 "Got %d polling access request — clearing stale href.",
987 http_code);
988 polling_href_ = "";
989 save();
991 return;
992 }
993 // any other HTTP status code
994 ESP_LOGW(__FILENAME__,
995 "Can't handle response %d to pending access request.\n",
996 http_code);
998 return;
999 }
1000}
1001
1002void SKWSClient::connect_ws(const String& host, const uint16_t port) {
1004
1005 String protocol = ssl_enabled_ ? "wss" : "ws";
1006 String path = "/signalk/v1/stream?subscribe=none";
1007 String url = protocol + "://" + host + ":" + String(port) + path;
1008
1009 ESP_LOGD(__FILENAME__, "Connecting WebSocket to %s", url.c_str());
1010
1011 // Build authorization header string (must persist through init call)
1012 String auth_header;
1013 if (auth_token_ != NULL_AUTH_TOKEN) {
1014 auth_header = String("Authorization: Bearer ") + auth_token_ + "\r\n";
1015 }
1016
1017 // Configure WebSocket client
1018 esp_websocket_client_config_t config = {};
1019 config.uri = url.c_str();
1020 config.task_stack = kWsTransportTaskStackSize;
1021 config.buffer_size = 1024;
1022 if (auth_header.length() > 0) {
1023 config.headers = auth_header.c_str();
1024 }
1025
1026#ifdef SENSESP_SSL_SUPPORT
1027 if (ssl_enabled_) {
1028 // Use custom crt_bundle_attach to disable SSL verification
1029 // This directly configures mbedTLS to skip certificate verification
1030 config.crt_bundle_attach = tofu_crt_bundle_attach;
1031 config.skip_cert_common_name_check = true;
1032 }
1033#endif
1034
1035 // Destroy any existing client
1036 if (client_ != nullptr) {
1037 esp_websocket_client_stop(client_);
1038 esp_websocket_client_destroy(client_);
1039 client_ = nullptr;
1040 }
1041
1042 client_ = esp_websocket_client_init(&config);
1043 if (client_ == nullptr) {
1044 ESP_LOGE(__FILENAME__, "Failed to initialize WebSocket client");
1046 return;
1047 }
1048
1049 // Register event handler
1050 esp_websocket_register_events(client_, WEBSOCKET_EVENT_ANY,
1051 websocket_event_handler, nullptr);
1052
1053 // Start the client
1054 esp_err_t err = esp_websocket_client_start(client_);
1055 if (err != ESP_OK) {
1056 ESP_LOGE(__FILENAME__, "Failed to start WebSocket client: %s",
1057 esp_err_to_name(err));
1058 esp_websocket_client_destroy(client_);
1059 client_ = nullptr;
1061 return;
1062 }
1063
1064 ESP_LOGD(__FILENAME__, "WebSocket client started, waiting for connection...");
1065}
1066
1070
1072 // Set state first so event handler callbacks and send callsites see the
1073 // disconnected state and skip operations on the client being destroyed.
1075 if (client_ != nullptr) {
1076 esp_websocket_client_stop(client_);
1077 esp_websocket_client_destroy(client_);
1078 client_ = nullptr;
1079 }
1080}
1081
1084 if (sk_delta_queue_->data_available()) {
1085 std::vector<String> deltas;
1086 sk_delta_queue_->get_deltas(deltas);
1087 for (const auto& delta : deltas) {
1088 int send_result = esp_websocket_client_send_text(
1089 client_, delta.c_str(), delta.length(), kWsSendTimeoutTicks);
1090 if (send_result < 0) {
1091 ESP_LOGE(__FILENAME__,
1092 "WebSocket send failed (result=%d), restarting",
1093 send_result);
1094 this->restart();
1095 return;
1096 }
1097 this->delta_tx_tick_producer_.set(1);
1098 }
1099 }
1100 }
1101}
1102
1103bool SKWSClient::to_json(JsonObject& root) {
1104 root["sk_address"] = this->conf_server_address_;
1105 root["sk_port"] = this->conf_server_port_;
1106 root["use_mdns"] = this->use_mdns_;
1107
1108 root["token"] = this->auth_token_;
1109 root["client_id"] = this->client_id_;
1110 root["polling_href"] = this->polling_href_;
1111
1112 root["ssl_enabled"] = this->ssl_enabled_;
1113 root["tofu_enabled"] = this->tofu_enabled_;
1114 root["tofu_fingerprint"] = this->tofu_fingerprint_;
1115 return true;
1116}
1117
1118bool SKWSClient::from_json(const JsonObject& config) {
1119 if (config["sk_address"].is<String>()) {
1120 this->conf_server_address_ = config["sk_address"].as<String>();
1121 }
1122 if (config["sk_port"].is<int>()) {
1123 this->conf_server_port_ = config["sk_port"].as<int>();
1124 }
1125 if (config["use_mdns"].is<bool>()) {
1126 this->use_mdns_ = config["use_mdns"].as<bool>();
1127 }
1128 if (config["token"].is<String>()) {
1129 this->auth_token_ = config["token"].as<String>();
1130 }
1131 if (config["client_id"].is<String>()) {
1132 this->client_id_ = config["client_id"].as<String>();
1133 }
1134 if (config["polling_href"].is<String>()) {
1135 String href = config["polling_href"].as<String>();
1136 // Only accept valid hrefs (must start with /)
1137 this->polling_href_ = href.startsWith("/") ? href : "";
1138 }
1139
1140 if (config["ssl_enabled"].is<bool>()) {
1141 this->ssl_enabled_ = config["ssl_enabled"].as<bool>();
1142 }
1143 if (config["tofu_enabled"].is<bool>()) {
1144 this->tofu_enabled_ = config["tofu_enabled"].as<bool>();
1145 }
1146 if (config["tofu_fingerprint"].is<String>()) {
1147 this->tofu_fingerprint_ = config["tofu_fingerprint"].as<String>();
1148 }
1149
1150 return true;
1151}
1152
1159 auto state = get_connection_state();
1160 switch (state) {
1162 return "Authorizing with SignalK";
1164 return "Connected";
1166 return "Connecting";
1168 return "Disconnected";
1169 }
1170
1171 return "Unknown";
1172}
1173
1174} // namespace sensesp
virtual bool load() override
Load and populate the object from a persistent storage.
Definition saveable.cpp:8
virtual bool save() override
Save the object to a persistent storage.
Definition saveable.cpp:40
FileSystemSaveable(const String &config_path)
Definition saveable.h:63
An Obervable class that listens for Signal K stream deltas and notifies any observers of value change...
static bool take_semaphore(uint64_t timeout_ms=0)
static void release_semaphore()
virtual void parse_value(const JsonObject &json)
static const std::vector< SKListener * > & get_listeners()
An Obervable class that listens for Signal K PUT requests coming over the websocket connection and no...
static const std::vector< SKPutListener * > & get_listeners()
virtual void parse_value(const JsonObject &put)=0
static void handle_response(JsonDocument &response)
The websocket connection to the Signal K server.
std::list< JsonDocument > received_updates_
void poll_access_request(const String host, const uint16_t port, const String href)
SKWSClient(const String &config_path, std::shared_ptr< SKDeltaQueue > sk_delta_queue, const String &server_address, uint16_t server_port, bool use_mdns=true)
void on_receive_delta(uint8_t *payload, size_t length)
Called when the websocket receives a delta.
void process_received_updates()
Loop through the received updates and process them.
void connect_ws(const String &host, const uint16_t port)
Integrator< int, int > delta_tx_count_producer_
TaskQueueProducer< SKWSConnectionState > connection_state_
SKWSConnectionState get_connection_state()
void on_receive_put(JsonDocument &message)
Called when a PUT event is received.
void release_received_updates_semaphore()
void send_access_request(const String host, const uint16_t port)
Integrator< int, int > delta_rx_count_producer_
virtual bool to_json(JsonObject &root) override final
esp_websocket_client_handle_t client_
void on_error()
Called when the websocket connection encounters an error.
void sendTXT(String &payload)
Send some processed data to the websocket.
bool get_mdns_service(String &server_address, uint16_t &server_port)
void on_disconnected()
Called when the websocket connection is disconnected.
void subscribe_listeners()
Subscribes the SK delta paths to the websocket.
void on_receive_updates(JsonDocument &message)
Called when a delta update is received.
void test_token(const String host, const uint16_t port)
void set_connection_state(SKWSConnectionState state)
bool take_received_updates_semaphore(unsigned long int timeout_ms=0)
virtual bool from_json(const JsonObject &config) override final
String get_connection_status()
Get a String representation of the current connection state.
TaskQueueProducer< int > delta_tx_tick_producer_
Emits the number of deltas sent since last report.
std::shared_ptr< SKDeltaQueue > sk_delta_queue_
void on_connected()
Called when the websocket connection is established.
static std::shared_ptr< SensESPApp > get()
Get the singleton instance of the SensESPApp.
Definition sensesp_app.h:54
static String get_hostname()
Get the current hostname.
void emit(const SKWSConnectionState &new_value)
std::shared_ptr< reactesp::EventLoop > event_loop()
Definition sensesp.cpp:9
String generate_uuid4()
Generate a random UUIDv4 string.
Definition uuid.cpp:5
constexpr int kWsClientTaskStackSize
SKWSClient * ws_client
constexpr TickType_t kWsSendTimeoutTicks
void ExecuteWebSocketTask(void *)
constexpr int kWsTransportTaskStackSize