1#ifndef SENSESP_SYSTEM_TASK_QUEUE_PRODUCER_H_
2#define SENSESP_SYSTEM_TASK_QUEUE_PRODUCER_H_
26 ESP_LOGE(
"SafeQueue",
"Failed to create mutex semaphore");
36 void push(
const T& value) {
38 if (xSemaphoreTake(
write_lock_, portMAX_DELAY) == pdTRUE) {
39 std::queue<T>::push(value);
44 bool pop(T& value,
unsigned int max_duration_ms) {
49 if (xSemaphoreTake(
write_lock_, portMAX_DELAY) == pdTRUE) {
50 if (!std::queue<T>::empty()) {
51 value = std::queue<T>::front();
65 if (xSemaphoreTake(
write_lock_, portMAX_DELAY) == pdTRUE) {
66 result = std::queue<T>::empty();
77 if (xSemaphoreTake(
write_lock_, portMAX_DELAY) == pdTRUE) {
78 result = std::queue<T>::size();
106 std::shared_ptr<reactesp::EventLoop> consumer_event_loop,
107 unsigned int poll_rate = 990)
109 auto func = [
this]() {
111 while (queue_.pop(value, 0)) {
117 if (poll_rate == 0) {
118 consumer_event_loop->onTick(func);
120 consumer_event_loop->onRepeatMicros(poll_rate, func);
127 virtual void set(
const T& value)
override { queue_.push(value); }
141 bool wait(T& value,
unsigned int max_duration_ms) {
143 bool result = queue_.pop(received_value, max_duration_ms);
145 value = received_value;
ObservableValue()=default
Thread-safe queue for inter-task communication. Works like std::queue.
bool pop(T &value, unsigned int max_duration_ms)
void push(const T &value)
SemaphoreHandle_t write_lock_
TaskQueueProducer(const T &value, unsigned int poll_rate=990)
TaskQueueProducer(const T &value, std::shared_ptr< reactesp::EventLoop > consumer_event_loop, unsigned int poll_rate=990)
bool wait(T &value, unsigned int max_duration_ms)
Wait for a value to be available in the queue.
virtual void set(const T &value) override
void emit(const T &new_value)
std::shared_ptr< reactesp::EventLoop > event_loop()