SensESP 3.0.1
Universal Signal K sensor toolkit ESP32
Loading...
Searching...
No Matches
signalk_delta_queue.cpp
Go to the documentation of this file.
1#include "sensesp.h"
2
4
5#include "Arduino.h"
6#include "ArduinoJson.h"
7#include "sensesp_app.h"
8#include "signalk_emitter.h"
9
10namespace sensesp {
11
12SKDeltaQueue::SKDeltaQueue(unsigned int max_buffer_size)
13 : max_buffer_size{max_buffer_size}, meta_sent_{false} {
14 semaphore_ = xSemaphoreCreateRecursiveMutex();
15
16 event_loop()->onDelay(
17 0, [this]() { this->connect_emitters(); });
18}
19
20bool SKDeltaQueue::take_semaphore(uint64_t timeout_ms) {
21 if (timeout_ms == 0) {
22 return xSemaphoreTakeRecursive(semaphore_, portMAX_DELAY) == pdTRUE;
23 } else {
24 return xSemaphoreTakeRecursive(semaphore_, timeout_ms) == pdTRUE;
25 }
26}
27
28void SKDeltaQueue::release_semaphore() { xSemaphoreGiveRecursive(semaphore_); }
29
30void SKDeltaQueue::append(const String& val) {
32 if (get_buffer_size() >= max_buffer_size) {
33 buffer.pop_back();
34 }
35 buffer.push_front(val);
37}
38
40 for (auto const& sk_source : SKEmitter::get_sources()) {
41 if (sk_source->get_sk_path() != "") {
42 sk_source->attach([sk_source, this]() {
43 String output;
44 JsonDocument doc;
45 sk_source->as_signalk_json(doc);
46 serializeJson(doc, output);
47 this->append(output);
48 });
49 }
50 }
51}
52
55 bool available = buffer.size() > 0;
57 return available;
58}
59
60void SKDeltaQueue::get_delta(String& output) {
61 // estimate the size of the serialized json string
62
63 JsonDocument json_doc;
64
65 // JsonObject delta = jsonDoc.as<JsonObject>();
66 JsonArray updates = json_doc["updates"].to<JsonArray>();
67
68 if (!meta_sent_) {
69 this->add_metadata(updates);
70 }
71
72 JsonObject current = updates.add<JsonObject>();
73 JsonObject source = current["source"].to<JsonObject>();
74 source["label"] = SensESPBaseApp::get_hostname();
75 JsonArray values = current["values"].to<JsonArray>();
76
78 while (!buffer.empty()) {
79 values.add(serialized(buffer.back()));
80 buffer.pop_back();
81 }
83
84 serializeJson(json_doc, output);
85
86 ESP_LOGD(__FILENAME__, "delta: %s", output.c_str());
87}
88
89void SKDeltaQueue::add_metadata(JsonArray updates) {
90 JsonObject new_entry = updates.add<JsonObject>();
91 JsonArray meta = new_entry["meta"].to<JsonArray>();
92 for (auto const& sk_source : SKEmitter::get_sources()) {
93 sk_source->add_metadata(meta);
94 }
95 meta_sent_ = true;
96}
97
98} // namespace sensesp
SKDeltaQueue(unsigned int max_buffer_size=20)
void get_delta(String &output)
bool take_semaphore(uint64_t timeout_ms=0)
void append(const String &val)
static const std::vector< SKEmitter * > & get_sources()
static String get_hostname()
Get the current hostname.
std::shared_ptr< reactesp::EventLoop > event_loop()
Definition sensesp.cpp:9