Commit 119b9d6a authored by Your Name's avatar Your Name
Browse files

added lsldert_proxy.cc

parent e9b0bbb6
......@@ -10,7 +10,7 @@ LDLIBS+=-lzmq -lboost_program_options -lpigpio -llsl32
#INCLUDE+=RS-232
CXXFLAGS+=-Wall -std=c++11
all: zmq_trigger playtone msgqueue_pubsub msgqueue_xpubxsub zmq_trigger_subscriber playsound lsldertc hwserver hwserver2 hwserver3
all: zmq_trigger playtone msgqueue_pubsub lsldert_proxy zmq_trigger_subscriber playsound lsldertc hwserver hwserver2 hwserver3
develop: zmqaudio
lsldertc: lsldertc.c
......@@ -23,7 +23,8 @@ pcm_min: pcm_min.c
playtone: playtone.cc
zmq_trigger: zmq_trigger.cc
msgqueue_pubsub: msgqueue_pubsub.cc
msgqueue_xpubxsub: msgqueue_xpubxsub.cc
lsldert_proxy: lsldert_proxy.cc
lsldert_proxy: LDLIBS+=-lpthread
zmq_trigger_subscriber: zmq_trigger_subscriber.cc
zmqaudio: zmqaudio.cc zmqaudio.h
paex_sine: paex_sine.c
......
......@@ -8,6 +8,7 @@
// makes a difference at which point in the code you set
// the real time scheduling options
#include <pthread.h>
#include <zmq.hpp>
#include <lsl_cpp.h>
#include <iostream>
......@@ -102,6 +103,44 @@ int rtpriority(int n)
return sched_setscheduler (0, sched_policy, &sched) ;
}
void *listener_routine (void *arg)
{
zmq::context_t *context = (zmq::context_t*)arg;
zmq::socket_t socket(*context, ZMQ_SUB);
socket.connect("inproc://listeners");
while (1) {
int part = 0;
int more = 1;
do {
zmq::message_t message;
size_t more_size = sizeof (more);
// Process all parts of the message
socket.recv(&message);
int msgsize = message.size();
part++;
socket.getsockopt( ZMQ_RCVMORE, &more, &more_size);
if (part==1) {
std::string str((char*)message.data(), msgsize);
if (verbose) {
cout << "size=" << msgsize << endl;
cout << "Received: " << str << endl;
}
}
else {
if (verbose) {
std::string str((char*)message.data(), msgsize);
cout << "size=" << msgsize << endl;
cout << "Received another message part..." << str << endl;
}
}
} while (more);
}
return (NULL);
}
int main (int argc, const char *argv[])
{
parse_options(argc, argv);
......@@ -131,11 +170,6 @@ int main (int argc, const char *argv[])
//zmq_ctx_set(c,ZMQ_THREAD_SCHED_POLICY,sched_policy);
//zmq_ctx_set(c,ZMQ_THREAD_PRIORITY,sched_priority);
//if (rtpriority(sched_priority) < 0) {
//perror("rtpriority");
//return 1;
//}
// Socket facing clients
zmq::socket_t frontend (context, ZMQ_XSUB);
frontend.bind("tcp://*:5556");
......@@ -144,8 +178,15 @@ int main (int argc, const char *argv[])
zmq::socket_t backend (context, ZMQ_XPUB);
backend.bind("tcp://*:5557");
pthread_t listener;
pthread_create (&listener, nullptr, listener_routine, (void*)&context);
zmq::proxy(static_cast<void*>(frontend), static_cast<void*>(backend), nullptr);
return 0;
// not needed for XPUB/XSUB? frontend.setsockopt(ZMQ_SUBSCRIBE, "", 0);
// Shunt messages out to our own subscribers
while (1) {
int part = 0;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment