1 /** 2 * Threaded stream decoder. 3 * 4 * Copyright: Copyright Guillaume Piolat 2021. 5 * License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0) 6 */ 7 module gamemixer.bufferedstream; 8 9 import core.atomic; 10 import core.stdc..string: memcpy; 11 12 import dplug.core; 13 import gamemixer.delayline; 14 import audioformats; 15 16 package: 17 18 // Warning: the sequence of buffering, sample rate change, threading... is pretty complicated to follow. 19 20 // A `BufferedStream` has optional threaded decoding, activated for streams that perform file IO. 21 class BufferedStream 22 { 23 public: 24 @nogc: 25 26 enum streamingDecodingIncrement = 0.1f; // No more than 100ms in decoded at once. TODO tune 27 enum streamingBufferDuration = 1.0f; // Seems to be the default buffer size in foobar TODO tune 28 29 this(const(char)[] path) 30 { 31 _bufMutex = makeMutex(); 32 _stream.openFromFile(path); 33 _channels = _stream.getNumChannels(); 34 startDecodingThreadIfNeeded(); 35 } 36 37 this(const(ubyte)[] data) 38 { 39 _bufMutex = makeMutex(); 40 _stream.openFromMemory(data); 41 _channels = _stream.getNumChannels(); 42 startDecodingThreadIfNeeded(); 43 } 44 45 ~this() 46 { 47 if (_threaded) 48 { 49 atomicStore(_decodeThreadShouldDie, true); 50 _decodeThread.join(); 51 _decodeBuffer.reallocBuffer(0); 52 } 53 } 54 55 int getNumChannels() nothrow 56 { 57 return _stream.getNumChannels(); 58 } 59 60 float getSamplerate() nothrow 61 { 62 return _stream.getSamplerate(); 63 } 64 65 int readSamplesFloat(float* outData, int frames) 66 { 67 if (!_threaded) 68 { 69 // Non-threaded version 70 return _stream.readSamplesFloat(outData, frames); 71 } 72 73 // <CONSUMER> 74 int decodedFrames = 0; 75 76 while(true) 77 { 78 if (decodedFrames == frames) 79 break; 80 81 assert(decodedFrames < frames); 82 83 // Get number of frames in ring buffer. 84 _bufMutex.lock(); 85 86 loop: 87 88 int bufFrames = _bufferLength / _channels; 89 if (bufFrames == 0) 90 { 91 if (atomicLoad(_streamIsFinished)) 92 { 93 _bufMutex.unlock(); 94 break; 95 } 96 _bufferIsEmpty.wait(&_bufMutex); 97 goto loop; // maybe it is filled now,w ait for data 98 } 99 int framesNeeded = frames - decodedFrames; 100 if (bufFrames > framesNeeded) 101 bufFrames = framesNeeded; 102 103 // PERF: why not read newbufFrames instead of bufFrames ? 104 int newbufFrames = _bufferLength / _channels; // re-read length to get oldest frame location 105 const(float)* readPointerOldest = _streamingBuffer.readPointer() - (_bufferLength - 1); 106 size_t bytes = float.sizeof * bufFrames * _channels; 107 memcpy(&outData[decodedFrames * _channels], readPointerOldest, bytes); 108 _bufferLength -= bufFrames * _channels; 109 assert(_bufferLength >= 0); 110 111 decodedFrames += bufFrames; 112 113 _bufMutex.unlock(); 114 _bufferIsFull.notifyOne(); // Buffer is probably not full anymore. 115 116 // stream buffer is probably not full anymore 117 } 118 119 return decodedFrames; 120 121 // </CONSUMER> 122 } 123 124 private: 125 Thread _decodeThread; 126 AudioStream _stream; 127 bool _threaded = false; 128 int _channels; 129 shared(bool) _decodeThreadShouldDie = false; 130 shared(bool) _streamIsFinished = false; 131 132 133 UncheckedMutex _bufMutex; 134 ConditionVariable _bufferIsFull; 135 ConditionVariable _bufferIsEmpty; 136 137 // all protected by _bufMutex too 138 int _bufferLength; // is counted in individual samples, not frames 139 int _bufferCapacity; // is counted in individual samples, not frames 140 Delayline!float _streamingBuffer; 141 142 int _decodeIncrement; // max number of samples to decode at once, to avoid longer mutex hold 143 float[] _decodeBuffer; // producer-only buffer before-pushgin 144 145 void startDecodingThreadIfNeeded() 146 { 147 if (!_stream.realtimeSafe()) 148 { 149 _threaded = true; 150 151 _bufferIsEmpty = makeConditionVariable(); 152 _bufferIsFull = makeConditionVariable(); 153 154 // compute amount of buffer we want 155 int streamingBufferSamples = cast(int)(streamingBufferDuration * _stream.getSamplerate() * _channels); 156 157 _streamingBuffer.initialize(streamingBufferSamples); 158 _bufferLength = 0; 159 _bufferCapacity = streamingBufferSamples; 160 161 _decodeIncrement = cast(int)(streamingDecodingIncrement * _stream.getSamplerate()); 162 _decodeBuffer.reallocBuffer(_decodeIncrement * _channels); 163 164 // start event thread 165 _decodeThread = makeThread(&decodeStream); 166 _decodeThread.start(); 167 } 168 } 169 170 void decodeStream() nothrow 171 { 172 // <PRODUCER> 173 174 loop: 175 while(!atomicLoad(_decodeThreadShouldDie)) 176 { 177 // Get available room in the delayline. 178 _bufMutex.lock(); 179 180 // How much room there is in the streaming buffer? 181 int roomFrames = ( _bufferCapacity - _bufferLength) / _stream.getNumChannels(); 182 183 assert(roomFrames >= 0); 184 if (roomFrames > _decodeIncrement) 185 roomFrames = _decodeIncrement; 186 187 if (roomFrames == 0) 188 { 189 // buffer is full, wait on condition 190 _bufferIsFull.wait(&_bufMutex); 191 _bufMutex.unlock(); 192 goto loop; 193 } 194 _bufMutex.unlock(); 195 196 assert(roomFrames != 0); 197 198 // Decode that much frames, but without holding the mutex. 199 int framesRead; 200 try 201 { 202 framesRead = _stream.readSamplesFloat(_decodeBuffer.ptr, roomFrames); 203 } 204 catch(Exception e) 205 { 206 // decode error, stop decoding 207 framesRead = 0; 208 } 209 210 bool streamIsFinished = (framesRead != roomFrames); 211 if (streamIsFinished) 212 { 213 atomicStore(_streamIsFinished, true); 214 } 215 216 if (framesRead) 217 { 218 // Re-lock the mutex in order to fill the buffer 219 _bufMutex.lock(); 220 int samples = framesRead * _channels; 221 _streamingBuffer.feedBuffer( _decodeBuffer[0..samples] ); 222 _bufferLength += samples; 223 assert(_bufferLength <= _bufferCapacity); 224 _bufMutex.unlock(); 225 _bufferIsEmpty.notifyOne(); // stream buffer is probably not empty anymore 226 } 227 228 if (streamIsFinished) 229 return; 230 } 231 // <PRODUCER> 232 } 233 }