Commit 0409487f authored by Your Name's avatar Your Name
Browse files

add msgqueue_pubsub.cc

parent 12c39498
No preview for this file type
......@@ -6,12 +6,98 @@
#include "zhelpers.hpp"
#include <lsl_cpp.h>
#include <iostream>
#include <mutex>
#include <boost/program_options.hpp>
using namespace std;
using namespace lsl;
using namespace boost;
static int verbose = 0;
stream_outlet *outlet;
string info_name, info_type, info_sourceid;
std::mutex lsl_mutex;
void parse_options(int argc, const char* argv[])
{
namespace po = boost::program_options;
// Declare the supported options.
po::options_description desc("Usage:");
desc.add_options()
("help,h", "show this message")
("lsl-name", po::value<string>(), "set the name of the LSL stream")
("lsl-type", po::value<string>(), "set the type of the LSL stream")
("lsl-sourceid", po::value<string>(), "set the sourceid of the LSL stream")
("verbose,v", "enable verbose output")
;
po::variables_map vm;
po::store(po::parse_command_line(argc, argv, desc), vm);
po::notify(vm);
if (vm.count("help")) {
cout << desc << "\n";
exit(0);
}
if (vm.count("verbose")) {
verbose = 1;
}
if (vm.count("lsl-name"))
info_name = vm["lsl-name"].as<string>();
else
info_name = "Raspberry Pi Digital Triggers";
if (verbose)
cout << "LSL name is: " << info_name << endl;
int main (int argc, char *argv[])
if (vm.count("lsl-type"))
info_type = vm["lsl-type"].as<string>();
else {
char hostname[255];
gethostname(hostname,255);
ostringstream s;
s << "Digital Triggers @ " << hostname;
info_type = s.str();
}
if (verbose)
cout << "LSL type is: " << info_type << endl;
if (vm.count("lsl-sourceid"))
info_sourceid = vm["lsl-sourceid"].as<string>();
else {
char hostname[255];
gethostname(hostname,255);
ostringstream s;
s << argv[0] << "@" << hostname;
info_sourceid = s.str();
}
if (verbose)
cout << "LSL sourceid is: " << info_sourceid << endl;
}
int main (int argc, const char *argv[])
{
parse_options(argc, argv);
// make a new stream_info and open an outlet with it
stream_info info(
info_name.c_str(),
info_type.c_str(),
1,
lsl::IRREGULAR_RATE,
lsl::cf_string,
info_sourceid.c_str()
);
outlet = new stream_outlet(info);
zmq::context_t context(1);
// Socket facing clients
......@@ -22,7 +108,6 @@ int main (int argc, char *argv[])
zmq::socket_t backend (context, ZMQ_PUB);
backend.bind("tcp://*:5557");
#if 1
frontend.setsockopt(ZMQ_SUBSCRIBE, "", 0);
// Shunt messages out to our own subscribers
......@@ -37,18 +122,14 @@ int main (int argc, char *argv[])
frontend.getsockopt( ZMQ_RCVMORE, &more, &more_size);
backend.send(message, more? ZMQ_SNDMORE: 0);
if (!more) {
cout << (char*)message.data() << endl;
std::string str((char*)message.data());
outlet->push_sample(&str);
if (verbose)
cout << "Received: " << str << endl;
break; // Last message part
}
}
}
return 0;
#else
// Start the proxy
zmq::proxy(static_cast<void*>(frontend),
static_cast<void*>(backend),
nullptr);
return 0;
#endif
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment