NBEngine/engine/NBCore/DataSink.hpp

114 lines
2.9 KiB
C++

#pragma once
#ifndef _NB_DATASINK
#define _NB_DATASINK
#include <thread>
#include "ThreadSafeQueue.hpp"
namespace nb {
template<typename DataType>
class DataSink {
public:
DataSink(const DataSink&) = delete;
DataSink(DataSink&&) = delete;
DataSink& operator=(const DataSink&) = delete;
virtual bool isRunning() const noexcept {
return _running;
}
virtual bool stop() noexcept = 0;
virtual bool run() = 0;
virtual bool in(const DataType&) = 0;
protected:
DataSink() = default;
std::atomic<bool> _running;
};
template<typename DataType, typename BufferType, typename ProcessorType>
class BufferedDataProcessor : public DataSink<DataType> {
using Base = DataSink<DataType>;
public:
using Base::Base;
bool in(const DataType& val) { this->push(val); return true;}
protected:
virtual unsigned int count() const = 0;
virtual void push(const DataType&) = 0;
virtual DataType pop() = 0;
virtual void flush() = 0;
virtual void clear() = 0;
virtual bool process(const DataType& val) = 0;
using Base::_running;
BufferType _buffer;
};
template<typename DataType, typename ProcessorType>
class MultithreadedDataProcessor : public BufferedDataProcessor<DataType, ThreadsafeQueue<DataType>, ProcessorType> {
using Base = BufferedDataProcessor<DataType, ThreadsafeQueue<DataType>, ProcessorType>;
public:
~MultithreadedDataProcessor() { this->stop(); }
bool isRunning() const noexcept override {
return this->_running && _runningThread;
}
bool run() override {
if (!isRunning()) {
this->_running = true;
_runningThread = std::make_shared<std::thread>([&]{
auto this_type = static_cast<ProcessorType*>(this);
while(this_type->isRunning()) {
this_type->flush();
}
});
}
return isRunning();
}
bool stop() noexcept override {
if (isRunning()) {
this->_running = false;
_runningThread->join();
_runningThread = nullptr;
auto this_type = static_cast<ProcessorType*>(this);
}
return !isRunning();
}
protected:
using Base::Base;
unsigned int count() const override {
return this->_buffer.size();
}
virtual void push(const DataType& val) {
this->_buffer.push(val);
};
virtual DataType pop() {
std::shared_ptr<DataType> event;
this->_buffer.pop(event);
process(*event);
return *event;
}
virtual void flush() override {
auto this_type = static_cast<ProcessorType*>(this);
while(this_type->count()) {
this_type->pop();
}
}
virtual void clear() override {
this->_buffer.empty();
}
virtual bool process(const DataType&) = 0;
std::shared_ptr<std::thread> _runningThread;
};
} // namespace nb
#endif // _NB_DATASINK