PdCom  5.3
Process data communication client
Loading...
Searching...
No Matches
advanced_example.cpp

Base class for receiving notifications.

Base class for receiving notifications.This class is in charge of passing notifications from the library to client code. Make sure that the subscriber outlives all assigned subscriptions. Otherwise use-after-free bugs will occur, so be careful.

The Subscriber class is also used to group subscriptions with the same transmission mode. The newValues method is called everytime, after all active subscriptions got their values updated for one realtime cycle.

/*****************************************************************************
*
* Copyright (C) 2021 Bjarne von Horn (vh at igh dot de).
*
* This file is part of the PdCom library.
*
* The PdCom library is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at your
* option) any later version.
*
* The PdCom library is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
* License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with the PdCom library. If not, see <http://www.gnu.org/licenses/>.
*
*****************************************************************************/
/* Advanced PdCom example.
*
* This is an interactive sample application with two signals and one parameter.
* The parameter can be changed via keyboard. If you press 'q', the application
* exits.
*
* The Process basically has the following states:
* -# waiting for connected
* -# waiting for both subscriptions and the parameter
* -# active
* -# finished
*
* The connected() callback switches from state 1 to 2. asyncData() is then
* called until all subscriptions and the parameter are available. After that,
* keyboard inputs are processed and subscription updates are shown until the
* 'q' key is pressed. Then, everything is torn down.
*/
#include <cassert>
#include <cstdio>
#include <iostream>
#include <pdcom5/Process.h>
#include <termios.h>
#include <unistd.h>
#include <unordered_set>
// RAII helper to configure the shell. Nothing to do with PdCom.
class TerminalGuard
{
termios old_;
public:
TerminalGuard()
{
tcgetattr(STDIN_FILENO, &old_);
termios newt = old_;
newt.c_lflag &= ~(ECHO | ICANON);
tcsetattr(STDIN_FILENO, TCSANOW, &newt);
}
~TerminalGuard() { tcsetattr(STDIN_FILENO, TCSANOW, &old_); }
};
template <class Future>
class FutureManager
{
std::unordered_set<Future> futures_;
public:
FutureManager() = default;
const Future &push(Future future)
{
auto ans = futures_.insert(std::move(future));
return *ans.first;
}
void pop(const Future &future) { futures_.erase(future); }
};
class MyProcess :
{
// Two subscriptions, they will be destructed by ~MyProcess before the
// Subscriber part of MyProcess goes away.
PdCom::Subscription s1_, s2_;
PdCom::Variable p1_;
FutureManager<PdCom::Variable::SetValueFuture> setValue_futures_;
int read(char *buf, int count) override
{
const int ans = posixRead(buf, count);
// stop on EOF
if (ans == 0)
running_ = false;
return ans;
}
void write(const char *buf, size_t count) override
{
posixWriteBuffered(buf, count);
}
void flush() override { posixFlush(); }
public:
MyProcess(const char *host = "127.0.0.1", unsigned short port = 2345) :
PdCom::Process(),
PdCom::PosixProcess(host, port),
PdCom::Subscriber(
PdCom::event_mode) //{std::chrono::milliseconds(100)})
{}
void execute();
void connected() override;
void findReply(const PdCom::Variable &var) override
{
// the parameter has arrived
p1_ = var;
assert(!p1_.empty());
std::cout << "Found parameter!" << std::endl;
}
void stateChanged(PdCom::Subscription const &s) override
{
// the state of one of our subscriptions has changed.
using State = PdCom::Subscription::State;
if (s.getState() == State::Active) {
if (&s == &s1_)
s1_active_ = true;
if (&s == &s2_)
s2_active_ = true;
}
else if (s.getState() == State::Invalid) {
std::cout << "Invalid subscription!" << std::endl;
running_ = false;
}
}
void newValues(std::chrono::nanoseconds /* time_ns */) override
{
std::cout << "New Data: ";
s1_.print(std::cout, ',');
std::cout << " and ";
s2_.print(std::cout, ',');
std::cout << "\n";
}
bool s1_active_ = false, s2_active_ = false;
bool running_ = true;
bool connected_ = false;
};
void MyProcess::connected()
{
// connection is established, start subscriptions and query the server for
// the parameter.
std::cout << "Connected!" << std::endl;
s1_ = PdCom::Subscription(*this, *this, "/osc/cos");
s2_ = PdCom::Subscription(*this, *this, "/osc/sin");
find("/parameter01");
connected_ = true;
}
void MyProcess::execute()
{
fd_set fds;
const int max_fd = std::max<int>(fd_, STDIN_FILENO);
TerminalGuard tg;
// wait until everything is set up
while (running_ and !(s1_active_ and s2_active_ and !p1_.empty()))
asyncData();
// now we're ready
std::cout << "Ready to rumble!" << std::endl;
while (running_) {
FD_ZERO(&fds);
FD_SET(fd_, &fds);
FD_SET(STDIN_FILENO, &fds);
select(max_fd + 1, &fds, NULL, NULL, NULL);
// process input
if (FD_ISSET(STDIN_FILENO, &fds) and !p1_.empty()) {
char buf;
std::cin.read(&buf, 1);
if (buf == 'q') {
break;
}
const auto &future = setValue_futures_.push(
p1_.setValue(static_cast<unsigned char>(buf)));
future.then([&future, buf, this]() {
std::cout << "Changed Parameter to " << buf << std::endl;
this->setValue_futures_.pop(future);
});
future.handle_exception([&future,
this](PdCom::Exception const &ex) {
std::cout << "Future got exception " << ex.what() << std::endl;
this->running_ = false;
this->setValue_futures_.pop(future);
});
}
// ask PdCom to process incoming data
if (FD_ISSET(fd_, &fds)) {
asyncData();
}
}
}
int main(int argc, char **argv)
{
// usage: example2 <host> <port>
MyProcess p(
argc >= 2 ? argv[1] : "127.0.0.1",
argc >= 3 ? strtoul(argv[2], nullptr, 10) : 2345);
p.execute();
}
Wrapper around POSIX socket.
Definition PosixProcess.h:44
void posixFlush()
Flush internal buffer to socket.
void posixWriteBuffered(const char *buf, size_t count)
Buffered Wrapper for write().
int posixRead(char *buf, int count)
Wrapper for read().
Base class for PdCom protocol handler.
Definition Process.h:87
virtual void write(const char *buf, size_t count)=0
Write data to server.
virtual int read(char *buf, int count)=0
Read data from server.
virtual void connected()=0
Protocol initialization completed.
virtual void findReply(const Variable &variable)
Reply to find()
virtual void flush()=0
Flush unsent data in output buffer.
Definition Subscriber.h:107
PdCom Subscription interface.
Definition Subscription.h:65
void print(std::ostream &os, char delimiter) const
Print the value(s).
State getState() const noexcept
Get the current state.
Definition Subscription.h:140
std::enable_if<!std::is_arithmetic< T >::value, SetValueFuture >::type setValue(T const &data, const Selector &selector={nullptr}) const
Write to a variable.
Definition Variable.h:104
bool empty() const noexcept
Checks whether this instance is empty.
Definition Variable.h:245
Definition Exception.h:34