From c918ffb562722c5892e600a1c750c78e59a322a3 Mon Sep 17 00:00:00 2001 From: Dr John Vidler Date: Thu, 16 Mar 2023 09:25:14 +0000 Subject: [PATCH] Reworked and updated the streams API; still a fair amount of debug code in here that will need cleaning out before tag/release --- .gitignore | 1 + inc/streams/DataStream.h | 27 +++---- inc/streams/LevelDetector.h | 3 + inc/streams/LevelDetectorSPL.h | 3 + inc/streams/StreamNormalizer.h | 4 +- inc/streams/StreamRecording.h | 4 +- inc/streams/StreamSplitter.h | 17 +++-- source/driver-models/Timer.cpp | 1 + source/streams/DataStream.cpp | 90 ++++++++++++++-------- source/streams/LevelDetector.cpp | 24 ++++-- source/streams/LevelDetectorSPL.cpp | 28 +++++-- source/streams/LowPassFilter.cpp | 1 + source/streams/StreamNormalizer.cpp | 24 +++--- source/streams/StreamRecording.cpp | 28 +++++-- source/streams/StreamSplitter.cpp | 111 ++++++++++++++++++---------- 15 files changed, 240 insertions(+), 126 deletions(-) diff --git a/.gitignore b/.gitignore index 654d1b9b..39fd005f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ *~ *.swp .vscode +.DS_Store diff --git a/inc/streams/DataStream.h b/inc/streams/DataStream.h index 5560a72b..3348c786 100644 --- a/inc/streams/DataStream.h +++ b/inc/streams/DataStream.h @@ -81,12 +81,13 @@ namespace codal class DataStream : public DataSource, public DataSink { ManagedBuffer stream[DATASTREAM_MAXIMUM_BUFFERS]; - int bufferCount; - int bufferLength; - int preferredBufferSize; - int writers; - uint16_t spaceAvailableEventCode; - uint16_t pullRequestEventCode; + //int bufferCount; + //int bufferLength; + //int preferredBufferSize; + //int writers; + //uint16_t spaceAvailableEventCode; + //uint16_t pullRequestEventCode; + ManagedBuffer * nextBuffer; bool isBlocking; DataSink *downStream; @@ -114,7 +115,7 @@ namespace codal * @param position The index of the byte to read. * @return The value of the byte at the given position, or DEVICE_INVALID_PARAMETER. */ - int get(int position); + //int get(int position); /** * Sets the byte at the given index to value provided. @@ -123,13 +124,13 @@ namespace codal * @return DEVICE_OK, or DEVICE_INVALID_PARAMETER. * */ - int set(int position, uint8_t value); + //int set(int position, uint8_t value); /** * Gets number of bytes that are ready to be consumed in this data stream. * @return The size in bytes. */ - int length(); + //int length(); /** * Determines if any of the data currently flowing through this stream is held in non-volatile (FLASH) memory. @@ -160,13 +161,13 @@ namespace codal * Determine the number of bytes that are currnetly buffered before blocking subsequent push() operations. * @return the current preferred buffer size for this DataStream */ - int getPreferredBufferSize(); + //int getPreferredBufferSize(); /** * Define the number of bytes that should be buffered before blocking subsequent push() operations. * @param size The number of bytes to buffer. */ - void setPreferredBufferSize(int size); + //void setPreferredBufferSize(int size); /** * Determines if this stream acts in a synchronous, blocking mode or asynchronous mode. In blocking mode, writes to a full buffer @@ -189,7 +190,7 @@ namespace codal * * @return true if there if the buffer is ful, and can accept no more data at this time. False otherwise. */ - bool full(); + ///bool full(); /** * Provide the next available ManagedBuffer to our downstream caller, if available. @@ -209,7 +210,7 @@ namespace codal /** * Issue a deferred pull request to our downstream component, if one has been registered. */ - void onDeferredPullRequest(Event); + //void onDeferredPullRequest(Event); }; } diff --git a/inc/streams/LevelDetector.h b/inc/streams/LevelDetector.h index 98be461e..49de01e1 100644 --- a/inc/streams/LevelDetector.h +++ b/inc/streams/LevelDetector.h @@ -61,6 +61,7 @@ namespace codal{ int level; // The current, instantaneous level. int sigma; // Running total of the samples in the current window. bool activated; // Has this component been connected yet. + int ttl; /** @@ -86,6 +87,8 @@ namespace codal{ */ int getValue(); + void activateForEvents( bool state ); + /** * Set threshold to the given value. Events will be generated when these thresholds are crossed. * diff --git a/inc/streams/LevelDetectorSPL.h b/inc/streams/LevelDetectorSPL.h index d5f7cda4..0d0efd76 100644 --- a/inc/streams/LevelDetectorSPL.h +++ b/inc/streams/LevelDetectorSPL.h @@ -84,6 +84,7 @@ namespace codal{ bool activated; // Has this component been connected yet bool enabled; // Is the component currently running int unit; // The units to be returned from this level detector (e.g. dB or linear 8bit) + uint64_t timeout; /** * Creates a component capable of measuring and thresholding stream data @@ -111,6 +112,8 @@ namespace codal{ */ float getValue(); + void activateForEvents( bool state ); + /** * Disable component */ diff --git a/inc/streams/StreamNormalizer.h b/inc/streams/StreamNormalizer.h index adb66ddc..259153be 100644 --- a/inc/streams/StreamNormalizer.h +++ b/inc/streams/StreamNormalizer.h @@ -51,10 +51,10 @@ namespace codal{ uint32_t orMask; // post processing step - or'd with each sample. bool normalize; // If set, will recalculate a zero offset. bool zeroOffsetValid; // Set to true after the first buffer has been processed. - bool outputEnabled; // When set any buffer processed will be forwarded downstream. + bool outputEnabled; // When set any bxuffer processed will be forwarded downstream. DataSource &upstream; // The upstream component of this StreamNormalizer. DataStream output; // The downstream output stream of this StreamNormalizer. - ManagedBuffer buffer; // The buffer being processed. + //ManagedBuffer buffer; // The buffer being processed. static SampleReadFn readSample[9]; static SampleWriteFn writeSample[9]; diff --git a/inc/streams/StreamRecording.h b/inc/streams/StreamRecording.h index c87b9ada..ec9fc3c9 100644 --- a/inc/streams/StreamRecording.h +++ b/inc/streams/StreamRecording.h @@ -5,7 +5,7 @@ #include "DataStream.h" // Pretty much the largest sensible number we can have on a v2 -#define REC_MAX_BUFFERS 256 +#define REC_MAX_BUFFERS 200 #define REC_STATE_STOPPED 0 #define REC_STATE_PLAYING 1 @@ -149,6 +149,8 @@ namespace codal */ bool isStopped(); + virtual float getSampleRate(); + }; } diff --git a/inc/streams/StreamSplitter.h b/inc/streams/StreamSplitter.h index bfcb669c..90c3cb25 100644 --- a/inc/streams/StreamSplitter.h +++ b/inc/streams/StreamSplitter.h @@ -43,6 +43,8 @@ DEALINGS IN THE SOFTWARE. #define SPLITTER_DEACTIVATE_CHANNEL 2 #define SPLITTER_CHANNEL_CONNECT 3 #define SPLITTER_CHANNEL_DISCONNECT 4 +#define SPLITTER_ACTIVATE 5 +#define SPLITTER_DEACTIVATE 6 /** @@ -88,11 +90,12 @@ namespace codal{ { private: ManagedBuffer lastBuffer; // Buffer being processed + uint64_t __cycle; - public: - int numberChannels; // Current number of channels Splitter is serving - int numberActiveChannels; // Current number of /active/ channels this Splitter is serving - int processed; // How many downstream components have responded to pull request + public: + bool isActive; // Track if we need to emit activate/deactivate messages + int channels; // Current number of channels Splitter is serving + volatile int activeChannels; // Current number of /active/ channels this Splitter is serving DataSource &upstream; // The upstream component of this Splitter SplitterChannel * outputChannels[CONFIG_MAX_CHANNELS]; // Array of SplitterChannels the Splitter is serving @@ -103,6 +106,8 @@ namespace codal{ */ StreamSplitter(DataSource &source, uint16_t id = CodalComponent::generateDynamicID()); + void periodicCallback(); + /** * Callback provided when data is ready. */ @@ -116,7 +121,9 @@ namespace codal{ /** * Destructor. */ - ~StreamSplitter(); + virtual ~StreamSplitter(); + + friend SplitterChannel; }; } diff --git a/source/driver-models/Timer.cpp b/source/driver-models/Timer.cpp index 2cef543f..23500ebc 100644 --- a/source/driver-models/Timer.cpp +++ b/source/driver-models/Timer.cpp @@ -100,6 +100,7 @@ Timer::Timer(LowLevelTimer& t, uint8_t ccPeriodChannel, uint8_t ccEventChannel) { // Register ourselves as the defualt timer - most recent timer wins. system_timer = this; + this->ccPeriodChannel = ccPeriodChannel; this->ccEventChannel = ccEventChannel; diff --git a/source/streams/DataStream.cpp b/source/streams/DataStream.cpp index e8fe408e..b3cd123e 100644 --- a/source/streams/DataStream.cpp +++ b/source/streams/DataStream.cpp @@ -78,17 +78,17 @@ int DataSink::pullRequest() */ DataStream::DataStream(DataSource &upstream) { - this->bufferCount = 0; - this->bufferLength = 0; - this->preferredBufferSize = 0; - this->pullRequestEventCode = 0; - this->spaceAvailableEventCode = allocateNotifyEvent(); + //this->bufferCount = 0; + //this->bufferLength = 0; + //this->preferredBufferSize = 0; + //this->pullRequestEventCode = 0; + //this->spaceAvailableEventCode = allocateNotifyEvent(); + this->nextBuffer = NULL; this->isBlocking = true; - this->writers = 0; + //this->writers = 0; this->downStream = NULL; this->upStream = &upstream; - } /** @@ -105,7 +105,7 @@ DataStream::~DataStream() * @param position The index of the byte to read. * @return The value of the byte at the given position, or DEVICE_INVALID_PARAMETER. */ -int DataStream::get(int position) +/*int DataStream::get(int position) { for (int i = 0; i < bufferCount; i++) { @@ -116,7 +116,7 @@ int DataStream::get(int position) } return DEVICE_INVALID_PARAMETER; -} +}*/ /** * Sets the byte at the given index to value provided. @@ -125,7 +125,7 @@ int DataStream::get(int position) * @return DEVICE_OK, or DEVICE_INVALID_PARAMETER. * */ -int DataStream::set(int position, uint8_t value) +/*int DataStream::set(int position, uint8_t value) { for (int i = 0; i < bufferCount; i++) { @@ -139,16 +139,16 @@ int DataStream::set(int position, uint8_t value) } return DEVICE_INVALID_PARAMETER; -} +}*/ /** * Gets number of bytes that are ready to be consumed in this data stream. * @return The size in bytes. */ -int DataStream::length() +/*int DataStream::length() { return this->bufferLength; -} +}*/ /** * Determines if any of the data currently flowing through this stream is held in non-volatile (FLASH) memory. @@ -156,13 +156,17 @@ int DataStream::length() */ bool DataStream::isReadOnly() { - bool r = true; + /*bool r = true; for (int i=0; inextBuffer != NULL ) + return nextBuffer->isReadOnly(); + return true; } /** @@ -198,19 +202,19 @@ void DataStream::disconnect() * Determine the number of bytes that are currnetly buffered before blocking subsequent push() operations. * @return the current preferred buffer size for this DataStream */ -int DataStream::getPreferredBufferSize() +/*int DataStream::getPreferredBufferSize() { return preferredBufferSize; -} +}*/ /** * Define the number of bytes that should be buffered before blocking subsequent push() operations. * @param size The number of bytes to buffer. */ -void DataStream::setPreferredBufferSize(int size) +/*void DataStream::setPreferredBufferSize(int size) { this->preferredBufferSize = size; -} +}*/ /** * Determines if this stream acts in a synchronous, blocking mode or asynchronous mode. In blocking mode, writes to a full buffer @@ -223,13 +227,13 @@ void DataStream::setBlocking(bool isBlocking) this->isBlocking = isBlocking; // If this is the first time async mode has been used on this stream, allocate the necessary resources. - if (!isBlocking && this->pullRequestEventCode == 0) + /*if (!isBlocking && this->pullRequestEventCode == 0) { this->pullRequestEventCode = allocateNotifyEvent(); if(EventModel::defaultEventBus) EventModel::defaultEventBus->listen(DEVICE_ID_NOTIFY, pullRequestEventCode, this, &DataStream::onDeferredPullRequest); - } + }*/ } /** @@ -237,7 +241,7 @@ void DataStream::setBlocking(bool isBlocking) */ ManagedBuffer DataStream::pull() { - ManagedBuffer out = stream[0]; + /*ManagedBuffer out = stream[0]; // // A simplistic FIFO for now. Copy cost is actually pretty low because ManagedBuffer is a managed type, @@ -256,17 +260,24 @@ ManagedBuffer DataStream::pull() Event(DEVICE_ID_NOTIFY_ONE, spaceAvailableEventCode); - return out; + return out;*/ + + /*if( this->nextBuffer != NULL ) + return *this->nextBuffer; + + return ManagedBuffer();*/ + + return this->upStream->pull(); } /** * Issue a pull request to our downstream component, if one has been registered. */ -void DataStream::onDeferredPullRequest(Event) +/*void DataStream::onDeferredPullRequest(Event) { if (downStream != NULL) downStream->pullRequest(); -} +}*/ /** * Determines if a buffer of the given size can be added to the buffer. @@ -276,13 +287,16 @@ void DataStream::onDeferredPullRequest(Event) */ bool DataStream::canPull(int size) { - if(bufferCount + writers >= DATASTREAM_MAXIMUM_BUFFERS) + DMESG( "DataStream::canPull()" ); + /*if(bufferCount + writers >= DATASTREAM_MAXIMUM_BUFFERS) return false; if(preferredBufferSize > 0 && (bufferLength + size > preferredBufferSize)) return false; - return true; + return true;*/ + + return this->nextBuffer != NULL; } /** @@ -290,16 +304,17 @@ bool DataStream::canPull(int size) * * @return true if there if the buffer is ful, and can accept no more data at this time. False otherwise. */ -bool DataStream::full() +/*bool DataStream::full() { return !canPull(); -} +}*/ /** * Store the given buffer in our stream, possibly also causing a push operation on our downstream component. */ int DataStream::pullRequest() { + /* // If we're defined as non-blocking and no space is available, then there's nothing we can do. if (full() && this->isBlocking == false) return DEVICE_NO_RESOURCES; @@ -328,14 +343,25 @@ int DataStream::pullRequest() if (downStream != NULL) { - if (this->isBlocking) + DMESG( "DS(blocking = %d) -> PR?", this->isBlocking ); + if (this->isBlocking) { + DMESG( "DS -> Direct PR" ); downStream->pullRequest(); - else + } + else { + DMESG( "DS -> Defer PR" ); Event(DEVICE_ID_NOTIFY, pullRequestEventCode); + } } - return DEVICE_OK; + return DEVICE_OK;*/ + + if( this->downStream != NULL ) { + this->downStream->pullRequest(); + } + + return DEVICE_OK; } /** diff --git a/source/streams/LevelDetector.cpp b/source/streams/LevelDetector.cpp index 7c50883a..b5b0e127 100644 --- a/source/streams/LevelDetector.cpp +++ b/source/streams/LevelDetector.cpp @@ -45,7 +45,6 @@ LevelDetector::LevelDetector(DataSource &source, int highThreshold, int lowThres this->activated = false; if(connectImmediately){ upstream.connect(*this); - activated = true; } } @@ -54,6 +53,9 @@ LevelDetector::LevelDetector(DataSource &source, int highThreshold, int lowThres */ int LevelDetector::pullRequest() { + if( ttl < 1 && !activated ) + return DEVICE_OK; + ManagedBuffer b = upstream.pull(); int16_t *data = (int16_t *) &b[0]; @@ -100,6 +102,10 @@ int LevelDetector::pullRequest() data++; } + // Disconnect when our TTL is less than 1, if we're not set to always active + if( !activated && --ttl < 1 ) + upstream.disconnect(); + return DEVICE_OK; } @@ -110,15 +116,19 @@ int LevelDetector::pullRequest() */ int LevelDetector::getValue() { - if(!activated){ - // Register with our upstream component: on demand activated - DMESG("activating LD"); - upstream.connect(*this); - activated = true; - } + this->ttl = 100; // In buffers + upstream.connect(*this); + target_wait( 100 ); return level; } +void LevelDetector::activateForEvents( bool state ) +{ + this->activated = state; + if( this->activated ) + upstream.connect(*this); +} + /** * Set threshold to the given value. Events will be generated when these thresholds are crossed. * diff --git a/source/streams/LevelDetectorSPL.cpp b/source/streams/LevelDetectorSPL.cpp index b0cc545c..fbe2817d 100644 --- a/source/streams/LevelDetectorSPL.cpp +++ b/source/streams/LevelDetectorSPL.cpp @@ -32,6 +32,9 @@ DEALINGS IN THE SOFTWARE. #include "StreamNormalizer.h" #include "CodalDmesg.h" +#define CODAL_STREAM_IDLE_TIMEOUT_MS 250 +#define CODAL_STREAM_MIC_STABLE_MS 5 + using namespace codal; LevelDetectorSPL::LevelDetectorSPL(DataSource &source, float highThreshold, float lowThreshold, float gain, float minValue, uint16_t id, bool connectImmediately) : upstream(source) @@ -60,6 +63,11 @@ LevelDetectorSPL::LevelDetectorSPL(DataSource &source, float highThreshold, floa */ int LevelDetectorSPL::pullRequest() { + if( this->timeout - system_timer_current_time() > CODAL_STREAM_IDLE_TIMEOUT_MS && !activated ) { + upstream.disconnect(); + return DEVICE_OK; + } + ManagedBuffer b = upstream.pull(); uint8_t *data = &b[0]; @@ -135,7 +143,7 @@ int LevelDetectorSPL::pullRequest() status |= LEVEL_DETECTOR_SPL_LOW_THRESHOLD_PASSED; status &= ~LEVEL_DETECTOR_SPL_HIGH_THRESHOLD_PASSED; } - } + } return DEVICE_OK; } @@ -147,15 +155,21 @@ int LevelDetectorSPL::pullRequest() */ float LevelDetectorSPL::getValue() { - if(!activated){ - // Register with our upstream component: on demand activated - upstream.connect(*this); - activated = true; - } - + bool wasAwake = this->activated || system_timer_current_time() - this->timeout ; + this->timeout = system_timer_current_time() + CODAL_STREAM_IDLE_TIMEOUT_MS; + upstream.connect(*this); + if( !wasAwake ) + target_wait( CODAL_STREAM_MIC_STABLE_MS ); return splToUnit(level); } +void LevelDetectorSPL::activateForEvents( bool state ) +{ + this->activated = state; + if( this->activated ) + upstream.connect(*this); +} + /* * Disable / turn off this level detector * diff --git a/source/streams/LowPassFilter.cpp b/source/streams/LowPassFilter.cpp index e92558f3..a68fe052 100644 --- a/source/streams/LowPassFilter.cpp +++ b/source/streams/LowPassFilter.cpp @@ -1,4 +1,5 @@ #include "LowPassFilter.h" +#include "CodalDmesg.h" using namespace codal; diff --git a/source/streams/StreamNormalizer.cpp b/source/streams/StreamNormalizer.cpp index 6d73fae8..05e881aa 100644 --- a/source/streams/StreamNormalizer.cpp +++ b/source/streams/StreamNormalizer.cpp @@ -24,7 +24,6 @@ DEALINGS IN THE SOFTWARE. #include "StreamNormalizer.h" #include "ErrorNo.h" -#include "CodalDmesg.h" using namespace codal; @@ -144,14 +143,6 @@ StreamNormalizer::StreamNormalizer(DataSource &source, float gain, bool normaliz * Provide the next available ManagedBuffer to our downstream caller, if available. */ ManagedBuffer StreamNormalizer::pull() -{ - return buffer; -} - -/** - * Callback provided when data is ready. - */ -int StreamNormalizer::pullRequest() { int samples; // Number of samples in the input buffer. int s; // The sample being processed, encpasulated inside a 32 bit number. @@ -161,7 +152,8 @@ int StreamNormalizer::pullRequest() int bytesPerSampleIn; // number of bit per sample of the input buffer. int bytesPerSampleOut; // number of bit per sample of the input buffer. int z = 0; // normalized zero point calculated from this buffer. - int zo = (int) zeroOffset; // Snapshot of our previously calculate zero point + int zo = (int) zeroOffset; // Snapshot of our previously calculate zero point. + ManagedBuffer buffer; // The buffer being processed. // Determine the input format. inputFormat = upstream.getFormat(); @@ -226,11 +218,15 @@ int StreamNormalizer::pullRequest() // Ensure output buffer is the correct size; buffer.truncate(samples * bytesPerSampleOut); - // Signal downstream component that a buffer is ready. - if (outputEnabled) - output.pullRequest(); + return buffer; +} - return DEVICE_OK; +/** + * Callback provided when data is ready. + */ +int StreamNormalizer::pullRequest() +{ + return output.pullRequest(); } /** diff --git a/source/streams/StreamRecording.cpp b/source/streams/StreamRecording.cpp index 4a9a7686..d85d92da 100644 --- a/source/streams/StreamRecording.cpp +++ b/source/streams/StreamRecording.cpp @@ -9,12 +9,13 @@ using namespace codal; StreamRecording::StreamRecording( DataSource &source ) : upStream( source ) { + this->state = REC_STATE_STOPPED; this->bufferLength = 0; this->lastBuffer = 0; this->readWriteHead = 0; this->downStream = NULL; - source.connect( *this ); + //upStream.connect( *this ); } StreamRecording::~StreamRecording() @@ -38,6 +39,8 @@ ManagedBuffer StreamRecording::pull() stop(); return ManagedBuffer(); } + + //DMESGF( "Output: %d of %d", this->readWriteHead, REC_MAX_BUFFERS ); // Grab the next block ManagedBuffer out = this->buffer[this->readWriteHead++]; @@ -77,15 +80,17 @@ void StreamRecording::dumpState() int StreamRecording::pullRequest() { - ManagedBuffer data = this->upStream.pull(); - // Are we recording? if( this->state != REC_STATE_RECORDING ) return DEVICE_OK; + + ManagedBuffer data = this->upStream.pull(); + DMESGN( "Input: %d of %d (%d B)\r", this->readWriteHead, REC_MAX_BUFFERS, this->bufferLength ); + // Are we getting empty buffers (probably because we're out of RAM!) if( data.length() == 0 ) - return DEVICE_NO_RESOURCES; + return DEVICE_NO_DATA; // Can we record any more? if( this->readWriteHead < REC_MAX_BUFFERS ) @@ -130,9 +135,10 @@ bool StreamRecording::record() erase(); bool changed = this->state != REC_STATE_RECORDING; + if( changed ) + upStream.connect( *this ); this->state = REC_STATE_RECORDING; - this->upStream.pull(); // Kickstart the upstream?? return changed; } @@ -155,13 +161,18 @@ bool StreamRecording::play() bool changed = this->state != REC_STATE_PLAYING; this->state = REC_STATE_PLAYING; - this->downStream->pullRequest(); + if( this->downStream != NULL ) + this->downStream->pullRequest(); return changed; } void StreamRecording::stop() { + bool changed = this->state != REC_STATE_STOPPED; + if( changed ) + upStream.disconnect(); + this->state = REC_STATE_STOPPED; this->readWriteHead = 0; // Snap to the start } @@ -179,4 +190,9 @@ bool StreamRecording::isRecording() bool StreamRecording::isStopped() { return this->state == REC_STATE_STOPPED; +} + +float StreamRecording::getSampleRate() +{ + return this->upStream.getSampleRate(); } \ No newline at end of file diff --git a/source/streams/StreamSplitter.cpp b/source/streams/StreamSplitter.cpp index f9d2ef3e..9a72852d 100644 --- a/source/streams/StreamSplitter.cpp +++ b/source/streams/StreamSplitter.cpp @@ -26,10 +26,8 @@ DEALINGS IN THE SOFTWARE. #include "StreamSplitter.h" #include "StreamNormalizer.h" #include "ErrorNo.h" -#include "CodalDmesg.h" #include "Event.h" - - +#include "CodalDmesg.h" using namespace codal; @@ -54,14 +52,15 @@ int SplitterChannel::pullRequest() { ManagedBuffer SplitterChannel::pull() { - pullAttempts--; - // Shortcut! - if( sampleRate == DATASTREAM_SAMPLE_RATE_UNKNOWN ) - return parent->getBuffer(); + pullAttempts = 0; + ManagedBuffer inData = parent->getBuffer(); + + // Shortcuts - we can't fabricate samples, so just pass on what we can if we don't know or can't keep up. + if( sampleRate == DATASTREAM_SAMPLE_RATE_UNKNOWN || sampleRate >= parent->upstream.getSampleRate() ) + return inData; // Going the long way around - drop any extra samples... float inRate = parent->upstream.getSampleRate(); - ManagedBuffer inData = parent->getBuffer(); int inFmt = parent->upstream.getFormat(); int bytesPerSample = DATASTREAM_FORMAT_BYTES_PER_SAMPLE( inFmt ); int inSamples = inData.length() / bytesPerSample; @@ -71,7 +70,8 @@ ManagedBuffer SplitterChannel::pull() uint8_t *inPtr = &inData[0]; uint8_t *outPtr = &outData[0]; - for( int i=0; inumberActiveChannels++; - Event e( parent->id, SPLITTER_CHANNEL_CONNECT ); + // Prevent repeated events on calling connect multiple times (so consumers can blindly connect!) + if( output != &sink ) { + output = &sink; + parent->activeChannels++; // Notify that we have might _at least_ one sink available + Event e( parent->id, SPLITTER_CHANNEL_CONNECT ); + } } void SplitterChannel::disconnect() { - output = NULL; - parent->numberActiveChannels--; - Event e( parent->id, SPLITTER_CHANNEL_DISCONNECT ); + // Prevent repeated events on calling disconnect multiple times (so consumers can blindly disconnect!) + if( output != NULL ) { + output = NULL; + Event e( parent->id, SPLITTER_CHANNEL_DISCONNECT ); + } } int SplitterChannel::getFormat() @@ -115,7 +120,7 @@ float SplitterChannel::getSampleRate() float SplitterChannel::requestSampleRate( float sampleRate ) { - sampleRate = sampleRate; + this->sampleRate = sampleRate; // Do we need to request a higher rate upstream? if( parent->upstream.getSampleRate() < sampleRate ) { @@ -141,12 +146,15 @@ float SplitterChannel::requestSampleRate( float sampleRate ) StreamSplitter::StreamSplitter(DataSource &source, uint16_t id) : upstream(source) { this->id = id; - this->numberChannels = 0; + this->channels = 0; // init array to NULL. for (int i = 0; i < CONFIG_MAX_CHANNELS; i++) outputChannels[i] = NULL; source.connect(*this); + + this->__cycle = 0; + //this->status |= DEVICE_COMPONENT_STATUS_SYSTEM_TICK; } StreamSplitter::~StreamSplitter() @@ -156,29 +164,58 @@ StreamSplitter::~StreamSplitter() ManagedBuffer StreamSplitter::getBuffer() { - processed++; + activeChannels++; return lastBuffer; } +void StreamSplitter::periodicCallback() { + if( this->__cycle++ % 50 != 0 ) + return; + + if( this->id == 64000 ) { + char const CLEAR_SRC[] = {0x1B, 0x5B, 0x32, 0x4A }; + DMESG( CLEAR_SRC ); + } + + DMESG( "%d - Active Channels: %d (active = %d, sampleRate = %d)", this->id, this->activeChannels, this->isActive, (int)this->upstream.getSampleRate() ); + for( int i=0; ioutputChannels[i] != NULL ) { + if( this->outputChannels[i]->output != NULL ) + DMESG( "\t- %d [CONN] failed = %d (sampleRate = %d)", i, this->outputChannels[i]->pullAttempts, (int)this->outputChannels[i]->getSampleRate() ); + else + DMESG( "\t- %d [DISC] failed = %d (sampleRate = %d)", i, this->outputChannels[i]->pullAttempts, (int)this->outputChannels[i]->getSampleRate() ); + } else + DMESG( "\t- %d [----]", i ); + } +} + /** * Callback provided when data is ready. */ int StreamSplitter::pullRequest() { - if( processed >= numberChannels ) + if( activeChannels > 0 ) { - processed = 0; + if( !isActive ) + Event e( id, SPLITTER_ACTIVATE ); + isActive = true; lastBuffer = upstream.pull(); - - // For each downstream channel that exists in array outputChannels - make a pullRequest - for (int i = 0; i < CONFIG_MAX_CHANNELS; i++) - if (outputChannels[i] != NULL) - outputChannels[i]->pullRequest(); } - else { - // Unfortunately we have to drop a buffer, otherwise we might stall the pipeline! - upstream.pull(); - processed = CONFIG_MAX_CHANNELS + 1; + else + { + if( isActive ) + Event e( id, SPLITTER_DEACTIVATE ); + isActive = false; + lastBuffer = ManagedBuffer(); + } + + activeChannels = 0; + + // For each downstream channel that exists in array outputChannels - make a pullRequest + for (int i = 0; i < CONFIG_MAX_CHANNELS; i++) + { + if (outputChannels[i] != NULL) + outputChannels[i]->pullRequest(); } return DEVICE_OK; @@ -197,14 +234,11 @@ SplitterChannel * StreamSplitter::createChannel() break; } } - if(placed != -1) - numberChannels++; - - if(numberChannels > 0) - Event e( id, SPLITTER_ACTIVATE_CHANNEL ); //Activate ADC - - if( placed != -1 ) + if(placed != -1) { + channels++; + Event e( id, SPLITTER_ACTIVATE_CHANNEL ); return outputChannels[placed]; + } return NULL; } @@ -213,14 +247,13 @@ bool StreamSplitter::destroyChannel( SplitterChannel * channel ) { for( int i=0; i The SplitterChannel supplied did not exist on this splitter, refusing to destroy it." ); return false; }