zmqaudio.cc 12.5 KB
Newer Older
Your Name's avatar
Your Name committed
1
2
//
// zmqaudio.cc -- play audio samples received over zeromq sockets
Your Name's avatar
Your Name committed
3
// Copyright (c) 2020 Günter Windau
Your Name's avatar
Your Name committed
4
// based on the example in paex_sine.c
Your Name's avatar
Your Name committed
5
//
Your Name's avatar
Your Name committed
6

Your Name's avatar
Your Name committed
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
//
// This program uses the PortAudio Portable Audio Library.
// For more information see: http://www.portaudio.com/
// Copyright (c) 1999-2000 Ross Bencina and Phil Burk
//
// Permission is hereby granted, free of charge, to any person obtaining
// a copy of this software and associated documentation files
// (the "Software"), to deal in the Software without restriction,
// including without limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of the Software,
// and to permit persons to whom the Software is furnished to do so,
// subject to the following conditions:
//
// The above copyright notice and this permission notice shall be
// included in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
// IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR
// ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
// CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
//

//
// The text above constitutes the entire PortAudio license; however,
// the PortAudio community also makes the following non-binding requests:
//
// Any person wishing to distribute modifications to the Software is
// requested to send the modifications to the original developer so that
// they can be incorporated into the canonical version. It is also
// requested that these non-binding requests be included along with the
// license above.
//
Your Name's avatar
Your Name committed
42
43
#include "zmqaudio.h"
#include "portaudio.h"
Your Name's avatar
Your Name committed
44
#include "zhelpers.hpp"
Your Name's avatar
Your Name committed
45
#include <iostream>
Your Name's avatar
Your Name committed
46
47
#include <math.h>
#include <stdio.h>
Your Name's avatar
Your Name committed
48
#include <sched.h>
Your Name's avatar
Your Name committed
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64

#define NUM_SECONDS (5)
#define SAMPLE_RATE (44100)

#ifndef M_PI
#define M_PI (3.14159265)
#endif

#define TABLE_SIZE (200)
typedef struct {
    float sine[TABLE_SIZE];
    int left_phase;
    int right_phase;
    char message[20];
} paTestData;

Your Name's avatar
Your Name committed
65
66
zmq_audiobuffer *zmq_audiobuffer::_playing_now = nullptr;

Your Name's avatar
Your Name committed
67
int verbose = 1;
Your Name's avatar
Your Name committed
68
const int nrbuffers = 22;
Your Name's avatar
Your Name committed
69
zmq_audiobuffer *buffer[nrbuffers];
Your Name's avatar
Your Name committed
70

Your Name's avatar
Your Name committed
71
72
73
74
75
76
77
78
79
PaError print_error(PaError e) {
    if (e != paNoError)
        Pa_Terminate();
    fprintf(stderr, "An error occured while using the portaudio stream\n");
    fprintf(stderr, "Error number: %d\n", e);
    fprintf(stderr, "Error message: %s\n", Pa_GetErrorText(e));
    return e;
}

Your Name's avatar
Your Name committed
80
zmq_audiobuffer::zmq_audiobuffer(std::uint32_t _fsamp, std::uint32_t _nsamp,
Your Name's avatar
Your Name committed
81
82
83
                                 std::uint32_t _nchan)
    : fsamp{_fsamp}, nsamp{_nsamp}, nchan{_nchan},
      data{new float[_nsamp * _nchan]}, idata{0},
Your Name's avatar
Your Name committed
84
      frames_per_buffer{paFramesPerBufferUnspecified}, playing{false}, stream{nullptr} {
Your Name's avatar
Your Name committed
85
    assert(sizeof(float) == 4); // need 32 bit IEEE 754 floats
Your Name's avatar
Your Name committed
86
    frames_per_buffer = 100;
Your Name's avatar
Your Name committed
87
88
}

Your Name's avatar
Your Name committed
89
zmq_audiobuffer::~zmq_audiobuffer() {
Your Name's avatar
Your Name committed
90
    delete[] data;
Your Name's avatar
Your Name committed
91
92
93
94
    Pa_StopStream(stream);
    Pa_CloseStream(stream);
}

Your Name's avatar
Your Name committed
95
96
97
98
// This routine will be called by the PortAudio engine when audio is needed.
// It may called at interrupt level on some machines so don't do anything
// that could mess up the system like calling malloc() or free().
//
Your Name's avatar
Your Name committed
99
int zmq_audiobuffer::stream_callback(
Your Name's avatar
Your Name committed
100
    const void *, void *outputBuffer,
Your Name's avatar
Your Name committed
101
    unsigned long framesPerBuffer,
Your Name's avatar
Your Name committed
102
103
    const PaStreamCallbackTimeInfo *,
    PaStreamCallbackFlags, void *userData) {
Your Name's avatar
Your Name committed
104
    std::cerr << "<callback>" << std::flush;
Your Name's avatar
Your Name committed
105
    zmq_audiobuffer *pbuf = (zmq_audiobuffer *)userData;
Your Name's avatar
Your Name committed
106
107
108
    float *out = (float *)outputBuffer;
    unsigned long i;

Your Name's avatar
Your Name committed
109
    int ret = paContinue;
Your Name's avatar
Your Name committed
110
    for (i = 0; i < framesPerBuffer; i++) {
111
112
113
        if (pbuf->idata < (pbuf->nsamp * pbuf->nchan)) {
            for (int j = 0; j < pbuf->nchan; j++)
               *out++ = pbuf->data[pbuf->idata++];
Your Name's avatar
Your Name committed
114
115
        }
        else {
116
117
            for (int j = 0; j < pbuf->nchan; j++)
               *out++ = 0;
Your Name's avatar
Your Name committed
118
            ret = paComplete;
Your Name's avatar
Your Name committed
119
120
            //std::cerr << "</callback>" << std::endl;
            //return ret;
Your Name's avatar
Your Name committed
121
122
        }
    }
Your Name's avatar
Your Name committed
123
124
    std::cerr << "idata=" << pbuf->idata << " fpb=" << framesPerBuffer << "  nsamp=" << pbuf->nsamp << std::flush;
    std::cerr << "</callback>" << std::endl;
Your Name's avatar
Your Name committed
125
    return ret;
Your Name's avatar
Your Name committed
126
127
}

Your Name's avatar
Your Name committed
128
129
130
//
// This routine is called by portaudio when playback is done.
//
Your Name's avatar
Your Name committed
131
void zmq_audiobuffer::stream_finished(void *userData) {
Your Name's avatar
Your Name committed
132
133
    zmq_audiobuffer *pbuf = (zmq_audiobuffer *)userData;
    pbuf->playing = false;
Your Name's avatar
Your Name committed
134
    zmq_audiobuffer::_playing_now = nullptr;
Your Name's avatar
Your Name committed
135
    printf("Stream Completed: %s\n", pbuf->message);
Your Name's avatar
Your Name committed
136
137
}

Your Name's avatar
Your Name committed
138
int zmq_audiobuffer::play() {
Your Name's avatar
Your Name committed
139

Your Name's avatar
Your Name committed
140
141
142
143
144
145
146
147
148
    if (zmq_audiobuffer * p = zmq_audiobuffer::_playing_now) {
        std::cerr
          << "warning: zmq_audiobuffer::play: a buffer is already playing, stopping it"
          << std::endl;
        p->stop();
    }
    std::cerr << "Start!" << std::endl;
    zmq_audiobuffer::_playing_now = this;

Your Name's avatar
Your Name committed
149
    idata = 0;
150

Your Name's avatar
Your Name committed
151
152
153
154
    PaError err = Pa_Initialize();
    if (err != paNoError)
        return print_error(err);

155
156
    PaStreamParameters outputParameters;
    outputParameters.device =
Your Name's avatar
Your Name committed
157
        Pa_GetDefaultOutputDevice(); // default output device
158
159
160
161
    if (outputParameters.device == paNoDevice) {
        fprintf(stderr, "Error: No default output device.\n");
        return paNoDevice;
    }
Your Name's avatar
Your Name committed
162
    outputParameters.channelCount = nchan; // stereo output
163
    outputParameters.sampleFormat =
Your Name's avatar
Your Name committed
164
        paFloat32; // 32 bit floating point output
165
166
167
168
    outputParameters.suggestedLatency =
        Pa_GetDeviceInfo(outputParameters.device)->defaultLowOutputLatency;
    outputParameters.hostApiSpecificStreamInfo = NULL;

Your Name's avatar
Your Name committed
169
    //PaError err;
Your Name's avatar
Your Name committed
170
    err = Pa_OpenStream(&stream, NULL, // no input
171
                        &outputParameters, fsamp, frames_per_buffer,
Your Name's avatar
Your Name committed
172
173
                        paClipOff, // we won't output out of range samples so
                                   // don't bother clipping them
174
                        zmq_audiobuffer::stream_callback, this);
Your Name's avatar
Your Name committed
175
    sprintf(message, "No Message");
176
    err =
Your Name's avatar
Your Name committed
177
        Pa_SetStreamFinishedCallback(stream, &zmq_audiobuffer::stream_finished);
Your Name's avatar
Your Name committed
178
179
    if (err != paNoError) {
        print_error(err);
Your Name's avatar
Your Name committed
180
        return err;
Your Name's avatar
Your Name committed
181
    }
Your Name's avatar
Your Name committed
182
183

    err = Pa_StartStream(stream);
Your Name's avatar
Your Name committed
184
185
    if (err != paNoError) {
        print_error(err);
Your Name's avatar
Your Name committed
186
        return err;
Your Name's avatar
Your Name committed
187
188
    }
    
Your Name's avatar
Your Name committed
189
    playing = true;
Your Name's avatar
Your Name committed
190

Your Name's avatar
Your Name committed
191
192
    //printf("Play for %d seconds.\n", NUM_SECONDS);
    //Pa_Sleep((unsigned int)(duration() * 1000));
Your Name's avatar
Your Name committed
193
    return paNoError;
Your Name's avatar
Your Name committed
194
}
Your Name's avatar
Your Name committed
195

Your Name's avatar
Your Name committed
196
197
198
199
200
void zmq_audiobuffer::abort() {
    std::cerr << "Abort!" << std::endl;
    Pa_AbortStream(stream);
    Pa_CloseStream(stream);
}
Your Name's avatar
Your Name committed
201

Your Name's avatar
Your Name committed
202
203
204
205
206
207
void zmq_audiobuffer::stop() {
    std::cerr << "Stop!" << std::endl;
    Pa_StopStream(stream);
    Pa_CloseStream(stream);
    Pa_Terminate();
}
Your Name's avatar
Your Name committed
208

Your Name's avatar
Your Name committed
209
int zmq_audiobuffer::fill() {
Your Name's avatar
Your Name committed
210
    // initialise sinusoidal wavetable
Your Name's avatar
Your Name committed
211
212
213
214
215
216
    const float fl{440};
    const float fr{880};
    const float vol{0.1};
    for (int i = 0; i < nsamp - 1; i++) {
        data[2 * i] = vol * sin((2 * M_PI * fl * i) / fsamp);
        data[2 * i + 1] = vol * sin((2 * M_PI * fr * i) / fsamp);
Your Name's avatar
Your Name committed
217
    }
Your Name's avatar
Your Name committed
218
219
220
221
222
223
224
225
    return 0;
}

int zmq_audiobuffer::fill(zmq::message_t &msg) {
    uint32_t ndata = msg.size()/sizeof(float);
    if (ndata != (nsamp*nchan)) {
        std::cerr <<
           "in int zmq_audiobuffer::fill(zmq::message_t &msg): audio data size mismatch" <<
Your Name's avatar
Your Name committed
226
227
228
           std::endl <<
           "ndata=" << ndata << std::endl <<
           "nsamp=" << nsamp << ", nchan=" << nchan << " total=" << nsamp*nchan <<
Your Name's avatar
Your Name committed
229
230
231
232
233
           std::endl;
        return 1;
    }
    std::memcpy(data, msg.data(), msg.size());
    return 0;
Your Name's avatar
Your Name committed
234
235
}

Your Name's avatar
Your Name committed
236
237
238
239
240
241
242
243
244
245
246
void zmq_audiobuffer::dump(int nsamples)
{
    std::cerr << "in void zmq_audiobuffer::dump(int nsamples):" << std::endl;
    std::cerr << "fsamp=" << fsamp << std::endl;
    std::cerr << "nsamp=" << nsamp << std::endl;
    std::cerr << "nchan=" << nchan << std::endl;
    std::cerr << "frames_per_buffer=" << frames_per_buffer << std::endl;
    std::cerr << "idata=" << idata << std::endl;
    for (int i=0; i<nsamples; i++) {
        std::cerr << "data[" << i << "]=" << data[i] << std::endl;
    }
Your Name's avatar
Your Name committed
247
248
}

Your Name's avatar
Your Name committed
249

Your Name's avatar
Your Name committed
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
int zmq_recv_multi(zmq::socket_t &socket, zmq::message_t parts[], int nmax) {
    int64_t more;
    size_t more_size = sizeof more;
    int n = 0;
    try {
        do {
            if (n < nmax) {
                socket.recv(&parts[n]);
                if (verbose)
                    std::cout << "recv part " << n << std::endl;
                n++;
            }
            else {
                zmq::message_t discard;
                socket.recv(&discard);
                if (verbose)
Your Name's avatar
Your Name committed
266
267
                    std::cout << "recv part " << n << " (discarded)."
                              << std::endl;
Your Name's avatar
Your Name committed
268
269
270
271
272
273
            }
            socket.getsockopt(ZMQ_RCVMORE, &more, &more_size);
        } while (more);
    }
    catch (std::exception &error) {
        std::cout << "zmq_rev_multi while receiving frame " << n << ": "
Your Name's avatar
Your Name committed
274
                  << error.what() << std::endl;
Your Name's avatar
Your Name committed
275
276
277
    }
    return n;
}
Your Name's avatar
Your Name committed
278

Your Name's avatar
Your Name committed
279
280
281
282
283
284
285
286
287
288
289
290
291
292
int rtpriority(int n) {
    int sched_policy = SCHED_FIFO;
    struct sched_param sched;

    memset(&sched, 0, sizeof(sched));

    if (n > sched_get_priority_max(sched_policy))
        sched.sched_priority = sched_get_priority_max(sched_policy);
    else
        sched.sched_priority = n;

    return sched_setscheduler(0, sched_policy, &sched);
}

Your Name's avatar
Your Name committed
293
294
int main(void) {

Your Name's avatar
Your Name committed
295
296
    int prio1 = 40;
    int prio2 = 50;
Your Name's avatar
Your Name committed
297

Your Name's avatar
Your Name committed
298
299
300
301
302
303
    rtpriority(prio2);
    //PaError err = Pa_Initialize();
    //if (err != paNoError)
    //    return print_error(err);

    rtpriority(prio1);
Your Name's avatar
Your Name committed
304
305
306
307
    //  Prepare our context and socket
    zmq::context_t context(1);

    zmq::socket_t ssub(context, ZMQ_SUB);
Your Name's avatar
Your Name committed
308
    ssub.connect("tcp://raspi6.local:5557");
Your Name's avatar
Your Name committed
309
    // ssub.connect("tcp://lsldert00.local:5557");
Your Name's avatar
Your Name committed
310
    ssub.setsockopt(ZMQ_SUBSCRIBE, "A", 1);
Your Name's avatar
Your Name committed
311

Your Name's avatar
Your Name committed
312
    rtpriority(prio2);
Your Name's avatar
Your Name committed
313
314
315
    buffer[0] = new zmq_audiobuffer(44100, 5 * 48000, 2);
    buffer[0]->fill();

Your Name's avatar
Your Name committed
316
317
318
319
320
321
322
323
324
325
    int run = 1;
    while (run) {
        const int maxmsg = 3;
        zmq::message_t msg[maxmsg];
        int nmsg = 0;

        nmsg = zmq_recv_multi(ssub, msg, maxmsg);
        zmq::message_t &request = msg[0];
        std::string str((char *)request.data());

Your Name's avatar
Your Name committed
326
327
328
329
330
331
        std::uint32_t *audio_header = nullptr;
        if (nmsg > 2) {
            audio_header = new std::uint32_t[msg[1].size()/sizeof(std::uint32_t)];
            std::memcpy(audio_header, msg[1].data(), msg[1].size());
        }

Your Name's avatar
Your Name committed
332
333
334
335
336
337
338
        if (verbose) {
            std::cout << "Received: '" << str << "' (" << request.size()
                      << " bytes)" << std::endl;
            if (nmsg > 1) {
                for (int i = 1; i < nmsg; i++)
                    std::cout << "  ...one more message, " << msg[i].size()
                              << " bytes" << std::endl;
Your Name's avatar
Your Name committed
339
            }
Your Name's avatar
Your Name committed
340
        }
Your Name's avatar
Your Name committed
341

Your Name's avatar
Your Name committed
342
343
344
345
346
        // parse the command and data in the message
        std::istringstream s(str);
        std::string cmd;
        unsigned int ibuf;
        s >> cmd >> ibuf;
Your Name's avatar
Your Name committed
347

Your Name's avatar
Your Name committed
348
349
        if (verbose)
            std::cout << "cmd='" << cmd << "' ibuf=" << ibuf << std::endl;
Your Name's avatar
Your Name committed
350

Your Name's avatar
Your Name committed
351
352
353
354
        if (ibuf >= nrbuffers) {
            std::cerr << "buffer index out of range" << std::endl;
            continue;
        }
Your Name's avatar
Your Name committed
355

Your Name's avatar
Your Name committed
356
        if (cmd == "AF") { // Fill
Your Name's avatar
Your Name committed
357
358
359
360
361
362
363
364
            delete buffer[ibuf];
            
            if (audio_header) {
               std::uint32_t Fs = audio_header[0];
               std::uint32_t nchan = audio_header[1];
               std::uint32_t nsamp = audio_header[2];
               buffer[ibuf] = new zmq_audiobuffer(Fs, nsamp, nchan);
               buffer[ibuf]->fill(msg[2]);
Your Name's avatar
Your Name committed
365
366
               if (verbose)
                  buffer[ibuf]->dump(50);
Your Name's avatar
Your Name committed
367
368
369
            }
            else
               buffer[ibuf]=nullptr;
Your Name's avatar
Your Name committed
370
371
        }
        else if (cmd == "AP") { // Play
Your Name's avatar
Your Name committed
372
            if (buffer[ibuf]) {
Your Name's avatar
Your Name committed
373
               PaError err=buffer[ibuf]->play();
Your Name's avatar
Your Name committed
374
375
376
377
378
               if (err != paNoError) {
                   print_error(err);
                   return err;
               }
             }
Your Name's avatar
Your Name committed
379
380
        }
        else if (cmd == "AS") { // Stop
Your Name's avatar
Your Name committed
381
            if (buffer[ibuf]) {
Your Name's avatar
Your Name committed
382
               buffer[ibuf]->stop();
Your Name's avatar
Your Name committed
383
384
385
               if (verbose)
                  buffer[ibuf]->dump(4);
            }
Your Name's avatar
Your Name committed
386
        }
Your Name's avatar
Your Name committed
387

Your Name's avatar
Your Name committed
388
389
390
        delete[] audio_header;
        //delete[] audio_data;

Your Name's avatar
Your Name committed
391
392
393
394
395
        //buffer[0] = new zmq_audiobuffer(44100, 5 * 48000, 2);
        //zmq_audiobuffer *data = buffer[0];
        //data->fill();
        //printf("PortAudio Test: output sine wave. SR = %d, BufSize = %d\n",
               //SAMPLE_RATE, data->frames_per_buffer);
Your Name's avatar
Your Name committed
396

Your Name's avatar
Your Name committed
397
398
399
        //err = data->play();
        //if (err != paNoError)
            //return print_error(err);
Your Name's avatar
Your Name committed
400
    }
Your Name's avatar
Your Name committed
401
//    Pa_Terminate();
Your Name's avatar
Your Name committed
402
403
    printf("Test finished.\n");

Your Name's avatar
Your Name committed
404
405
    return 0;
    //return err;
Your Name's avatar
Your Name committed
406
}