1 /**
2 * Threaded stream decoder.
3 *
4 * Copyright: Copyright Guillaume Piolat 2021.
5 * License:   $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0)
6 */
7 module gamemixer.bufferedstream;
8 
9 import core.atomic;
10 import core.stdc..string: memcpy;
11 
12 import dplug.core;
13 import gamemixer.delayline;
14 import audioformats;
15 
16 package:
17 
18 // Warning: the sequence of buffering, sample rate change, threading... is pretty complicated to follow.
19 
20 // A `BufferedStream` has optional threaded decoding, activated for streams that perform file IO.
21 class BufferedStream
22 {
23 public:
24 @nogc:
25 
26     enum streamingDecodingIncrement = 0.1f; // No more than 100ms in decoded at once. TODO tune
27     enum streamingBufferDuration = 1.0f; // Seems to be the default buffer size in foobar TODO tune
28 
29     this(const(char)[] path)
30     {
31         _bufMutex = makeMutex();
32         _stream.openFromFile(path);
33         _channels = _stream.getNumChannels();
34         startDecodingThreadIfNeeded();
35     }
36 
37     this(const(ubyte)[] data)
38     {
39         _bufMutex = makeMutex();
40         _stream.openFromMemory(data);
41         _channels = _stream.getNumChannels();
42         startDecodingThreadIfNeeded();
43     }  
44 
45     ~this()
46     {
47         if (_threaded)
48         {
49             atomicStore(_decodeThreadShouldDie, true);
50             _decodeThread.join();
51             _decodeBuffer.reallocBuffer(0);
52         }
53     }
54 
55     int getNumChannels() nothrow
56     {
57         return _stream.getNumChannels();
58     }
59 
60     float getSamplerate() nothrow
61     {
62         return _stream.getSamplerate();
63     }
64 
65     int readSamplesFloat(float* outData, int frames)
66     {
67         if (!_threaded)
68         {
69             // Non-threaded version
70             return _stream.readSamplesFloat(outData, frames);
71         }
72 
73         // <CONSUMER>
74         int decodedFrames = 0;
75 
76         while(true)
77         {
78             if (decodedFrames == frames)
79                 break;
80 
81             assert(decodedFrames < frames);
82 
83             // Get number of frames in ring buffer.
84             _bufMutex.lock();
85 
86         loop:
87 
88             int bufFrames = _bufferLength / _channels;
89             if (bufFrames == 0)
90             {
91                 if (atomicLoad(_streamIsFinished))
92                 {
93                     _bufMutex.unlock();
94                     break;
95                 }
96                 _bufferIsEmpty.wait(&_bufMutex);
97                 goto loop; // maybe it is filled now,w ait for data
98             }
99             int framesNeeded = frames - decodedFrames;
100             if (bufFrames > framesNeeded)
101                 bufFrames = framesNeeded;
102 
103             // PERF: why not read newbufFrames instead of bufFrames ?
104             int newbufFrames = _bufferLength / _channels; // re-read length to get oldest frame location
105             const(float)* readPointerOldest = _streamingBuffer.readPointer() - (_bufferLength - 1);
106             size_t bytes = float.sizeof * bufFrames * _channels; 
107             memcpy(&outData[decodedFrames * _channels], readPointerOldest, bytes);
108             _bufferLength -= bufFrames * _channels;
109             assert(_bufferLength >= 0);
110             
111             decodedFrames += bufFrames;
112 
113             _bufMutex.unlock();
114             _bufferIsFull.notifyOne(); // Buffer is probably not full anymore.
115 
116             // stream buffer is probably not full anymore
117         }
118 
119         return decodedFrames;
120 
121         // </CONSUMER>
122     }
123 
124 private:
125     Thread _decodeThread;
126     AudioStream _stream;
127     bool _threaded = false;
128     int _channels;
129     shared(bool) _decodeThreadShouldDie = false;
130     shared(bool) _streamIsFinished = false;
131 
132 
133     UncheckedMutex _bufMutex;
134     ConditionVariable _bufferIsFull;
135     ConditionVariable _bufferIsEmpty;
136 
137     // all protected by _bufMutex too
138     int _bufferLength;   // is counted in individual samples, not frames
139     int _bufferCapacity; // is counted in individual samples, not frames
140     Delayline!float _streamingBuffer;
141 
142     int _decodeIncrement; // max number of samples to decode at once, to avoid longer mutex hold
143     float[] _decodeBuffer; // producer-only buffer before-pushgin
144 
145     void startDecodingThreadIfNeeded()
146     {
147         if (!_stream.realtimeSafe())
148         {
149             _threaded = true;
150 
151             _bufferIsEmpty = makeConditionVariable();
152             _bufferIsFull = makeConditionVariable();
153 
154             // compute amount of buffer we want
155             int streamingBufferSamples = cast(int)(streamingBufferDuration * _stream.getSamplerate() * _channels);
156             
157             _streamingBuffer.initialize(streamingBufferSamples);
158             _bufferLength = 0;
159             _bufferCapacity = streamingBufferSamples;
160 
161             _decodeIncrement = cast(int)(streamingDecodingIncrement * _stream.getSamplerate());
162             _decodeBuffer.reallocBuffer(_decodeIncrement * _channels);
163 
164             // start event thread
165             _decodeThread = makeThread(&decodeStream);
166             _decodeThread.start();
167         }
168     }
169 
170     void decodeStream() nothrow
171     {
172         // <PRODUCER>
173 
174         loop:
175         while(!atomicLoad(_decodeThreadShouldDie))
176         {            
177             // Get available room in the delayline.
178             _bufMutex.lock();
179             
180             // How much room there is in the streaming buffer?
181             int roomFrames = ( _bufferCapacity - _bufferLength) / _stream.getNumChannels();
182 
183             assert(roomFrames >= 0);
184             if (roomFrames > _decodeIncrement)
185                 roomFrames = _decodeIncrement;
186 
187             if (roomFrames == 0)
188             {
189                 // buffer is full, wait on condition
190                 _bufferIsFull.wait(&_bufMutex);
191                 _bufMutex.unlock();
192                 goto loop;                    
193             }
194             _bufMutex.unlock();
195 
196             assert(roomFrames != 0);
197 
198             // Decode that much frames, but without holding the mutex.
199             int framesRead;
200             try
201             {
202                 framesRead = _stream.readSamplesFloat(_decodeBuffer.ptr, roomFrames);
203             }
204             catch(Exception e)
205             {
206                 // decode error, stop decoding
207                 framesRead = 0;
208             }
209 
210             bool streamIsFinished = (framesRead != roomFrames);
211             if (streamIsFinished)
212             {
213                 atomicStore(_streamIsFinished, true);
214             }
215 
216             if (framesRead)
217             {
218                 // Re-lock the mutex in order to fill the buffer
219                 _bufMutex.lock();
220                 int samples = framesRead * _channels;
221                 _streamingBuffer.feedBuffer( _decodeBuffer[0..samples] );
222                 _bufferLength += samples;
223                 assert(_bufferLength <= _bufferCapacity);
224                 _bufMutex.unlock();
225                 _bufferIsEmpty.notifyOne(); // stream buffer is probably not empty anymore
226             }
227             
228             if (streamIsFinished)
229                 return;
230         }
231         // <PRODUCER>
232     }
233 }