5#include <ArduinoJson.h>
11#include "elapsedMillis.h"
12#include "esp_arduino_version.h"
25static const char* kRequestPermission =
"readwrite";
28 elapsedMillis connect_loop_elapsed = 0;
29 elapsedMillis delta_loop_elapsed = 0;
34 if (connect_loop_elapsed > 2000) {
35 connect_loop_elapsed = 0;
38 if (delta_loop_elapsed > 5) {
39 delta_loop_elapsed = 0;
54static void websocket_event_handler(
void* handler_args, esp_event_base_t base,
55 int32_t event_id,
void* event_data) {
56 esp_websocket_event_data_t* data = (esp_websocket_event_data_t*)event_data;
58 case WEBSOCKET_EVENT_CONNECTED:
59 ESP_LOGD(__FILENAME__,
"WEBSOCKET_EVENT_CONNECTED");
62 case WEBSOCKET_EVENT_DISCONNECTED:
63 ESP_LOGD(__FILENAME__,
"WEBSOCKET_EVENT_DISCONNECTED");
66 case WEBSOCKET_EVENT_DATA:
68 if (data->op_code == 0x01) {
72 case WEBSOCKET_EVENT_ERROR:
79 std::shared_ptr<SKDeltaQueue> sk_delta_queue,
80 const String& server_address, uint16_t server_port,
83 conf_server_address_{server_address},
84 conf_server_port_{server_port},
86 sk_delta_queue_{sk_delta_queue} {
106 ESP_LOGD(__FILENAME__,
"Starting SKWSClient");
109 MDNS.addService(
"signalk-sensesp",
"tcp", 80);
131 ESP_LOGW(__FILENAME__,
"Bad access token detected. Setting token to null.");
147 ESP_LOGW(__FILENAME__,
"Websocket client error.");
158 ESP_LOGI(__FILENAME__,
"Subscribing to Signal K listeners...");
169 bool output_available =
false;
170 JsonDocument subscription;
171 subscription[
"context"] =
"vessels.self";
176 if (listeners.size() > 0) {
177 output_available =
true;
178 JsonArray subscribe = subscription[
"subscribe"].to<JsonArray>();
180 for (
size_t i = 0; i < listeners.size(); i++) {
181 auto* listener = listeners.at(i);
182 String sk_path = listener->get_sk_path();
183 int listen_delay = listener->get_listen_delay();
185 JsonObject subscribe_path = subscribe.add<JsonObject>();
187 subscribe_path[
"path"] = sk_path;
188 subscribe_path[
"period"] = listen_delay;
189 ESP_LOGI(__FILENAME__,
"Adding %s subscription with listen_delay %d\n",
190 sk_path.c_str(), listen_delay);
195 if (output_available) {
198 serializeJson(subscription, json_message);
199 ESP_LOGI(__FILENAME__,
"Subscription JSON message:\n %s",
200 json_message.c_str());
201 esp_websocket_client_send_text(this->
client_, json_message.c_str(),
202 json_message.length(), portMAX_DELAY);
215 char buf[length + 1];
216 memcpy(buf, payload, length);
219#ifdef SIGNALK_PRINT_RCV_DELTA
220 ESP_LOGD(__FILENAME__,
"Websocket payload received: %s", (
char*)buf);
223 JsonDocument message;
225 auto error = deserializeJson(message, buf);
228 if (message[
"updates"].is<JsonVariant>()) {
232 if (message[
"put"].is<JsonVariant>()) {
237 if (message[
"requestId"].is<JsonVariant>() &&
238 !message[
"put"].is<JsonVariant>()) {
242 ESP_LOGE(__FILENAME__,
"deserializeJson error: %s", error.c_str());
255 JsonArray updates = message[
"updates"];
258 for (
size_t i = 0; i < updates.size(); i++) {
259 JsonObject update = updates[i];
261 JsonArray values = update[
"values"];
263 for (
size_t vi = 0; vi < values.size(); vi++) {
264 JsonDocument value_doc =
265 static_cast<JsonDocument
>(
static_cast<JsonObject
>((values[vi])));
285 const std::vector<SKPutListener*>& put_listeners =
293 const char* path = doc[
"path"];
294 JsonObject value = doc.as<JsonObject>();
296 for (
size_t i = 0; i < listeners.size(); i++) {
303 for (
size_t i = 0; i < put_listeners.size(); i++) {
326 JsonArray puts = message[
"put"];
327 size_t response_count = 0;
328 for (
size_t i = 0; i < puts.size(); i++) {
329 JsonObject value = puts[i];
330 const char* path = value[
"path"];
331 String str_val = value[
"value"].as<String>();
334 const std::vector<SKPutListener*>& listeners =
336 for (
size_t i = 0; i < listeners.size(); i++) {
348 JsonDocument put_response;
349 put_response[
"requestId"] = message[
"requestId"];
350 if (response_count == puts.size()) {
352 put_response[
"state"] =
"COMPLETED";
353 put_response[
"statusCode"] = 200;
356 put_response[
"state"] =
"FAILED";
357 put_response[
"statusCode"] = 405;
359 String response_text;
360 serializeJson(put_response, response_text);
361 esp_websocket_client_send_text(this->
client_, response_text.c_str(),
362 response_text.length(), portMAX_DELAY);
375 esp_websocket_client_send_text(this->
client_, payload.c_str(),
376 payload.length(), portMAX_DELAY);
381 uint16_t& server_port) {
383 int num = MDNS.queryService(
"signalk-ws",
"tcp");
389#if ESP_ARDUINO_VERSION_MAJOR < 3
390 server_address = MDNS.IP(0).toString();
392 server_address = MDNS.address(0).toString();
394 server_port = MDNS.port(0);
395 ESP_LOGI(__FILENAME__,
"Found server %s (port %d)", server_address.c_str(),
405 if (!WiFi.isConnected() && WiFi.getMode() != WIFI_MODE_AP) {
408 "WiFi is disconnected. SignalK client connection will be initiated "
409 "when WiFi is connected.");
413 ESP_LOGI(__FILENAME__,
"Initiating websocket connection with server...");
418 ESP_LOGE(__FILENAME__,
419 "No Signal K server found in network when using mDNS service!");
421 ESP_LOGI(__FILENAME__,
422 "Signal K server has been found at address %s:%d by mDNS.",
431 ESP_LOGD(__FILENAME__,
432 "Websocket is connecting to Signal K server on address %s:%d",
436 ESP_LOGD(__FILENAME__,
437 "Websocket is not connecting to Signal K server because host and "
438 "port are not defined.");
452 ESP_LOGD(__FILENAME__,
"No prior authorization token present.");
462 const uint16_t server_port) {
466 String url = String(
"http://") + server_address +
":" + server_port +
467 "/signalk/v1/stream";
468 ESP_LOGD(__FILENAME__,
"Testing token with url %s", url.c_str());
470 const String full_token = String(
"Bearer ") +
auth_token_;
471 ESP_LOGD(__FILENAME__,
"Authorization: %s", full_token.c_str());
472 http.addHeader(
"Authorization", full_token.c_str());
473 int http_code = http.GET();
475 String payload = http.getString();
477 ESP_LOGD(__FILENAME__,
"Testing resulted in http status %d", http_code);
478 if (payload.length() > 0) {
479 ESP_LOGD(__FILENAME__,
480 "Returned payload (length %d) is: ", payload.length());
481 ESP_LOGD(__FILENAME__,
"%s", payload.c_str());
483 ESP_LOGD(__FILENAME__,
"Returned payload is empty");
485 if (http_code == 426) {
488 ESP_LOGD(__FILENAME__,
"Attempting to connect to Signal K Websocket...");
491 this->
connect_ws(server_address, server_port);
492 }
else if (http_code == 401) {
499 ESP_LOGE(__FILENAME__,
"GET... failed, error: %s\n",
500 http.errorToString(http_code).c_str());
506 const uint16_t server_port) {
507 ESP_LOGD(__FILENAME__,
"Preparing a new access request");
519 doc[
"permissions"] = kRequestPermission;
520 String json_req =
"";
521 serializeJson(doc, json_req);
523 ESP_LOGD(__FILENAME__,
"Access request: %s", json_req.c_str());
527 String url = String(
"http://") + server_address +
":" + server_port +
528 "/signalk/v1/access/requests";
529 ESP_LOGD(__FILENAME__,
"Access request url: %s", url.c_str());
531 http.addHeader(
"Content-Type",
"application/json");
532 int http_code = http.POST(json_req);
533 String payload = http.getString();
537 if (http_code != 202) {
538 ESP_LOGW(__FILENAME__,
"Can't handle response %d to access request.",
540 ESP_LOGD(__FILENAME__,
"%s", payload.c_str());
548 deserializeJson(doc, payload.c_str());
549 String state = doc[
"state"];
551 if (state !=
"PENDING") {
552 ESP_LOGW(__FILENAME__,
"Got unknown state: %s", state.c_str());
558 String href = doc[
"href"];
567 const uint16_t server_port,
569 ESP_LOGD(__FILENAME__,
"Polling SK Server for authentication token");
573 String url = String(
"http://") + server_address +
":" + server_port + href;
575 int http_code = http.GET();
576 if (http_code == 200 or http_code == 202) {
577 String payload = http.getString();
580 auto error = deserializeJson(doc, payload.c_str());
582 ESP_LOGW(__FILENAME__,
"WARNING: Could not deserialize http payload.");
583 ESP_LOGW(__FILENAME__,
"DeserializationError: %s", error.c_str());
586 String state = doc[
"state"];
587 ESP_LOGD(__FILENAME__,
"%s", state.c_str());
588 if (state ==
"PENDING") {
593 if (state ==
"COMPLETED") {
594 JsonObject access_req = doc[
"accessRequest"];
595 String permission = access_req[
"permission"];
600 if (permission ==
"DENIED") {
601 ESP_LOGW(__FILENAME__,
"Permission denied");
606 if (permission ==
"APPROVED") {
607 ESP_LOGI(__FILENAME__,
"Permission granted");
608 String token = access_req[
"token"];
611 this->
connect_ws(server_address, server_port);
617 if (http_code == 500) {
621 ESP_LOGD(__FILENAME__,
"Got 500, probably a non-existing request.");
628 ESP_LOGW(__FILENAME__,
629 "Can't handle response %d to pending access request.\n",
637 String path =
"/signalk/v1/stream?subscribe=none";
642 String url = String(
"ws://") + host +
":" + port + path;
644 esp_websocket_client_config_t websocket_cfg = {};
645 websocket_cfg.uri = url.c_str();
647 const String full_auth_header =
648 String(
"Authorization: Bearer ") +
auth_token_ +
"\r\n";
650 websocket_cfg.headers = full_auth_header.c_str();
652 ESP_LOGD(__FILENAME__,
"Websocket config: %s", websocket_cfg.uri);
653 ESP_LOGD(__FILENAME__,
"Initializing websocket client...");
654 this->
client_ = esp_websocket_client_init(&websocket_cfg);
655 ESP_LOGD(__FILENAME__,
"Registering websocket event handler...");
656 error = esp_websocket_register_events(this->
client_, WEBSOCKET_EVENT_ANY,
657 websocket_event_handler,
659 if (error != ESP_OK) {
660 ESP_LOGE(__FILENAME__,
"Error registering websocket event handler: %d",
663 ESP_LOGD(__FILENAME__,
"Starting websocket client...");
664 error = esp_websocket_client_start(this->client_);
665 if (error != ESP_OK) {
666 ESP_LOGE(__FILENAME__,
"Error starting websocket client: %d", error);
668 ESP_LOGD(__FILENAME__,
"Websocket client started.");
677 esp_websocket_client_close(this->
client_, portMAX_DELAY);
687 esp_websocket_client_send_text(this->
client_, output.c_str(),
688 output.length(), portMAX_DELAY);
707 if (config[
"sk_address"].is<String>()) {
710 if (config[
"sk_port"].is<int>()) {
713 if (config[
"use_mdns"].is<bool>()) {
714 this->
use_mdns_ = config[
"use_mdns"].as<
bool>();
716 if (config[
"token"].is<String>()) {
719 if (config[
"client_id"].is<String>()) {
720 this->
client_id_ = config[
"client_id"].as<String>();
722 if (config[
"polling_href"].is<String>()) {
738 return "Authorizing with SignalK";
744 return "Disconnected";
virtual bool load() override
Load and populate the object from a persistent storage.
virtual bool save() override
Save the object to a persistent storage.
virtual void set(const C &input) override final
void attach(std::function< void()> observer)
An Obervable class that listens for Signal K stream deltas and notifies any observers of value change...
static bool take_semaphore(uint64_t timeout_ms=0)
static void release_semaphore()
virtual void parse_value(const JsonObject &json)
static const std::vector< SKListener * > & get_listeners()
An Obervable class that listens for Signal K PUT requests coming over the websocket connection and no...
static const std::vector< SKPutListener * > & get_listeners()
virtual void parse_value(const JsonObject &put)=0
static void handle_response(JsonDocument &response)
The websocket connection to the Signal K server.
std::list< JsonDocument > received_updates_
void poll_access_request(const String host, const uint16_t port, const String href)
SKWSClient(const String &config_path, std::shared_ptr< SKDeltaQueue > sk_delta_queue, const String &server_address, uint16_t server_port, bool use_mdns=true)
void on_receive_delta(uint8_t *payload, size_t length)
Called when the websocket receives a delta.
void process_received_updates()
Loop through the received updates and process them.
void connect_ws(const String &host, const uint16_t port)
Integrator< int, int > delta_tx_count_producer_
TaskQueueProducer< SKWSConnectionState > connection_state_
SKWSConnectionState get_connection_state()
void on_receive_put(JsonDocument &message)
Called when a PUT event is received.
void release_received_updates_semaphore()
void send_access_request(const String host, const uint16_t port)
Integrator< int, int > delta_rx_count_producer_
virtual bool to_json(JsonObject &root) override final
esp_websocket_client_handle_t client_
void on_error()
Called when the websocket connection encounters an error.
void sendTXT(String &payload)
Send some processed data to the websocket.
bool get_mdns_service(String &server_address, uint16_t &server_port)
void on_disconnected()
Called when the websocket connection is disconnected.
void subscribe_listeners()
Subscribes the SK delta paths to the websocket.
void on_receive_updates(JsonDocument &message)
Called when a delta update is received.
uint16_t conf_server_port_
void test_token(const String host, const uint16_t port)
void set_connection_state(SKWSConnectionState state)
bool take_received_updates_semaphore(unsigned long int timeout_ms=0)
virtual bool from_json(const JsonObject &config) override final
String get_connection_status()
Get a String representation of the current connection state.
TaskQueueProducer< int > delta_tx_tick_producer_
Emits the number of deltas sent since last report.
std::shared_ptr< SKDeltaQueue > sk_delta_queue_
String conf_server_address_
void on_connected()
Called when the websocket connection is established.
static String get_hostname()
Get the current hostname.
virtual void set(const T &value) override
virtual const T & get() const
void emit(const SKWSConnectionState &new_value)
std::enable_if< std::is_base_of< ValueConsumer< typenameVConsumer::input_type >, VConsumer >::value &&std::is_convertible< T, typenameVConsumer::input_type >::value, std::shared_ptr< VConsumer > >::type connect_to(std::shared_ptr< VConsumer > consumer)
Connect a producer to a transform with a different input type.
std::shared_ptr< reactesp::EventLoop > event_loop()
String generate_uuid4()
Generate a random UUIDv4 string.
constexpr int kWsClientTaskStackSize
void ExecuteWebSocketTask(void *)