SensESP 3.3.0
Universal Signal K sensor toolkit ESP32
Loading...
Searching...
No Matches
signalk_delta_queue.cpp
Go to the documentation of this file.
1#include "sensesp.h"
2
4
5#include "Arduino.h"
6#include "ArduinoJson.h"
7#include "sensesp_app.h"
8#include "signalk_emitter.h"
9
10namespace sensesp {
11
12SKDeltaQueue::SKDeltaQueue(unsigned int max_buffer_size)
13 : max_buffer_size{max_buffer_size}, meta_sent_{false} {
14 semaphore_ = xSemaphoreCreateRecursiveMutexStatic(&semaphore_buffer_);
15
16 event_loop()->onDelay(
17 0, [this]() { this->connect_emitters(); });
18}
19
20bool SKDeltaQueue::take_semaphore(uint64_t timeout_ms) {
21 if (timeout_ms == 0) {
22 return xSemaphoreTakeRecursive(semaphore_, portMAX_DELAY) == pdTRUE;
23 } else {
24 return xSemaphoreTakeRecursive(semaphore_, timeout_ms) == pdTRUE;
25 }
26}
27
28void SKDeltaQueue::release_semaphore() { xSemaphoreGiveRecursive(semaphore_); }
29
30void SKDeltaQueue::append(const String& val) {
32 if (get_buffer_size() >= max_buffer_size) {
33 buffer.pop_back();
34 }
35 buffer.push_front(val);
37}
38
40 for (auto const& sk_source : SKEmitter::get_sources()) {
41 if (sk_source->get_sk_path() != "") {
42 sk_source->attach([sk_source, this]() {
43 String output;
44 JsonDocument doc;
45 sk_source->as_signalk_json(doc);
46 serializeJson(doc, output);
47 this->append(output);
48 });
49 }
50 }
51}
52
55 bool available = buffer.size() > 0;
57 return available;
58}
59
60void SKDeltaQueue::get_delta(String& output) {
61 std::vector<String> deltas;
62 get_deltas(deltas);
63 output = "";
64 for (size_t i = 0; i < deltas.size(); ++i) {
65 if (i > 0) {
66 output += "\n";
67 }
68 output += deltas[i];
69 }
70}
71
72void SKDeltaQueue::get_deltas(std::vector<String>& output) {
73 // Drain the buffer under the semaphore
74 std::list<String> items;
76 items.swap(buffer);
78
79 // Fast path: if no item contains a context key, skip the grouping logic.
80 // This avoids deserialize/reserialize overhead for the common case.
81 bool has_contextual = false;
82 for (const auto& item : items) {
83 if (item.indexOf("\"context\"") >= 0) {
84 has_contextual = true;
85 break;
86 }
87 }
88
89 if (!has_contextual) {
90 // All items are self-context: build a single delta (original behavior)
91 JsonDocument json_doc;
92 JsonArray updates = json_doc["updates"].to<JsonArray>();
93
94 if (!meta_sent_) {
95 this->add_metadata(updates);
96 }
97
98 if (!items.empty()) {
99 JsonObject current = updates.add<JsonObject>();
100 JsonObject source = current["source"].to<JsonObject>();
101 source["label"] = SensESPBaseApp::get_hostname();
102 JsonArray values = current["values"].to<JsonArray>();
103
104 // Reverse iteration: buffer is push_front/pop_back (LIFO),
105 // so rbegin gives oldest-first ordering.
106 for (auto it = items.rbegin(); it != items.rend(); ++it) {
107 values.add(serialized(*it));
108 }
109 }
110
111 String delta;
112 serializeJson(json_doc, delta);
113 output.push_back(std::move(delta));
114 ESP_LOGD(__FILENAME__, "delta: %s", output.back().c_str());
115 return;
116 }
117
118 // Slow path: separate items by context
119 std::list<String> self_items;
120 std::map<String, std::list<String>> contextual_items;
121
122 // Reverse iteration: buffer is push_front/pop_back (LIFO),
123 // so rbegin gives oldest-first ordering.
124 for (auto it = items.rbegin(); it != items.rend(); ++it) {
125 JsonDocument item_doc;
126 DeserializationError err = deserializeJson(item_doc, *it);
127 if (err) {
128 ESP_LOGE(__FILENAME__, "Failed to parse buffered item: %s", err.c_str());
129 continue;
130 }
131 if (item_doc["context"].is<const char*>()) {
132 String context = item_doc["context"].as<String>();
133 // Re-serialize without the context key for the values array
134 item_doc.remove("context");
135 String value_json;
136 serializeJson(item_doc, value_json);
137 contextual_items[context].push_back(value_json);
138 } else {
139 self_items.push_back(*it);
140 }
141 }
142
143 // Default (self) delta
144 if (!self_items.empty() || !meta_sent_) {
145 JsonDocument json_doc;
146 JsonArray updates = json_doc["updates"].to<JsonArray>();
147
148 if (!meta_sent_) {
149 this->add_metadata(updates);
150 }
151
152 if (!self_items.empty()) {
153 JsonObject current = updates.add<JsonObject>();
154 JsonObject source = current["source"].to<JsonObject>();
155 source["label"] = SensESPBaseApp::get_hostname();
156 JsonArray values = current["values"].to<JsonArray>();
157
158 for (const auto& item : self_items) {
159 values.add(serialized(item));
160 }
161 }
162
163 String delta;
164 serializeJson(json_doc, delta);
165 ESP_LOGD(__FILENAME__, "delta: %s", delta.c_str());
166 output.push_back(std::move(delta));
167 }
168
169 // Contextual deltas — one per context
170 for (const auto& [context, ctx_items] : contextual_items) {
171 JsonDocument json_doc;
172 json_doc["context"] = context;
173 JsonArray updates = json_doc["updates"].to<JsonArray>();
174 JsonObject current = updates.add<JsonObject>();
175 JsonObject source = current["source"].to<JsonObject>();
176 source["label"] = SensESPBaseApp::get_hostname();
177 JsonArray values = current["values"].to<JsonArray>();
178
179 for (const auto& item : ctx_items) {
180 values.add(serialized(item));
181 }
182
183 String delta;
184 serializeJson(json_doc, delta);
185 ESP_LOGD(__FILENAME__, "delta: %s", delta.c_str());
186 output.push_back(std::move(delta));
187 }
188}
189
190void SKDeltaQueue::add_metadata(JsonArray updates) {
191 JsonObject new_entry = updates.add<JsonObject>();
192 JsonArray meta = new_entry["meta"].to<JsonArray>();
193 for (auto const& sk_source : SKEmitter::get_sources()) {
194 sk_source->add_metadata(meta);
195 }
196 meta_sent_ = true;
197}
198
199} // namespace sensesp
SKDeltaQueue(unsigned int max_buffer_size=20)
void get_delta(String &output)
void get_deltas(std::vector< String > &output)
bool take_semaphore(uint64_t timeout_ms=0)
void append(const String &val)
static const std::vector< SKEmitter * > & get_sources()
static String get_hostname()
Get the current hostname.
std::shared_ptr< reactesp::EventLoop > event_loop()
Definition sensesp.cpp:9