SensESP 3.2.2
Universal Signal K sensor toolkit ESP32
Loading...
Searching...
No Matches
signalk_ws_client.cpp
Go to the documentation of this file.
1#include "sensesp.h"
2
3#include "signalk_ws_client.h"
4
5#include <ArduinoJson.h>
6#include <ESPmDNS.h>
7#include <esp_http_client.h>
8
9#ifdef SENSESP_SSL_SUPPORT
10#include <mbedtls/sha256.h>
11#include <mbedtls/ssl.h>
12#include <mbedtls/x509_crt.h>
13#endif
14
15#include "Arduino.h"
16#include "elapsedMillis.h"
17#include "esp_arduino_version.h"
21#include "sensesp/system/uuid.h"
22#include "sensesp_app.h"
23
24namespace sensesp {
25
26constexpr int kWsClientTaskStackSize = 8192; // Reduced from 16KB to save heap
27
29
30static const char* kRequestPermission = "readwrite";
31
32#ifdef SENSESP_SSL_SUPPORT
33// Convert a SHA256 hash to hex string
34static void sha256_to_hex(const uint8_t* sha256, char* hex) {
35 for (int i = 0; i < 32; i++) {
36 sprintf(hex + (i * 2), "%02x", sha256[i]);
37 }
38 hex[64] = '\0';
39}
40
41// TOFU verification callback - called during SSL handshake
42// Returns 0 to allow connection, non-zero to reject
43static int tofu_verify_callback(void* ctx, mbedtls_x509_crt* crt,
44 int depth, uint32_t* flags) {
45 // Only check the server certificate (depth 0), not the CA chain
46 if (depth != 0) {
47 *flags = 0; // Clear errors for intermediate certs
48 return 0;
49 }
50
51 SKWSClient* client = static_cast<SKWSClient*>(ctx);
52 if (client == nullptr) {
53 ESP_LOGW("SKWSClient", "TOFU: No client context, allowing connection");
54 *flags = 0;
55 return 0;
56 }
57
58 // Compute SHA256 of the certificate
59 uint8_t sha256[32];
60 mbedtls_sha256_context sha256_ctx;
61 mbedtls_sha256_init(&sha256_ctx);
62 mbedtls_sha256_starts(&sha256_ctx, 0); // 0 = SHA256 (not SHA224)
63 mbedtls_sha256_update(&sha256_ctx, crt->raw.p, crt->raw.len);
64 mbedtls_sha256_finish(&sha256_ctx, sha256);
65 mbedtls_sha256_free(&sha256_ctx);
66
67 char hex[65];
68 sha256_to_hex(sha256, hex);
69 String current_fingerprint = String(hex);
70
71 ESP_LOGD("SKWSClient", "Server certificate fingerprint: %s", hex);
72
73 if (!client->is_tofu_enabled()) {
74 // TOFU disabled, allow any certificate
75 ESP_LOGD("SKWSClient", "TOFU disabled, allowing connection");
76 *flags = 0;
77 return 0;
78 }
79
80 if (!client->has_tofu_fingerprint()) {
81 // First connection - capture the fingerprint
82 ESP_LOGI("SKWSClient", "TOFU: First connection, capturing fingerprint: %s", hex);
83 client->set_tofu_fingerprint(current_fingerprint);
84 *flags = 0;
85 return 0;
86 }
87
88 // Verify against stored fingerprint
89 if (client->get_tofu_fingerprint() == current_fingerprint) {
90 ESP_LOGD("SKWSClient", "TOFU: Fingerprint verified successfully");
91 *flags = 0;
92 return 0;
93 }
94
95 // Fingerprint mismatch!
96 ESP_LOGE("SKWSClient", "TOFU: Fingerprint mismatch!");
97 ESP_LOGE("SKWSClient", " Expected: %s", client->get_tofu_fingerprint().c_str());
98 ESP_LOGE("SKWSClient", " Received: %s", hex);
99 // Return error to reject the connection
100 return MBEDTLS_ERR_X509_CERT_VERIFY_FAILED;
101}
102
103// Certificate bundle attach function for TOFU verification
104// Sets up a custom verification callback that implements Trust On First Use
105static esp_err_t tofu_crt_bundle_attach(void* conf) {
106 mbedtls_ssl_config* ssl_conf = static_cast<mbedtls_ssl_config*>(conf);
107 // Use OPTIONAL so we can handle verification ourselves
108 mbedtls_ssl_conf_authmode(ssl_conf, MBEDTLS_SSL_VERIFY_OPTIONAL);
109 mbedtls_ssl_conf_verify(ssl_conf, tofu_verify_callback, ws_client);
110 ESP_LOGD("SKWSClient", "TOFU verification callback installed");
111 return ESP_OK;
112}
113#endif // SENSESP_SSL_SUPPORT
114
115static void websocket_event_handler(void* handler_args,
116 esp_event_base_t base,
117 int32_t event_id, void* event_data) {
118 esp_websocket_event_data_t* data = (esp_websocket_event_data_t*)event_data;
119 switch (event_id) {
120 case WEBSOCKET_EVENT_CONNECTED:
121 ws_client->on_connected();
122 break;
123 case WEBSOCKET_EVENT_DISCONNECTED:
124 ws_client->on_disconnected();
125 break;
126 case WEBSOCKET_EVENT_DATA:
127 ws_client->on_receive_delta((uint8_t*)data->data_ptr, data->data_len);
128 break;
129 case WEBSOCKET_EVENT_ERROR:
130 ws_client->on_error();
131 break;
132 }
133}
134
135void ExecuteWebSocketTask(void* /*parameter*/) {
136 elapsedMillis connect_loop_elapsed = 0;
137 elapsedMillis delta_loop_elapsed = 0;
138
139 ws_client->connect();
140
141 while (true) {
142 if (connect_loop_elapsed > 2000) {
143 connect_loop_elapsed = 0;
144 ws_client->connect();
145 }
146 if (delta_loop_elapsed > 5) {
147 delta_loop_elapsed = 0;
148 ws_client->send_delta();
149 }
150 delay(20);
151 }
152}
153
154
155SKWSClient::SKWSClient(const String& config_path,
156 std::shared_ptr<SKDeltaQueue> sk_delta_queue,
157 const String& server_address, uint16_t server_port,
158 bool use_mdns)
159 : FileSystemSaveable{config_path},
160 conf_server_address_{server_address},
161 conf_server_port_{server_port},
162 use_mdns_{use_mdns},
163 sk_delta_queue_{sk_delta_queue} {
164 // a SKWSClient object observes its own connection_state_ member
165 // and simply passes through any notification it emits. As a result,
166 // whenever the value of connection_state_ is updated, observers of the
167 // SKWSClient object get automatically notified.
168 this->connection_state_.attach(
169 [this]() { this->emit(this->connection_state_.get()); });
170
171 // process any received updates in the main task
172 event_loop()->onRepeat(1, [this]() { this->process_received_updates(); });
173
174 // set the singleton object pointer
175 ws_client = this;
176
177 load();
178
179 // Connect the counters
181
182 event_loop()->onDelay(0, [this]() {
183 ESP_LOGD(__FILENAME__, "Starting SKWSClient");
184 xTaskCreate(ExecuteWebSocketTask, "SKWSClient", kWsClientTaskStackSize,
185 this, 1, NULL);
186 MDNS.addService("signalk-sensesp", "tcp", 80);
187 });
188}
189
195
205 // Going from connecting directly to disconnect when we
206 // know we have found and talked to the server usually means
207 // the authentication token is bad.
208 ESP_LOGW(__FILENAME__, "Bad access token detected. Setting token to null.");
209 auth_token_ = NULL_AUTH_TOKEN;
210 save();
211 }
213 server_detected_ = false;
214}
215
224 ESP_LOGW(__FILENAME__, "Websocket client error.");
225}
226
234 this->sk_delta_queue_->reset_meta_send();
235 ESP_LOGI(__FILENAME__, "Subscribing to Signal K listeners...");
236 this->subscribe_listeners();
237}
238
246 bool output_available = false;
247 JsonDocument subscription;
248 subscription["context"] = "vessels.self";
249
251 const std::vector<SKListener*>& listeners = SKListener::get_listeners();
252
253 if (listeners.size() > 0) {
254 output_available = true;
255 JsonArray subscribe = subscription["subscribe"].to<JsonArray>();
256
257 for (size_t i = 0; i < listeners.size(); i++) {
258 auto* listener = listeners.at(i);
259 String sk_path = listener->get_sk_path();
260 int listen_delay = listener->get_listen_delay();
261
262 JsonObject subscribe_path = subscribe.add<JsonObject>();
263
264 subscribe_path["path"] = sk_path;
265 subscribe_path["period"] = listen_delay;
266 ESP_LOGI(__FILENAME__, "Adding %s subscription with listen_delay %d\n",
267 sk_path.c_str(), listen_delay);
268 }
269 }
271
272 if (output_available) {
273 String json_message;
274
275 serializeJson(subscription, json_message);
276 ESP_LOGI(__FILENAME__, "Subscription JSON message:\n %s",
277 json_message.c_str());
278 esp_websocket_client_send_text(client_, json_message.c_str(),
279 json_message.length(), portMAX_DELAY);
280 }
281}
282
290void SKWSClient::on_receive_delta(uint8_t* payload, size_t length) {
291 // Need to work on null-terminated strings
292 char buf[length + 1];
293 memcpy(buf, payload, length);
294 buf[length] = 0;
295
296#ifdef SIGNALK_PRINT_RCV_DELTA
297 ESP_LOGD(__FILENAME__, "Websocket payload received: %s", (char*)buf);
298#endif
299
300 JsonDocument message;
301 // JsonObject message = jsonDoc.as<JsonObject>();
302 auto error = deserializeJson(message, buf);
303
304 if (!error) {
305 if (message["updates"].is<JsonVariant>()) {
306 on_receive_updates(message);
307 }
308
309 if (message["put"].is<JsonVariant>()) {
310 on_receive_put(message);
311 }
312
313 // Putrequest contains also requestId Key GA
314 if (message["requestId"].is<JsonVariant>() &&
315 !message["put"].is<JsonVariant>()) {
317 }
318 } else {
319 ESP_LOGE(__FILENAME__, "deserializeJson error: %s", error.c_str());
320 }
321}
322
330void SKWSClient::on_receive_updates(JsonDocument& message) {
331 // Process updates from subscriptions...
332 JsonArray updates = message["updates"];
333
335 for (size_t i = 0; i < updates.size(); i++) {
336 JsonObject update = updates[i];
337
338 JsonArray values = update["values"];
339
340 for (size_t vi = 0; vi < values.size(); vi++) {
341 JsonDocument value_doc =
342 static_cast<JsonDocument>(static_cast<JsonObject>((values[vi])));
343
344 // push all values into a separate list for processing
345 // in the main task
346 received_updates_.push_back(value_doc);
347 }
348 }
350}
351
360
361 const std::vector<SKListener*>& listeners = SKListener::get_listeners();
362 const std::vector<SKPutListener*>& put_listeners =
364
366 int num_updates = received_updates_.size();
367 while (!received_updates_.empty()) {
368 JsonDocument& doc = received_updates_.front();
369
370 const char* path = doc["path"];
371 JsonObject value = doc.as<JsonObject>();
372
373 for (size_t i = 0; i < listeners.size(); i++) {
374 SKListener* listener = listeners[i];
375 if (listener->get_sk_path().equals(path)) {
376 listener->parse_value(value);
377 }
378 }
379 // to be able to parse values of Put Listeners GA
380 for (size_t i = 0; i < put_listeners.size(); i++) {
381 SKPutListener* listener = put_listeners[i];
382 if (listener->get_sk_path().equals(path)) {
383 listener->parse_value(value);
384 }
385 }
386 received_updates_.pop_front();
387 }
389 delta_rx_count_producer_.set(num_updates);
390
392}
393
401void SKWSClient::on_receive_put(JsonDocument& message) {
402 // Process PUT requests...
403 JsonArray puts = message["put"];
404 size_t response_count = 0;
405 for (size_t i = 0; i < puts.size(); i++) {
406 JsonObject value = puts[i];
407 const char* path = value["path"];
408 String str_val = value["value"].as<String>();
409
411 const std::vector<SKPutListener*>& listeners =
413 for (size_t i = 0; i < listeners.size(); i++) {
414 SKPutListener* listener = listeners[i];
415 if (listener->get_sk_path().equals(path)) {
417 received_updates_.push_back(value);
419 response_count++;
420 }
421 }
423
424 // Send back a request response...
425 JsonDocument put_response;
426 put_response["requestId"] = message["requestId"];
427 if (response_count == puts.size()) {
428 // We found a response for every PUT request
429 put_response["state"] = "COMPLETED";
430 put_response["statusCode"] = 200;
431 } else {
432 // One or more requests did not have a matching path
433 put_response["state"] = "FAILED";
434 put_response["statusCode"] = 405;
435 }
436 String response_text;
437 serializeJson(put_response, response_text);
438 esp_websocket_client_send_text(client_, response_text.c_str(),
439 response_text.length(), portMAX_DELAY);
440 }
441}
442
450void SKWSClient::sendTXT(String& payload) {
452 esp_websocket_client_send_text(client_, payload.c_str(), payload.length(),
453 portMAX_DELAY);
454 }
455}
456
457bool SKWSClient::get_mdns_service(String& server_address,
458 uint16_t& server_port) {
459 // get IP address using an mDNS query
460 // Try SSL service first, then fall back to non-SSL
461 int num = MDNS.queryService("signalk-wss", "tcp");
462 if (num > 0) {
463 // Found SSL-enabled server
464 ssl_enabled_ = true;
465 ESP_LOGI(__FILENAME__, "Found Signal K server via mDNS (signalk-wss)");
466 } else {
467 // Try non-SSL service
468 num = MDNS.queryService("signalk-ws", "tcp");
469 if (num == 0) {
470 // no service found
471 return false;
472 }
473 // Found non-SSL server, disable SSL
474 ssl_enabled_ = false;
475 ESP_LOGI(__FILENAME__, "Found Signal K server via mDNS (signalk-ws)");
476 }
477
478#if ESP_ARDUINO_VERSION_MAJOR < 3
479 server_address = MDNS.IP(0).toString();
480#else
481 server_address = MDNS.address(0).toString();
482#endif
483 server_port = MDNS.port(0);
484 ESP_LOGI(__FILENAME__, "Found server %s (port %d)", server_address.c_str(),
485 server_port);
486 return true;
487}
488
490 // Try to detect if the server requires SSL by checking for HTTP->HTTPS
491 // redirects
492 String url =
493 String("http://") + server_address_ + ":" + server_port_ + "/signalk";
494
495 ESP_LOGD(__FILENAME__, "Probing for SSL redirect at %s", url.c_str());
496
497 esp_http_client_config_t config = {};
498 config.url = url.c_str();
499 config.disable_auto_redirect = true;
500 config.timeout_ms = 10000;
501
502 esp_http_client_handle_t client = esp_http_client_init(&config);
503 if (client == nullptr) {
504 ESP_LOGE(__FILENAME__, "Failed to initialize HTTP client");
505 return false;
506 }
507
508 esp_err_t err = esp_http_client_perform(client);
509 if (err != ESP_OK) {
510 ESP_LOGD(__FILENAME__, "HTTP request failed: %s", esp_err_to_name(err));
511 esp_http_client_cleanup(client);
512 return false;
513 }
514
515 int http_code = esp_http_client_get_status_code(client);
516
517 if (http_code == 301 || http_code == 302 || http_code == 307 ||
518 http_code == 308) {
519 // Check Location header for HTTPS redirect
520 char* location = nullptr;
521 esp_http_client_get_header(client, "Location", &location);
522 esp_http_client_cleanup(client);
523
524 if (location != nullptr && strncmp(location, "https://", 8) == 0) {
525 ESP_LOGI(__FILENAME__, "SSL redirect detected to %s, enabling HTTPS/WSS",
526 location);
527 ssl_enabled_ = true;
528 save();
529 return true;
530 }
531 }
532
533 esp_http_client_cleanup(client);
534 return false;
535}
536
537
540 return;
541 }
542
543 if (!WiFi.isConnected() && WiFi.getMode() != WIFI_MODE_AP) {
544 ESP_LOGI(
545 __FILENAME__,
546 "WiFi is disconnected. SignalK client connection will be initiated "
547 "when WiFi is connected.");
548 return;
549 }
550
551 ESP_LOGI(__FILENAME__, "Initiating websocket connection with server...");
552
554 if (use_mdns_) {
555 if (!get_mdns_service(this->server_address_, this->server_port_)) {
556 ESP_LOGE(__FILENAME__,
557 "No Signal K server found in network when using mDNS service!");
558 } else {
559 ESP_LOGI(__FILENAME__,
560 "Signal K server has been found at address %s:%d by mDNS.",
561 this->server_address_.c_str(), this->server_port_);
562 }
563 } else {
565 this->server_port_ = this->conf_server_port_;
566 }
567
568 if (!this->server_address_.isEmpty() && this->server_port_ > 0) {
569 ESP_LOGD(__FILENAME__,
570 "Websocket is connecting to Signal K server on address %s:%d",
571 this->server_address_.c_str(), this->server_port_);
572
573 // Detect if server requires SSL (check for HTTP->HTTPS redirects)
574 if (!ssl_enabled_) {
575 detect_ssl();
576 }
577 } else {
578 // host and port not defined - don't try to connect
579 ESP_LOGD(__FILENAME__,
580 "Websocket is not connecting to Signal K server because host and "
581 "port are not defined.");
583 return;
584 }
585
586 if (this->polling_href_.length() > 0 && this->polling_href_.startsWith("/")) {
587 // existing pending request
589 this->polling_href_);
590 return;
591 }
592
593 if (this->auth_token_ == NULL_AUTH_TOKEN) {
594 // initiate HTTP authentication
595 ESP_LOGD(__FILENAME__, "No prior authorization token present.");
597 return;
598 }
599
600 // Test the validity of the authorization token
601 this->test_token(this->server_address_, this->server_port_);
602}
603
605 // No-op: esp_websocket_client handles data via event callbacks
606}
607
608void SKWSClient::test_token(const String server_address,
609 const uint16_t server_port) {
610 String protocol = ssl_enabled_ ? "https://" : "http://";
611 String url = protocol + server_address + ":" + server_port +
612 "/signalk/v1/stream";
613 ESP_LOGD(__FILENAME__, "Testing token with url %s", url.c_str());
614
615 const String full_token = String("Bearer ") + auth_token_;
616 ESP_LOGD(__FILENAME__, "Authorization: %s", full_token.c_str());
617
618 esp_http_client_config_t config = {};
619 config.url = url.c_str();
620 config.timeout_ms = 10000;
621#ifdef SENSESP_SSL_SUPPORT
622 if (ssl_enabled_) {
623 config.crt_bundle_attach = tofu_crt_bundle_attach;
624 config.skip_cert_common_name_check = true;
625 }
626#endif
627
628 esp_http_client_handle_t client = esp_http_client_init(&config);
629 if (client == nullptr) {
630 ESP_LOGE(__FILENAME__, "Failed to initialize HTTP client");
632 return;
633 }
634
635 esp_http_client_set_header(client, "Authorization", full_token.c_str());
636
637 // Use streaming API for GET request
638 esp_err_t err = esp_http_client_open(client, 0);
639 if (err != ESP_OK) {
640 ESP_LOGE(__FILENAME__, "Failed to open HTTP connection: %s", esp_err_to_name(err));
641 esp_http_client_cleanup(client);
643 return;
644 }
645
646 int content_length = esp_http_client_fetch_headers(client);
647 int http_code = esp_http_client_get_status_code(client);
648
649 ESP_LOGD(__FILENAME__, "Testing resulted in http status %d", http_code);
650
651 // Read response body
652 String payload;
653 if (content_length > 0 && content_length < 4096) {
654 char* buffer = new char[content_length + 1];
655 int read_len = esp_http_client_read(client, buffer, content_length);
656 buffer[read_len > 0 ? read_len : 0] = '\0';
657 payload = String(buffer);
658 delete[] buffer;
659 } else {
660 // Chunked encoding or unknown/large content length - read in chunks
661 char buffer[512];
662 int read_len;
663 while ((read_len = esp_http_client_read(client, buffer, sizeof(buffer) - 1)) > 0) {
664 buffer[read_len] = '\0';
665 payload += String(buffer);
666 if (payload.length() > 4096) break;
667 }
668 }
669
670 esp_http_client_close(client);
671 esp_http_client_cleanup(client);
672
673 if (payload.length() > 0) {
674 ESP_LOGD(__FILENAME__, "Returned payload (%d bytes): %s",
675 payload.length(), payload.c_str());
676 }
677
678 if (http_code == 426) {
679 // HTTP status 426 is "Upgrade Required", which is the expected
680 // response for a websocket connection.
681 ESP_LOGD(__FILENAME__, "Attempting to connect to Signal K Websocket...");
682 server_detected_ = true;
683 token_test_success_ = true;
684 this->connect_ws(server_address, server_port);
685 } else if (http_code == 401) {
686 // Token is invalid/expired - clear it and request new access
687 // Keep client_id_ so we reuse the same device identity
688 ESP_LOGW(__FILENAME__, "Token rejected (401), requesting new access");
689 this->auth_token_ = NULL_AUTH_TOKEN;
690 this->save();
691 this->send_access_request(server_address, server_port);
692 } else if (http_code > 0) {
694 } else {
695 ESP_LOGE(__FILENAME__, "HTTP request failed with code %d", http_code);
697 }
698}
699
700void SKWSClient::send_access_request(const String server_address,
701 const uint16_t server_port) {
702 ESP_LOGD(__FILENAME__, "Sending access request (client_id=%s, ssl=%d)",
703 client_id_.c_str(), ssl_enabled_);
704 if (client_id_ == "") {
705 // generate a client ID
707 save();
708 }
709
710 // create a new access request
711 JsonDocument doc;
712 doc["clientId"] = client_id_;
713 doc["description"] =
714 String("SensESP device: ") + SensESPBaseApp::get_hostname();
715 doc["permissions"] = kRequestPermission;
716 String json_req = "";
717 serializeJson(doc, json_req);
718
719 ESP_LOGD(__FILENAME__, "Access request: %s", json_req.c_str());
720
721 String protocol = ssl_enabled_ ? "https://" : "http://";
722 String url = protocol + server_address + ":" + server_port +
723 "/signalk/v1/access/requests";
724 ESP_LOGD(__FILENAME__, "Access request url: %s", url.c_str());
725
726 esp_http_client_config_t config = {};
727 config.url = url.c_str();
728 config.method = HTTP_METHOD_POST;
729 config.timeout_ms = 10000;
730#ifdef SENSESP_SSL_SUPPORT
731 if (ssl_enabled_) {
732 config.crt_bundle_attach = tofu_crt_bundle_attach;
733 config.skip_cert_common_name_check = true;
734 }
735#endif
736
737 esp_http_client_handle_t client = esp_http_client_init(&config);
738 if (client == nullptr) {
739 ESP_LOGE(__FILENAME__, "Failed to initialize HTTP client");
741 // Don't clear client_id_ - keep device identity for retry
742 return;
743 }
744
745 esp_http_client_set_header(client, "Content-Type", "application/json");
746
747 // Use streaming API: open -> write request -> fetch headers -> read response
748 esp_err_t err = esp_http_client_open(client, json_req.length());
749 if (err != ESP_OK) {
750 ESP_LOGE(__FILENAME__, "Failed to open HTTP connection: %s", esp_err_to_name(err));
751 esp_http_client_cleanup(client);
753 return;
754 }
755
756 int write_len = esp_http_client_write(client, json_req.c_str(), json_req.length());
757 if (write_len < 0 || write_len != (int)json_req.length()) {
758 ESP_LOGE(__FILENAME__, "Failed to write request body (wrote %d of %d bytes)",
759 write_len, json_req.length());
760 esp_http_client_close(client);
761 esp_http_client_cleanup(client);
763 return;
764 }
765
766 int content_length = esp_http_client_fetch_headers(client);
767 int http_code = esp_http_client_get_status_code(client);
768
769 ESP_LOGD(__FILENAME__, "HTTP response: code=%d, content_length=%d", http_code, content_length);
770
771 // Read response body
772 String payload;
773 char buffer[512];
774 int read_len;
775 while ((read_len = esp_http_client_read(client, buffer, sizeof(buffer) - 1)) > 0) {
776 buffer[read_len] = '\0';
777 payload += String(buffer);
778 if (payload.length() > 4096) break;
779 }
780 ESP_LOGD(__FILENAME__, "Response payload (%d bytes): %s",
781 payload.length(), payload.c_str());
782
783 esp_http_client_close(client);
784 esp_http_client_cleanup(client);
785
786 // Parse JSON response for both 202 and 400 status codes
787 deserializeJson(doc, payload.c_str());
788 String state = doc["state"].is<const char*>() ? doc["state"].as<String>() : "";
789 String href = doc["href"].is<const char*>() ? doc["href"].as<String>() : "";
790 String message = doc["message"].is<const char*>() ? doc["message"].as<String>() : "";
791
792 ESP_LOGD(__FILENAME__, "Access request response: http=%d, state=%s, href=%s",
793 http_code, state.c_str(), href.c_str());
794 if (message.length() > 0) {
795 ESP_LOGI(__FILENAME__, "Server message: %s", message.c_str());
796 }
797
798 // HTTP 400 with href means "already requested" - we can poll the existing request
799 if (http_code == 400 && href.length() > 0 && href.startsWith("/")) {
800 ESP_LOGI(__FILENAME__, "Existing request found, polling href: %s", href.c_str());
801 polling_href_ = href;
802 save();
803 delay(5000);
804 this->poll_access_request(server_address, server_port, this->polling_href_);
805 return;
806 }
807
808 // HTTP 202 with href means new request pending
809 if (http_code == 202 && href.length() > 0 && href.startsWith("/")) {
810 polling_href_ = href;
811 save();
812 delay(5000);
813 this->poll_access_request(server_address, server_port, this->polling_href_);
814 return;
815 }
816
817 // Can't proceed - disconnect and retry later
818 ESP_LOGW(__FILENAME__, "Cannot handle response: http=%d, state=%s", http_code, state.c_str());
820}
821
822void SKWSClient::poll_access_request(const String server_address,
823 const uint16_t server_port,
824 const String href) {
825 ESP_LOGD(__FILENAME__, "Polling SK Server for authentication token");
826
827 String protocol = ssl_enabled_ ? "https://" : "http://";
828 String url = protocol + server_address + ":" + server_port + href;
829
830 esp_http_client_config_t config = {};
831 config.url = url.c_str();
832 config.timeout_ms = 10000;
833#ifdef SENSESP_SSL_SUPPORT
834 if (ssl_enabled_) {
835 config.crt_bundle_attach = tofu_crt_bundle_attach;
836 config.skip_cert_common_name_check = true;
837 }
838#endif
839
840 esp_http_client_handle_t client = esp_http_client_init(&config);
841 if (client == nullptr) {
842 ESP_LOGE(__FILENAME__, "Failed to initialize HTTP client");
844 return;
845 }
846
847 // Use streaming API for GET request
848 esp_err_t err = esp_http_client_open(client, 0);
849 if (err != ESP_OK) {
850 ESP_LOGE(__FILENAME__, "Failed to open HTTP connection: %s", esp_err_to_name(err));
851 esp_http_client_cleanup(client);
853 return;
854 }
855
856 int content_length = esp_http_client_fetch_headers(client);
857 int http_code = esp_http_client_get_status_code(client);
858
859 // Read response body
860 String payload;
861 if (content_length > 0 && content_length < 4096) {
862 char* buffer = new char[content_length + 1];
863 int read_len = esp_http_client_read(client, buffer, content_length);
864 buffer[read_len > 0 ? read_len : 0] = '\0';
865 payload = String(buffer);
866 delete[] buffer;
867 } else {
868 // Chunked encoding or unknown/large content length - read in chunks
869 char buffer[512];
870 int read_len;
871 while ((read_len = esp_http_client_read(client, buffer, sizeof(buffer) - 1)) > 0) {
872 buffer[read_len] = '\0';
873 payload += String(buffer);
874 if (payload.length() > 4096) break;
875 }
876 }
877
878 ESP_LOGD(__FILENAME__, "Poll response: http=%d, payload=%s", http_code, payload.c_str());
879
880 esp_http_client_close(client);
881 esp_http_client_cleanup(client);
882
883 if (http_code == 200 || http_code == 202) {
884 JsonDocument doc;
885 auto error = deserializeJson(doc, payload.c_str());
886 if (error) {
887 ESP_LOGW(__FILENAME__, "WARNING: Could not deserialize http payload.");
888 ESP_LOGW(__FILENAME__, "DeserializationError: %s", error.c_str());
889 return;
890 }
891 String state = doc["state"];
892 ESP_LOGD(__FILENAME__, "%s", state.c_str());
893 if (state == "PENDING") {
895 delay(5000);
896 return;
897 }
898 if (state == "COMPLETED") {
899 JsonObject access_req = doc["accessRequest"];
900 String permission = access_req["permission"];
901
902 polling_href_ = "";
903 save();
904
905 if (permission == "DENIED") {
906 ESP_LOGW(__FILENAME__, "Permission denied");
908 return;
909 }
910
911 if (permission == "APPROVED") {
912 ESP_LOGI(__FILENAME__, "Permission granted");
913 String token = access_req["token"];
914 auth_token_ = token;
915 save();
916 this->connect_ws(server_address, server_port);
917 return;
918 }
919 }
920 } else {
921 if (http_code == 500) {
922 // this is probably the server barfing due to
923 // us polling a non-existing request. Just
924 // delete the polling href.
925 ESP_LOGD(__FILENAME__, "Got 500, probably a non-existing request.");
926 polling_href_ = "";
927 save();
929 return;
930 }
931 // any other HTTP status code
932 ESP_LOGW(__FILENAME__,
933 "Can't handle response %d to pending access request.\n",
934 http_code);
936 return;
937 }
938}
939
940void SKWSClient::connect_ws(const String& host, const uint16_t port) {
942
943 String protocol = ssl_enabled_ ? "wss" : "ws";
944 String path = "/signalk/v1/stream?subscribe=none";
945 String url = protocol + "://" + host + ":" + String(port) + path;
946
947 ESP_LOGD(__FILENAME__, "Connecting WebSocket to %s", url.c_str());
948
949 // Build authorization header string (must persist through init call)
950 String auth_header;
951 if (auth_token_ != NULL_AUTH_TOKEN) {
952 auth_header = String("Authorization: Bearer ") + auth_token_ + "\r\n";
953 }
954
955 // Configure WebSocket client
956 esp_websocket_client_config_t config = {};
957 config.uri = url.c_str();
958 config.task_stack = 8192;
959 config.buffer_size = 1024;
960 if (auth_header.length() > 0) {
961 config.headers = auth_header.c_str();
962 }
963
964#ifdef SENSESP_SSL_SUPPORT
965 if (ssl_enabled_) {
966 // Use custom crt_bundle_attach to disable SSL verification
967 // This directly configures mbedTLS to skip certificate verification
968 config.crt_bundle_attach = tofu_crt_bundle_attach;
969 config.skip_cert_common_name_check = true;
970 }
971#endif
972
973 // Destroy any existing client
974 if (client_ != nullptr) {
975 esp_websocket_client_stop(client_);
976 esp_websocket_client_destroy(client_);
977 client_ = nullptr;
978 }
979
980 client_ = esp_websocket_client_init(&config);
981 if (client_ == nullptr) {
982 ESP_LOGE(__FILENAME__, "Failed to initialize WebSocket client");
984 return;
985 }
986
987 // Register event handler
988 esp_websocket_register_events(client_, WEBSOCKET_EVENT_ANY,
989 websocket_event_handler, nullptr);
990
991 // Start the client
992 esp_err_t err = esp_websocket_client_start(client_);
993 if (err != ESP_OK) {
994 ESP_LOGE(__FILENAME__, "Failed to start WebSocket client: %s",
995 esp_err_to_name(err));
996 esp_websocket_client_destroy(client_);
997 client_ = nullptr;
999 return;
1000 }
1001
1002 ESP_LOGD(__FILENAME__, "WebSocket client started, waiting for connection...");
1003}
1004
1008
1010 if (client_ != nullptr) {
1011 esp_websocket_client_stop(client_);
1012 esp_websocket_client_destroy(client_);
1013 client_ = nullptr;
1014 }
1016}
1017
1019 String output;
1021 if (sk_delta_queue_->data_available()) {
1022 sk_delta_queue_->get_delta(output);
1023 esp_websocket_client_send_text(client_, output.c_str(), output.length(),
1024 portMAX_DELAY);
1025 // This automatically notifies the observers
1026 this->delta_tx_tick_producer_.set(1);
1027 }
1028 }
1029}
1030
1031bool SKWSClient::to_json(JsonObject& root) {
1032 root["sk_address"] = this->conf_server_address_;
1033 root["sk_port"] = this->conf_server_port_;
1034 root["use_mdns"] = this->use_mdns_;
1035
1036 root["token"] = this->auth_token_;
1037 root["client_id"] = this->client_id_;
1038 root["polling_href"] = this->polling_href_;
1039
1040 root["ssl_enabled"] = this->ssl_enabled_;
1041 root["tofu_enabled"] = this->tofu_enabled_;
1042 root["tofu_fingerprint"] = this->tofu_fingerprint_;
1043 return true;
1044}
1045
1046bool SKWSClient::from_json(const JsonObject& config) {
1047 if (config["sk_address"].is<String>()) {
1048 this->conf_server_address_ = config["sk_address"].as<String>();
1049 }
1050 if (config["sk_port"].is<int>()) {
1051 this->conf_server_port_ = config["sk_port"].as<int>();
1052 }
1053 if (config["use_mdns"].is<bool>()) {
1054 this->use_mdns_ = config["use_mdns"].as<bool>();
1055 }
1056 if (config["token"].is<String>()) {
1057 this->auth_token_ = config["token"].as<String>();
1058 }
1059 if (config["client_id"].is<String>()) {
1060 this->client_id_ = config["client_id"].as<String>();
1061 }
1062 if (config["polling_href"].is<String>()) {
1063 String href = config["polling_href"].as<String>();
1064 // Only accept valid hrefs (must start with /)
1065 this->polling_href_ = href.startsWith("/") ? href : "";
1066 }
1067
1068 if (config["ssl_enabled"].is<bool>()) {
1069 this->ssl_enabled_ = config["ssl_enabled"].as<bool>();
1070 }
1071 if (config["tofu_enabled"].is<bool>()) {
1072 this->tofu_enabled_ = config["tofu_enabled"].as<bool>();
1073 }
1074 if (config["tofu_fingerprint"].is<String>()) {
1075 this->tofu_fingerprint_ = config["tofu_fingerprint"].as<String>();
1076 }
1077
1078 return true;
1079}
1080
1087 auto state = get_connection_state();
1088 switch (state) {
1090 return "Authorizing with SignalK";
1092 return "Connected";
1094 return "Connecting";
1096 return "Disconnected";
1097 }
1098
1099 return "Unknown";
1100}
1101
1102} // namespace sensesp
virtual bool load() override
Load and populate the object from a persistent storage.
Definition saveable.cpp:8
virtual bool save() override
Save the object to a persistent storage.
Definition saveable.cpp:41
FileSystemSaveable(const String &config_path)
Definition saveable.h:63
An Obervable class that listens for Signal K stream deltas and notifies any observers of value change...
static bool take_semaphore(uint64_t timeout_ms=0)
static void release_semaphore()
virtual void parse_value(const JsonObject &json)
static const std::vector< SKListener * > & get_listeners()
An Obervable class that listens for Signal K PUT requests coming over the websocket connection and no...
static const std::vector< SKPutListener * > & get_listeners()
virtual void parse_value(const JsonObject &put)=0
static void handle_response(JsonDocument &response)
The websocket connection to the Signal K server.
std::list< JsonDocument > received_updates_
void poll_access_request(const String host, const uint16_t port, const String href)
SKWSClient(const String &config_path, std::shared_ptr< SKDeltaQueue > sk_delta_queue, const String &server_address, uint16_t server_port, bool use_mdns=true)
void on_receive_delta(uint8_t *payload, size_t length)
Called when the websocket receives a delta.
void process_received_updates()
Loop through the received updates and process them.
void connect_ws(const String &host, const uint16_t port)
Integrator< int, int > delta_tx_count_producer_
TaskQueueProducer< SKWSConnectionState > connection_state_
SKWSConnectionState get_connection_state()
void on_receive_put(JsonDocument &message)
Called when a PUT event is received.
void release_received_updates_semaphore()
void send_access_request(const String host, const uint16_t port)
Integrator< int, int > delta_rx_count_producer_
virtual bool to_json(JsonObject &root) override final
esp_websocket_client_handle_t client_
void on_error()
Called when the websocket connection encounters an error.
void sendTXT(String &payload)
Send some processed data to the websocket.
bool get_mdns_service(String &server_address, uint16_t &server_port)
void on_disconnected()
Called when the websocket connection is disconnected.
void subscribe_listeners()
Subscribes the SK delta paths to the websocket.
void on_receive_updates(JsonDocument &message)
Called when a delta update is received.
void test_token(const String host, const uint16_t port)
void set_connection_state(SKWSConnectionState state)
bool take_received_updates_semaphore(unsigned long int timeout_ms=0)
virtual bool from_json(const JsonObject &config) override final
String get_connection_status()
Get a String representation of the current connection state.
TaskQueueProducer< int > delta_tx_tick_producer_
Emits the number of deltas sent since last report.
std::shared_ptr< SKDeltaQueue > sk_delta_queue_
void on_connected()
Called when the websocket connection is established.
static String get_hostname()
Get the current hostname.
void emit(const SKWSConnectionState &new_value)
std::shared_ptr< reactesp::EventLoop > event_loop()
Definition sensesp.cpp:9
String generate_uuid4()
Generate a random UUIDv4 string.
Definition uuid.cpp:5
constexpr int kWsClientTaskStackSize
SKWSClient * ws_client
void ExecuteWebSocketTask(void *)