3#include <ArduinoJson.h>
9#include "elapsedMillis.h"
22static const char* kRequestPermission =
"readwrite";
90 this->connection_state_.
attach(
91 [
this]() { this->
emit(this->connection_state_.
get()); });
94 ReactESP::app->onRepeat(1, [
this]() { this->process_received_updates(); });
105 MDNS.addService(
"signalk-sensesp",
"tcp", 80);
108void WSClient::connect_loop() {
122 server_detected_ && !token_test_success_) {
126 debugW(
"Bad access token detected. Setting token to null.");
127 auth_token_ = NULL_AUTH_TOKEN;
131 server_detected_ =
false;
142 debugW(
"Websocket client error.");
156 debugI(
"Subscribing to Signal K listeners...");
157 this->subscribe_listeners();
166void WSClient::subscribe_listeners() {
174 if (listeners.size() > 0) {
178 for (
size_t i = 0;
i < listeners.size();
i++) {
181 int listen_delay =
listener->get_listen_delay();
187 debugI(
"Adding %s subscription with listen_delay %d\n", sk_path.c_str(),
193 if (output_available) {
196 serializeJson(subscription, messageJson);
197 debugI(
"Subscription JSON message:\n %s", messageJson.c_str());
198 this->client_.sendTXT(messageJson);
210#ifdef SIGNALK_PRINT_RCV_DELTA
219 if (
message.containsKey(
"updates")) {
223 if (
message.containsKey(
"put")) {
228 if (
message.containsKey(
"requestId") && !
message.containsKey(
"put")) {
232 debugE(
"deserializeJson error: %s",
error.c_str());
247 take_received_updates_semaphore();
256 const char*
path = value[
"path"];
260 received_updates_.push_back(value);
263 release_received_updates_semaphore();
272void WSClient::process_received_updates() {
279 take_received_updates_semaphore();
280 while (!received_updates_.empty()) {
282 received_updates_.pop_front();
283 const char*
path = value[
"path"];
285 for (
size_t i = 0;
i < listeners.size();
i++) {
292 for (
size_t i = 0; i < put_listeners.size(); i++) {
293 SKPutListener* listener = put_listeners[i];
294 if (listener->get_sk_path().equals(path)) {
299 release_received_updates_semaphore();
315 for (
size_t i = 0;
i <
puts.size();
i++) {
317 const char*
path = value[
"path"];
321 const std::vector<SKPutListener*>& listeners =
323 for (
size_t i = 0;
i < listeners.size();
i++) {
326 take_received_updates_semaphore();
327 received_updates_.push_back(value);
328 release_received_updates_semaphore();
361 this->client_.sendTXT(
payload);
367 int n =
MDNS.queryService(
"signalk-ws",
"tcp");
372 server_address = MDNS.IP(0).toString();
373 server_port = MDNS.port(0);
374 debugI(
"Found server %s (port %d)", server_address.c_str(), server_port);
386 "WiFi is disconnected. SignalK client connection will connect when "
387 "WiFi is connected.");
391 debugI(
"Initiating websocket connection with server...");
394 if (this->conf_server_address_.isEmpty()) {
395 if (!get_mdns_service(this->server_address_, this->server_port_)) {
396 debugE(
"No Signal K server found in network when using mDNS service!");
398 debugI(
"Signal K server has been found at address %s:%d by mDNS.",
399 this->server_address_.c_str(),
this->server_port_);
402 this->server_address_ = this->conf_server_address_;
403 this->server_port_ = this->conf_server_port_;
406 if (!this->server_address_.isEmpty() &&
this->server_port_ > 0) {
407 debugD(
"Websocket is connecting to Signal K server on address %s:%d",
408 this->server_address_.c_str(),
this->server_port_);
415 if (this->polling_href_ !=
"") {
417 this->poll_access_request(this->server_address_, this->server_port_, this->polling_href_);
421 if (this->auth_token_ == NULL_AUTH_TOKEN) {
423 debugD(
"No prior authorization token present.");
424 this->send_access_request(this->server_address_, this->server_port_);
429 this->test_token(this->server_address_, this->server_port_);
438 "/signalk/v1/stream";
439 debugD(
"Testing token with url %s",
url.c_str());
450 debugD(
"Returned payload (length %d) is: ",
payload.length());
453 debugD(
"Returned payload is empty");
455 if (httpCode == 426) {
458 debugD(
"Attempting to connect to Signal K Websocket...");
459 server_detected_ =
true;
460 token_test_success_ =
true;
461 this->connect_ws(server_address, server_port);
462 }
else if (httpCode == 401) {
463 this->client_id_ =
"";
464 this->send_access_request(server_address, server_port);
469 debugE(
"GET... failed, error: %s\n", http.errorToString(httpCode).c_str());
474void WSClient::send_access_request(
const String server_address,
475 const uint16_t server_port) {
476 debugD(
"Preparing a new access request");
477 if (client_id_ ==
"") {
484 DynamicJsonDocument doc(1024);
485 doc[
"clientId"] = client_id_;
488 doc[
"permissions"] = kRequestPermission;
489 String json_req =
"";
490 serializeJson(doc, json_req);
492 debugD(
"Access request: %s", json_req.c_str());
496 String url = String(
"http://") + server_address +
":" + server_port +
497 "/signalk/v1/access/requests";
498 debugD(
"Access request url: %s", url.c_str());
499 http.begin(wifi_client_, url);
500 http.addHeader(
"Content-Type",
"application/json");
501 int httpCode = http.POST(json_req);
502 String payload = http.getString();
506 if (httpCode != 202) {
507 debugW(
"Can't handle response %d to access request.", httpCode);
508 debugD(
"%s", payload.c_str());
516 deserializeJson(doc, payload.c_str());
517 String state = doc[
"state"];
519 if (state !=
"PENDING") {
520 debugW(
"Got unknown state: %s", state.c_str());
526 String href = doc[
"href"];
527 polling_href_ = href;
531 this->poll_access_request(server_address, server_port, this->polling_href_);
534void WSClient::poll_access_request(
const String server_address,
535 const uint16_t server_port,
537 debugD(
"Polling SK Server for authentication token");
541 String url = String(
"http://") + server_address +
":" + server_port + href;
542 http.begin(wifi_client_, url);
543 int httpCode = http.GET();
544 if (httpCode == 200 or httpCode == 202) {
545 String payload = http.getString();
547 DynamicJsonDocument doc(1024);
548 auto error = deserializeJson(doc, payload.c_str());
550 debugW(
"WARNING: Could not deserialize http payload.");
551 debugW(
"DeserializationError: %s", error.c_str());
554 String state = doc[
"state"];
555 debugD(
"%s", state.c_str());
556 if (state ==
"PENDING") {
558 this->poll_access_request(server_address, server_port, href);
560 }
else if (state ==
"COMPLETED") {
561 JsonObject access_req = doc[
"accessRequest"];
563 access_req[
"permission"];
569 if (permission ==
"DENIED") {
570 debugW(
"Permission denied");
573 }
else if (permission ==
"APPROVED") {
574 debugI(
"Permission granted");
575 String token = access_req[
"token"];
578 this->connect_ws(server_address, server_port);
584 if (httpCode == 500) {
588 debugD(
"Got 500, probably a non-existing request.");
595 debugW(
"Can't handle response %d to pending access request.\n", httpCode);
601void WSClient::connect_ws(
const String host,
const uint16_t port) {
602 String path =
"/signalk/v1/stream?subscribe=none";
604 this->client_.begin(host, port, path);
606 String full_token = String(
"Bearer ") + auth_token_;
607 this->client_.setAuthorization(full_token.c_str());
613 this->client_.loop();
623 this->client_.disconnect();
633 this->client_.sendTXT(
output);
635 this->delta_count_producer_.
set(1);
641 root[
"sk_address"] = this->conf_server_address_;
642 root[
"sk_port"] = this->conf_server_port_;
644 root[
"token"] = this->auth_token_;
645 root[
"client_id"] = this->client_id_;
646 root[
"polling_href"] = this->polling_href_;
652 "sk_address": { "title": "Signal K server address", "type": "string" },
653 "sk_port": { "title": "Signal K server port", "type": "integer" },
654 "client_id": { "title": "Client ID (readonly)", "type": "string", "readOnly": true },
655 "token": { "title": "Server authorization token (readonly)", "type": "string", "readOnly": true },
656 "polling_href": { "title": "Server authorization polling href (readonly)", "type": "string", "readOnly": true }
663 String expected[] = {
"sk_address",
"sk_port",
"token",
"client_id"};
667 "Websocket configuration update rejected. Missing following "
674 this->conf_server_address_ =
config[
"sk_address"].as<
String>();
675 this->conf_server_port_ =
config[
"sk_port"].as<
int>();
680 this->polling_href_ =
config[
"polling_href"].as<
String>();
691 auto state = get_connection_state();
694 return "Authorizing with SignalK";
700 return "Disconnected";
An object that is capable of having configuration data that can be set remotely using a RESTful API,...
virtual void save_configuration()
virtual void load_configuration()
void attach(std::function< void()> observer)
void get_delta(String &output)
An Obervable class that listens for Signal K stream deltas and notifies any observers of value change...
static void release_semaphore()
virtual void parse_value(const JsonObject &json)
static bool take_semaphore(unsigned long int timeout_ms=0)
static const std::vector< SKListener * > & get_listeners()
An Obervable class that listens for Signal K PUT requests coming over the websocket connection and no...
static const std::vector< SKPutListener * > & get_listeners()
static void handle_response(DynamicJsonDocument &response)
static String get_hostname()
Get the current hostname.
Automatic calling of the start() method at startup.
virtual const T & get() const
void emit(WSConnectionState new_value)
The websocket connection to the Signal K server.
void on_disconnected()
Called when the websocket connection is disconnected.
void on_receive_delta(uint8_t *payload)
Called when the websocket receives a delta.
virtual void get_configuration(JsonObject &doc) override final
void on_error()
Called when the websocket connection encounters an error.
virtual void start() override
void on_connected(uint8_t *payload)
Called when the websocket connection is established.
void on_receive_updates(DynamicJsonDocument &message)
Called when a delta update is received.
WSClient(String config_path, SKDeltaQueue *sk_delta_queue, String server_address, uint16_t server_port)
virtual String get_config_schema() override
String get_connection_status()
Get a String representation of the current connection state.
void on_receive_put(DynamicJsonDocument &message)
Called when a PUT event is received.
void sendTXT(String &payload)
Send some processed data to the websocket.
virtual bool set_configuration(const JsonObject &config) override final
const uint8_t PAGE_css_bootstrap[] PROGMEM
String generate_uuid4()
Generate a random UUIDv4 string.
constexpr int ws_client_task_stack_size
void webSocketClientEvent(WStype_t type, uint8_t *payload, size_t length)
WebSocket event handler.
void ExecuteWebSocketTask(void *parameter)