1#ifndef SENSESP_SYSTEM_TASK_QUEUE_PRODUCER_H_
2#define SENSESP_SYSTEM_TASK_QUEUE_PRODUCER_H_
28 SafeQueue(
size_t max_size = 10) : max_size_(max_size) {
29 write_lock_ = xSemaphoreCreateMutexStatic(&write_lock_buffer_);
38 void push(
const T& value) {
39 if (xSemaphoreTake(write_lock_, portMAX_DELAY) == pdTRUE) {
40 if (queue_.size() >= max_size_) {
42 ESP_LOGW(
"SafeQueue",
"Queue full, dropping oldest entry");
45 xSemaphoreGive(write_lock_);
52 bool pop(T& value,
unsigned int max_duration_ms) {
54 if (xSemaphoreTake(write_lock_, portMAX_DELAY) == pdTRUE) {
55 if (!queue_.empty()) {
56 value = queue_.front();
60 xSemaphoreGive(write_lock_);
67 if (xSemaphoreTake(write_lock_, portMAX_DELAY) == pdTRUE) {
68 result = queue_.empty();
69 xSemaphoreGive(write_lock_);
76 if (xSemaphoreTake(write_lock_, portMAX_DELAY) == pdTRUE) {
77 result = queue_.size();
78 xSemaphoreGive(write_lock_);
86 StaticSemaphore_t write_lock_buffer_;
87 SemaphoreHandle_t write_lock_;
108 std::shared_ptr<reactesp::EventLoop> consumer_event_loop,
109 unsigned int poll_rate = 990)
111 auto func = [
this]() {
113 while (queue_.pop(value, 0)) {
119 if (poll_rate == 0) {
120 consumer_event_loop->onTick(func);
122 consumer_event_loop->onRepeatMicros(poll_rate, func);
129 virtual void set(
const T& value)
override { queue_.push(value); }
148 bool wait(T& value,
unsigned int max_duration_ms) {
150 bool result = queue_.pop(received_value, max_duration_ms);
152 value = received_value;
ObservableValue()=default
Thread-safe queue for inter-task communication.
SafeQueue(size_t max_size=10)
SafeQueue(SafeQueue &&)=delete
bool pop(T &value, unsigned int max_duration_ms)
void push(const T &value)
SafeQueue & operator=(SafeQueue &&)=delete
SafeQueue & operator=(const SafeQueue &)=delete
SafeQueue(const SafeQueue &)=delete
TaskQueueProducer(const T &value, unsigned int poll_rate=990)
TaskQueueProducer(const T &value, std::shared_ptr< reactesp::EventLoop > consumer_event_loop, unsigned int poll_rate=990)
bool wait(T &value, unsigned int max_duration_ms)
Wait for a value to be available in the queue.
virtual void set(const T &value) override
void emit(const T &new_value)
std::shared_ptr< reactesp::EventLoop > event_loop()