5#include <ArduinoJson.h>
11#include "elapsedMillis.h"
24static const char* kRequestPermission =
"readwrite";
27 elapsedMillis connect_loop_elapsed = 0;
28 elapsedMillis delta_loop_elapsed = 0;
33 if (connect_loop_elapsed > 2000) {
34 connect_loop_elapsed = 0;
37 if (delta_loop_elapsed > 5) {
38 delta_loop_elapsed = 0;
53static void websocket_event_handler(
void* handler_args, esp_event_base_t base,
54 int32_t event_id,
void* event_data) {
55 esp_websocket_event_data_t* data = (esp_websocket_event_data_t*)event_data;
57 case WEBSOCKET_EVENT_CONNECTED:
58 ESP_LOGD(__FILENAME__,
"WEBSOCKET_EVENT_CONNECTED");
61 case WEBSOCKET_EVENT_DISCONNECTED:
62 ESP_LOGD(__FILENAME__,
"WEBSOCKET_EVENT_DISCONNECTED");
65 case WEBSOCKET_EVENT_DATA:
67 if (data->op_code == 0x01) {
71 case WEBSOCKET_EVENT_ERROR:
78 const String& server_address, uint16_t server_port,
81 conf_server_address_{server_address},
82 conf_server_port_{server_port},
84 sk_delta_queue_{sk_delta_queue} {
104 ESP_LOGD(__FILENAME__,
"Starting SKWSClient");
107 MDNS.addService(
"signalk-sensesp",
"tcp", 80);
129 ESP_LOGW(__FILENAME__,
"Bad access token detected. Setting token to null.");
145 ESP_LOGW(__FILENAME__,
"Websocket client error.");
156 ESP_LOGI(__FILENAME__,
"Subscribing to Signal K listeners...");
167 bool output_available =
false;
168 JsonDocument subscription;
169 subscription[
"context"] =
"vessels.self";
174 if (listeners.size() > 0) {
175 output_available =
true;
176 JsonArray subscribe = subscription[
"subscribe"].to<JsonArray>();
178 for (
size_t i = 0; i < listeners.size(); i++) {
179 auto* listener = listeners.at(i);
180 String sk_path = listener->get_sk_path();
181 int listen_delay = listener->get_listen_delay();
183 JsonObject subscribe_path = subscribe.add<JsonObject>();
185 subscribe_path[
"path"] = sk_path;
186 subscribe_path[
"period"] = listen_delay;
187 ESP_LOGI(__FILENAME__,
"Adding %s subscription with listen_delay %d\n",
188 sk_path.c_str(), listen_delay);
193 if (output_available) {
196 serializeJson(subscription, json_message);
197 ESP_LOGI(__FILENAME__,
"Subscription JSON message:\n %s",
198 json_message.c_str());
199 esp_websocket_client_send_text(this->
client_, json_message.c_str(),
200 json_message.length(), portMAX_DELAY);
213 char buf[length + 1];
214 memcpy(buf, payload, length);
217#ifdef SIGNALK_PRINT_RCV_DELTA
218 ESP_LOGD(__FILENAME__,
"Websocket payload received: %s", (
char*)buf);
221 JsonDocument message;
223 auto error = deserializeJson(message, buf);
226 if (message[
"updates"].is<JsonVariant>()) {
230 if (message[
"put"].is<JsonVariant>()) {
235 if (message[
"requestId"].is<JsonVariant>() &&
236 !message[
"put"].is<JsonVariant>()) {
240 ESP_LOGE(__FILENAME__,
"deserializeJson error: %s", error.c_str());
253 JsonArray updates = message[
"updates"];
256 for (
size_t i = 0; i < updates.size(); i++) {
257 JsonObject update = updates[i];
259 JsonArray values = update[
"values"];
261 for (
size_t vi = 0; vi < values.size(); vi++) {
262 JsonDocument value_doc =
263 static_cast<JsonDocument
>(
static_cast<JsonObject
>((values[vi])));
283 const std::vector<SKPutListener*>& put_listeners =
291 const char* path = doc[
"path"];
292 JsonObject value = doc.as<JsonObject>();
294 for (
size_t i = 0; i < listeners.size(); i++) {
301 for (
size_t i = 0; i < put_listeners.size(); i++) {
324 JsonArray puts = message[
"put"];
325 size_t response_count = 0;
326 for (
size_t i = 0; i < puts.size(); i++) {
327 JsonObject value = puts[i];
328 const char* path = value[
"path"];
329 String str_val = value[
"value"].as<String>();
332 const std::vector<SKPutListener*>& listeners =
334 for (
size_t i = 0; i < listeners.size(); i++) {
346 JsonDocument put_response;
347 put_response[
"requestId"] = message[
"requestId"];
348 if (response_count == puts.size()) {
350 put_response[
"state"] =
"COMPLETED";
351 put_response[
"statusCode"] = 200;
354 put_response[
"state"] =
"FAILED";
355 put_response[
"statusCode"] = 405;
357 String response_text;
358 serializeJson(put_response, response_text);
359 esp_websocket_client_send_text(this->
client_, response_text.c_str(),
360 response_text.length(), portMAX_DELAY);
373 esp_websocket_client_send_text(this->
client_, payload.c_str(),
374 payload.length(), portMAX_DELAY);
379 uint16_t& server_port) {
381 int num = MDNS.queryService(
"signalk-ws",
"tcp");
386 server_address = MDNS.IP(0).toString();
387 server_port = MDNS.port(0);
388 ESP_LOGI(__FILENAME__,
"Found server %s (port %d)", server_address.c_str(),
398 if (!WiFi.isConnected() && WiFi.getMode() != WIFI_MODE_AP) {
401 "WiFi is disconnected. SignalK client connection will be initiated "
402 "when WiFi is connected.");
406 ESP_LOGI(__FILENAME__,
"Initiating websocket connection with server...");
411 ESP_LOGE(__FILENAME__,
412 "No Signal K server found in network when using mDNS service!");
414 ESP_LOGI(__FILENAME__,
415 "Signal K server has been found at address %s:%d by mDNS.",
424 ESP_LOGD(__FILENAME__,
425 "Websocket is connecting to Signal K server on address %s:%d",
429 ESP_LOGD(__FILENAME__,
430 "Websocket is not connecting to Signal K server because host and "
431 "port are not defined.");
445 ESP_LOGD(__FILENAME__,
"No prior authorization token present.");
455 const uint16_t server_port) {
459 String url = String(
"http://") + server_address +
":" + server_port +
460 "/signalk/v1/stream";
461 ESP_LOGD(__FILENAME__,
"Testing token with url %s", url.c_str());
463 const String full_token = String(
"Bearer ") +
auth_token_;
464 ESP_LOGD(__FILENAME__,
"Authorization: %s", full_token.c_str());
465 http.addHeader(
"Authorization", full_token.c_str());
466 int http_code = http.GET();
468 String payload = http.getString();
470 ESP_LOGD(__FILENAME__,
"Testing resulted in http status %d", http_code);
471 if (payload.length() > 0) {
472 ESP_LOGD(__FILENAME__,
473 "Returned payload (length %d) is: ", payload.length());
474 ESP_LOGD(__FILENAME__,
"%s", payload.c_str());
476 ESP_LOGD(__FILENAME__,
"Returned payload is empty");
478 if (http_code == 426) {
481 ESP_LOGD(__FILENAME__,
"Attempting to connect to Signal K Websocket...");
484 this->
connect_ws(server_address, server_port);
485 }
else if (http_code == 401) {
492 ESP_LOGE(__FILENAME__,
"GET... failed, error: %s\n",
493 http.errorToString(http_code).c_str());
499 const uint16_t server_port) {
500 ESP_LOGD(__FILENAME__,
"Preparing a new access request");
512 doc[
"permissions"] = kRequestPermission;
513 String json_req =
"";
514 serializeJson(doc, json_req);
516 ESP_LOGD(__FILENAME__,
"Access request: %s", json_req.c_str());
520 String url = String(
"http://") + server_address +
":" + server_port +
521 "/signalk/v1/access/requests";
522 ESP_LOGD(__FILENAME__,
"Access request url: %s", url.c_str());
524 http.addHeader(
"Content-Type",
"application/json");
525 int http_code = http.POST(json_req);
526 String payload = http.getString();
530 if (http_code != 202) {
531 ESP_LOGW(__FILENAME__,
"Can't handle response %d to access request.",
533 ESP_LOGD(__FILENAME__,
"%s", payload.c_str());
541 deserializeJson(doc, payload.c_str());
542 String state = doc[
"state"];
544 if (state !=
"PENDING") {
545 ESP_LOGW(__FILENAME__,
"Got unknown state: %s", state.c_str());
551 String href = doc[
"href"];
560 const uint16_t server_port,
562 ESP_LOGD(__FILENAME__,
"Polling SK Server for authentication token");
566 String url = String(
"http://") + server_address +
":" + server_port + href;
568 int http_code = http.GET();
569 if (http_code == 200 or http_code == 202) {
570 String payload = http.getString();
573 auto error = deserializeJson(doc, payload.c_str());
575 ESP_LOGW(__FILENAME__,
"WARNING: Could not deserialize http payload.");
576 ESP_LOGW(__FILENAME__,
"DeserializationError: %s", error.c_str());
579 String state = doc[
"state"];
580 ESP_LOGD(__FILENAME__,
"%s", state.c_str());
581 if (state ==
"PENDING") {
586 if (state ==
"COMPLETED") {
587 JsonObject access_req = doc[
"accessRequest"];
588 String permission = access_req[
"permission"];
593 if (permission ==
"DENIED") {
594 ESP_LOGW(__FILENAME__,
"Permission denied");
599 if (permission ==
"APPROVED") {
600 ESP_LOGI(__FILENAME__,
"Permission granted");
601 String token = access_req[
"token"];
604 this->
connect_ws(server_address, server_port);
610 if (http_code == 500) {
614 ESP_LOGD(__FILENAME__,
"Got 500, probably a non-existing request.");
621 ESP_LOGW(__FILENAME__,
622 "Can't handle response %d to pending access request.\n",
630 String path =
"/signalk/v1/stream?subscribe=none";
635 String url = String(
"ws://") + host +
":" + port + path;
637 esp_websocket_client_config_t websocket_cfg = {};
638 websocket_cfg.uri = url.c_str();
640 const String full_auth_header =
641 String(
"Authorization: Bearer ") +
auth_token_ +
"\r\n";
643 websocket_cfg.headers = full_auth_header.c_str();
645 ESP_LOGD(__FILENAME__,
"Websocket config: %s", websocket_cfg.uri);
646 ESP_LOGD(__FILENAME__,
"Initializing websocket client...");
647 this->
client_ = esp_websocket_client_init(&websocket_cfg);
648 ESP_LOGD(__FILENAME__,
"Registering websocket event handler...");
649 error = esp_websocket_register_events(this->
client_, WEBSOCKET_EVENT_ANY,
650 websocket_event_handler,
652 if (error != ESP_OK) {
653 ESP_LOGE(__FILENAME__,
"Error registering websocket event handler: %d",
656 ESP_LOGD(__FILENAME__,
"Starting websocket client...");
657 error = esp_websocket_client_start(this->client_);
658 if (error != ESP_OK) {
659 ESP_LOGE(__FILENAME__,
"Error starting websocket client: %d", error);
661 ESP_LOGD(__FILENAME__,
"Websocket client started.");
670 esp_websocket_client_close(this->
client_, portMAX_DELAY);
680 esp_websocket_client_send_text(this->
client_, output.c_str(),
681 output.length(), portMAX_DELAY);
700 if (config[
"sk_address"].is<String>()) {
703 if (config[
"sk_port"].is<int>()) {
706 if (config[
"use_mdns"].is<bool>()) {
707 this->
use_mdns_ = config[
"use_mdns"].as<
bool>();
709 if (config[
"token"].is<String>()) {
712 if (config[
"client_id"].is<String>()) {
713 this->
client_id_ = config[
"client_id"].as<String>();
715 if (config[
"polling_href"].is<String>()) {
731 return "Authorizing with SignalK";
737 return "Disconnected";
virtual bool load() override
Load and populate the object from a persistent storage.
virtual bool save() override
Save the object to a persistent storage.
virtual void set(const C &input) override final
void attach(std::function< void()> observer)
An Obervable class that listens for Signal K stream deltas and notifies any observers of value change...
static bool take_semaphore(uint64_t timeout_ms=0)
static void release_semaphore()
virtual void parse_value(const JsonObject &json)
static const std::vector< SKListener * > & get_listeners()
An Obervable class that listens for Signal K PUT requests coming over the websocket connection and no...
static const std::vector< SKPutListener * > & get_listeners()
virtual void parse_value(const JsonObject &put)=0
static void handle_response(JsonDocument &response)
The websocket connection to the Signal K server.
std::list< JsonDocument > received_updates_
void poll_access_request(const String host, const uint16_t port, const String href)
SKWSClient(const String &config_path, std::shared_ptr< SKDeltaQueue > sk_delta_queue, const String &server_address, uint16_t server_port, bool use_mdns=true)
void on_receive_delta(uint8_t *payload, size_t length)
Called when the websocket receives a delta.
void process_received_updates()
Loop through the received updates and process them.
void connect_ws(const String &host, const uint16_t port)
Integrator< int, int > delta_tx_count_producer_
TaskQueueProducer< SKWSConnectionState > connection_state_
SKWSConnectionState get_connection_state()
void on_receive_put(JsonDocument &message)
Called when a PUT event is received.
void release_received_updates_semaphore()
void send_access_request(const String host, const uint16_t port)
Integrator< int, int > delta_rx_count_producer_
virtual bool to_json(JsonObject &root) override final
esp_websocket_client_handle_t client_
void on_error()
Called when the websocket connection encounters an error.
void sendTXT(String &payload)
Send some processed data to the websocket.
bool get_mdns_service(String &server_address, uint16_t &server_port)
void on_disconnected()
Called when the websocket connection is disconnected.
void subscribe_listeners()
Subscribes the SK delta paths to the websocket.
void on_receive_updates(JsonDocument &message)
Called when a delta update is received.
uint16_t conf_server_port_
void test_token(const String host, const uint16_t port)
void set_connection_state(SKWSConnectionState state)
bool take_received_updates_semaphore(unsigned long int timeout_ms=0)
virtual bool from_json(const JsonObject &config) override final
String get_connection_status()
Get a String representation of the current connection state.
TaskQueueProducer< int > delta_tx_tick_producer_
Emits the number of deltas sent since last report.
std::shared_ptr< SKDeltaQueue > sk_delta_queue_
String conf_server_address_
void on_connected()
Called when the websocket connection is established.
static String get_hostname()
Get the current hostname.
virtual void set(const T &value) override
virtual const T & get() const
void emit(const SKWSConnectionState &new_value)
std::enable_if< std::is_base_of< ValueConsumer< typenameVConsumer::input_type >, VConsumer >::value &&std::is_convertible< T, typenameVConsumer::input_type >::value, std::shared_ptr< VConsumer > >::type connect_to(std::shared_ptr< VConsumer > consumer)
Connect a producer to a transform with a different input type.
std::shared_ptr< reactesp::EventLoop > event_loop()
String generate_uuid4()
Generate a random UUIDv4 string.
constexpr int kWsClientTaskStackSize
void ExecuteWebSocketTask(void *)