Commit e9b0bbb6 authored by Günter Windau's avatar Günter Windau
Browse files

added msgqueue_xpubxsub.cc

parent e973a4a5
......@@ -10,7 +10,7 @@ LDLIBS+=-lzmq -lboost_program_options -lpigpio -llsl32
#INCLUDE+=RS-232
CXXFLAGS+=-Wall -std=c++11
all: zmq_trigger playtone msgqueue_pubsub zmq_trigger_subscriber playsound lsldertc hwserver hwserver2 hwserver3
all: zmq_trigger playtone msgqueue_pubsub msgqueue_xpubxsub zmq_trigger_subscriber playsound lsldertc hwserver hwserver2 hwserver3
develop: zmqaudio
lsldertc: lsldertc.c
......@@ -23,6 +23,7 @@ pcm_min: pcm_min.c
playtone: playtone.cc
zmq_trigger: zmq_trigger.cc
msgqueue_pubsub: msgqueue_pubsub.cc
msgqueue_xpubxsub: msgqueue_xpubxsub.cc
zmq_trigger_subscriber: zmq_trigger_subscriber.cc
zmqaudio: zmqaudio.cc zmqaudio.h
paex_sine: paex_sine.c
......
//
// Simple pub-sub broker in C++
// Same as request-reply broker but using QUEUE device
//
// Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com>
// notice, ZMQ as well as LSL spawns its own threads. So it
// makes a difference at which point in the code you set
// the real time scheduling options
#include <zmq.hpp>
#include <lsl_cpp.h>
#include <iostream>
#include <mutex>
#include <boost/program_options.hpp>
using namespace std;
using namespace lsl;
using namespace boost;
const int sched_policy = SCHED_FIFO;
const int sched_priority = 99;
static int verbose = 0;
stream_outlet *outlet;
string info_name, info_type, info_sourceid;
std::mutex lsl_mutex;
void parse_options(int argc, const char* argv[])
{
namespace po = boost::program_options;
// Declare the supported options.
po::options_description desc("Usage:");
desc.add_options()
("help,h", "show this message")
("lsl-name", po::value<string>(), "set the name of the LSL stream")
("lsl-type", po::value<string>(), "set the type of the LSL stream")
("lsl-sourceid", po::value<string>(), "set the sourceid of the LSL stream")
("verbose,v", "enable verbose output")
;
po::variables_map vm;
po::store(po::parse_command_line(argc, argv, desc), vm);
po::notify(vm);
if (vm.count("help")) {
cout << desc << "\n";
exit(0);
}
if (vm.count("verbose")) {
verbose = 1;
}
if (vm.count("lsl-name"))
info_name = vm["lsl-name"].as<string>();
else
info_name = "Raspberry Pi Digital Triggers";
if (verbose)
cout << "LSL name is: " << info_name << endl;
if (vm.count("lsl-type"))
info_type = vm["lsl-type"].as<string>();
else {
char hostname[255];
gethostname(hostname,255);
ostringstream s;
s << "Digital Triggers @ " << hostname;
info_type = s.str();
}
if (verbose)
cout << "LSL type is: " << info_type << endl;
if (vm.count("lsl-sourceid"))
info_sourceid = vm["lsl-sourceid"].as<string>();
else {
char hostname[255];
gethostname(hostname,255);
ostringstream s;
s << argv[0] << "@" << hostname;
info_sourceid = s.str();
}
if (verbose)
cout << "LSL sourceid is: " << info_sourceid << endl;
}
int rtpriority(int n)
{
struct sched_param sched ;
memset (&sched, 0, sizeof(sched)) ;
if (n > sched_get_priority_max (sched_policy))
sched.sched_priority = sched_get_priority_max (sched_policy) ;
else
sched.sched_priority = n ;
return sched_setscheduler (0, sched_policy, &sched) ;
}
int main (int argc, const char *argv[])
{
parse_options(argc, argv);
if (rtpriority(sched_priority) < 0) {
perror("rtpriority");
return 1;
}
// make a new stream_info and open an outlet with it
stream_info info(
info_name.c_str(),
info_type.c_str(),
1,
lsl::IRREGULAR_RATE,
lsl::cf_string,
info_sourceid.c_str()
);
outlet = new stream_outlet(info);
zmq::context_t context(1);
// set priority
// GW: check threads with 'ps -mLca'
//void * c = (void *) context;
//zmq_ctx_set(c,ZMQ_THREAD_SCHED_POLICY,sched_policy);
//zmq_ctx_set(c,ZMQ_THREAD_PRIORITY,sched_priority);
//if (rtpriority(sched_priority) < 0) {
//perror("rtpriority");
//return 1;
//}
// Socket facing clients
zmq::socket_t frontend (context, ZMQ_XSUB);
frontend.bind("tcp://*:5556");
// Socket facing services
zmq::socket_t backend (context, ZMQ_XPUB);
backend.bind("tcp://*:5557");
// not needed for XPUB/XSUB? frontend.setsockopt(ZMQ_SUBSCRIBE, "", 0);
// Shunt messages out to our own subscribers
while (1) {
int part = 0;
int more = 1;
do {
zmq::message_t message;
size_t more_size = sizeof (more);
// Process all parts of the message
frontend.recv(&message);
int msgsize = message.size();
part++;
frontend.getsockopt( ZMQ_RCVMORE, &more, &more_size);
backend.send(message, more? ZMQ_SNDMORE: 0);
if (part==1) {
std::string str((char*)message.data(), msgsize);
outlet->push_sample(&str);
if (verbose) {
cout << "size=" << msgsize << endl;
cout << "Received: " << str << endl;
}
}
else {
if (verbose) {
std::string str((char*)message.data(), msgsize);
cout << "size=" << msgsize << endl;
cout << "Received another message part..." << str << endl;
}
}
} while (more);
}
return 0;
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment