From 3b59df593aea030ac452c580c9f1499c514180fc Mon Sep 17 00:00:00 2001 From: Dr John Vidler Date: Sat, 13 May 2023 00:39:14 +0100 Subject: [PATCH] Updated DataStream to support decoupling the input fiber from the output to enable async stream data --- inc/streams/DataStream.h | 66 ++------ source/streams/DataStream.cpp | 306 +++++++--------------------------- 2 files changed, 80 insertions(+), 292 deletions(-) diff --git a/inc/streams/DataStream.h b/inc/streams/DataStream.h index 3348c786..027a3a59 100644 --- a/inc/streams/DataStream.h +++ b/inc/streams/DataStream.h @@ -80,15 +80,12 @@ namespace codal */ class DataStream : public DataSource, public DataSink { - ManagedBuffer stream[DATASTREAM_MAXIMUM_BUFFERS]; - //int bufferCount; - //int bufferLength; - //int preferredBufferSize; - //int writers; - //uint16_t spaceAvailableEventCode; - //uint16_t pullRequestEventCode; - ManagedBuffer * nextBuffer; + uint16_t pullRequestEventCode; + uint16_t flowEventCode; + ManagedBuffer nextBuffer; bool isBlocking; + unsigned int missedBuffers; + int downstreamReturn; DataSink *downStream; DataSource *upStream; @@ -109,28 +106,7 @@ namespace codal */ ~DataStream(); - /** - * Determines the value of the given byte in the buffer. - * - * @param position The index of the byte to read. - * @return The value of the byte at the given position, or DEVICE_INVALID_PARAMETER. - */ - //int get(int position); - - /** - * Sets the byte at the given index to value provided. - * @param position The index of the byte to change. - * @param value The new value of the byte (0-255). - * @return DEVICE_OK, or DEVICE_INVALID_PARAMETER. - * - */ - //int set(int position, uint8_t value); - - /** - * Gets number of bytes that are ready to be consumed in this data stream. - * @return The size in bytes. - */ - //int length(); + uint16_t emitFlowEvents( uint16_t id = 0 ); /** * Determines if any of the data currently flowing through this stream is held in non-volatile (FLASH) memory. @@ -138,6 +114,15 @@ namespace codal */ bool isReadOnly(); + /** + * Attempts to determine if another component downstream of this one is _actually_ pulling data, and thus, data + * is flowing. + * + * @return true If there is a count-match between `pullRequest` and `pull` calls. + * @return false If `pullRequest` calls are not currently being matched by `pull` calls. + */ + bool isFlowing(); + /** * Define a downstream component for data stream. * @@ -157,18 +142,6 @@ namespace codal */ virtual int getFormat() override; - /** - * Determine the number of bytes that are currnetly buffered before blocking subsequent push() operations. - * @return the current preferred buffer size for this DataStream - */ - //int getPreferredBufferSize(); - - /** - * Define the number of bytes that should be buffered before blocking subsequent push() operations. - * @param size The number of bytes to buffer. - */ - //void setPreferredBufferSize(int size); - /** * Determines if this stream acts in a synchronous, blocking mode or asynchronous mode. In blocking mode, writes to a full buffer * will result int he calling fiber being blocked until space is available. Downstream DataSinks will also attempt to process data @@ -185,13 +158,6 @@ namespace codal */ bool canPull(int size = 0); - /** - * Determines if the DataStream can accept any more data. - * - * @return true if there if the buffer is ful, and can accept no more data at this time. False otherwise. - */ - ///bool full(); - /** * Provide the next available ManagedBuffer to our downstream caller, if available. */ @@ -210,7 +176,7 @@ namespace codal /** * Issue a deferred pull request to our downstream component, if one has been registered. */ - //void onDeferredPullRequest(Event); + void onDeferredPullRequest(Event); }; } diff --git a/source/streams/DataStream.cpp b/source/streams/DataStream.cpp index b3cd123e..789726b9 100644 --- a/source/streams/DataStream.cpp +++ b/source/streams/DataStream.cpp @@ -24,9 +24,13 @@ DEALINGS IN THE SOFTWARE. #include "DataStream.h" #include "CodalComponent.h" +#include "ManagedBuffer.h" +#include "Event.h" #include "CodalFiber.h" #include "ErrorNo.h" -#include "CodalDmesg.h" + +// Do not set this to less than 2, otherwise events will spuriously trigger +#define HIGH_WATER_MARK 32 using namespace codal; @@ -70,320 +74,138 @@ int DataSink::pullRequest() return DEVICE_NOT_SUPPORTED; } - -/** - * Class definition for DataStream. - * A Datastream holds a number of ManagedBuffer references, provides basic flow control through a push/pull mechanism - * and byte level access to the datastream, even if it spans different buffers. - */ DataStream::DataStream(DataSource &upstream) { - //this->bufferCount = 0; - //this->bufferLength = 0; - //this->preferredBufferSize = 0; - //this->pullRequestEventCode = 0; - //this->spaceAvailableEventCode = allocateNotifyEvent(); - this->nextBuffer = NULL; + this->pullRequestEventCode = 0; this->isBlocking = true; - //this->writers = 0; + this->missedBuffers = HIGH_WATER_MARK; + this->downstreamReturn = DEVICE_OK; + this->flowEventCode = 0; this->downStream = NULL; this->upStream = &upstream; } -/** - * Destructor. - * Removes all resources held by the instance. - */ DataStream::~DataStream() { } -/** - * Determines the value of the given byte in the buffer. - * - * @param position The index of the byte to read. - * @return The value of the byte at the given position, or DEVICE_INVALID_PARAMETER. - */ -/*int DataStream::get(int position) -{ - for (int i = 0; i < bufferCount; i++) - { - if (position < stream[i].length()) - return stream[i].getByte(position); - - position = position - stream[i].length(); - } - - return DEVICE_INVALID_PARAMETER; -}*/ - -/** - * Sets the byte at the given index to value provided. - * @param position The index of the byte to change. - * @param value The new value of the byte (0-255). - * @return DEVICE_OK, or DEVICE_INVALID_PARAMETER. - * - */ -/*int DataStream::set(int position, uint8_t value) -{ - for (int i = 0; i < bufferCount; i++) - { - if (position < stream[i].length()) - { - stream[i].setByte(position, value); - return DEVICE_OK; - } - - position = position - stream[i].length(); - } - - return DEVICE_INVALID_PARAMETER; -}*/ - -/** - * Gets number of bytes that are ready to be consumed in this data stream. - * @return The size in bytes. - */ -/*int DataStream::length() +uint16_t DataStream::emitFlowEvents( uint16_t id ) { - return this->bufferLength; -}*/ + if( this->flowEventCode == 0 ) { + if( id == 0 ) + this->flowEventCode = allocateNotifyEvent(); + else + this->flowEventCode = id; + } + return this->flowEventCode; +} -/** - * Determines if any of the data currently flowing through this stream is held in non-volatile (FLASH) memory. - * @return true if one or more of the ManagedBuffers in this stream reside in FLASH memory, false otherwise. - */ bool DataStream::isReadOnly() { - /*bool r = true; - - for (int i=0; inextBuffer != NULL ) - return nextBuffer->isReadOnly(); + if( this->nextBuffer.length() != 0 ) + return this->nextBuffer.isReadOnly(); return true; } -/** - * Define a downstream component for data stream. - * - * @sink The component that data will be delivered to, when it is available - */ +bool DataStream::isFlowing() +{ + return this->missedBuffers < HIGH_WATER_MARK; +} + void DataStream::connect(DataSink &sink) { this->downStream = &sink; this->upStream->connect(*this); } -/** - * Determine the data format of the buffers streamed out of this component. - */ int DataStream::getFormat() { return upStream->getFormat(); } -/** - * Define a downstream component for data stream. - * - * @sink The component that data will be delivered to, when it is available - */ void DataStream::disconnect() { this->downStream = NULL; } -/** - * Determine the number of bytes that are currnetly buffered before blocking subsequent push() operations. - * @return the current preferred buffer size for this DataStream - */ -/*int DataStream::getPreferredBufferSize() -{ - return preferredBufferSize; -}*/ - -/** - * Define the number of bytes that should be buffered before blocking subsequent push() operations. - * @param size The number of bytes to buffer. - */ -/*void DataStream::setPreferredBufferSize(int size) -{ - this->preferredBufferSize = size; -}*/ - -/** - * Determines if this stream acts in a synchronous, blocking mode or asynchronous mode. In blocking mode, writes to a full buffer - * will result in the calling fiber being blocked until space is available. Downstream DataSinks will also attempt to process data - * immediately as it becomes available. In non-blocking asynchronous mode, writes to a full buffer are dropped and processing of - * downstream Datasinks will be deferred. - */ void DataStream::setBlocking(bool isBlocking) { this->isBlocking = isBlocking; // If this is the first time async mode has been used on this stream, allocate the necessary resources. - /*if (!isBlocking && this->pullRequestEventCode == 0) + if (!this->isBlocking && this->pullRequestEventCode == 0) { this->pullRequestEventCode = allocateNotifyEvent(); if(EventModel::defaultEventBus) - EventModel::defaultEventBus->listen(DEVICE_ID_NOTIFY, pullRequestEventCode, this, &DataStream::onDeferredPullRequest); - }*/ + EventModel::defaultEventBus->listen(DEVICE_ID_NOTIFY, this->pullRequestEventCode, this, &DataStream::onDeferredPullRequest); + } } -/** - * Provide the next available ManagedBuffer to our downstream caller, if available. - */ ManagedBuffer DataStream::pull() { - /*ManagedBuffer out = stream[0]; - - // - // A simplistic FIFO for now. Copy cost is actually pretty low because ManagedBuffer is a managed type, - // so we're just moving a few references here. - // - if (bufferCount > 0) - { - for (int i = 0; i < bufferCount-1; i++) - stream[i] = stream[i + 1]; - - stream[bufferCount-1] = ManagedBuffer(); - - bufferCount--; - bufferLength = bufferLength - out.length(); - } - - Event(DEVICE_ID_NOTIFY_ONE, spaceAvailableEventCode); - - return out;*/ - - /*if( this->nextBuffer != NULL ) - return *this->nextBuffer; - - return ManagedBuffer();*/ - - return this->upStream->pull(); + // 1, as we will normally be at '1' waiting buffer here if we're in-sync with the source + if( this->missedBuffers > 1 ) + Event evt( DEVICE_ID_NOTIFY, this->flowEventCode ); + + this->missedBuffers = 0; + // Are we running in sync (blocking) mode? + if( this->isBlocking ) + return this->upStream->pull(); + + ManagedBuffer tmp = this->nextBuffer; + this->nextBuffer = ManagedBuffer(); + return tmp; } -/** - * Issue a pull request to our downstream component, if one has been registered. - */ -/*void DataStream::onDeferredPullRequest(Event) +void DataStream::onDeferredPullRequest(Event) { + this->downstreamReturn = DEVICE_OK; // The default state + if (downStream != NULL) - downStream->pullRequest(); -}*/ + this->downstreamReturn = downStream->pullRequest(); +} -/** - * Determines if a buffer of the given size can be added to the buffer. - * - * @param size The number of bytes to add to the buffer. - * @return true if there is space for "size" bytes in the buffer. false otherwise. - */ bool DataStream::canPull(int size) { - DMESG( "DataStream::canPull()" ); - /*if(bufferCount + writers >= DATASTREAM_MAXIMUM_BUFFERS) - return false; - - if(preferredBufferSize > 0 && (bufferLength + size > preferredBufferSize)) - return false; - - return true;*/ - - return this->nextBuffer != NULL; + // We only buffer '1' ahead at most, so if we have one already, refuse more + return this->nextBuffer.length() != 0; } -/** - * Determines if the DataStream can accept any more data. - * - * @return true if there if the buffer is ful, and can accept no more data at this time. False otherwise. - */ -/*bool DataStream::full() -{ - return !canPull(); -}*/ - -/** - * Store the given buffer in our stream, possibly also causing a push operation on our downstream component. - */ int DataStream::pullRequest() { - /* - // If we're defined as non-blocking and no space is available, then there's nothing we can do. - if (full() && this->isBlocking == false) - return DEVICE_NO_RESOURCES; - - // As there is either space available in the buffer or we want to block, pull the upstream buffer to release resources there. - ManagedBuffer buffer = upStream->pull(); - - // If pull is called multiple times in a row (yielding nothing after the first time) - // several streams might be woken up, despite the fact that there is no space for them. - do { - // If the buffer is full or we're behind another fiber, then wait for space to become available. - if (full() || writers) - fiber_wake_on_event(DEVICE_ID_NOTIFY, spaceAvailableEventCode); - - if (full() || writers) - { - writers++; - schedule(); - writers--; + // _Technically_ not a missed buffer... yet. But we can only check later. + if( this->missedBuffers < HIGH_WATER_MARK ) + if( ++this->missedBuffers == HIGH_WATER_MARK ) + if( this->flowEventCode != 0 ) + Event evt( DEVICE_ID_NOTIFY, this->flowEventCode ); + + // Are we running in async (non-blocking) mode? + if( !this->isBlocking ) { + if( this->nextBuffer.length() != 0 && this->downstreamReturn != DEVICE_OK ) { + Event evt( DEVICE_ID_NOTIFY, this->pullRequestEventCode ); + return this->downstreamReturn; } - } while (bufferCount >= DATASTREAM_MAXIMUM_BUFFERS); - stream[bufferCount] = buffer; - bufferLength = bufferLength + buffer.length(); - bufferCount++; + this->nextBuffer = this->upStream->pull(); - if (downStream != NULL) - { - DMESG( "DS(blocking = %d) -> PR?", this->isBlocking ); - if (this->isBlocking) { - DMESG( "DS -> Direct PR" ); - downStream->pullRequest(); - } - else { - DMESG( "DS -> Defer PR" ); - Event(DEVICE_ID_NOTIFY, pullRequestEventCode); - } - + Event evt( DEVICE_ID_NOTIFY, this->pullRequestEventCode ); + return this->downstreamReturn; } - return DEVICE_OK;*/ + if( this->downStream != NULL ) + return this->downStream->pullRequest(); - if( this->downStream != NULL ) { - this->downStream->pullRequest(); - } - - return DEVICE_OK; + return DEVICE_BUSY; } -/** - * Gets the sample rate for the stream that this component is part of. - * - * This will query 'up stream' towards the stream source to determine the current sample rate. - * - * @return float The sample rate, in samples-per-second, or DATASTREAM_SAMPLE_RATE_UNKNOWN if not known or unconfigured - */ float DataStream::getSampleRate() { if( this->upStream != NULL ) return this->upStream->getSampleRate(); return DATASTREAM_SAMPLE_RATE_UNKNOWN; } -/** - * Sents a request 'up stream' towards the stream source for a higher sample rate, if possible. - * - * @note Components are not required to respect this request, and this mechanism is best-effort, so if the caller needs a specific rate, it should check via getSampleRate after making this request. - * - * @param sampleRate The sample rate requestd, in samples-per-second - */ float DataStream::requestSampleRate(float sampleRate) { if( this->upStream != NULL ) return this->upStream->requestSampleRate( sampleRate );