1#ifndef SENSESP_SRC_SENSESP_SIGNALK_SIGNALK_WS_CLIENT_H_
2#define SENSESP_SRC_SENSESP_SIGNALK_SIGNALK_WS_CLIENT_H_
8#include <ArduinoJson.h>
9#include <esp_websocket_client.h>
24#ifndef SENSESP_MAX_RECEIVED_VALUE_UPDATES
25#define SENSESP_MAX_RECEIVED_VALUE_UPDATES 20
34#ifndef SENSESP_MAX_RECEIVED_META_UPDATES
35#define SENSESP_MAX_RECEIVED_META_UPDATES 20
45#ifndef SENSESP_SK_WS_BUFFER_SIZE
46#define SENSESP_SK_WS_BUFFER_SIZE 1024
51static const char* NULL_AUTH_TOKEN =
"";
55static constexpr int kHttpUnauthorized = 401;
87 std::shared_ptr<SKDeltaQueue> sk_delta_queue,
88 const String& server_address, uint16_t server_port,
89 bool use_mdns =
true);
94 virtual bool to_json(JsonObject& root)
override final;
95 virtual bool from_json(
const JsonObject& config)
override final;
122 void on_error(
int handshake_status);
393 std::atomic<esp_websocket_client_handle_t>
client_{
nullptr};
442 if (timeout_ms == 0) {
444 portMAX_DELAY) == pdTRUE;
447 timeout_ms) == pdTRUE;
468#ifndef SENSESP_SSL_SUPPORT
473 void test_token(
const String host,
const uint16_t port);
478 void connect_ws(
const String& host,
const uint16_t port);
489 void reap_async(esp_websocket_client_handle_t old);
532 return "{\"type\":\"object\",\"properties\":{"
533 "\"ssl_enabled\":{\"title\":\"SSL/TLS Enabled\",\"type\":\"boolean\"},"
534 "\"tofu_enabled\":{\"title\":\"TOFU Verification\",\"type\":\"boolean\"},"
535 "\"tofu_pin_cn\":{\"title\":\"Pinned Certificate\",\"type\":\"string\",\"readOnly\":true},"
536 "\"tofu_pin_is_ca\":{\"title\":\"Pinned as CA\",\"type\":\"boolean\",\"readOnly\":true},"
537 "\"send_meta_enabled\":{\"title\":\"Subscribe with sendMeta=all\","
538 "\"description\":\"Request metadata deltas (units, zones, displayName, displayUnits) over the WS stream. Disable only for constrained clients that ignore them.\","
539 "\"type\":\"boolean\"}"
virtual bool save() override
Save the object to a persistent storage.
Integrator integrates (accumulates) the incoming values.
The websocket connection to the Signal K server.
void commit_pending_tofu()
Persist a stashed anchor after a successful (authenticated) connection. Called from on_connected().
void poll_access_request(const String host, const uint16_t port, const String href)
void schedule_reconnect()
void on_receive_delta(uint8_t *payload, size_t length)
Called when the websocket receives a delta.
String pending_fingerprint_
SemaphoreHandle_t received_updates_semaphore_
unsigned long connect_interval_ms_
void process_received_updates()
Loop through the received updates and process them.
void set_ssl_enabled(bool enabled)
Enable or disable SSL/TLS manually.
const String & get_tofu_ca() const
std::atomic< esp_websocket_client_handle_t > pending_teardown_
bool is_connect_due() const
std::atomic< bool > auth_job_running_
void connect_ws(const String &host, const uint16_t port)
void stash_pending_leaf(const String &fingerprint, const String &cn)
unsigned long next_attempt_ms_
Integrator< int, int > delta_tx_count_producer_
TaskQueueProducer< SKWSConnectionState > connection_state_
const String & get_tofu_pin_cn() const
Identity (CN) of whatever is pinned, and whether it is a CA.
SKWSConnectionState get_connection_state()
std::atomic< bool > teardown_in_progress_
void on_receive_put(JsonDocument &message)
Called when a PUT event is received.
uint32_t client_generation() const
Generation tag of the currently-valid client. The event handler compares the generation it was regist...
void release_received_updates_semaphore()
void run_connect_attempt()
The (blocking) connect attempt — mDNS resolve, SSL detect, and the access-request / poll / connect_ws...
SKWSConnectionState task_connection_state_
void send_access_request(const String host, const uint16_t port)
std::atomic< uint32_t > client_generation_
void on_error(int handshake_status)
Integrator< int, int > delta_rx_count_producer_
void clear_pending_tofu()
std::list< ReceivedUpdate > received_updates_
bool is_send_meta_enabled() const
Whether the WS subscribes with sendMeta=all.
virtual bool to_json(JsonObject &root) override final
uint16_t get_server_port() const
void detach_teardown()
Hand client_ to a detached one-shot task that stops+destroys it, so the blocking teardown never runs ...
void enqueue_received_update(ReceivedUpdate &&update)
Push a received delta entry onto the queue, enforcing a per-kind budget so a metadata burst (one meta...
void sendTXT(String &payload)
Send some processed data to the websocket.
uint32_t last_oversize_log_ms_
millis() timestamp of the last oversize-delta-drop warning, used to rate-limit it when a device keeps...
bool get_mdns_service(String &server_address, uint16_t &server_port)
std::atomic< esp_websocket_client_handle_t > client_
const String & get_tofu_san() const
TOFU'd leaf identity (normalized DNS SAN set) bound in CA-anchor mode. A reconnecting leaf must chain...
void on_disconnected()
Called when the websocket connection is disconnected.
ValueProducer< int > & get_delta_tx_count_producer()
void reap_async(esp_websocket_client_handle_t old)
Spawn the detached reaper for old; on spawn failure (OOM) stash it in pending_teardown_ for a later r...
void subscribe_listeners()
Subscribes the SK delta paths to the websocket.
void on_receive_updates(JsonDocument &message)
Called when a delta update is received.
uint16_t conf_server_port_
void test_token(const String host, const uint16_t port)
bool has_tofu_ca() const
True if a pinned CA certificate is stored (CA-anchor mode).
bool has_tofu_fingerprint() const
True if a pinned leaf fingerprint is stored (leaf-fingerprint mode).
void set_connection_state(SKWSConnectionState state)
ValueProducer< int > & get_delta_rx_count_producer()
Get the delta rx count producer object.
void stash_pending_ca(const String &ca_pem, const String &cn)
Stash a candidate anchor seen during a handshake. Persisted only after the connection succeeds (commi...
bool is_tofu_pin_ca() const
bool take_received_updates_semaphore(unsigned long int timeout_ms=0)
StaticSemaphore_t received_updates_semaphore_buffer_
virtual bool from_json(const JsonObject &config) override final
void set_tofu_enabled(bool enabled)
Enable or disable TOFU certificate verification.
std::atomic< bool > cert_error_
void set_pending_san(const String &san)
Record the leaf's identity (SAN set) for the pending CA anchor, captured at depth 0 and committed alo...
String get_connection_status()
Get a String representation of the current connection state.
static void connect_worker(void *arg)
void set_send_meta_enabled(bool enabled)
const String get_server_address() const
TaskQueueProducer< int > delta_tx_tick_producer_
Emits the number of deltas sent since last report.
void reset_tofu()
Reset the stored trust anchor (CA or leaf fingerprint).
static void teardown_task(void *arg)
Body of the detached teardown task (stop+destroy+self-delete).
void reset_reconnect_interval()
bool has_tofu_anchor() const
True if any trust anchor (pinned CA or leaf fingerprint) is stored.
const String & get_tofu_fingerprint() const
bool is_ssl_enabled() const
Check if SSL/TLS is enabled.
std::shared_ptr< SKDeltaQueue > sk_delta_queue_
String conf_server_address_
void restart()
Drop the current connection (detached, non-blocking teardown) and let the reconnect path rebuild it.
void on_connected()
Called when the websocket connection is established.
bool is_tofu_enabled() const
Check if TOFU certificate verification is enabled.
String get_auth_token() const
Current Signal K access token, or an empty string if none.
bool has_pending_ca() const
True if a candidate CA has already been stashed this handshake.
void flag_cert_error()
Flag a certificate rejection from the verify callback so the next disconnect surfaces as kSKWSCertifi...
Producer class that works across task boundaries.
virtual void set(const T &value) override
A base class for any sensor or piece of code that outputs a value for consumption elsewhere.
const String ConfigSchema(const SmartSwitchController &obj)
std::shared_ptr< reactesp::EventLoop > event_loop()
bool should_clear_token_on_status(bool ssl_enabled, int handshake_status)
Called when the websocket connection encounters an error.
bool ConfigRequiresRestart(const HTTPServer &obj)
A single received delta entry awaiting dispatch on the main task.