SensESP 3.3.0
Universal Signal K sensor toolkit ESP32
Loading...
Searching...
No Matches
task_queue_producer.h
Go to the documentation of this file.
1#ifndef SENSESP_SYSTEM_TASK_QUEUE_PRODUCER_H_
2#define SENSESP_SYSTEM_TASK_QUEUE_PRODUCER_H_
3
4#include <limits>
5#include <queue>
6
7#include "ReactESP.h"
8#include "esp_log.h"
9#include "observablevalue.h"
10#include "sensesp_base_app.h"
11
12namespace sensesp {
13
25template <typename T>
26class SafeQueue {
27 public:
28 SafeQueue(size_t max_size = 10) : max_size_(max_size) {
29 write_lock_ = xSemaphoreCreateMutexStatic(&write_lock_buffer_);
30 }
31
32 // Non-copyable, non-movable — the semaphore handle is not transferable.
33 SafeQueue(const SafeQueue&) = delete;
34 SafeQueue& operator=(const SafeQueue&) = delete;
35 SafeQueue(SafeQueue&&) = delete;
37
38 void push(const T& value) {
39 if (xSemaphoreTake(write_lock_, portMAX_DELAY) == pdTRUE) {
40 if (queue_.size() >= max_size_) {
41 queue_.pop();
42 ESP_LOGW("SafeQueue", "Queue full, dropping oldest entry");
43 }
44 queue_.push(value);
45 xSemaphoreGive(write_lock_);
46 }
47 }
48
49 // NOTE: max_duration_ms is currently unused but retained for API
50 // compatibility. The pop is non-blocking; it returns immediately if
51 // the queue is empty.
52 bool pop(T& value, unsigned int max_duration_ms) {
53 bool result = false;
54 if (xSemaphoreTake(write_lock_, portMAX_DELAY) == pdTRUE) {
55 if (!queue_.empty()) {
56 value = queue_.front();
57 queue_.pop();
58 result = true;
59 }
60 xSemaphoreGive(write_lock_);
61 }
62 return result;
63 }
64
65 bool empty() {
66 bool result = true;
67 if (xSemaphoreTake(write_lock_, portMAX_DELAY) == pdTRUE) {
68 result = queue_.empty();
69 xSemaphoreGive(write_lock_);
70 }
71 return result;
72 }
73
74 size_t size() {
75 size_t result = 0;
76 if (xSemaphoreTake(write_lock_, portMAX_DELAY) == pdTRUE) {
77 result = queue_.size();
78 xSemaphoreGive(write_lock_);
79 }
80 return result;
81 }
82
83 private:
84 std::queue<T> queue_;
85 size_t max_size_;
86 StaticSemaphore_t write_lock_buffer_;
87 SemaphoreHandle_t write_lock_;
88};
89
104template <class T>
106 public:
107 TaskQueueProducer(const T& value,
108 std::shared_ptr<reactesp::EventLoop> consumer_event_loop,
109 unsigned int poll_rate = 990)
110 : ObservableValue<T>(value) {
111 auto func = [this]() {
112 T value;
113 while (queue_.pop(value, 0)) {
114 this->emit(value);
115 }
116 };
117
118 // Create a repeat event that will poll the queue and emit the values
119 if (poll_rate == 0) {
120 consumer_event_loop->onTick(func);
121 } else {
122 consumer_event_loop->onRepeatMicros(poll_rate, func);
123 }
124 }
125
126 TaskQueueProducer(const T& value, unsigned int poll_rate = 990)
127 : TaskQueueProducer(value, event_loop(), poll_rate) {}
128
129 virtual void set(const T& value) override { queue_.push(value); }
130
148 bool wait(T& value, unsigned int max_duration_ms) {
149 T received_value;
150 bool result = queue_.pop(received_value, max_duration_ms);
151 if (result) {
152 value = received_value;
153 this->emit(value);
154 }
155 return result;
156 }
157
158 private:
159 SafeQueue<T> queue_;
160};
161
162} // namespace sensesp
163
164#endif // SENSESP_SYSTEM_TASK_QUEUE_PRODUCER_H_
Thread-safe queue for inter-task communication.
SafeQueue(size_t max_size=10)
SafeQueue(SafeQueue &&)=delete
bool pop(T &value, unsigned int max_duration_ms)
void push(const T &value)
SafeQueue & operator=(SafeQueue &&)=delete
SafeQueue & operator=(const SafeQueue &)=delete
SafeQueue(const SafeQueue &)=delete
TaskQueueProducer(const T &value, unsigned int poll_rate=990)
TaskQueueProducer(const T &value, std::shared_ptr< reactesp::EventLoop > consumer_event_loop, unsigned int poll_rate=990)
bool wait(T &value, unsigned int max_duration_ms)
Wait for a value to be available in the queue.
virtual void set(const T &value) override
void emit(const T &new_value)
std::shared_ptr< reactesp::EventLoop > event_loop()
Definition sensesp.cpp:9