#pragma once #ifndef _NB_DATASINK #define _NB_DATASINK #include #include "ThreadSafeQueue.hpp" namespace nb { template class DataSink { public: DataSink(const DataSink&) = delete; DataSink(DataSink&&) = delete; DataSink& operator=(const DataSink&) = delete; virtual bool isRunning() const noexcept { return _running; } virtual bool stop() noexcept = 0; virtual bool run() = 0; virtual bool in(const DataType&) = 0; protected: DataSink() = default; std::atomic _running; }; template class BufferedDataProcessor : public DataSink { using Base = DataSink; public: using Base::Base; bool stop() noexcept { return type_ptr->stop(); } bool run() { return type_ptr->run(); } bool in(const DataType& val) { return type_ptr->in(val); } protected: unsigned int count() const { return type_ptr->count(); } void push(const DataType& val) { type_ptr->push(val); } DataType pop() { return type_ptr->pop(); } void flush() { type_ptr->flush(); } void clear() { type_ptr->clear(); } bool process(const DataType& val) { return type_ptr->process(val); } using Base::_running; BufferType _buffer; private: ProcessorType* const type_ptr = static_cast(this); }; template class MultithreadedDataProcessor : public BufferedDataProcessor, ProcessorType> { using Base = BufferedDataProcessor, ProcessorType>; public: ~MultithreadedDataProcessor() { type_ptr->stop(); } bool isRunning() const noexcept override { return this->_running && (_runningThread!=nullptr); } bool run() { if (!type_ptr->isRunning()) { this->_running = true; _runningThread = std::make_shared([&]{ while(type_ptr->isRunning()) { flush(); } }); } return this->isRunning(); } bool stop() noexcept override { if (type_ptr->isRunning()) { this->_running = false; _runningThread->join(); _runningThread = nullptr; } return !type_ptr->isRunning(); } protected: using Base::Base; unsigned int count() const { return this->_buffer.size(); } void push(const DataType& val) { this->_buffer.push(val); }; DataType pop() { std::shared_ptr event; this->_buffer.pop(event); this->process(*event); return *event; } void flush() { while(type_ptr->count()) { pop(); } } void clear() { this->_buffer.empty(); } std::shared_ptr _runningThread; private: ProcessorType* type_ptr = static_cast(this); }; } // namespace nb #endif // _NB_DATASINK