SensESP 3.0.0-beta.6
Universal Signal K sensor toolkit ESP32
Loading...
Searching...
No Matches
signalk_ws_client.cpp
Go to the documentation of this file.
1#include "sensesp.h"
2
3#include "signalk_ws_client.h"
4
5#include <ArduinoJson.h>
6#include <ESPmDNS.h>
7#include <HTTPClient.h>
8#include <WiFiClient.h>
9
10#include "Arduino.h"
11#include "elapsedMillis.h"
15#include "sensesp/system/uuid.h"
16#include "sensesp_app.h"
17
18namespace sensesp {
19
20constexpr int kWsClientTaskStackSize = 8192;
21
23
24static const char* kRequestPermission = "readwrite";
25
26void ExecuteWebSocketTask(void* /*parameter*/) {
27 elapsedMillis connect_loop_elapsed = 0;
28 elapsedMillis delta_loop_elapsed = 0;
29
31
32 while (true) {
33 if (connect_loop_elapsed > 2000) {
34 connect_loop_elapsed = 0;
36 }
37 if (delta_loop_elapsed > 5) {
38 delta_loop_elapsed = 0;
40 }
41 delay(1);
42 }
43}
44
53static void websocket_event_handler(void* handler_args, esp_event_base_t base,
54 int32_t event_id, void* event_data) {
55 esp_websocket_event_data_t* data = (esp_websocket_event_data_t*)event_data;
56 switch (event_id) {
57 case WEBSOCKET_EVENT_CONNECTED:
58 ESP_LOGD(__FILENAME__, "WEBSOCKET_EVENT_CONNECTED");
60 break;
61 case WEBSOCKET_EVENT_DISCONNECTED:
62 ESP_LOGD(__FILENAME__, "WEBSOCKET_EVENT_DISCONNECTED");
64 break;
65 case WEBSOCKET_EVENT_DATA:
66 // check if the payload is text)
67 if (data->op_code == 0x01) {
68 ws_client->on_receive_delta((uint8_t*)data->data_ptr, data->data_len);
69 }
70 break;
71 case WEBSOCKET_EVENT_ERROR:
73 break;
74 }
75}
76
77SKWSClient::SKWSClient(const String& config_path, std::shared_ptr<SKDeltaQueue> sk_delta_queue,
78 const String& server_address, uint16_t server_port,
79 bool use_mdns)
80 : FileSystemSaveable{config_path},
81 conf_server_address_{server_address},
82 conf_server_port_{server_port},
83 use_mdns_{use_mdns},
84 sk_delta_queue_{sk_delta_queue} {
85 // a SKWSClient object observes its own connection_state_ member
86 // and simply passes through any notification it emits. As a result,
87 // whenever the value of connection_state_ is updated, observers of the
88 // SKWSClient object get automatically notified.
90 [this]() { this->emit(this->connection_state_.get()); });
91
92 // process any received updates in the main task
93 event_loop()->onRepeat(1, [this]() { this->process_received_updates(); });
94
95 // set the singleton object pointer
96 ws_client = this;
97
98 load();
99
100 // Connect the counters
102
103 event_loop()->onDelay(0, [this]() {
104 ESP_LOGD(__FILENAME__, "Starting SKWSClient");
105 xTaskCreate(ExecuteWebSocketTask, "SKWSClient", kWsClientTaskStackSize,
106 this, 1, NULL);
107 MDNS.addService("signalk-sensesp", "tcp", 80);
108 });
109}
110
116
126 // Going from connecting directly to disconnect when we
127 // know we have found and talked to the server usually means
128 // the authentication token is bad.
129 ESP_LOGW(__FILENAME__, "Bad access token detected. Setting token to null.");
130 auth_token_ = NULL_AUTH_TOKEN;
131 save();
132 }
134 server_detected_ = false;
135}
136
145 ESP_LOGW(__FILENAME__, "Websocket client error.");
146}
147
155 this->sk_delta_queue_->reset_meta_send();
156 ESP_LOGI(__FILENAME__, "Subscribing to Signal K listeners...");
157 this->subscribe_listeners();
158}
159
167 bool output_available = false;
168 JsonDocument subscription;
169 subscription["context"] = "vessels.self";
170
172 const std::vector<SKListener*>& listeners = SKListener::get_listeners();
173
174 if (listeners.size() > 0) {
175 output_available = true;
176 JsonArray subscribe = subscription["subscribe"].to<JsonArray>();
177
178 for (size_t i = 0; i < listeners.size(); i++) {
179 auto* listener = listeners.at(i);
180 String sk_path = listener->get_sk_path();
181 int listen_delay = listener->get_listen_delay();
182
183 JsonObject subscribe_path = subscribe.add<JsonObject>();
184
185 subscribe_path["path"] = sk_path;
186 subscribe_path["period"] = listen_delay;
187 ESP_LOGI(__FILENAME__, "Adding %s subscription with listen_delay %d\n",
188 sk_path.c_str(), listen_delay);
189 }
190 }
192
193 if (output_available) {
194 String json_message;
195
196 serializeJson(subscription, json_message);
197 ESP_LOGI(__FILENAME__, "Subscription JSON message:\n %s",
198 json_message.c_str());
199 esp_websocket_client_send_text(this->client_, json_message.c_str(),
200 json_message.length(), portMAX_DELAY);
201 }
202}
203
211void SKWSClient::on_receive_delta(uint8_t* payload, size_t length) {
212 // Need to work on null-terminated strings
213 char buf[length + 1];
214 memcpy(buf, payload, length);
215 buf[length] = 0;
216
217#ifdef SIGNALK_PRINT_RCV_DELTA
218 ESP_LOGD(__FILENAME__, "Websocket payload received: %s", (char*)buf);
219#endif
220
221 JsonDocument message;
222 // JsonObject message = jsonDoc.as<JsonObject>();
223 auto error = deserializeJson(message, buf);
224
225 if (!error) {
226 if (message["updates"].is<JsonVariant>()) {
227 on_receive_updates(message);
228 }
229
230 if (message["put"].is<JsonVariant>()) {
231 on_receive_put(message);
232 }
233
234 // Putrequest contains also requestId Key GA
235 if (message["requestId"].is<JsonVariant>() &&
236 !message["put"].is<JsonVariant>()) {
238 }
239 } else {
240 ESP_LOGE(__FILENAME__, "deserializeJson error: %s", error.c_str());
241 }
242}
243
251void SKWSClient::on_receive_updates(JsonDocument& message) {
252 // Process updates from subscriptions...
253 JsonArray updates = message["updates"];
254
256 for (size_t i = 0; i < updates.size(); i++) {
257 JsonObject update = updates[i];
258
259 JsonArray values = update["values"];
260
261 for (size_t vi = 0; vi < values.size(); vi++) {
262 JsonDocument value_doc =
263 static_cast<JsonDocument>(static_cast<JsonObject>((values[vi])));
264
265 // push all values into a separate list for processing
266 // in the main task
267 received_updates_.push_back(value_doc);
268 }
269 }
271}
272
281
282 const std::vector<SKListener*>& listeners = SKListener::get_listeners();
283 const std::vector<SKPutListener*>& put_listeners =
285
287 int num_updates = received_updates_.size();
288 while (!received_updates_.empty()) {
289 JsonDocument& doc = received_updates_.front();
290
291 const char* path = doc["path"];
292 JsonObject value = doc.as<JsonObject>();
293
294 for (size_t i = 0; i < listeners.size(); i++) {
295 SKListener* listener = listeners[i];
296 if (listener->get_sk_path().equals(path)) {
297 listener->parse_value(value);
298 }
299 }
300 // to be able to parse values of Put Listeners GA
301 for (size_t i = 0; i < put_listeners.size(); i++) {
302 SKPutListener* listener = put_listeners[i];
303 if (listener->get_sk_path().equals(path)) {
304 listener->parse_value(value);
305 }
306 }
307 received_updates_.pop_front();
308 }
310 delta_rx_count_producer_.set(num_updates);
311
313}
314
322void SKWSClient::on_receive_put(JsonDocument& message) {
323 // Process PUT requests...
324 JsonArray puts = message["put"];
325 size_t response_count = 0;
326 for (size_t i = 0; i < puts.size(); i++) {
327 JsonObject value = puts[i];
328 const char* path = value["path"];
329 String str_val = value["value"].as<String>();
330
332 const std::vector<SKPutListener*>& listeners =
334 for (size_t i = 0; i < listeners.size(); i++) {
335 SKPutListener* listener = listeners[i];
336 if (listener->get_sk_path().equals(path)) {
338 received_updates_.push_back(value);
340 response_count++;
341 }
342 }
344
345 // Send back a request response...
346 JsonDocument put_response;
347 put_response["requestId"] = message["requestId"];
348 if (response_count == puts.size()) {
349 // We found a response for every PUT request
350 put_response["state"] = "COMPLETED";
351 put_response["statusCode"] = 200;
352 } else {
353 // One or more requests did not have a matching path
354 put_response["state"] = "FAILED";
355 put_response["statusCode"] = 405;
356 }
357 String response_text;
358 serializeJson(put_response, response_text);
359 esp_websocket_client_send_text(this->client_, response_text.c_str(),
360 response_text.length(), portMAX_DELAY);
361 }
362}
363
371void SKWSClient::sendTXT(String& payload) {
373 esp_websocket_client_send_text(this->client_, payload.c_str(),
374 payload.length(), portMAX_DELAY);
375 }
376}
377
378bool SKWSClient::get_mdns_service(String& server_address,
379 uint16_t& server_port) {
380 // get IP address using an mDNS query
381 int num = MDNS.queryService("signalk-ws", "tcp");
382 if (num == 0) {
383 // no service found
384 return false;
385 }
386 server_address = MDNS.IP(0).toString();
387 server_port = MDNS.port(0);
388 ESP_LOGI(__FILENAME__, "Found server %s (port %d)", server_address.c_str(),
389 server_port);
390 return true;
391}
392
395 return;
396 }
397
398 if (!WiFi.isConnected() && WiFi.getMode() != WIFI_MODE_AP) {
399 ESP_LOGI(
400 __FILENAME__,
401 "WiFi is disconnected. SignalK client connection will be initiated "
402 "when WiFi is connected.");
403 return;
404 }
405
406 ESP_LOGI(__FILENAME__, "Initiating websocket connection with server...");
407
409 if (use_mdns_) {
410 if (!get_mdns_service(this->server_address_, this->server_port_)) {
411 ESP_LOGE(__FILENAME__,
412 "No Signal K server found in network when using mDNS service!");
413 } else {
414 ESP_LOGI(__FILENAME__,
415 "Signal K server has been found at address %s:%d by mDNS.",
416 this->server_address_.c_str(), this->server_port_);
417 }
418 } else {
420 this->server_port_ = this->conf_server_port_;
421 }
422
423 if (!this->server_address_.isEmpty() && this->server_port_ > 0) {
424 ESP_LOGD(__FILENAME__,
425 "Websocket is connecting to Signal K server on address %s:%d",
426 this->server_address_.c_str(), this->server_port_);
427 } else {
428 // host and port not defined - don't try to connect
429 ESP_LOGD(__FILENAME__,
430 "Websocket is not connecting to Signal K server because host and "
431 "port are not defined.");
433 return;
434 }
435
436 if (this->polling_href_ != "") {
437 // existing pending request
439 this->polling_href_);
440 return;
441 }
442
443 if (this->auth_token_ == NULL_AUTH_TOKEN) {
444 // initiate HTTP authentication
445 ESP_LOGD(__FILENAME__, "No prior authorization token present.");
447 return;
448 }
449
450 // Test the validity of the authorization token
451 this->test_token(this->server_address_, this->server_port_);
452}
453
454void SKWSClient::test_token(const String server_address,
455 const uint16_t server_port) {
456 // FIXME: implement async HTTP client!
457 HTTPClient http;
458
459 String url = String("http://") + server_address + ":" + server_port +
460 "/signalk/v1/stream";
461 ESP_LOGD(__FILENAME__, "Testing token with url %s", url.c_str());
462 http.begin(wifi_client_, url);
463 const String full_token = String("Bearer ") + auth_token_;
464 ESP_LOGD(__FILENAME__, "Authorization: %s", full_token.c_str());
465 http.addHeader("Authorization", full_token.c_str());
466 int http_code = http.GET();
467 if (http_code > 0) {
468 String payload = http.getString();
469 http.end();
470 ESP_LOGD(__FILENAME__, "Testing resulted in http status %d", http_code);
471 if (payload.length() > 0) {
472 ESP_LOGD(__FILENAME__,
473 "Returned payload (length %d) is: ", payload.length());
474 ESP_LOGD(__FILENAME__, "%s", payload.c_str());
475 } else {
476 ESP_LOGD(__FILENAME__, "Returned payload is empty");
477 }
478 if (http_code == 426) {
479 // HTTP status 426 is "Upgrade Required", which is the expected
480 // response for a websocket connection.
481 ESP_LOGD(__FILENAME__, "Attempting to connect to Signal K Websocket...");
482 server_detected_ = true;
483 token_test_success_ = true;
484 this->connect_ws(server_address, server_port);
485 } else if (http_code == 401) {
486 this->client_id_ = "";
487 this->send_access_request(server_address, server_port);
488 } else {
490 }
491 } else {
492 ESP_LOGE(__FILENAME__, "GET... failed, error: %s\n",
493 http.errorToString(http_code).c_str());
495 }
496}
497
498void SKWSClient::send_access_request(const String server_address,
499 const uint16_t server_port) {
500 ESP_LOGD(__FILENAME__, "Preparing a new access request");
501 if (client_id_ == "") {
502 // generate a client ID
504 save();
505 }
506
507 // create a new access request
508 JsonDocument doc;
509 doc["clientId"] = client_id_;
510 doc["description"] =
511 String("SensESP device: ") + SensESPBaseApp::get_hostname();
512 doc["permissions"] = kRequestPermission;
513 String json_req = "";
514 serializeJson(doc, json_req);
515
516 ESP_LOGD(__FILENAME__, "Access request: %s", json_req.c_str());
517
518 HTTPClient http;
519
520 String url = String("http://") + server_address + ":" + server_port +
521 "/signalk/v1/access/requests";
522 ESP_LOGD(__FILENAME__, "Access request url: %s", url.c_str());
523 http.begin(wifi_client_, url);
524 http.addHeader("Content-Type", "application/json");
525 int http_code = http.POST(json_req);
526 String payload = http.getString();
527 http.end();
528
529 // if we get a response we can't handle, try to reconnect later
530 if (http_code != 202) {
531 ESP_LOGW(__FILENAME__, "Can't handle response %d to access request.",
532 http_code);
533 ESP_LOGD(__FILENAME__, "%s", payload.c_str());
535 client_id_ = "";
536 return;
537 }
538
539 // http status code 202
540
541 deserializeJson(doc, payload.c_str());
542 String state = doc["state"];
543
544 if (state != "PENDING") {
545 ESP_LOGW(__FILENAME__, "Got unknown state: %s", state.c_str());
547 client_id_ = "";
548 return;
549 }
550
551 String href = doc["href"];
552 polling_href_ = href;
553 save();
554
555 delay(5000);
556 this->poll_access_request(server_address, server_port, this->polling_href_);
557}
558
559void SKWSClient::poll_access_request(const String server_address,
560 const uint16_t server_port,
561 const String href) {
562 ESP_LOGD(__FILENAME__, "Polling SK Server for authentication token");
563
564 HTTPClient http;
565
566 String url = String("http://") + server_address + ":" + server_port + href;
567 http.begin(wifi_client_, url);
568 int http_code = http.GET();
569 if (http_code == 200 or http_code == 202) {
570 String payload = http.getString();
571 http.end();
572 JsonDocument doc;
573 auto error = deserializeJson(doc, payload.c_str());
574 if (error) {
575 ESP_LOGW(__FILENAME__, "WARNING: Could not deserialize http payload.");
576 ESP_LOGW(__FILENAME__, "DeserializationError: %s", error.c_str());
577 return; // TODO(mairas): return at this point, or keep going?
578 }
579 String state = doc["state"];
580 ESP_LOGD(__FILENAME__, "%s", state.c_str());
581 if (state == "PENDING") {
583 delay(5000);
584 return;
585 }
586 if (state == "COMPLETED") {
587 JsonObject access_req = doc["accessRequest"];
588 String permission = access_req["permission"];
589
590 polling_href_ = "";
591 save();
592
593 if (permission == "DENIED") {
594 ESP_LOGW(__FILENAME__, "Permission denied");
596 return;
597 }
598
599 if (permission == "APPROVED") {
600 ESP_LOGI(__FILENAME__, "Permission granted");
601 String token = access_req["token"];
602 auth_token_ = token;
603 save();
604 this->connect_ws(server_address, server_port);
605 return;
606 }
607 }
608 } else {
609 http.end();
610 if (http_code == 500) {
611 // this is probably the server barfing due to
612 // us polling a non-existing request. Just
613 // delete the polling href.
614 ESP_LOGD(__FILENAME__, "Got 500, probably a non-existing request.");
615 polling_href_ = "";
616 save();
618 return;
619 }
620 // any other HTTP status code
621 ESP_LOGW(__FILENAME__,
622 "Can't handle response %d to pending access request.\n",
623 http_code);
625 return;
626 }
627}
628
629void SKWSClient::connect_ws(const String& host, const uint16_t port) {
630 String path = "/signalk/v1/stream?subscribe=none";
632
633 esp_err_t error;
634
635 String url = String("ws://") + host + ":" + port + path;
636
637 esp_websocket_client_config_t websocket_cfg = {};
638 websocket_cfg.uri = url.c_str();
639
640 const String full_auth_header =
641 String("Authorization: Bearer ") + auth_token_ + "\r\n";
642
643 websocket_cfg.headers = full_auth_header.c_str();
644
645 ESP_LOGD(__FILENAME__, "Websocket config: %s", websocket_cfg.uri);
646 ESP_LOGD(__FILENAME__, "Initializing websocket client...");
647 this->client_ = esp_websocket_client_init(&websocket_cfg);
648 ESP_LOGD(__FILENAME__, "Registering websocket event handler...");
649 error = esp_websocket_register_events(this->client_, WEBSOCKET_EVENT_ANY,
650 websocket_event_handler,
651 (void*)this->client_);
652 if (error != ESP_OK) {
653 ESP_LOGE(__FILENAME__, "Error registering websocket event handler: %d",
654 error);
655 }
656 ESP_LOGD(__FILENAME__, "Starting websocket client...");
657 error = esp_websocket_client_start(this->client_);
658 if (error != ESP_OK) {
659 ESP_LOGE(__FILENAME__, "Error starting websocket client: %d", error);
660 }
661 ESP_LOGD(__FILENAME__, "Websocket client started.");
662}
663
667
670 esp_websocket_client_close(this->client_, portMAX_DELAY);
672 }
673}
674
676 String output;
678 if (sk_delta_queue_->data_available()) {
679 sk_delta_queue_->get_delta(output);
680 esp_websocket_client_send_text(this->client_, output.c_str(),
681 output.length(), portMAX_DELAY);
682 // This automatically notifies the observers
684 }
685 }
686}
687
688bool SKWSClient::to_json(JsonObject& root) {
689 root["sk_address"] = this->conf_server_address_;
690 root["sk_port"] = this->conf_server_port_;
691 root["use_mdns"] = this->use_mdns_;
692
693 root["token"] = this->auth_token_;
694 root["client_id"] = this->client_id_;
695 root["polling_href"] = this->polling_href_;
696 return true;
697}
698
699bool SKWSClient::from_json(const JsonObject& config) {
700 if (config["sk_address"].is<String>()) {
701 this->conf_server_address_ = config["sk_address"].as<String>();
702 }
703 if (config["sk_port"].is<int>()) {
704 this->conf_server_port_ = config["sk_port"].as<int>();
705 }
706 if (config["use_mdns"].is<bool>()) {
707 this->use_mdns_ = config["use_mdns"].as<bool>();
708 }
709 if (config["token"].is<String>()) {
710 this->auth_token_ = config["token"].as<String>();
711 }
712 if (config["client_id"].is<String>()) {
713 this->client_id_ = config["client_id"].as<String>();
714 }
715 if (config["polling_href"].is<String>()) {
716 this->polling_href_ = config["polling_href"].as<String>();
717 }
718
719 return true;
720}
721
728 auto state = get_connection_state();
729 switch (state) {
731 return "Authorizing with SignalK";
733 return "Connected";
735 return "Connecting";
737 return "Disconnected";
738 }
739
740 return "Unknown";
741}
742
743} // namespace sensesp
virtual bool load() override
Load and populate the object from a persistent storage.
Definition saveable.cpp:8
virtual bool save() override
Save the object to a persistent storage.
Definition saveable.cpp:41
virtual void set(const C &input) override final
Definition integrator.h:34
void attach(std::function< void()> observer)
Definition observable.h:29
An Obervable class that listens for Signal K stream deltas and notifies any observers of value change...
static bool take_semaphore(uint64_t timeout_ms=0)
static void release_semaphore()
virtual void parse_value(const JsonObject &json)
static const std::vector< SKListener * > & get_listeners()
An Obervable class that listens for Signal K PUT requests coming over the websocket connection and no...
static const std::vector< SKPutListener * > & get_listeners()
virtual void parse_value(const JsonObject &put)=0
static void handle_response(JsonDocument &response)
The websocket connection to the Signal K server.
std::list< JsonDocument > received_updates_
void poll_access_request(const String host, const uint16_t port, const String href)
SKWSClient(const String &config_path, std::shared_ptr< SKDeltaQueue > sk_delta_queue, const String &server_address, uint16_t server_port, bool use_mdns=true)
void on_receive_delta(uint8_t *payload, size_t length)
Called when the websocket receives a delta.
void process_received_updates()
Loop through the received updates and process them.
void connect_ws(const String &host, const uint16_t port)
Integrator< int, int > delta_tx_count_producer_
TaskQueueProducer< SKWSConnectionState > connection_state_
SKWSConnectionState get_connection_state()
void on_receive_put(JsonDocument &message)
Called when a PUT event is received.
void release_received_updates_semaphore()
void send_access_request(const String host, const uint16_t port)
Integrator< int, int > delta_rx_count_producer_
virtual bool to_json(JsonObject &root) override final
esp_websocket_client_handle_t client_
void on_error()
Called when the websocket connection encounters an error.
void sendTXT(String &payload)
Send some processed data to the websocket.
bool get_mdns_service(String &server_address, uint16_t &server_port)
void on_disconnected()
Called when the websocket connection is disconnected.
void subscribe_listeners()
Subscribes the SK delta paths to the websocket.
void on_receive_updates(JsonDocument &message)
Called when a delta update is received.
void test_token(const String host, const uint16_t port)
void set_connection_state(SKWSConnectionState state)
bool take_received_updates_semaphore(unsigned long int timeout_ms=0)
virtual bool from_json(const JsonObject &config) override final
String get_connection_status()
Get a String representation of the current connection state.
TaskQueueProducer< int > delta_tx_tick_producer_
Emits the number of deltas sent since last report.
std::shared_ptr< SKDeltaQueue > sk_delta_queue_
void on_connected()
Called when the websocket connection is established.
static String get_hostname()
Get the current hostname.
virtual void set(const T &value) override
virtual const T & get() const
void emit(const SKWSConnectionState &new_value)
std::enable_if< std::is_base_of< ValueConsumer< typenameVConsumer::input_type >, VConsumer >::value &&std::is_convertible< T, typenameVConsumer::input_type >::value, std::shared_ptr< VConsumer > >::type connect_to(std::shared_ptr< VConsumer > consumer)
Connect a producer to a transform with a different input type.
String generate_uuid4()
Generate a random UUIDv4 string.
Definition uuid.cpp:5
constexpr int kWsClientTaskStackSize
SKWSClient * ws_client
void ExecuteWebSocketTask(void *)
reactesp::EventLoop * event_loop()
Definition sensesp.cpp:7