Base class for receiving notifications.
Base class for receiving notifications.This class is in charge of passing notifications from the library to client code. Make sure that the subscriber outlives all assigned subscriptions. Otherwise use-after-free bugs will occur, so be careful.
The Subscriber class is also used to group subscriptions with the same transmission mode. The newValues method is called everytime, after all active subscriptions got their values updated for one realtime cycle.
#include <cassert>
#include <cstdio>
#include <iostream>
#include <termios.h>
#include <unistd.h>
#include <unordered_set>
class TerminalGuard
{
termios old_;
public:
TerminalGuard()
{
tcgetattr(STDIN_FILENO, &old_);
termios newt = old_;
newt.c_lflag &= ~(ECHO | ICANON);
tcsetattr(STDIN_FILENO, TCSANOW, &newt);
}
~TerminalGuard() { tcsetattr(STDIN_FILENO, TCSANOW, &old_); }
};
template <class Future>
class FutureManager
{
std::unordered_set<Future> futures_;
public:
FutureManager() = default;
const Future &push(Future future)
{
auto ans = futures_.insert(std::move(future));
return *ans.first;
}
void pop(const Future &future) { futures_.erase(future); }
};
class MyProcess :
{
FutureManager<PdCom::Variable::SetValueFuture> setValue_futures_;
int read(
char *buf,
int count)
override
{
if (ans == 0)
running_ = false;
return ans;
}
void write(
const char *buf,
size_t count)
override
{
}
public:
MyProcess(const char *host = "127.0.0.1", unsigned short port = 2345) :
PdCom::Process(),
PdCom::PosixProcess(host, port),
PdCom::Subscriber(
PdCom::event_mode)
{}
void execute();
{
p1_ = var;
std::cout << "Found parameter!" << std::endl;
}
{
using State = PdCom::Subscription::State;
if (&s == &s1_)
s1_active_ = true;
if (&s == &s2_)
s2_active_ = true;
}
else if (s.
getState() == State::Invalid) {
std::cout << "Invalid subscription!" << std::endl;
running_ = false;
}
}
void newValues(std::chrono::nanoseconds ) override
{
std::cout << "New Data: ";
s1_.
print(std::cout,
',');
std::cout << " and ";
s2_.
print(std::cout,
',');
std::cout << "\n";
}
bool s1_active_ = false, s2_active_ = false;
bool running_ = true;
bool connected_ = false;
};
void MyProcess::connected()
{
std::cout << "Connected!" << std::endl;
find("/parameter01");
connected_ = true;
}
void MyProcess::execute()
{
fd_set fds;
const int max_fd = std::max<int>(fd_, STDIN_FILENO);
TerminalGuard tg;
while (running_ and !(s1_active_ and s2_active_ and !p1_.
empty()))
asyncData();
std::cout << "Ready to rumble!" << std::endl;
while (running_) {
FD_ZERO(&fds);
FD_SET(fd_, &fds);
FD_SET(STDIN_FILENO, &fds);
select(max_fd + 1, &fds, NULL, NULL, NULL);
if (FD_ISSET(STDIN_FILENO, &fds) and !p1_.
empty()) {
char buf;
std::cin.read(&buf, 1);
if (buf == 'q') {
break;
}
const auto &future = setValue_futures_.push(
p1_.
setValue(
static_cast<unsigned char>(buf)));
future.then([&future, buf, this]() {
std::cout << "Changed Parameter to " << buf << std::endl;
this->setValue_futures_.pop(future);
});
future.handle_exception([&future,
std::cout << "Future got exception " << ex.what() << std::endl;
this->running_ = false;
this->setValue_futures_.pop(future);
});
}
if (FD_ISSET(fd_, &fds)) {
asyncData();
}
}
}
int main(int argc, char **argv)
{
MyProcess p(
argc >= 2 ? argv[1] : "127.0.0.1",
argc >= 3 ? strtoul(argv[2], nullptr, 10) : 2345);
p.execute();
}
Wrapper around POSIX socket.
Definition: PosixProcess.h:44
void posixFlush()
Flush internal buffer to socket.
void posixWriteBuffered(const char *buf, size_t count)
Buffered Wrapper for write().
int posixRead(char *buf, int count)
Wrapper for read().
Base class for PdCom protocol handler.
Definition: Process.h:87
virtual void write(const char *buf, size_t count)=0
Write data to server.
virtual int read(char *buf, int count)=0
Read data from server.
virtual void connected()=0
Protocol initialization completed.
virtual void findReply(const Variable &variable)
Reply to find()
virtual void flush()=0
Flush unsent data in output buffer.
Definition: Subscriber.h:107
PdCom Subscription interface.
Definition: Subscription.h:65
void print(std::ostream &os, char delimiter) const
Print the value(s).
State getState() const noexcept
Get the current state.
Definition: Subscription.h:140
PdCom Variable interface.
Definition: Variable.h:68
std::enable_if<!std::is_arithmetic< T >::value, SetValueFuture >::type setValue(T const &data, const Selector &selector={nullptr}) const
Write to a variable.
Definition: Variable.h:104
bool empty() const noexcept
Checks whether this instance is empty.
Definition: Variable.h:245
Definition: Exception.h:34