SensESP 3.4.1-alpha
Universal Signal K sensor toolkit ESP32
Loading...
Searching...
No Matches
signalk_ws_client.h
Go to the documentation of this file.
1#ifndef SENSESP_SRC_SENSESP_SIGNALK_SIGNALK_WS_CLIENT_H_
2#define SENSESP_SRC_SENSESP_SIGNALK_SIGNALK_WS_CLIENT_H_
3
4#include "sensesp.h"
5
6#include <algorithm>
7
8#include <ArduinoJson.h>
9#include <esp_websocket_client.h>
10#include <atomic>
11#include <functional>
12#include <list>
13#include <set>
14
20#include "sensesp_base_app.h"
21
22// Maximum number of received value/put deltas buffered for processing on the
23// main task. Oldest entries are dropped when the buffer is full.
24#ifndef SENSESP_MAX_RECEIVED_VALUE_UPDATES
25#define SENSESP_MAX_RECEIVED_VALUE_UPDATES 20
26#endif
27
28// Maximum number of received meta deltas buffered for processing on the main
29// task, budgeted independently of value deltas so a metadata burst (~one entry
30// per subscribed path at subscribe time when sendMeta=all is enabled) cannot
31// evict pending values, and vice versa. Override per board via build_flags if
32// you subscribe many paths and need the full burst delivered (more paths =>
33// more buffered meta => more RAM).
34#ifndef SENSESP_MAX_RECEIVED_META_UPDATES
35#define SENSESP_MAX_RECEIVED_META_UPDATES 20
36#endif
37
38// Signal K WebSocket buffer size in bytes, allocated by esp_websocket_client for
39// both the tx and rx buffer. send_delta drops any delta longer than this rather
40// than let esp_websocket_client split it across non-blocking writes and abort
41// the connection (see signalk_ws_delta_size.h). Override per board via
42// build_flags to hold the largest delta a device sends (e.g. a GNSS receiver's
43// full-sky satellitesInView); kept small by default for memory-constrained
44// boards.
45#ifndef SENSESP_SK_WS_BUFFER_SIZE
46#define SENSESP_SK_WS_BUFFER_SIZE 1024
47#endif
48
49namespace sensesp {
50
51static const char* NULL_AUTH_TOKEN = "";
52
53// HTTP status returned on a WebSocket upgrade when the Signal K server rejects
54// the auth token.
55static constexpr int kHttpUnauthorized = 401;
56
64bool should_clear_token_on_status(bool ssl_enabled, int handshake_status);
65
71 // TLS certificate pinning rejected the server certificate. Distinct from a
72 // plain disconnect so the UI can tell a cert problem from a network drop.
74};
75
81 virtual public ValueProducer<SKWSConnectionState> {
82 public:
84 // main task methods
85
86 SKWSClient(const String& config_path,
87 std::shared_ptr<SKDeltaQueue> sk_delta_queue,
88 const String& server_address, uint16_t server_port,
89 bool use_mdns = true);
90
91 const String get_server_address() const { return server_address_; }
92 uint16_t get_server_port() const { return server_port_; }
93
94 virtual bool to_json(JsonObject& root) override final;
95 virtual bool from_json(const JsonObject& config) override final;
96
104
113
114 String get_connection_status();
115
117 // SK websocket connection methods
118
119 void on_disconnected();
120 // handshake_status is the HTTP status of a failed WebSocket upgrade (0 if not
121 // applicable); a 401 means the auth token was rejected.
122 void on_error(int handshake_status);
123 void on_connected();
124 void on_receive_delta(uint8_t* payload, size_t length);
125 void on_receive_updates(JsonDocument& message);
126 void on_receive_put(JsonDocument& message);
127 void connect();
128 void loop();
129 bool is_connected();
138 void restart();
139 bool is_connect_due() const { return millis() >= next_attempt_ms_; }
140 void send_delta();
141
147 void sendTXT(String& payload);
148
152 bool is_ssl_enabled() const { return ssl_enabled_; }
153
170 String get_auth_token() const { return auth_token_; }
171
178 void set_ssl_enabled(bool enabled) {
179 ssl_enabled_ = enabled;
180 save();
181 }
182
197 void set_send_meta_enabled(bool enabled) {
198 send_meta_enabled_ = enabled;
199 save();
200 }
201
205 bool is_tofu_enabled() const { return tofu_enabled_; }
206
214 void set_tofu_enabled(bool enabled) {
215 tofu_enabled_ = enabled;
216 save();
217 }
218
220 bool has_tofu_anchor() const {
221 return !tofu_ca_pem_.isEmpty() || !tofu_fingerprint_.isEmpty();
222 }
223
225 bool has_tofu_ca() const { return !tofu_ca_pem_.isEmpty(); }
226 const String& get_tofu_ca() const { return tofu_ca_pem_; }
227
229 bool has_tofu_fingerprint() const { return !tofu_fingerprint_.isEmpty(); }
230 const String& get_tofu_fingerprint() const { return tofu_fingerprint_; }
231
233 const String& get_tofu_pin_cn() const { return tofu_pin_cn_; }
234 bool is_tofu_pin_ca() const { return tofu_pin_is_ca_; }
235
240 const String& get_tofu_san() const { return tofu_san_; }
241
249 void reset_tofu() {
250 tofu_ca_pem_ = "";
252 tofu_pin_cn_ = "";
253 tofu_pin_is_ca_ = false;
254 tofu_san_ = "";
255 save();
256 }
257
261 void stash_pending_ca(const String& ca_pem, const String& cn) {
262 pending_ca_pem_ = ca_pem;
264 pending_cn_ = cn;
265 pending_is_ca_ = true;
266 pending_valid_ = true;
267 }
268 void stash_pending_leaf(const String& fingerprint, const String& cn) {
269 pending_ca_pem_ = "";
270 pending_fingerprint_ = fingerprint;
271 pending_cn_ = cn;
272 pending_is_ca_ = false;
273 pending_valid_ = true;
274 }
277 void set_pending_san(const String& san) { pending_san_ = san; }
279 pending_ca_pem_ = "";
281 pending_cn_ = "";
282 pending_san_ = "";
283 pending_is_ca_ = false;
284 pending_valid_ = false;
285 }
287 bool has_pending_ca() const { return pending_valid_ && pending_is_ca_; }
291 if (!pending_valid_) {
292 return;
293 }
296 if (pending_is_ca_) {
299 tofu_san_ = pending_san_; // bind the leaf identity in CA-anchor mode
300 } else {
302 tofu_ca_pem_ = "";
303 tofu_san_ = ""; // leaf mode: the fingerprint binds identity
304 }
306 save();
307 }
308
311 void flag_cert_error() { cert_error_.store(true); }
312
316 uint32_t client_generation() const { return client_generation_.load(); }
317
318 protected:
319 // these are the actually used values
320 String server_address_ = "";
321 uint16_t server_port_ = 80;
322 // these are the hardcoded and/or conf file values
324 uint16_t conf_server_port_ = 0;
325 bool use_mdns_ = true;
326
327 String client_id_ = "";
328 String polling_href_ = "";
329 String auth_token_ = NULL_AUTH_TOKEN;
330
331 unsigned long next_attempt_ms_ = 0;
332 unsigned long connect_interval_ms_ = 2000;
333
334 // SSL/TLS configuration
335 bool ssl_enabled_ = false;
336 bool tofu_enabled_ = true; // TOFU enabled by default
337
338 // Subscribe with ?sendMeta=all so metadata (zones, units, etc) is
339 // pushed in-stream rather than requiring REST /meta polls.
341
342 // TOFU trust anchor. At most one of these is non-empty once captured:
343 // tofu_ca_pem_ — PEM of the pinned issuing CA (CA-anchor mode)
344 // tofu_fingerprint_ — SHA256 hex of the pinned leaf (leaf-fingerprint mode,
345 // and the legacy on-disk format the migration reads)
346 String tofu_fingerprint_ = ""; // SHA256 fingerprint in hex (64 chars)
347 String tofu_ca_pem_ = ""; // PEM of pinned CA (CA-anchor mode)
348 // TOFU'd leaf identity (normalized DNS SAN set) bound in CA-anchor mode, so a
349 // reconnecting leaf must present the same identity AND chain to the pinned CA.
350 // This is what makes CA pinning safe against a public CA (e.g. Let's Encrypt):
351 // a valid leaf for a different name from the same CA fails the identity check.
352 // Empty in leaf-fingerprint mode.
353 String tofu_san_ = "";
354 // Display-only identity of the pinned cert, captured at pin time (X.509
355 // parsing is only available during the handshake). The CN is attacker-
356 // controlled at capture and is bounded + sanitized before storage.
357 String tofu_pin_cn_ = "";
358 bool tofu_pin_is_ca_ = false;
359
360 // Candidate anchor seen during the current connection attempt, persisted only
361 // after the connection succeeds (commit_pending_tofu) so an unauthenticated
362 // MITM handshake cannot plant a trust anchor.
363 String pending_ca_pem_ = "";
365 String pending_cn_ = "";
366 String pending_san_ = "";
367 bool pending_is_ca_ = false;
368 bool pending_valid_ = false;
369
370 // Set by the verify callback (transport task) on certificate rejection; read
371 // by set_connection_state to surface kSKWSCertificateError. Atomic because the
372 // callback and state changes may run on different cores.
373 std::atomic<bool> cert_error_{false};
374
377
382
383 // Atomic for pointer-atomicity: the connect worker builds/stores it; the
384 // SK/event-loop context loads it to send and reaps it via exchange(nullptr)
385 // (single check-and-null, so no double-free). Writers are serialized
386 // single-owner-at-a-time by auth_job_running_ / teardown_in_progress_.
387 // NOTE: the atomic confers pointer-atomicity, not lifetime safety. send_delta
388 // and sendTXT run on the event loop, serialized with detach_teardown (same
389 // context), so they cannot hold a handle across its reap. Transport-task
390 // senders (on_connected -> subscribe_listeners, on_receive_put's response)
391 // are made safe by esp_websocket_client_stop() joining the transport task
392 // before destroy() frees the struct. See #1033.
393 std::atomic<esp_websocket_client_handle_t> client_{nullptr};
394 // Bumped on every teardown so the (singleton) event handler can drop late
395 // callbacks from a client that has been handed off for destruction. The
396 // handler is registered with the generation current at build time; an event
397 // whose generation != client_generation_ comes from a torn-down client.
398 std::atomic<uint32_t> client_generation_{0};
399 // True while a detached task is stopping+destroying a previous client.
400 // Bring-up is deferred until it clears, so at most one client ever exists.
401 std::atomic<bool> teardown_in_progress_{false};
402 // True while a one-shot worker is running a (blocking) connect attempt
403 // (mDNS / SSL-detect / access-request / poll legs). The dispatcher skips
404 // while it is set, so at most one attempt runs at a time.
405 std::atomic<bool> auth_job_running_{false};
406 // Holds a handle whose detached reaper task failed to spawn (OOM); the event
407 // loop retries the spawn each tick rather than reaping it synchronously (which
408 // would block the loop). At most one at a time (teardown_in_progress_ defers
409 // bring-up until it is reaped).
410 std::atomic<esp_websocket_client_handle_t> pending_teardown_{nullptr};
411 std::shared_ptr<SKDeltaQueue> sk_delta_queue_;
416
421
429 bool is_meta = false;
430 JsonDocument doc;
431 };
432
434 SemaphoreHandle_t received_updates_semaphore_ =
435 xSemaphoreCreateRecursiveMutexStatic(&received_updates_semaphore_buffer_);
436 std::list<ReceivedUpdate> received_updates_{};
437
439 // methods for all tasks
440
441 bool take_received_updates_semaphore(unsigned long int timeout_ms = 0) {
442 if (timeout_ms == 0) {
443 return xSemaphoreTakeRecursive(received_updates_semaphore_,
444 portMAX_DELAY) == pdTRUE;
445 } else {
446 return xSemaphoreTakeRecursive(received_updates_semaphore_,
447 timeout_ms) == pdTRUE;
448 }
449 }
451 xSemaphoreGiveRecursive(received_updates_semaphore_);
452 }
453
458 void enqueue_received_update(ReceivedUpdate&& update);
459
461 // main task methods
462
464
466 // SK websocket connection methods
467
468#ifndef SENSESP_SSL_SUPPORT
469 // Validate the auth token over plain HTTP before opening the WebSocket
470 // stream. Only used on non-SSL builds; SSL builds validate it on the upgrade
471 // itself (see connect()) to avoid a second TLS handshake that fragments the
472 // heap.
473 void test_token(const String host, const uint16_t port);
474#endif
475 void send_access_request(const String host, const uint16_t port);
476 void poll_access_request(const String host, const uint16_t port,
477 const String href);
478 void connect_ws(const String& host, const uint16_t port);
485 void detach_teardown();
489 void reap_async(esp_websocket_client_handle_t old);
491 static void teardown_task(void* arg);
495 void run_connect_attempt();
496 static void connect_worker(void* arg);
497 void subscribe_listeners();
498 bool get_mdns_service(String& server_address, uint16_t& server_port);
499 bool detect_ssl();
500
502 // A certificate rejection during this attempt (flagged by the verify
503 // callback) is surfaced distinctly instead of as a generic disconnect.
504 // This is the single chokepoint, so it covers all four TLS paths — the
505 // three esp_http_client requests and the websocket — without touching each
506 // call site. A fresh attempt (authorizing/connecting) or success clears the
507 // flag.
510 } else if (state != SKWSConnectionState::kSKWSDisconnected &&
512 cert_error_.store(false);
513 }
515 connection_state_.set(state);
516 }
518
521 (esp_random() % (connect_interval_ms_ / 4 + 1));
523 std::min(connect_interval_ms_ * 2, (unsigned long)60000);
524 }
525
529};
530
531inline const String ConfigSchema(const SKWSClient& obj) {
532 return "{\"type\":\"object\",\"properties\":{"
533 "\"ssl_enabled\":{\"title\":\"SSL/TLS Enabled\",\"type\":\"boolean\"},"
534 "\"tofu_enabled\":{\"title\":\"TOFU Verification\",\"type\":\"boolean\"},"
535 "\"tofu_pin_cn\":{\"title\":\"Pinned Certificate\",\"type\":\"string\",\"readOnly\":true},"
536 "\"tofu_pin_is_ca\":{\"title\":\"Pinned as CA\",\"type\":\"boolean\",\"readOnly\":true},"
537 "\"send_meta_enabled\":{\"title\":\"Subscribe with sendMeta=all\","
538 "\"description\":\"Request metadata deltas (units, zones, displayName, displayUnits) over the WS stream. Disable only for constrained clients that ignore them.\","
539 "\"type\":\"boolean\"}"
540 "}}";
541}
542
543inline bool ConfigRequiresRestart(const SKWSClient& obj) { return true; }
544
545} // namespace sensesp
546
547#endif
virtual bool save() override
Save the object to a persistent storage.
Definition saveable.cpp:40
Integrator integrates (accumulates) the incoming values.
Definition integrator.h:19
The websocket connection to the Signal K server.
void commit_pending_tofu()
Persist a stashed anchor after a successful (authenticated) connection. Called from on_connected().
void poll_access_request(const String host, const uint16_t port, const String href)
void on_receive_delta(uint8_t *payload, size_t length)
Called when the websocket receives a delta.
SemaphoreHandle_t received_updates_semaphore_
unsigned long connect_interval_ms_
void process_received_updates()
Loop through the received updates and process them.
void set_ssl_enabled(bool enabled)
Enable or disable SSL/TLS manually.
const String & get_tofu_ca() const
std::atomic< esp_websocket_client_handle_t > pending_teardown_
std::atomic< bool > auth_job_running_
void connect_ws(const String &host, const uint16_t port)
void stash_pending_leaf(const String &fingerprint, const String &cn)
unsigned long next_attempt_ms_
Integrator< int, int > delta_tx_count_producer_
TaskQueueProducer< SKWSConnectionState > connection_state_
const String & get_tofu_pin_cn() const
Identity (CN) of whatever is pinned, and whether it is a CA.
SKWSConnectionState get_connection_state()
std::atomic< bool > teardown_in_progress_
void on_receive_put(JsonDocument &message)
Called when a PUT event is received.
uint32_t client_generation() const
Generation tag of the currently-valid client. The event handler compares the generation it was regist...
void release_received_updates_semaphore()
void run_connect_attempt()
The (blocking) connect attempt — mDNS resolve, SSL detect, and the access-request / poll / connect_ws...
SKWSConnectionState task_connection_state_
void send_access_request(const String host, const uint16_t port)
std::atomic< uint32_t > client_generation_
void on_error(int handshake_status)
Integrator< int, int > delta_rx_count_producer_
std::list< ReceivedUpdate > received_updates_
bool is_send_meta_enabled() const
Whether the WS subscribes with sendMeta=all.
virtual bool to_json(JsonObject &root) override final
uint16_t get_server_port() const
void detach_teardown()
Hand client_ to a detached one-shot task that stops+destroys it, so the blocking teardown never runs ...
void enqueue_received_update(ReceivedUpdate &&update)
Push a received delta entry onto the queue, enforcing a per-kind budget so a metadata burst (one meta...
void sendTXT(String &payload)
Send some processed data to the websocket.
uint32_t last_oversize_log_ms_
millis() timestamp of the last oversize-delta-drop warning, used to rate-limit it when a device keeps...
bool get_mdns_service(String &server_address, uint16_t &server_port)
std::atomic< esp_websocket_client_handle_t > client_
const String & get_tofu_san() const
TOFU'd leaf identity (normalized DNS SAN set) bound in CA-anchor mode. A reconnecting leaf must chain...
void on_disconnected()
Called when the websocket connection is disconnected.
ValueProducer< int > & get_delta_tx_count_producer()
void reap_async(esp_websocket_client_handle_t old)
Spawn the detached reaper for old; on spawn failure (OOM) stash it in pending_teardown_ for a later r...
void subscribe_listeners()
Subscribes the SK delta paths to the websocket.
void on_receive_updates(JsonDocument &message)
Called when a delta update is received.
void test_token(const String host, const uint16_t port)
bool has_tofu_ca() const
True if a pinned CA certificate is stored (CA-anchor mode).
bool has_tofu_fingerprint() const
True if a pinned leaf fingerprint is stored (leaf-fingerprint mode).
void set_connection_state(SKWSConnectionState state)
ValueProducer< int > & get_delta_rx_count_producer()
Get the delta rx count producer object.
void stash_pending_ca(const String &ca_pem, const String &cn)
Stash a candidate anchor seen during a handshake. Persisted only after the connection succeeds (commi...
bool take_received_updates_semaphore(unsigned long int timeout_ms=0)
StaticSemaphore_t received_updates_semaphore_buffer_
virtual bool from_json(const JsonObject &config) override final
void set_tofu_enabled(bool enabled)
Enable or disable TOFU certificate verification.
std::atomic< bool > cert_error_
void set_pending_san(const String &san)
Record the leaf's identity (SAN set) for the pending CA anchor, captured at depth 0 and committed alo...
String get_connection_status()
Get a String representation of the current connection state.
static void connect_worker(void *arg)
void set_send_meta_enabled(bool enabled)
const String get_server_address() const
TaskQueueProducer< int > delta_tx_tick_producer_
Emits the number of deltas sent since last report.
void reset_tofu()
Reset the stored trust anchor (CA or leaf fingerprint).
static void teardown_task(void *arg)
Body of the detached teardown task (stop+destroy+self-delete).
bool has_tofu_anchor() const
True if any trust anchor (pinned CA or leaf fingerprint) is stored.
const String & get_tofu_fingerprint() const
bool is_ssl_enabled() const
Check if SSL/TLS is enabled.
std::shared_ptr< SKDeltaQueue > sk_delta_queue_
void restart()
Drop the current connection (detached, non-blocking teardown) and let the reconnect path rebuild it.
void on_connected()
Called when the websocket connection is established.
bool is_tofu_enabled() const
Check if TOFU certificate verification is enabled.
String get_auth_token() const
Current Signal K access token, or an empty string if none.
bool has_pending_ca() const
True if a candidate CA has already been stashed this handshake.
void flag_cert_error()
Flag a certificate rejection from the verify callback so the next disconnect surfaces as kSKWSCertifi...
Producer class that works across task boundaries.
virtual void set(const T &value) override
A base class for any sensor or piece of code that outputs a value for consumption elsewhere.
const String ConfigSchema(const SmartSwitchController &obj)
std::shared_ptr< reactesp::EventLoop > event_loop()
Definition sensesp.cpp:9
bool should_clear_token_on_status(bool ssl_enabled, int handshake_status)
Called when the websocket connection encounters an error.
bool ConfigRequiresRestart(const HTTPServer &obj)
A single received delta entry awaiting dispatch on the main task.