SensESP 3.0.1
Universal Signal K sensor toolkit ESP32
Loading...
Searching...
No Matches
task_queue_producer.h
Go to the documentation of this file.
1#ifndef SENSESP_SYSTEM_TASK_QUEUE_PRODUCER_H_
2#define SENSESP_SYSTEM_TASK_QUEUE_PRODUCER_H_
3
4#include <limits>
5#include <queue>
6
7#include "ReactESP.h"
8#include "observablevalue.h"
9#include "sensesp_base_app.h"
10
11namespace sensesp {
12
18template <typename T>
19class SafeQueue : public std::queue<T> {
20 public:
21 SafeQueue() : std::queue<T>() {
23 xSemaphoreCreateCounting(std::numeric_limits<int>::max(), 0);
24 write_lock_ = xSemaphoreCreateMutex();
25 }
26
27 void push(const T& value) {
28 xSemaphoreTake(write_lock_, portMAX_DELAY);
29 std::queue<T>::push(value);
30 xSemaphoreGive(queue_semaphore_);
31 xSemaphoreGive(write_lock_);
32 }
33
34 bool pop(T& value, unsigned int max_duration_ms) {
35 if (xSemaphoreTake(queue_semaphore_,
36 max_duration_ms / portTICK_PERIOD_MS) == pdTRUE) {
37 xSemaphoreTake(write_lock_, portMAX_DELAY);
38 value = std::queue<T>::front();
39 std::queue<T>::pop();
40 xSemaphoreGive(write_lock_);
41 return true;
42 }
43 return false;
44 }
45
46 bool empty() {
47 xSemaphoreTake(write_lock_, portMAX_DELAY);
48 bool result = std::queue<T>::empty();
49 xSemaphoreGive(write_lock_);
50 return result;
51 }
52
53 size_t size() {
54 xSemaphoreTake(write_lock_, portMAX_DELAY);
55 size_t result = std::queue<T>::size();
56 xSemaphoreGive(write_lock_);
57 return result;
58 }
59
60 protected:
61 SemaphoreHandle_t queue_semaphore_; // Mirrors the items in the queue
62 SemaphoreHandle_t write_lock_; // Lock for writing to the queue
63};
64
79template <class T>
81 public:
82 TaskQueueProducer(const T& value,
83 std::shared_ptr<reactesp::EventLoop> consumer_event_loop,
84 unsigned int poll_rate = 990)
85 : ObservableValue<T>(value) {
86 auto func = [this]() {
87 T value;
88 while (queue_.pop(value, 0)) {
89 this->emit(value);
90 }
91 };
92
93 // Create a repeat event that will poll the queue and emit the values
94 if (poll_rate == 0) {
95 consumer_event_loop->onTick(func);
96 } else {
97 consumer_event_loop->onRepeatMicros(poll_rate, func);
98 }
99 }
100
101 TaskQueueProducer(const T& value, unsigned int poll_rate = 990)
102 : TaskQueueProducer(value, event_loop(), poll_rate) {}
103
104 virtual void set(const T& value) override { queue_.push(value); }
105
118 bool wait(T& value, unsigned int max_duration_ms) {
119 T received_value;
120 bool result = queue_.pop(received_value, max_duration_ms);
121 if (result) {
122 value = received_value;
123 this->emit(value);
124 }
125 return result;
126 }
127
128 private:
129 SafeQueue<T> queue_;
130};
131
132} // namespace sensesp
133
134#endif // SENSESP_SYSTEM_TASK_QUEUE_PRODUCER_H_
A value container that notifies its observers if it gets changed.
Thread-safe queue for inter-task communication. Works like std::queue.
bool pop(T &value, unsigned int max_duration_ms)
void push(const T &value)
SemaphoreHandle_t queue_semaphore_
SemaphoreHandle_t write_lock_
Producer class that works across task boundaries.
TaskQueueProducer(const T &value, unsigned int poll_rate=990)
TaskQueueProducer(const T &value, std::shared_ptr< reactesp::EventLoop > consumer_event_loop, unsigned int poll_rate=990)
bool wait(T &value, unsigned int max_duration_ms)
Wait for a value to be available in the queue.
virtual void set(const T &value) override
void emit(const T &new_value)
std::shared_ptr< reactesp::EventLoop > event_loop()
Definition sensesp.cpp:9