SensESP 2.7.2
Universal Signal K sensor toolkit ESP32
Loading...
Searching...
No Matches
ws_client.cpp
Go to the documentation of this file.
1#include "ws_client.h"
2
3#include <ArduinoJson.h>
4#include <ESPmDNS.h>
5#include <HTTPClient.h>
6#include <WiFiClient.h>
7
8#include "Arduino.h"
9#include "elapsedMillis.h"
13#include "sensesp/system/uuid.h"
14#include "sensesp_app.h"
15
16namespace sensesp {
17
18constexpr int ws_client_task_stack_size = 8192;
19
21
22static const char* kRequestPermission = "readwrite";
23
28
30
31 while (true) {
32 if (connect_loop_elapsed > 2000) {
35 }
36 if (delta_loop_elapsed > 5) {
39 }
40 if (ws_client_loop_elapsed > 20) {
42 ws_client->loop();
43 }
44 delay(1);
45 }
46}
47
58 switch (type) {
61 break;
62 case WStype_ERROR:
64 break;
67 break;
68 case WStype_TEXT:
70 break;
71 default:
72 // Do nothing for other types
73 break;
74 }
75}
76
79 : Configurable{config_path, "Signal K Server Settings", 200},
80 Startable(60) {
81 this->sk_delta_queue_ = sk_delta_queue;
82
83 conf_server_address_ = server_address;
84 conf_server_port_ = server_port;
85
86 // a WSClient object observes its own connection_state_ member
87 // and simply passes through any notification it emits. As a result,
88 // whenever the value of connection_state_ is updated, observers of the
89 // WSClient object get automatically notified.
90 this->connection_state_.attach(
91 [this]() { this->emit(this->connection_state_.get()); });
92
93 // process any received updates in the main task
94 ReactESP::app->onRepeat(1, [this]() { this->process_received_updates(); });
95
96 // set the singleton object pointer
97 ws_client = this;
98
100}
101
104 1, NULL);
105 MDNS.addService("signalk-sensesp", "tcp", 80);
106}
107
108void WSClient::connect_loop() {
109 if (this->get_connection_state() == WSConnectionState::kWSDisconnected) {
110 this->connect();
111 }
112}
113
121 if (this->get_connection_state() == WSConnectionState::kWSConnecting &&
122 server_detected_ && !token_test_success_) {
123 // Going from connecting directly to disconnect when we
124 // know we have found and talked to the server usually means
125 // the authentication token is bad.
126 debugW("Bad access token detected. Setting token to null.");
127 auth_token_ = NULL_AUTH_TOKEN;
129 }
130 this->set_connection_state(WSConnectionState::kWSDisconnected);
131 server_detected_ = false;
132}
133
141 this->set_connection_state(WSConnectionState::kWSDisconnected);
142 debugW("Websocket client error.");
143}
144
153 this->set_connection_state(WSConnectionState::kWSConnected);
154 this->sk_delta_queue_->reset_meta_send();
155 debugI("Websocket client connected to URL: %s\n", payload);
156 debugI("Subscribing to Signal K listeners...");
157 this->subscribe_listeners();
158}
159
166void WSClient::subscribe_listeners() {
167 bool output_available = false;
169 subscription["context"] = "vessels.self";
170
172 const std::vector<SKListener*>& listeners = SKListener::get_listeners();
173
174 if (listeners.size() > 0) {
175 output_available = true;
176 JsonArray subscribe = subscription.createNestedArray("subscribe");
177
178 for (size_t i = 0; i < listeners.size(); i++) {
179 auto* listener = listeners.at(i);
180 String sk_path = listener->get_sk_path();
181 int listen_delay = listener->get_listen_delay();
182
183 JsonObject subscribePath = subscribe.createNestedObject();
184
185 subscribePath["path"] = sk_path;
186 subscribePath["period"] = listen_delay;
187 debugI("Adding %s subscription with listen_delay %d\n", sk_path.c_str(),
188 listen_delay);
189 }
190 }
192
193 if (output_available) {
194 String messageJson;
195
196 serializeJson(subscription, messageJson);
197 debugI("Subscription JSON message:\n %s", messageJson.c_str());
198 this->client_.sendTXT(messageJson);
199 }
200}
201
210#ifdef SIGNALK_PRINT_RCV_DELTA
211 debugD("Websocket payload received: %s", (char*)payload);
212#endif
213
215 // JsonObject message = jsonDoc.as<JsonObject>();
217
218 if (!error) {
219 if (message.containsKey("updates")) {
221 }
222
223 if (message.containsKey("put")) {
225 }
226
227 // Putrequest contains also requestId Key GA
228 if (message.containsKey("requestId") && !message.containsKey("put")) {
230 }
231 } else {
232 debugE("deserializeJson error: %s", error.c_str());
233 }
234}
235
244 // Process updates from subscriptions...
245 JsonArray updates = message["updates"];
246
247 take_received_updates_semaphore();
248 for (size_t i = 0; i < updates.size(); i++) {
249 JsonObject update = updates[i];
250
251 JsonArray values = update["values"];
252
253 for (size_t vi = 0; vi < values.size(); vi++) {
254 JsonObject value = values[vi];
255
256 const char* path = value["path"];
257
258 // push all values into a separate list for processing
259 // in the main task
260 received_updates_.push_back(value);
261 }
262 }
263 release_received_updates_semaphore();
264}
265
272void WSClient::process_received_updates() {
274
275 const std::vector<SKListener*>& listeners = SKListener::get_listeners();
276 const std::vector<SKPutListener*>& put_listeners =
278
279 take_received_updates_semaphore();
280 while (!received_updates_.empty()) {
281 JsonObject value = received_updates_.front();
282 received_updates_.pop_front();
283 const char* path = value["path"];
284
285 for (size_t i = 0; i < listeners.size(); i++) {
286 SKListener* listener = listeners[i];
287 if (listener->get_sk_path().equals(path)) {
288 listener->parse_value(value);
289 }
290 }
291 // to be able to parse values of Put Listeners GA
292 for (size_t i = 0; i < put_listeners.size(); i++) {
293 SKPutListener* listener = put_listeners[i];
294 if (listener->get_sk_path().equals(path)) {
295 listener->parse_value(value);
296 }
297 }
298 }
299 release_received_updates_semaphore();
300
302}
303
312 // Process PUT requests...
313 JsonArray puts = message["put"];
314 size_t response_count = 0;
315 for (size_t i = 0; i < puts.size(); i++) {
316 JsonObject value = puts[i];
317 const char* path = value["path"];
318 String strVal = value["value"].as<String>();
319
321 const std::vector<SKPutListener*>& listeners =
323 for (size_t i = 0; i < listeners.size(); i++) {
324 SKPutListener* listener = listeners[i];
325 if (listener->get_sk_path().equals(path)) {
326 take_received_updates_semaphore();
327 received_updates_.push_back(value);
328 release_received_updates_semaphore();
330 }
331 }
333
334 // Send back a request response...
336 put_response["requestId"] = message["requestId"];
337 if (response_count == puts.size()) {
338 // We found a response for every PUT request
339 put_response["state"] = "COMPLETED";
340 put_response["statusCode"] = 200;
341 } else {
342 // One or more requests did not have a matching path
343 put_response["state"] = "FAILED";
344 put_response["statusCode"] = 405;
345 }
348 this->client_.sendTXT(response_text);
349 }
350}
351
360 if (get_connection_state() == WSConnectionState::kWSConnected) {
361 this->client_.sendTXT(payload);
362 }
363}
364
365bool WSClient::get_mdns_service(String& server_address, uint16_t& server_port) {
366 // get IP address using an mDNS query
367 int n = MDNS.queryService("signalk-ws", "tcp");
368 if (n == 0) {
369 // no service found
370 return false;
371 } else {
372 server_address = MDNS.IP(0).toString();
373 server_port = MDNS.port(0);
374 debugI("Found server %s (port %d)", server_address.c_str(), server_port);
375 return true;
376 }
377}
378
380 if (get_connection_state() != WSConnectionState::kWSDisconnected) {
381 return;
382 }
383
384 if (!WiFi.isConnected() && WiFi.getMode() != WIFI_MODE_AP) {
385 debugI(
386 "WiFi is disconnected. SignalK client connection will connect when "
387 "WiFi is connected.");
388 return;
389 }
390
391 debugI("Initiating websocket connection with server...");
392
393 set_connection_state(WSConnectionState::kWSAuthorizing);
394 if (this->conf_server_address_.isEmpty()) {
395 if (!get_mdns_service(this->server_address_, this->server_port_)) {
396 debugE("No Signal K server found in network when using mDNS service!");
397 } else {
398 debugI("Signal K server has been found at address %s:%d by mDNS.",
399 this->server_address_.c_str(), this->server_port_);
400 }
401 } else {
402 this->server_address_ = this->conf_server_address_;
403 this->server_port_ = this->conf_server_port_;
404 }
405
406 if (!this->server_address_.isEmpty() && this->server_port_ > 0) {
407 debugD("Websocket is connecting to Signal K server on address %s:%d",
408 this->server_address_.c_str(), this->server_port_);
409 } else {
410 // host and port not defined - wait for mDNS
411 set_connection_state(WSConnectionState::kWSDisconnected);
412 return;
413 }
414
415 if (this->polling_href_ != "") {
416 // existing pending request
417 this->poll_access_request(this->server_address_, this->server_port_, this->polling_href_);
418 return;
419 }
420
421 if (this->auth_token_ == NULL_AUTH_TOKEN) {
422 // initiate HTTP authentication
423 debugD("No prior authorization token present.");
424 this->send_access_request(this->server_address_, this->server_port_);
425 return;
426 }
427
428 // Test the validity of the authorization token
429 this->test_token(this->server_address_, this->server_port_);
430}
431
432void WSClient::test_token(const String server_address,
433 const uint16_t server_port) {
434 // FIXME: implement async HTTP client!
436
437 String url = String("http://") + server_address + ":" + server_port +
438 "/signalk/v1/stream";
439 debugD("Testing token with url %s", url.c_str());
440 http.begin(wifi_client_, url);
441 String full_token = String("Bearer ") + auth_token_;
442 debugD("Authorization: %s", full_token.c_str());
443 http.addHeader("Authorization", full_token.c_str());
444 int httpCode = http.GET();
445 if (httpCode > 0) {
446 String payload = http.getString();
447 http.end();
448 debugD("Testing resulted in http status %d", httpCode);
449 if (payload.length() > 0) {
450 debugD("Returned payload (length %d) is: ", payload.length());
451 debugD("%s", payload.c_str());
452 } else {
453 debugD("Returned payload is empty");
454 }
455 if (httpCode == 426) {
456 // HTTP status 426 is "Upgrade Required", which is the expected
457 // response for a websocket connection.
458 debugD("Attempting to connect to Signal K Websocket...");
459 server_detected_ = true;
460 token_test_success_ = true;
461 this->connect_ws(server_address, server_port);
462 } else if (httpCode == 401) {
463 this->client_id_ = "";
464 this->send_access_request(server_address, server_port);
465 } else {
466 set_connection_state(WSConnectionState::kWSDisconnected);
467 }
468 } else {
469 debugE("GET... failed, error: %s\n", http.errorToString(httpCode).c_str());
470 set_connection_state(WSConnectionState::kWSDisconnected);
471 }
472}
473
474void WSClient::send_access_request(const String server_address,
475 const uint16_t server_port) {
476 debugD("Preparing a new access request");
477 if (client_id_ == "") {
478 // generate a client ID
479 client_id_ = generate_uuid4();
481 }
482
483 // create a new access request
484 DynamicJsonDocument doc(1024);
485 doc["clientId"] = client_id_;
486 doc["description"] =
487 String("SensESP device: ") + SensESPBaseApp::get_hostname();
488 doc["permissions"] = kRequestPermission;
489 String json_req = "";
490 serializeJson(doc, json_req);
491
492 debugD("Access request: %s", json_req.c_str());
493
494 HTTPClient http;
495
496 String url = String("http://") + server_address + ":" + server_port +
497 "/signalk/v1/access/requests";
498 debugD("Access request url: %s", url.c_str());
499 http.begin(wifi_client_, url);
500 http.addHeader("Content-Type", "application/json");
501 int httpCode = http.POST(json_req);
502 String payload = http.getString();
503 http.end();
504
505 // if we get a response we can't handle, try to reconnect later
506 if (httpCode != 202) {
507 debugW("Can't handle response %d to access request.", httpCode);
508 debugD("%s", payload.c_str());
509 set_connection_state(WSConnectionState::kWSDisconnected);
510 client_id_ = "";
511 return;
512 }
513
514 // http status code 202
515
516 deserializeJson(doc, payload.c_str());
517 String state = doc["state"];
518
519 if (state != "PENDING") {
520 debugW("Got unknown state: %s", state.c_str());
521 set_connection_state(WSConnectionState::kWSDisconnected);
522 client_id_ = "";
523 return;
524 }
525
526 String href = doc["href"];
527 polling_href_ = href;
529
530 delay(5000);
531 this->poll_access_request(server_address, server_port, this->polling_href_);
532}
533
534void WSClient::poll_access_request(const String server_address,
535 const uint16_t server_port,
536 const String href) {
537 debugD("Polling SK Server for authentication token");
538
539 HTTPClient http;
540
541 String url = String("http://") + server_address + ":" + server_port + href;
542 http.begin(wifi_client_, url);
543 int httpCode = http.GET();
544 if (httpCode == 200 or httpCode == 202) {
545 String payload = http.getString();
546 http.end();
547 DynamicJsonDocument doc(1024);
548 auto error = deserializeJson(doc, payload.c_str());
549 if (error) {
550 debugW("WARNING: Could not deserialize http payload.");
551 debugW("DeserializationError: %s", error.c_str());
552 return; // TODO: return at this point, or keep going?
553 }
554 String state = doc["state"];
555 debugD("%s", state.c_str());
556 if (state == "PENDING") {
557 delay(5000);
558 this->poll_access_request(server_address, server_port, href);
559 return;
560 } else if (state == "COMPLETED") {
561 JsonObject access_req = doc["accessRequest"];
562 String permission =
563 access_req["permission"]; // TODO: like this in ArdJson 6? String
564 // permission =
565 // resp["accessRequest"]["permission"];
566 polling_href_ = "";
568
569 if (permission == "DENIED") {
570 debugW("Permission denied");
571 set_connection_state(WSConnectionState::kWSDisconnected);
572 return;
573 } else if (permission == "APPROVED") {
574 debugI("Permission granted");
575 String token = access_req["token"];
576 auth_token_ = token;
578 this->connect_ws(server_address, server_port);
579 return;
580 }
581 }
582 } else {
583 http.end();
584 if (httpCode == 500) {
585 // this is probably the server barfing due to
586 // us polling a non-existing request. Just
587 // delete the polling href.
588 debugD("Got 500, probably a non-existing request.");
589 polling_href_ = "";
591 set_connection_state(WSConnectionState::kWSDisconnected);
592 return;
593 }
594 // any other HTTP status code
595 debugW("Can't handle response %d to pending access request.\n", httpCode);
596 set_connection_state(WSConnectionState::kWSDisconnected);
597 return;
598 }
599}
600
601void WSClient::connect_ws(const String host, const uint16_t port) {
602 String path = "/signalk/v1/stream?subscribe=none";
603 set_connection_state(WSConnectionState::kWSConnecting);
604 this->client_.begin(host, port, path);
605 this->client_.onEvent(webSocketClientEvent);
606 String full_token = String("Bearer ") + auth_token_;
607 this->client_.setAuthorization(full_token.c_str());
608}
609
611 if (get_connection_state() == WSConnectionState::kWSConnecting ||
612 get_connection_state() == WSConnectionState::kWSConnected) {
613 this->client_.loop();
614 }
615}
616
618 return get_connection_state() == WSConnectionState::kWSConnected;
619}
620
622 if (get_connection_state() == WSConnectionState::kWSConnected) {
623 this->client_.disconnect();
624 set_connection_state(WSConnectionState::kWSDisconnected);
625 }
626}
627
630 if (get_connection_state() == WSConnectionState::kWSConnected) {
631 if (sk_delta_queue_->data_available()) {
632 sk_delta_queue_->get_delta(output);
633 this->client_.sendTXT(output);
634 // This automatically notifies the observers
635 this->delta_count_producer_.set(1);
636 }
637 }
638}
639
641 root["sk_address"] = this->conf_server_address_;
642 root["sk_port"] = this->conf_server_port_;
643
644 root["token"] = this->auth_token_;
645 root["client_id"] = this->client_id_;
646 root["polling_href"] = this->polling_href_;
647}
648
649static const char SCHEMA[] PROGMEM = R"~({
650 "type": "object",
651 "properties": {
652 "sk_address": { "title": "Signal K server address", "type": "string" },
653 "sk_port": { "title": "Signal K server port", "type": "integer" },
654 "client_id": { "title": "Client ID (readonly)", "type": "string", "readOnly": true },
655 "token": { "title": "Server authorization token (readonly)", "type": "string", "readOnly": true },
656 "polling_href": { "title": "Server authorization polling href (readonly)", "type": "string", "readOnly": true }
657 }
658 })~";
659
661
663 String expected[] = {"sk_address", "sk_port", "token", "client_id"};
664 for (auto str : expected) {
665 if (!config.containsKey(str)) {
666 debugI(
667 "Websocket configuration update rejected. Missing following "
668 "parameter: %s",
669 str.c_str());
670 return false;
671 }
672 }
673
674 this->conf_server_address_ = config["sk_address"].as<String>();
675 this->conf_server_port_ = config["sk_port"].as<int>();
676
677 // FIXME: setting the token should not be allowed via the REST API.
678 this->auth_token_ = config["token"].as<String>();
679 this->client_id_ = config["client_id"].as<String>();
680 this->polling_href_ = config["polling_href"].as<String>();
681
682 return true;
683}
684
691 auto state = get_connection_state();
692 switch (state) {
694 return "Authorizing with SignalK";
696 return "Connected";
698 return "Connecting";
700 return "Disconnected";
701 }
702
703 return "Unknown";
704}
705
706} // namespace sensesp
An object that is capable of having configuration data that can be set remotely using a RESTful API,...
virtual void save_configuration()
virtual void load_configuration()
Construct a new transform based on a single function.
void attach(std::function< void()> observer)
Signal K delta queue.
void get_delta(String &output)
An Obervable class that listens for Signal K stream deltas and notifies any observers of value change...
static void release_semaphore()
virtual void parse_value(const JsonObject &json)
static bool take_semaphore(unsigned long int timeout_ms=0)
static const std::vector< SKListener * > & get_listeners()
An Obervable class that listens for Signal K PUT requests coming over the websocket connection and no...
static const std::vector< SKPutListener * > & get_listeners()
static void handle_response(DynamicJsonDocument &response)
static String get_hostname()
Get the current hostname.
Automatic calling of the start() method at startup.
Definition startable.h:20
virtual const T & get() const
void emit(WSConnectionState new_value)
The websocket connection to the Signal K server.
Definition ws_client.h:33
void on_disconnected()
Called when the websocket connection is disconnected.
void on_receive_delta(uint8_t *payload)
Called when the websocket receives a delta.
virtual void get_configuration(JsonObject &doc) override final
void on_error()
Called when the websocket connection encounters an error.
virtual void start() override
void on_connected(uint8_t *payload)
Called when the websocket connection is established.
void on_receive_updates(DynamicJsonDocument &message)
Called when a delta update is received.
WSClient(String config_path, SKDeltaQueue *sk_delta_queue, String server_address, uint16_t server_port)
Definition ws_client.cpp:77
virtual String get_config_schema() override
String get_connection_status()
Get a String representation of the current connection state.
void on_receive_put(DynamicJsonDocument &message)
Called when a PUT event is received.
void sendTXT(String &payload)
Send some processed data to the websocket.
virtual bool set_configuration(const JsonObject &config) override final
const uint8_t PAGE_css_bootstrap[] PROGMEM
#define debugI(fmt,...)
Definition local_debug.h:48
#define debugE(fmt,...)
Definition local_debug.h:50
#define debugD(fmt,...)
Definition local_debug.h:47
#define debugW(fmt,...)
Definition local_debug.h:49
WSClient * ws_client
Definition ws_client.cpp:20
String generate_uuid4()
Generate a random UUIDv4 string.
Definition uuid.cpp:5
constexpr int ws_client_task_stack_size
Definition ws_client.cpp:18
void webSocketClientEvent(WStype_t type, uint8_t *payload, size_t length)
WebSocket event handler.
Definition ws_client.cpp:57
void ExecuteWebSocketTask(void *parameter)
Definition ws_client.cpp:24