SensESP 3.1.1
Universal Signal K sensor toolkit ESP32
Loading...
Searching...
No Matches
task_queue_producer.h
Go to the documentation of this file.
1#ifndef SENSESP_SYSTEM_TASK_QUEUE_PRODUCER_H_
2#define SENSESP_SYSTEM_TASK_QUEUE_PRODUCER_H_
3
4#include <limits>
5#include <queue>
6
7#include "ReactESP.h"
8#include "esp_log.h"
9#include "observablevalue.h"
10#include "sensesp_base_app.h"
11
12namespace sensesp {
13
19template <typename T>
20class SafeQueue : public std::queue<T> {
21 public:
22 SafeQueue() : std::queue<T>() {
23 write_lock_ = xSemaphoreCreateMutex();
24 if (write_lock_ == nullptr) {
25 // Handle semaphore creation failure
26 ESP_LOGE("SafeQueue", "Failed to create mutex semaphore");
27 }
28 }
29
31 if (write_lock_ != nullptr) {
32 vSemaphoreDelete(write_lock_);
33 }
34 }
35
36 void push(const T& value) {
37 if (write_lock_ == nullptr) return;
38 if (xSemaphoreTake(write_lock_, portMAX_DELAY) == pdTRUE) {
39 std::queue<T>::push(value);
40 xSemaphoreGive(write_lock_);
41 }
42 }
43
44 bool pop(T& value, unsigned int max_duration_ms) {
45 if (write_lock_ == nullptr) {
46 return false;
47 }
48 bool result = false;
49 if (xSemaphoreTake(write_lock_, portMAX_DELAY) == pdTRUE) {
50 if (!std::queue<T>::empty()) {
51 value = std::queue<T>::front();
52 std::queue<T>::pop();
53 result = true;
54 }
55 xSemaphoreGive(write_lock_);
56 }
57 return result;
58 }
59
60 bool empty() {
61 if (write_lock_ == nullptr) {
62 return true;
63 }
64 bool result = true;
65 if (xSemaphoreTake(write_lock_, portMAX_DELAY) == pdTRUE) {
66 result = std::queue<T>::empty();
67 xSemaphoreGive(write_lock_);
68 }
69 return result;
70 }
71
72 size_t size() {
73 if (write_lock_ == nullptr) {
74 return 0;
75 }
76 size_t result = 0;
77 if (xSemaphoreTake(write_lock_, portMAX_DELAY) == pdTRUE) {
78 result = std::queue<T>::size();
79 xSemaphoreGive(write_lock_);
80 }
81 return result;
82 }
83
84 protected:
85 SemaphoreHandle_t write_lock_; // Lock for writing to the queue
86};
87
102template <class T>
104 public:
105 TaskQueueProducer(const T& value,
106 std::shared_ptr<reactesp::EventLoop> consumer_event_loop,
107 unsigned int poll_rate = 990)
108 : ObservableValue<T>(value) {
109 auto func = [this]() {
110 T value;
111 while (queue_.pop(value, 0)) {
112 this->emit(value);
113 }
114 };
115
116 // Create a repeat event that will poll the queue and emit the values
117 if (poll_rate == 0) {
118 consumer_event_loop->onTick(func);
119 } else {
120 consumer_event_loop->onRepeatMicros(poll_rate, func);
121 }
122 }
123
124 TaskQueueProducer(const T& value, unsigned int poll_rate = 990)
125 : TaskQueueProducer(value, event_loop(), poll_rate) {}
126
127 virtual void set(const T& value) override { queue_.push(value); }
128
141 bool wait(T& value, unsigned int max_duration_ms) {
142 T received_value;
143 bool result = queue_.pop(received_value, max_duration_ms);
144 if (result) {
145 value = received_value;
146 this->emit(value);
147 }
148 return result;
149 }
150
151 private:
152 SafeQueue<T> queue_;
153};
154
155} // namespace sensesp
156
157#endif // SENSESP_SYSTEM_TASK_QUEUE_PRODUCER_H_
Thread-safe queue for inter-task communication. Works like std::queue.
bool pop(T &value, unsigned int max_duration_ms)
void push(const T &value)
SemaphoreHandle_t write_lock_
TaskQueueProducer(const T &value, unsigned int poll_rate=990)
TaskQueueProducer(const T &value, std::shared_ptr< reactesp::EventLoop > consumer_event_loop, unsigned int poll_rate=990)
bool wait(T &value, unsigned int max_duration_ms)
Wait for a value to be available in the queue.
virtual void set(const T &value) override
void emit(const T &new_value)
std::shared_ptr< reactesp::EventLoop > event_loop()
Definition sensesp.cpp:9