5#include <ArduinoJson.h>
7#include <esp_http_client.h>
9#ifdef SENSESP_SSL_SUPPORT
10#include <mbedtls/sha256.h>
11#include <mbedtls/ssl.h>
12#include <mbedtls/x509_crt.h>
16#include "elapsedMillis.h"
17#include "esp_arduino_version.h"
30static const char* kRequestPermission =
"readwrite";
32#ifdef SENSESP_SSL_SUPPORT
34static void sha256_to_hex(
const uint8_t* sha256,
char* hex) {
35 for (
int i = 0; i < 32; i++) {
36 sprintf(hex + (i * 2),
"%02x", sha256[i]);
43static int tofu_verify_callback(
void* ctx, mbedtls_x509_crt* crt,
44 int depth, uint32_t* flags) {
52 if (client ==
nullptr) {
53 ESP_LOGW(
"SKWSClient",
"TOFU: No client context, allowing connection");
60 mbedtls_sha256_context sha256_ctx;
61 mbedtls_sha256_init(&sha256_ctx);
62 mbedtls_sha256_starts(&sha256_ctx, 0);
63 mbedtls_sha256_update(&sha256_ctx, crt->raw.p, crt->raw.len);
64 mbedtls_sha256_finish(&sha256_ctx, sha256);
65 mbedtls_sha256_free(&sha256_ctx);
68 sha256_to_hex(sha256, hex);
69 String current_fingerprint = String(hex);
71 ESP_LOGD(
"SKWSClient",
"Server certificate fingerprint: %s", hex);
73 if (!client->is_tofu_enabled()) {
75 ESP_LOGD(
"SKWSClient",
"TOFU disabled, allowing connection");
80 if (!client->has_tofu_fingerprint()) {
82 ESP_LOGI(
"SKWSClient",
"TOFU: First connection, capturing fingerprint: %s", hex);
83 client->set_tofu_fingerprint(current_fingerprint);
89 if (client->get_tofu_fingerprint() == current_fingerprint) {
90 ESP_LOGD(
"SKWSClient",
"TOFU: Fingerprint verified successfully");
96 ESP_LOGE(
"SKWSClient",
"TOFU: Fingerprint mismatch!");
97 ESP_LOGE(
"SKWSClient",
" Expected: %s", client->get_tofu_fingerprint().c_str());
98 ESP_LOGE(
"SKWSClient",
" Received: %s", hex);
100 return MBEDTLS_ERR_X509_CERT_VERIFY_FAILED;
105static esp_err_t tofu_crt_bundle_attach(
void* conf) {
106 mbedtls_ssl_config* ssl_conf =
static_cast<mbedtls_ssl_config*
>(conf);
108 mbedtls_ssl_conf_authmode(ssl_conf, MBEDTLS_SSL_VERIFY_OPTIONAL);
109 mbedtls_ssl_conf_verify(ssl_conf, tofu_verify_callback,
ws_client);
110 ESP_LOGD(
"SKWSClient",
"TOFU verification callback installed");
115static void websocket_event_handler(
void* handler_args,
116 esp_event_base_t base,
117 int32_t event_id,
void* event_data) {
118 esp_websocket_event_data_t* data = (esp_websocket_event_data_t*)event_data;
120 case WEBSOCKET_EVENT_CONNECTED:
123 case WEBSOCKET_EVENT_DISCONNECTED:
126 case WEBSOCKET_EVENT_DATA:
127 ws_client->on_receive_delta((uint8_t*)data->data_ptr, data->data_len);
129 case WEBSOCKET_EVENT_ERROR:
136 elapsedMillis connect_loop_elapsed = 0;
137 elapsedMillis delta_loop_elapsed = 0;
142 if (connect_loop_elapsed > 2000) {
143 connect_loop_elapsed = 0;
146 if (delta_loop_elapsed > 5) {
147 delta_loop_elapsed = 0;
156 std::shared_ptr<SKDeltaQueue> sk_delta_queue,
157 const String& server_address, uint16_t server_port,
183 ESP_LOGD(__FILENAME__,
"Starting SKWSClient");
186 MDNS.addService(
"signalk-sensesp",
"tcp", 80);
208 ESP_LOGW(__FILENAME__,
"Bad access token detected. Setting token to null.");
224 ESP_LOGW(__FILENAME__,
"Websocket client error.");
235 ESP_LOGI(__FILENAME__,
"Subscribing to Signal K listeners...");
246 bool output_available =
false;
247 JsonDocument subscription;
248 subscription[
"context"] =
"vessels.self";
253 if (listeners.size() > 0) {
254 output_available =
true;
255 JsonArray subscribe = subscription[
"subscribe"].to<JsonArray>();
257 for (
size_t i = 0; i < listeners.size(); i++) {
258 auto* listener = listeners.at(i);
259 String sk_path = listener->get_sk_path();
260 int listen_delay = listener->get_listen_delay();
262 JsonObject subscribe_path = subscribe.add<JsonObject>();
264 subscribe_path[
"path"] = sk_path;
265 subscribe_path[
"period"] = listen_delay;
266 ESP_LOGI(__FILENAME__,
"Adding %s subscription with listen_delay %d\n",
267 sk_path.c_str(), listen_delay);
272 if (output_available) {
275 serializeJson(subscription, json_message);
276 ESP_LOGI(__FILENAME__,
"Subscription JSON message:\n %s",
277 json_message.c_str());
278 esp_websocket_client_send_text(
client_, json_message.c_str(),
279 json_message.length(), portMAX_DELAY);
292 char buf[length + 1];
293 memcpy(buf, payload, length);
296#ifdef SIGNALK_PRINT_RCV_DELTA
297 ESP_LOGD(__FILENAME__,
"Websocket payload received: %s", (
char*)buf);
300 JsonDocument message;
302 auto error = deserializeJson(message, buf);
305 if (message[
"updates"].is<JsonVariant>()) {
309 if (message[
"put"].is<JsonVariant>()) {
314 if (message[
"requestId"].is<JsonVariant>() &&
315 !message[
"put"].is<JsonVariant>()) {
319 ESP_LOGE(__FILENAME__,
"deserializeJson error: %s", error.c_str());
332 JsonArray updates = message[
"updates"];
335 for (
size_t i = 0; i < updates.size(); i++) {
336 JsonObject update = updates[i];
338 JsonArray values = update[
"values"];
340 for (
size_t vi = 0; vi < values.size(); vi++) {
341 JsonDocument value_doc =
342 static_cast<JsonDocument
>(
static_cast<JsonObject
>((values[vi])));
362 const std::vector<SKPutListener*>& put_listeners =
370 const char* path = doc[
"path"];
371 JsonObject value = doc.as<JsonObject>();
373 for (
size_t i = 0; i < listeners.size(); i++) {
380 for (
size_t i = 0; i < put_listeners.size(); i++) {
403 JsonArray puts = message[
"put"];
404 size_t response_count = 0;
405 for (
size_t i = 0; i < puts.size(); i++) {
406 JsonObject value = puts[i];
407 const char* path = value[
"path"];
408 String str_val = value[
"value"].as<String>();
411 const std::vector<SKPutListener*>& listeners =
413 for (
size_t i = 0; i < listeners.size(); i++) {
425 JsonDocument put_response;
426 put_response[
"requestId"] = message[
"requestId"];
427 if (response_count == puts.size()) {
429 put_response[
"state"] =
"COMPLETED";
430 put_response[
"statusCode"] = 200;
433 put_response[
"state"] =
"FAILED";
434 put_response[
"statusCode"] = 405;
436 String response_text;
437 serializeJson(put_response, response_text);
438 esp_websocket_client_send_text(
client_, response_text.c_str(),
439 response_text.length(), portMAX_DELAY);
452 esp_websocket_client_send_text(
client_, payload.c_str(), payload.length(),
458 uint16_t& server_port) {
461 int num = MDNS.queryService(
"signalk-wss",
"tcp");
465 ESP_LOGI(__FILENAME__,
"Found Signal K server via mDNS (signalk-wss)");
468 num = MDNS.queryService(
"signalk-ws",
"tcp");
475 ESP_LOGI(__FILENAME__,
"Found Signal K server via mDNS (signalk-ws)");
478#if ESP_ARDUINO_VERSION_MAJOR < 3
479 server_address = MDNS.IP(0).toString();
481 server_address = MDNS.address(0).toString();
483 server_port = MDNS.port(0);
484 ESP_LOGI(__FILENAME__,
"Found server %s (port %d)", server_address.c_str(),
495 ESP_LOGD(__FILENAME__,
"Probing for SSL redirect at %s", url.c_str());
497 esp_http_client_config_t config = {};
498 config.url = url.c_str();
499 config.disable_auto_redirect =
true;
500 config.timeout_ms = 10000;
502 esp_http_client_handle_t client = esp_http_client_init(&config);
503 if (client ==
nullptr) {
504 ESP_LOGE(__FILENAME__,
"Failed to initialize HTTP client");
508 esp_err_t err = esp_http_client_perform(client);
510 ESP_LOGD(__FILENAME__,
"HTTP request failed: %s", esp_err_to_name(err));
511 esp_http_client_cleanup(client);
515 int http_code = esp_http_client_get_status_code(client);
517 if (http_code == 301 || http_code == 302 || http_code == 307 ||
520 char* location =
nullptr;
521 esp_http_client_get_header(client,
"Location", &location);
522 esp_http_client_cleanup(client);
524 if (location !=
nullptr && strncmp(location,
"https://", 8) == 0) {
525 ESP_LOGI(__FILENAME__,
"SSL redirect detected to %s, enabling HTTPS/WSS",
533 esp_http_client_cleanup(client);
543 if (!WiFi.isConnected() && WiFi.getMode() != WIFI_MODE_AP) {
546 "WiFi is disconnected. SignalK client connection will be initiated "
547 "when WiFi is connected.");
551 ESP_LOGI(__FILENAME__,
"Initiating websocket connection with server...");
556 ESP_LOGE(__FILENAME__,
557 "No Signal K server found in network when using mDNS service!");
559 ESP_LOGI(__FILENAME__,
560 "Signal K server has been found at address %s:%d by mDNS.",
569 ESP_LOGD(__FILENAME__,
570 "Websocket is connecting to Signal K server on address %s:%d",
579 ESP_LOGD(__FILENAME__,
580 "Websocket is not connecting to Signal K server because host and "
581 "port are not defined.");
586 if (this->
polling_href_.length() > 0 && this->polling_href_.startsWith(
"/")) {
595 ESP_LOGD(__FILENAME__,
"No prior authorization token present.");
609 const uint16_t server_port) {
610 String protocol =
ssl_enabled_ ?
"https://" :
"http://";
611 String url = protocol + server_address +
":" + server_port +
612 "/signalk/v1/stream";
613 ESP_LOGD(__FILENAME__,
"Testing token with url %s", url.c_str());
615 const String full_token = String(
"Bearer ") +
auth_token_;
616 ESP_LOGD(__FILENAME__,
"Authorization: %s", full_token.c_str());
618 esp_http_client_config_t config = {};
619 config.url = url.c_str();
620 config.timeout_ms = 10000;
621#ifdef SENSESP_SSL_SUPPORT
623 config.crt_bundle_attach = tofu_crt_bundle_attach;
624 config.skip_cert_common_name_check =
true;
628 esp_http_client_handle_t client = esp_http_client_init(&config);
629 if (client ==
nullptr) {
630 ESP_LOGE(__FILENAME__,
"Failed to initialize HTTP client");
635 esp_http_client_set_header(client,
"Authorization", full_token.c_str());
638 esp_err_t err = esp_http_client_open(client, 0);
640 ESP_LOGE(__FILENAME__,
"Failed to open HTTP connection: %s", esp_err_to_name(err));
641 esp_http_client_cleanup(client);
646 int content_length = esp_http_client_fetch_headers(client);
647 int http_code = esp_http_client_get_status_code(client);
649 ESP_LOGD(__FILENAME__,
"Testing resulted in http status %d", http_code);
653 if (content_length > 0 && content_length < 4096) {
654 char* buffer =
new char[content_length + 1];
655 int read_len = esp_http_client_read(client, buffer, content_length);
656 buffer[read_len > 0 ? read_len : 0] =
'\0';
657 payload = String(buffer);
663 while ((read_len = esp_http_client_read(client, buffer,
sizeof(buffer) - 1)) > 0) {
664 buffer[read_len] =
'\0';
665 payload += String(buffer);
666 if (payload.length() > 4096)
break;
670 esp_http_client_close(client);
671 esp_http_client_cleanup(client);
673 if (payload.length() > 0) {
674 ESP_LOGD(__FILENAME__,
"Returned payload (%d bytes): %s",
675 payload.length(), payload.c_str());
678 if (http_code == 426) {
681 ESP_LOGD(__FILENAME__,
"Attempting to connect to Signal K Websocket...");
684 this->
connect_ws(server_address, server_port);
685 }
else if (http_code == 401) {
688 ESP_LOGW(__FILENAME__,
"Token rejected (401), requesting new access");
692 }
else if (http_code > 0) {
695 ESP_LOGE(__FILENAME__,
"HTTP request failed with code %d", http_code);
701 const uint16_t server_port) {
702 ESP_LOGD(__FILENAME__,
"Sending access request (client_id=%s, ssl=%d)",
715 doc[
"permissions"] = kRequestPermission;
716 String json_req =
"";
717 serializeJson(doc, json_req);
719 ESP_LOGD(__FILENAME__,
"Access request: %s", json_req.c_str());
721 String protocol =
ssl_enabled_ ?
"https://" :
"http://";
722 String url = protocol + server_address +
":" + server_port +
723 "/signalk/v1/access/requests";
724 ESP_LOGD(__FILENAME__,
"Access request url: %s", url.c_str());
726 esp_http_client_config_t config = {};
727 config.url = url.c_str();
728 config.method = HTTP_METHOD_POST;
729 config.timeout_ms = 10000;
730#ifdef SENSESP_SSL_SUPPORT
732 config.crt_bundle_attach = tofu_crt_bundle_attach;
733 config.skip_cert_common_name_check =
true;
737 esp_http_client_handle_t client = esp_http_client_init(&config);
738 if (client ==
nullptr) {
739 ESP_LOGE(__FILENAME__,
"Failed to initialize HTTP client");
745 esp_http_client_set_header(client,
"Content-Type",
"application/json");
748 esp_err_t err = esp_http_client_open(client, json_req.length());
750 ESP_LOGE(__FILENAME__,
"Failed to open HTTP connection: %s", esp_err_to_name(err));
751 esp_http_client_cleanup(client);
756 int write_len = esp_http_client_write(client, json_req.c_str(), json_req.length());
757 if (write_len < 0 || write_len != (
int)json_req.length()) {
758 ESP_LOGE(__FILENAME__,
"Failed to write request body (wrote %d of %d bytes)",
759 write_len, json_req.length());
760 esp_http_client_close(client);
761 esp_http_client_cleanup(client);
766 int content_length = esp_http_client_fetch_headers(client);
767 int http_code = esp_http_client_get_status_code(client);
769 ESP_LOGD(__FILENAME__,
"HTTP response: code=%d, content_length=%d", http_code, content_length);
775 while ((read_len = esp_http_client_read(client, buffer,
sizeof(buffer) - 1)) > 0) {
776 buffer[read_len] =
'\0';
777 payload += String(buffer);
778 if (payload.length() > 4096)
break;
780 ESP_LOGD(__FILENAME__,
"Response payload (%d bytes): %s",
781 payload.length(), payload.c_str());
783 esp_http_client_close(client);
784 esp_http_client_cleanup(client);
787 deserializeJson(doc, payload.c_str());
788 String state = doc[
"state"].is<
const char*>() ? doc[
"state"].as<String>() :
"";
789 String href = doc[
"href"].is<
const char*>() ? doc[
"href"].as<String>() :
"";
790 String message = doc[
"message"].is<
const char*>() ? doc[
"message"].as<String>() :
"";
792 ESP_LOGD(__FILENAME__,
"Access request response: http=%d, state=%s, href=%s",
793 http_code, state.c_str(), href.c_str());
794 if (message.length() > 0) {
795 ESP_LOGI(__FILENAME__,
"Server message: %s", message.c_str());
799 if (http_code == 400 && href.length() > 0 && href.startsWith(
"/")) {
800 ESP_LOGI(__FILENAME__,
"Existing request found, polling href: %s", href.c_str());
809 if (http_code == 202 && href.length() > 0 && href.startsWith(
"/")) {
818 ESP_LOGW(__FILENAME__,
"Cannot handle response: http=%d, state=%s", http_code, state.c_str());
823 const uint16_t server_port,
825 ESP_LOGD(__FILENAME__,
"Polling SK Server for authentication token");
827 String protocol =
ssl_enabled_ ?
"https://" :
"http://";
828 String url = protocol + server_address +
":" + server_port + href;
830 esp_http_client_config_t config = {};
831 config.url = url.c_str();
832 config.timeout_ms = 10000;
833#ifdef SENSESP_SSL_SUPPORT
835 config.crt_bundle_attach = tofu_crt_bundle_attach;
836 config.skip_cert_common_name_check =
true;
840 esp_http_client_handle_t client = esp_http_client_init(&config);
841 if (client ==
nullptr) {
842 ESP_LOGE(__FILENAME__,
"Failed to initialize HTTP client");
848 esp_err_t err = esp_http_client_open(client, 0);
850 ESP_LOGE(__FILENAME__,
"Failed to open HTTP connection: %s", esp_err_to_name(err));
851 esp_http_client_cleanup(client);
856 int content_length = esp_http_client_fetch_headers(client);
857 int http_code = esp_http_client_get_status_code(client);
861 if (content_length > 0 && content_length < 4096) {
862 char* buffer =
new char[content_length + 1];
863 int read_len = esp_http_client_read(client, buffer, content_length);
864 buffer[read_len > 0 ? read_len : 0] =
'\0';
865 payload = String(buffer);
871 while ((read_len = esp_http_client_read(client, buffer,
sizeof(buffer) - 1)) > 0) {
872 buffer[read_len] =
'\0';
873 payload += String(buffer);
874 if (payload.length() > 4096)
break;
878 ESP_LOGD(__FILENAME__,
"Poll response: http=%d, payload=%s", http_code, payload.c_str());
880 esp_http_client_close(client);
881 esp_http_client_cleanup(client);
883 if (http_code == 200 || http_code == 202) {
885 auto error = deserializeJson(doc, payload.c_str());
887 ESP_LOGW(__FILENAME__,
"WARNING: Could not deserialize http payload.");
888 ESP_LOGW(__FILENAME__,
"DeserializationError: %s", error.c_str());
891 String state = doc[
"state"];
892 ESP_LOGD(__FILENAME__,
"%s", state.c_str());
893 if (state ==
"PENDING") {
898 if (state ==
"COMPLETED") {
899 JsonObject access_req = doc[
"accessRequest"];
900 String permission = access_req[
"permission"];
905 if (permission ==
"DENIED") {
906 ESP_LOGW(__FILENAME__,
"Permission denied");
911 if (permission ==
"APPROVED") {
912 ESP_LOGI(__FILENAME__,
"Permission granted");
913 String token = access_req[
"token"];
916 this->
connect_ws(server_address, server_port);
921 if (http_code == 500) {
925 ESP_LOGD(__FILENAME__,
"Got 500, probably a non-existing request.");
932 ESP_LOGW(__FILENAME__,
933 "Can't handle response %d to pending access request.\n",
944 String path =
"/signalk/v1/stream?subscribe=none";
945 String url = protocol +
"://" + host +
":" + String(port) + path;
947 ESP_LOGD(__FILENAME__,
"Connecting WebSocket to %s", url.c_str());
952 auth_header = String(
"Authorization: Bearer ") +
auth_token_ +
"\r\n";
956 esp_websocket_client_config_t config = {};
957 config.uri = url.c_str();
958 config.task_stack = 8192;
959 config.buffer_size = 1024;
960 if (auth_header.length() > 0) {
961 config.headers = auth_header.c_str();
964#ifdef SENSESP_SSL_SUPPORT
968 config.crt_bundle_attach = tofu_crt_bundle_attach;
969 config.skip_cert_common_name_check =
true;
975 esp_websocket_client_stop(
client_);
976 esp_websocket_client_destroy(
client_);
980 client_ = esp_websocket_client_init(&config);
982 ESP_LOGE(__FILENAME__,
"Failed to initialize WebSocket client");
988 esp_websocket_register_events(
client_, WEBSOCKET_EVENT_ANY,
989 websocket_event_handler,
nullptr);
992 esp_err_t err = esp_websocket_client_start(
client_);
994 ESP_LOGE(__FILENAME__,
"Failed to start WebSocket client: %s",
995 esp_err_to_name(err));
996 esp_websocket_client_destroy(
client_);
1002 ESP_LOGD(__FILENAME__,
"WebSocket client started, waiting for connection...");
1011 esp_websocket_client_stop(
client_);
1012 esp_websocket_client_destroy(
client_);
1023 esp_websocket_client_send_text(
client_, output.c_str(), output.length(),
1047 if (config[
"sk_address"].is<String>()) {
1050 if (config[
"sk_port"].is<int>()) {
1053 if (config[
"use_mdns"].is<bool>()) {
1054 this->
use_mdns_ = config[
"use_mdns"].as<
bool>();
1056 if (config[
"token"].is<String>()) {
1059 if (config[
"client_id"].is<String>()) {
1060 this->
client_id_ = config[
"client_id"].as<String>();
1062 if (config[
"polling_href"].is<String>()) {
1063 String href = config[
"polling_href"].as<String>();
1068 if (config[
"ssl_enabled"].is<bool>()) {
1071 if (config[
"tofu_enabled"].is<bool>()) {
1074 if (config[
"tofu_fingerprint"].is<String>()) {
1090 return "Authorizing with SignalK";
1094 return "Connecting";
1096 return "Disconnected";
virtual bool load() override
Load and populate the object from a persistent storage.
virtual bool save() override
Save the object to a persistent storage.
FileSystemSaveable(const String &config_path)
An Obervable class that listens for Signal K stream deltas and notifies any observers of value change...
static bool take_semaphore(uint64_t timeout_ms=0)
static void release_semaphore()
virtual void parse_value(const JsonObject &json)
static const std::vector< SKListener * > & get_listeners()
An Obervable class that listens for Signal K PUT requests coming over the websocket connection and no...
static const std::vector< SKPutListener * > & get_listeners()
virtual void parse_value(const JsonObject &put)=0
static void handle_response(JsonDocument &response)
The websocket connection to the Signal K server.
std::list< JsonDocument > received_updates_
void poll_access_request(const String host, const uint16_t port, const String href)
SKWSClient(const String &config_path, std::shared_ptr< SKDeltaQueue > sk_delta_queue, const String &server_address, uint16_t server_port, bool use_mdns=true)
void on_receive_delta(uint8_t *payload, size_t length)
Called when the websocket receives a delta.
void process_received_updates()
Loop through the received updates and process them.
void connect_ws(const String &host, const uint16_t port)
Integrator< int, int > delta_tx_count_producer_
TaskQueueProducer< SKWSConnectionState > connection_state_
SKWSConnectionState get_connection_state()
void on_receive_put(JsonDocument &message)
Called when a PUT event is received.
void release_received_updates_semaphore()
void send_access_request(const String host, const uint16_t port)
Integrator< int, int > delta_rx_count_producer_
virtual bool to_json(JsonObject &root) override final
esp_websocket_client_handle_t client_
void on_error()
Called when the websocket connection encounters an error.
void sendTXT(String &payload)
Send some processed data to the websocket.
bool get_mdns_service(String &server_address, uint16_t &server_port)
void on_disconnected()
Called when the websocket connection is disconnected.
void subscribe_listeners()
Subscribes the SK delta paths to the websocket.
void on_receive_updates(JsonDocument &message)
Called when a delta update is received.
uint16_t conf_server_port_
void test_token(const String host, const uint16_t port)
void set_connection_state(SKWSConnectionState state)
bool take_received_updates_semaphore(unsigned long int timeout_ms=0)
virtual bool from_json(const JsonObject &config) override final
String get_connection_status()
Get a String representation of the current connection state.
TaskQueueProducer< int > delta_tx_tick_producer_
Emits the number of deltas sent since last report.
std::shared_ptr< SKDeltaQueue > sk_delta_queue_
String conf_server_address_
void on_connected()
Called when the websocket connection is established.
static String get_hostname()
Get the current hostname.
void emit(const SKWSConnectionState &new_value)
std::shared_ptr< reactesp::EventLoop > event_loop()
String generate_uuid4()
Generate a random UUIDv4 string.
constexpr int kWsClientTaskStackSize
void ExecuteWebSocketTask(void *)