Commit 81bc37d2 authored by Your Name's avatar Your Name
Browse files

add capturing to lsldert_proxy

parent cb247ef4
......@@ -2,7 +2,7 @@
LSL_BUILD_DIR=labstreaminglayer/build/install
LSL_INSTALL_DIR=/usr/local
SUBDIRS=ledbox lslder lsldert zmqmqbroker lslhidev
SUBDIRS=ledbox lslder lsldert zmqmqbroker lslhider
INSTALL=install -v
......
#!/bin/bash
echo "running rc.raspi6"
run lsldert_proxy
run lsldert_proxy
No preview for this file type
......@@ -8,13 +8,12 @@
// makes a difference at which point in the code you set
// the real time scheduling options
#include <pthread.h>
#include <zmq.hpp>
#include <lsl_cpp.h>
#include <boost/program_options.hpp>
#include <iostream>
#include <lsl_cpp.h>
#include <mutex>
#include <boost/program_options.hpp>
#include <pthread.h>
#include <zmq.hpp>
using namespace std;
using namespace lsl;
......@@ -28,195 +27,155 @@ stream_outlet *outlet;
string info_name, info_type, info_sourceid;
std::mutex lsl_mutex;
void parse_options(int argc, const char* argv[])
{
void parse_options(int argc, const char *argv[]) {
namespace po = boost::program_options;
// Declare the supported options.
po::options_description desc("Usage:");
desc.add_options()
("help,h", "show this message")
("lsl-name", po::value<string>(), "set the name of the LSL stream")
("lsl-type", po::value<string>(), "set the type of the LSL stream")
("lsl-sourceid", po::value<string>(), "set the sourceid of the LSL stream")
("verbose,v", "enable verbose output")
;
desc.add_options()("help,h", "show this message")(
"lsl-name", po::value<string>(), "set the name of the LSL stream")(
"lsl-type", po::value<string>(), "set the type of the LSL stream")(
"lsl-sourceid", po::value<string>(),
"set the sourceid of the LSL stream")("verbose,v",
"enable verbose output");
po::variables_map vm;
po::store(po::parse_command_line(argc, argv, desc), vm);
po::notify(vm);
if (vm.count("help")) {
cout << desc << "\n";
exit(0);
cout << desc << "\n";
exit(0);
}
if (vm.count("verbose")) {
verbose = 1;
verbose = 1;
}
if (vm.count("lsl-name"))
info_name = vm["lsl-name"].as<string>();
info_name = vm["lsl-name"].as<string>();
else
info_name = "Raspberry Pi Digital Triggers";
info_name = "Raspberry Pi Digital Triggers";
if (verbose)
cout << "LSL name is: " << info_name << endl;
cout << "LSL name is: " << info_name << endl;
if (vm.count("lsl-type"))
info_type = vm["lsl-type"].as<string>();
info_type = vm["lsl-type"].as<string>();
else {
char hostname[255];
gethostname(hostname,255);
ostringstream s;
s << "Digital Triggers @ " << hostname;
info_type = s.str();
char hostname[255];
gethostname(hostname, 255);
ostringstream s;
s << "Digital Triggers @ " << hostname;
info_type = s.str();
}
if (verbose)
cout << "LSL type is: " << info_type << endl;
cout << "LSL type is: " << info_type << endl;
if (vm.count("lsl-sourceid"))
info_sourceid = vm["lsl-sourceid"].as<string>();
info_sourceid = vm["lsl-sourceid"].as<string>();
else {
char hostname[255];
gethostname(hostname,255);
ostringstream s;
s << argv[0] << "@" << hostname;
info_sourceid = s.str();
char hostname[255];
gethostname(hostname, 255);
ostringstream s;
s << argv[0] << "@" << hostname;
info_sourceid = s.str();
}
if (verbose)
cout << "LSL sourceid is: " << info_sourceid << endl;
cout << "LSL sourceid is: " << info_sourceid << endl;
}
int rtpriority(int n)
{
struct sched_param sched ;
int rtpriority(int n) {
struct sched_param sched;
memset (&sched, 0, sizeof(sched)) ;
memset(&sched, 0, sizeof(sched));
if (n > sched_get_priority_max (sched_policy))
sched.sched_priority = sched_get_priority_max (sched_policy) ;
else
sched.sched_priority = n ;
if (n > sched_get_priority_max(sched_policy))
sched.sched_priority = sched_get_priority_max(sched_policy);
else
sched.sched_priority = n;
return sched_setscheduler (0, sched_policy, &sched) ;
return sched_setscheduler(0, sched_policy, &sched);
}
void *listener_routine (void *arg)
{
zmq::context_t *context = (zmq::context_t*)arg;
void *capture_routine(void *arg) {
zmq::context_t *context = (zmq::context_t *)arg;
zmq::socket_t socket(*context, ZMQ_SUB);
socket.connect("inproc://listeners");
socket.connect("inproc://capture");
socket.setsockopt(ZMQ_SUBSCRIBE, "", 0);
while (1) {
int part = 0;
int more = 1;
do {
zmq::message_t message;
size_t more_size = sizeof (more);
size_t more_size = sizeof(more);
// Process all parts of the message
socket.recv(&message);
int msgsize = message.size();
part++;
socket.getsockopt( ZMQ_RCVMORE, &more, &more_size);
if (part==1) {
std::string str((char*)message.data(), msgsize);
socket.getsockopt(ZMQ_RCVMORE, &more, &more_size);
if (part == 1) {
std::string str((char *)message.data(), msgsize);
outlet->push_sample(&str);
if (verbose) {
cout << "size=" << msgsize << endl;
cout << "Received: " << str << endl;
cout << "size=" << msgsize << endl;
cout << "Received: " << str << endl;
}
}
else {
if (verbose) {
std::string str((char*)message.data(), msgsize);
cout << "size=" << msgsize << endl;
cout << "Received another message part..." << str << endl;
}
if (verbose) {
std::string str((char *)message.data(), msgsize);
cout << "size=" << msgsize << endl;
cout << "Received another message part..." << str << endl;
}
}
} while (more);
}
return (NULL);
}
int main (int argc, const char *argv[])
{
int main(int argc, const char *argv[]) {
parse_options(argc, argv);
if (rtpriority(sched_priority) < 0) {
perror("rtpriority");
return 1;
perror("rtpriority");
return 1;
}
// make a new stream_info and open an outlet with it
stream_info info(
info_name.c_str(),
info_type.c_str(),
1,
lsl::IRREGULAR_RATE,
lsl::cf_string,
info_sourceid.c_str()
);
stream_info info(info_name.c_str(), info_type.c_str(), 1,
lsl::IRREGULAR_RATE, lsl::cf_string,
info_sourceid.c_str());
outlet = new stream_outlet(info);
zmq::context_t context(1);
// set priority
// GW: check threads with 'ps -mLca'
//void * c = (void *) context;
//zmq_ctx_set(c,ZMQ_THREAD_SCHED_POLICY,sched_policy);
//zmq_ctx_set(c,ZMQ_THREAD_PRIORITY,sched_priority);
// void * c = (void *) context;
// zmq_ctx_set(c,ZMQ_THREAD_SCHED_POLICY,sched_policy);
// zmq_ctx_set(c,ZMQ_THREAD_PRIORITY,sched_priority);
// Socket facing clients
zmq::socket_t frontend (context, ZMQ_XSUB);
zmq::socket_t frontend(context, ZMQ_XSUB);
frontend.bind("tcp://*:5556");
// Socket facing services
zmq::socket_t backend (context, ZMQ_XPUB);
zmq::socket_t backend(context, ZMQ_XPUB);
backend.bind("tcp://*:5557");
pthread_t listener;
pthread_create (&listener, nullptr, listener_routine, (void*)&context);
zmq::proxy(static_cast<void*>(frontend), static_cast<void*>(backend), nullptr);
return 0;
// not needed for XPUB/XSUB? frontend.setsockopt(ZMQ_SUBSCRIBE, "", 0);
// Socket capturing messages
zmq::socket_t capture(context, ZMQ_PUB);
capture.bind("inproc://capture");
pthread_t capture_thread;
pthread_create(&capture_thread, nullptr, capture_routine, (void *)&context);
// Shunt messages out to our own subscribers
while (1) {
int part = 0;
int more = 1;
do {
zmq::message_t message;
size_t more_size = sizeof (more);
// Process all parts of the message
frontend.recv(&message);
int msgsize = message.size();
part++;
frontend.getsockopt( ZMQ_RCVMORE, &more, &more_size);
backend.send(message, more? ZMQ_SNDMORE: 0);
if (part==1) {
std::string str((char*)message.data(), msgsize);
outlet->push_sample(&str);
if (verbose) {
cout << "size=" << msgsize << endl;
cout << "Received: " << str << endl;
}
}
else {
if (verbose) {
std::string str((char*)message.data(), msgsize);
cout << "size=" << msgsize << endl;
cout << "Received another message part..." << str << endl;
}
}
} while (more);
}
zmq::proxy(static_cast<void *>(frontend), static_cast<void *>(backend),
static_cast<void *>(capture));
return 0;
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment