Base class for receiving notifications.
Base class for receiving notifications.This class is in charge of passing notifications from the library to client code. Make sure that the subscriber outlives all assigned subscriptions. Otherwise use-after-free bugs will occur, so be careful.
The Subscriber class is also used to group subscriptions with the same transmission mode. The newValues method is called everytime, after all active subscriptions got their values updated for one realtime cycle.
#include <cassert>
#include <cstdio>
#include <iostream>
#include <termios.h>
#include <unistd.h>
#include <unordered_set>
class TerminalGuard
{
termios old_;
public:
TerminalGuard()
{
tcgetattr(STDIN_FILENO, &old_);
termios newt = old_;
newt.c_lflag &= ~(ECHO | ICANON);
tcsetattr(STDIN_FILENO, TCSANOW, &newt);
}
~TerminalGuard() { tcsetattr(STDIN_FILENO, TCSANOW, &old_); }
};
template <class Future>
class FutureManager
{
std::unordered_set<Future> futures_;
public:
FutureManager() = default;
const Future &push(Future future)
{
auto ans = futures_.insert(std::move(future));
return *ans.first;
}
void pop(const Future &future) { futures_.erase(future); }
};
class MyProcess :
{
PdCom::Subscription s1_, s2_;
PdCom::Variable p1_;
FutureManager<PdCom::Variable::SetValueFuture> setValue_futures_;
int read(
char *buf,
int count)
override
{
if (ans == 0)
running_ = false;
return ans;
}
void write(
const char *buf,
size_t count)
override
{
}
public:
MyProcess(const char *host = "127.0.0.1", unsigned short port = 2345) :
PdCom::Process(),
PdCom::PosixProcess(host, port),
PdCom::Subscriber(
PdCom::event_mode)
{}
void execute();
void findReply(
const PdCom::Variable &var)
override
{
p1_ = var;
std::cout << "Found parameter!" << std::endl;
}
void stateChanged(PdCom::Subscription const &s) override
{
using State = PdCom::Subscription::State;
if (&s == &s1_)
s1_active_ = true;
if (&s == &s2_)
s2_active_ = true;
}
else if (s.
getState() == State::Invalid) {
std::cout << "Invalid subscription!" << std::endl;
running_ = false;
}
}
void newValues(std::chrono::nanoseconds ) override
{
std::cout << "New Data: ";
s1_.
print(std::cout,
',');
std::cout << " and ";
s2_.
print(std::cout,
',');
std::cout << "\n";
}
bool s1_active_ = false, s2_active_ = false;
bool running_ = true;
bool connected_ = false;
};
void MyProcess::connected()
{
std::cout << "Connected!" << std::endl;
find("/parameter01");
connected_ = true;
}
void MyProcess::execute()
{
fd_set fds;
const int max_fd = std::max<int>(fd_, STDIN_FILENO);
TerminalGuard tg;
while (running_ and !(s1_active_ and s2_active_ and !p1_.
empty()))
asyncData();
std::cout << "Ready to rumble!" << std::endl;
while (running_) {
FD_ZERO(&fds);
FD_SET(fd_, &fds);
FD_SET(STDIN_FILENO, &fds);
select(max_fd + 1, &fds, NULL, NULL, NULL);
if (FD_ISSET(STDIN_FILENO, &fds) and !p1_.
empty()) {
char buf;
std::cin.read(&buf, 1);
if (buf == 'q') {
break;
}
const auto &future = setValue_futures_.push(
p1_.
setValue(
static_cast<unsigned char>(buf)));
future.then([&future, buf, this]() {
std::cout << "Changed Parameter to " << buf << std::endl;
this->setValue_futures_.pop(future);
});
future.handle_exception([&future,
std::cout << "Future got exception " << ex.what() << std::endl;
this->running_ = false;
this->setValue_futures_.pop(future);
});
}
if (FD_ISSET(fd_, &fds)) {
asyncData();
}
}
}
int main(int argc, char **argv)
{
MyProcess p(
argc >= 2 ? argv[1] : "127.0.0.1",
argc >= 3 ? strtoul(argv[2], nullptr, 10) : 2345);
p.execute();
}
Wrapper around POSIX socket.
Definition PosixProcess.h:44
void posixFlush()
Flush internal buffer to socket.
void posixWriteBuffered(const char *buf, size_t count)
Buffered Wrapper for write().
int posixRead(char *buf, int count)
Wrapper for read().
Base class for PdCom protocol handler.
Definition Process.h:87
virtual void write(const char *buf, size_t count)=0
Write data to server.
virtual int read(char *buf, int count)=0
Read data from server.
virtual void connected()=0
Protocol initialization completed.
virtual void findReply(const Variable &variable)
Reply to find()
virtual void flush()=0
Flush unsent data in output buffer.
Definition Subscriber.h:107
PdCom Subscription interface.
Definition Subscription.h:65
void print(std::ostream &os, char delimiter) const
Print the value(s).
State getState() const noexcept
Get the current state.
Definition Subscription.h:140
std::enable_if<!std::is_arithmetic< T >::value, SetValueFuture >::type setValue(T const &data, const Selector &selector={nullptr}) const
Write to a variable.
Definition Variable.h:104
bool empty() const noexcept
Checks whether this instance is empty.
Definition Variable.h:245
Definition Exception.h:34