Loading [MathJax]/jax/output/HTML-CSS/config.js
SensESP 3.1.0
Universal Signal K sensor toolkit ESP32
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
signalk_ws_client.cpp
Go to the documentation of this file.
1#include "sensesp.h"
2
3#include "signalk_ws_client.h"
4
5#include <ArduinoJson.h>
6#include <ESPmDNS.h>
7#include <HTTPClient.h>
8#include <WiFiClient.h>
9
10#include "Arduino.h"
11#include "elapsedMillis.h"
12#include "esp_arduino_version.h"
16#include "sensesp/system/uuid.h"
17#include "sensesp_app.h"
18
19namespace sensesp {
20
21constexpr int kWsClientTaskStackSize = 8192;
22
24
25static const char* kRequestPermission = "readwrite";
26
27void ExecuteWebSocketTask(void* /*parameter*/) {
28 elapsedMillis connect_loop_elapsed = 0;
29 elapsedMillis delta_loop_elapsed = 0;
30
32
33 while (true) {
34 if (connect_loop_elapsed > 2000) {
35 connect_loop_elapsed = 0;
37 }
38 if (delta_loop_elapsed > 5) {
39 delta_loop_elapsed = 0;
41 }
42 delay(1);
43 }
44}
45
54static void websocket_event_handler(void* handler_args, esp_event_base_t base,
55 int32_t event_id, void* event_data) {
56 esp_websocket_event_data_t* data = (esp_websocket_event_data_t*)event_data;
57 switch (event_id) {
58 case WEBSOCKET_EVENT_CONNECTED:
59 ESP_LOGD(__FILENAME__, "WEBSOCKET_EVENT_CONNECTED");
61 break;
62 case WEBSOCKET_EVENT_DISCONNECTED:
63 ESP_LOGD(__FILENAME__, "WEBSOCKET_EVENT_DISCONNECTED");
65 break;
66 case WEBSOCKET_EVENT_DATA:
67 // check if the payload is text)
68 if (data->op_code == 0x01) {
69 ws_client->on_receive_delta((uint8_t*)data->data_ptr, data->data_len);
70 }
71 break;
72 case WEBSOCKET_EVENT_ERROR:
74 break;
75 }
76}
77
78SKWSClient::SKWSClient(const String& config_path,
79 std::shared_ptr<SKDeltaQueue> sk_delta_queue,
80 const String& server_address, uint16_t server_port,
81 bool use_mdns)
82 : FileSystemSaveable{config_path},
83 conf_server_address_{server_address},
84 conf_server_port_{server_port},
85 use_mdns_{use_mdns},
86 sk_delta_queue_{sk_delta_queue} {
87 // a SKWSClient object observes its own connection_state_ member
88 // and simply passes through any notification it emits. As a result,
89 // whenever the value of connection_state_ is updated, observers of the
90 // SKWSClient object get automatically notified.
92 [this]() { this->emit(this->connection_state_.get()); });
93
94 // process any received updates in the main task
95 event_loop()->onRepeat(1, [this]() { this->process_received_updates(); });
96
97 // set the singleton object pointer
98 ws_client = this;
99
100 load();
101
102 // Connect the counters
104
105 event_loop()->onDelay(0, [this]() {
106 ESP_LOGD(__FILENAME__, "Starting SKWSClient");
107 xTaskCreate(ExecuteWebSocketTask, "SKWSClient", kWsClientTaskStackSize,
108 this, 1, NULL);
109 MDNS.addService("signalk-sensesp", "tcp", 80);
110 });
111}
112
118
128 // Going from connecting directly to disconnect when we
129 // know we have found and talked to the server usually means
130 // the authentication token is bad.
131 ESP_LOGW(__FILENAME__, "Bad access token detected. Setting token to null.");
132 auth_token_ = NULL_AUTH_TOKEN;
133 save();
134 }
136 server_detected_ = false;
137}
138
147 ESP_LOGW(__FILENAME__, "Websocket client error.");
148}
149
157 this->sk_delta_queue_->reset_meta_send();
158 ESP_LOGI(__FILENAME__, "Subscribing to Signal K listeners...");
159 this->subscribe_listeners();
160}
161
169 bool output_available = false;
170 JsonDocument subscription;
171 subscription["context"] = "vessels.self";
172
174 const std::vector<SKListener*>& listeners = SKListener::get_listeners();
175
176 if (listeners.size() > 0) {
177 output_available = true;
178 JsonArray subscribe = subscription["subscribe"].to<JsonArray>();
179
180 for (size_t i = 0; i < listeners.size(); i++) {
181 auto* listener = listeners.at(i);
182 String sk_path = listener->get_sk_path();
183 int listen_delay = listener->get_listen_delay();
184
185 JsonObject subscribe_path = subscribe.add<JsonObject>();
186
187 subscribe_path["path"] = sk_path;
188 subscribe_path["period"] = listen_delay;
189 ESP_LOGI(__FILENAME__, "Adding %s subscription with listen_delay %d\n",
190 sk_path.c_str(), listen_delay);
191 }
192 }
194
195 if (output_available) {
196 String json_message;
197
198 serializeJson(subscription, json_message);
199 ESP_LOGI(__FILENAME__, "Subscription JSON message:\n %s",
200 json_message.c_str());
201 esp_websocket_client_send_text(this->client_, json_message.c_str(),
202 json_message.length(), portMAX_DELAY);
203 }
204}
205
213void SKWSClient::on_receive_delta(uint8_t* payload, size_t length) {
214 // Need to work on null-terminated strings
215 char buf[length + 1];
216 memcpy(buf, payload, length);
217 buf[length] = 0;
218
219#ifdef SIGNALK_PRINT_RCV_DELTA
220 ESP_LOGD(__FILENAME__, "Websocket payload received: %s", (char*)buf);
221#endif
222
223 JsonDocument message;
224 // JsonObject message = jsonDoc.as<JsonObject>();
225 auto error = deserializeJson(message, buf);
226
227 if (!error) {
228 if (message["updates"].is<JsonVariant>()) {
229 on_receive_updates(message);
230 }
231
232 if (message["put"].is<JsonVariant>()) {
233 on_receive_put(message);
234 }
235
236 // Putrequest contains also requestId Key GA
237 if (message["requestId"].is<JsonVariant>() &&
238 !message["put"].is<JsonVariant>()) {
240 }
241 } else {
242 ESP_LOGE(__FILENAME__, "deserializeJson error: %s", error.c_str());
243 }
244}
245
253void SKWSClient::on_receive_updates(JsonDocument& message) {
254 // Process updates from subscriptions...
255 JsonArray updates = message["updates"];
256
258 for (size_t i = 0; i < updates.size(); i++) {
259 JsonObject update = updates[i];
260
261 JsonArray values = update["values"];
262
263 for (size_t vi = 0; vi < values.size(); vi++) {
264 JsonDocument value_doc =
265 static_cast<JsonDocument>(static_cast<JsonObject>((values[vi])));
266
267 // push all values into a separate list for processing
268 // in the main task
269 received_updates_.push_back(value_doc);
270 }
271 }
273}
274
283
284 const std::vector<SKListener*>& listeners = SKListener::get_listeners();
285 const std::vector<SKPutListener*>& put_listeners =
287
289 int num_updates = received_updates_.size();
290 while (!received_updates_.empty()) {
291 JsonDocument& doc = received_updates_.front();
292
293 const char* path = doc["path"];
294 JsonObject value = doc.as<JsonObject>();
295
296 for (size_t i = 0; i < listeners.size(); i++) {
297 SKListener* listener = listeners[i];
298 if (listener->get_sk_path().equals(path)) {
299 listener->parse_value(value);
300 }
301 }
302 // to be able to parse values of Put Listeners GA
303 for (size_t i = 0; i < put_listeners.size(); i++) {
304 SKPutListener* listener = put_listeners[i];
305 if (listener->get_sk_path().equals(path)) {
306 listener->parse_value(value);
307 }
308 }
309 received_updates_.pop_front();
310 }
312 delta_rx_count_producer_.set(num_updates);
313
315}
316
324void SKWSClient::on_receive_put(JsonDocument& message) {
325 // Process PUT requests...
326 JsonArray puts = message["put"];
327 size_t response_count = 0;
328 for (size_t i = 0; i < puts.size(); i++) {
329 JsonObject value = puts[i];
330 const char* path = value["path"];
331 String str_val = value["value"].as<String>();
332
334 const std::vector<SKPutListener*>& listeners =
336 for (size_t i = 0; i < listeners.size(); i++) {
337 SKPutListener* listener = listeners[i];
338 if (listener->get_sk_path().equals(path)) {
340 received_updates_.push_back(value);
342 response_count++;
343 }
344 }
346
347 // Send back a request response...
348 JsonDocument put_response;
349 put_response["requestId"] = message["requestId"];
350 if (response_count == puts.size()) {
351 // We found a response for every PUT request
352 put_response["state"] = "COMPLETED";
353 put_response["statusCode"] = 200;
354 } else {
355 // One or more requests did not have a matching path
356 put_response["state"] = "FAILED";
357 put_response["statusCode"] = 405;
358 }
359 String response_text;
360 serializeJson(put_response, response_text);
361 esp_websocket_client_send_text(this->client_, response_text.c_str(),
362 response_text.length(), portMAX_DELAY);
363 }
364}
365
373void SKWSClient::sendTXT(String& payload) {
375 esp_websocket_client_send_text(this->client_, payload.c_str(),
376 payload.length(), portMAX_DELAY);
377 }
378}
379
380bool SKWSClient::get_mdns_service(String& server_address,
381 uint16_t& server_port) {
382 // get IP address using an mDNS query
383 int num = MDNS.queryService("signalk-ws", "tcp");
384 if (num == 0) {
385 // no service found
386 return false;
387 }
388
389#if ESP_ARDUINO_VERSION_MAJOR < 3
390 server_address = MDNS.IP(0).toString();
391#else
392 server_address = MDNS.address(0).toString();
393#endif
394 server_port = MDNS.port(0);
395 ESP_LOGI(__FILENAME__, "Found server %s (port %d)", server_address.c_str(),
396 server_port);
397 return true;
398}
399
402 return;
403 }
404
405 if (!WiFi.isConnected() && WiFi.getMode() != WIFI_MODE_AP) {
406 ESP_LOGI(
407 __FILENAME__,
408 "WiFi is disconnected. SignalK client connection will be initiated "
409 "when WiFi is connected.");
410 return;
411 }
412
413 ESP_LOGI(__FILENAME__, "Initiating websocket connection with server...");
414
416 if (use_mdns_) {
417 if (!get_mdns_service(this->server_address_, this->server_port_)) {
418 ESP_LOGE(__FILENAME__,
419 "No Signal K server found in network when using mDNS service!");
420 } else {
421 ESP_LOGI(__FILENAME__,
422 "Signal K server has been found at address %s:%d by mDNS.",
423 this->server_address_.c_str(), this->server_port_);
424 }
425 } else {
427 this->server_port_ = this->conf_server_port_;
428 }
429
430 if (!this->server_address_.isEmpty() && this->server_port_ > 0) {
431 ESP_LOGD(__FILENAME__,
432 "Websocket is connecting to Signal K server on address %s:%d",
433 this->server_address_.c_str(), this->server_port_);
434 } else {
435 // host and port not defined - don't try to connect
436 ESP_LOGD(__FILENAME__,
437 "Websocket is not connecting to Signal K server because host and "
438 "port are not defined.");
440 return;
441 }
442
443 if (this->polling_href_ != "") {
444 // existing pending request
446 this->polling_href_);
447 return;
448 }
449
450 if (this->auth_token_ == NULL_AUTH_TOKEN) {
451 // initiate HTTP authentication
452 ESP_LOGD(__FILENAME__, "No prior authorization token present.");
454 return;
455 }
456
457 // Test the validity of the authorization token
458 this->test_token(this->server_address_, this->server_port_);
459}
460
461void SKWSClient::test_token(const String server_address,
462 const uint16_t server_port) {
463 // FIXME: implement async HTTP client!
464 HTTPClient http;
465
466 String url = String("http://") + server_address + ":" + server_port +
467 "/signalk/v1/stream";
468 ESP_LOGD(__FILENAME__, "Testing token with url %s", url.c_str());
469 http.begin(wifi_client_, url);
470 const String full_token = String("Bearer ") + auth_token_;
471 ESP_LOGD(__FILENAME__, "Authorization: %s", full_token.c_str());
472 http.addHeader("Authorization", full_token.c_str());
473 int http_code = http.GET();
474 if (http_code > 0) {
475 String payload = http.getString();
476 http.end();
477 ESP_LOGD(__FILENAME__, "Testing resulted in http status %d", http_code);
478 if (payload.length() > 0) {
479 ESP_LOGD(__FILENAME__,
480 "Returned payload (length %d) is: ", payload.length());
481 ESP_LOGD(__FILENAME__, "%s", payload.c_str());
482 } else {
483 ESP_LOGD(__FILENAME__, "Returned payload is empty");
484 }
485 if (http_code == 426) {
486 // HTTP status 426 is "Upgrade Required", which is the expected
487 // response for a websocket connection.
488 ESP_LOGD(__FILENAME__, "Attempting to connect to Signal K Websocket...");
489 server_detected_ = true;
490 token_test_success_ = true;
491 this->connect_ws(server_address, server_port);
492 } else if (http_code == 401) {
493 this->client_id_ = "";
494 this->send_access_request(server_address, server_port);
495 } else {
497 }
498 } else {
499 ESP_LOGE(__FILENAME__, "GET... failed, error: %s\n",
500 http.errorToString(http_code).c_str());
502 }
503}
504
505void SKWSClient::send_access_request(const String server_address,
506 const uint16_t server_port) {
507 ESP_LOGD(__FILENAME__, "Preparing a new access request");
508 if (client_id_ == "") {
509 // generate a client ID
511 save();
512 }
513
514 // create a new access request
515 JsonDocument doc;
516 doc["clientId"] = client_id_;
517 doc["description"] =
518 String("SensESP device: ") + SensESPBaseApp::get_hostname();
519 doc["permissions"] = kRequestPermission;
520 String json_req = "";
521 serializeJson(doc, json_req);
522
523 ESP_LOGD(__FILENAME__, "Access request: %s", json_req.c_str());
524
525 HTTPClient http;
526
527 String url = String("http://") + server_address + ":" + server_port +
528 "/signalk/v1/access/requests";
529 ESP_LOGD(__FILENAME__, "Access request url: %s", url.c_str());
530 http.begin(wifi_client_, url);
531 http.addHeader("Content-Type", "application/json");
532 int http_code = http.POST(json_req);
533 String payload = http.getString();
534 http.end();
535
536 // if we get a response we can't handle, try to reconnect later
537 if (http_code != 202) {
538 ESP_LOGW(__FILENAME__, "Can't handle response %d to access request.",
539 http_code);
540 ESP_LOGD(__FILENAME__, "%s", payload.c_str());
542 client_id_ = "";
543 return;
544 }
545
546 // http status code 202
547
548 deserializeJson(doc, payload.c_str());
549 String state = doc["state"];
550
551 if (state != "PENDING") {
552 ESP_LOGW(__FILENAME__, "Got unknown state: %s", state.c_str());
554 client_id_ = "";
555 return;
556 }
557
558 String href = doc["href"];
559 polling_href_ = href;
560 save();
561
562 delay(5000);
563 this->poll_access_request(server_address, server_port, this->polling_href_);
564}
565
566void SKWSClient::poll_access_request(const String server_address,
567 const uint16_t server_port,
568 const String href) {
569 ESP_LOGD(__FILENAME__, "Polling SK Server for authentication token");
570
571 HTTPClient http;
572
573 String url = String("http://") + server_address + ":" + server_port + href;
574 http.begin(wifi_client_, url);
575 int http_code = http.GET();
576 if (http_code == 200 or http_code == 202) {
577 String payload = http.getString();
578 http.end();
579 JsonDocument doc;
580 auto error = deserializeJson(doc, payload.c_str());
581 if (error) {
582 ESP_LOGW(__FILENAME__, "WARNING: Could not deserialize http payload.");
583 ESP_LOGW(__FILENAME__, "DeserializationError: %s", error.c_str());
584 return; // TODO(mairas): return at this point, or keep going?
585 }
586 String state = doc["state"];
587 ESP_LOGD(__FILENAME__, "%s", state.c_str());
588 if (state == "PENDING") {
590 delay(5000);
591 return;
592 }
593 if (state == "COMPLETED") {
594 JsonObject access_req = doc["accessRequest"];
595 String permission = access_req["permission"];
596
597 polling_href_ = "";
598 save();
599
600 if (permission == "DENIED") {
601 ESP_LOGW(__FILENAME__, "Permission denied");
603 return;
604 }
605
606 if (permission == "APPROVED") {
607 ESP_LOGI(__FILENAME__, "Permission granted");
608 String token = access_req["token"];
609 auth_token_ = token;
610 save();
611 this->connect_ws(server_address, server_port);
612 return;
613 }
614 }
615 } else {
616 http.end();
617 if (http_code == 500) {
618 // this is probably the server barfing due to
619 // us polling a non-existing request. Just
620 // delete the polling href.
621 ESP_LOGD(__FILENAME__, "Got 500, probably a non-existing request.");
622 polling_href_ = "";
623 save();
625 return;
626 }
627 // any other HTTP status code
628 ESP_LOGW(__FILENAME__,
629 "Can't handle response %d to pending access request.\n",
630 http_code);
632 return;
633 }
634}
635
636void SKWSClient::connect_ws(const String& host, const uint16_t port) {
637 String path = "/signalk/v1/stream?subscribe=none";
639
640 esp_err_t error;
641
642 String url = String("ws://") + host + ":" + port + path;
643
644 esp_websocket_client_config_t websocket_cfg = {};
645 websocket_cfg.uri = url.c_str();
646
647 const String full_auth_header =
648 String("Authorization: Bearer ") + auth_token_ + "\r\n";
649
650 websocket_cfg.headers = full_auth_header.c_str();
651
652 ESP_LOGD(__FILENAME__, "Websocket config: %s", websocket_cfg.uri);
653 ESP_LOGD(__FILENAME__, "Initializing websocket client...");
654 this->client_ = esp_websocket_client_init(&websocket_cfg);
655 ESP_LOGD(__FILENAME__, "Registering websocket event handler...");
656 error = esp_websocket_register_events(this->client_, WEBSOCKET_EVENT_ANY,
657 websocket_event_handler,
658 (void*)this->client_);
659 if (error != ESP_OK) {
660 ESP_LOGE(__FILENAME__, "Error registering websocket event handler: %d",
661 error);
662 }
663 ESP_LOGD(__FILENAME__, "Starting websocket client...");
664 error = esp_websocket_client_start(this->client_);
665 if (error != ESP_OK) {
666 ESP_LOGE(__FILENAME__, "Error starting websocket client: %d", error);
667 }
668 ESP_LOGD(__FILENAME__, "Websocket client started.");
669}
670
674
677 esp_websocket_client_close(this->client_, portMAX_DELAY);
679 }
680}
681
683 String output;
685 if (sk_delta_queue_->data_available()) {
686 sk_delta_queue_->get_delta(output);
687 esp_websocket_client_send_text(this->client_, output.c_str(),
688 output.length(), portMAX_DELAY);
689 // This automatically notifies the observers
691 }
692 }
693}
694
695bool SKWSClient::to_json(JsonObject& root) {
696 root["sk_address"] = this->conf_server_address_;
697 root["sk_port"] = this->conf_server_port_;
698 root["use_mdns"] = this->use_mdns_;
699
700 root["token"] = this->auth_token_;
701 root["client_id"] = this->client_id_;
702 root["polling_href"] = this->polling_href_;
703 return true;
704}
705
706bool SKWSClient::from_json(const JsonObject& config) {
707 if (config["sk_address"].is<String>()) {
708 this->conf_server_address_ = config["sk_address"].as<String>();
709 }
710 if (config["sk_port"].is<int>()) {
711 this->conf_server_port_ = config["sk_port"].as<int>();
712 }
713 if (config["use_mdns"].is<bool>()) {
714 this->use_mdns_ = config["use_mdns"].as<bool>();
715 }
716 if (config["token"].is<String>()) {
717 this->auth_token_ = config["token"].as<String>();
718 }
719 if (config["client_id"].is<String>()) {
720 this->client_id_ = config["client_id"].as<String>();
721 }
722 if (config["polling_href"].is<String>()) {
723 this->polling_href_ = config["polling_href"].as<String>();
724 }
725
726 return true;
727}
728
735 auto state = get_connection_state();
736 switch (state) {
738 return "Authorizing with SignalK";
740 return "Connected";
742 return "Connecting";
744 return "Disconnected";
745 }
746
747 return "Unknown";
748}
749
750} // namespace sensesp
virtual bool load() override
Load and populate the object from a persistent storage.
Definition saveable.cpp:8
virtual bool save() override
Save the object to a persistent storage.
Definition saveable.cpp:41
virtual void set(const C &input) override final
Definition integrator.h:34
void attach(std::function< void()> observer)
Definition observable.h:29
An Obervable class that listens for Signal K stream deltas and notifies any observers of value change...
static bool take_semaphore(uint64_t timeout_ms=0)
static void release_semaphore()
virtual void parse_value(const JsonObject &json)
static const std::vector< SKListener * > & get_listeners()
An Obervable class that listens for Signal K PUT requests coming over the websocket connection and no...
static const std::vector< SKPutListener * > & get_listeners()
virtual void parse_value(const JsonObject &put)=0
static void handle_response(JsonDocument &response)
The websocket connection to the Signal K server.
std::list< JsonDocument > received_updates_
void poll_access_request(const String host, const uint16_t port, const String href)
SKWSClient(const String &config_path, std::shared_ptr< SKDeltaQueue > sk_delta_queue, const String &server_address, uint16_t server_port, bool use_mdns=true)
void on_receive_delta(uint8_t *payload, size_t length)
Called when the websocket receives a delta.
void process_received_updates()
Loop through the received updates and process them.
void connect_ws(const String &host, const uint16_t port)
Integrator< int, int > delta_tx_count_producer_
TaskQueueProducer< SKWSConnectionState > connection_state_
SKWSConnectionState get_connection_state()
void on_receive_put(JsonDocument &message)
Called when a PUT event is received.
void release_received_updates_semaphore()
void send_access_request(const String host, const uint16_t port)
Integrator< int, int > delta_rx_count_producer_
virtual bool to_json(JsonObject &root) override final
esp_websocket_client_handle_t client_
void on_error()
Called when the websocket connection encounters an error.
void sendTXT(String &payload)
Send some processed data to the websocket.
bool get_mdns_service(String &server_address, uint16_t &server_port)
void on_disconnected()
Called when the websocket connection is disconnected.
void subscribe_listeners()
Subscribes the SK delta paths to the websocket.
void on_receive_updates(JsonDocument &message)
Called when a delta update is received.
void test_token(const String host, const uint16_t port)
void set_connection_state(SKWSConnectionState state)
bool take_received_updates_semaphore(unsigned long int timeout_ms=0)
virtual bool from_json(const JsonObject &config) override final
String get_connection_status()
Get a String representation of the current connection state.
TaskQueueProducer< int > delta_tx_tick_producer_
Emits the number of deltas sent since last report.
std::shared_ptr< SKDeltaQueue > sk_delta_queue_
void on_connected()
Called when the websocket connection is established.
static String get_hostname()
Get the current hostname.
virtual void set(const T &value) override
virtual const T & get() const
void emit(const SKWSConnectionState &new_value)
std::enable_if< std::is_base_of< ValueConsumer< typenameVConsumer::input_type >, VConsumer >::value &&std::is_convertible< T, typenameVConsumer::input_type >::value, std::shared_ptr< VConsumer > >::type connect_to(std::shared_ptr< VConsumer > consumer)
Connect a producer to a transform with a different input type.
std::shared_ptr< reactesp::EventLoop > event_loop()
Definition sensesp.cpp:9
String generate_uuid4()
Generate a random UUIDv4 string.
Definition uuid.cpp:5
constexpr int kWsClientTaskStackSize
SKWSClient * ws_client
void ExecuteWebSocketTask(void *)