13#include <sys/socket.h>
14#include <netinet/in.h>
16#include "LibCyberRadio/NDR651/PacketTypes.h"
17#include "LibCyberRadio/NDR651/StatusReceiver.h"
39 _freeSpaceMax(MAX_RADIO_BUFFSIZE - RADIO_BUFFER_RESERVE),
43 bzero(&_rxbuff, MAX_RX_SIZE);
50 this->
debug(
"Interrupting\n");
56 this->
debug(
"Closing socket\n");
60 this->
debug(
"Goodbye!\n");
63 bool StatusReceiver::_makeSocket(
void) {
65 struct sockaddr_in serveraddr;
66 char ip_addr_string[INET_ADDRSTRLEN];
74 if ((_sockfd<0)&&(_port>0)) {
75 _sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
77 std::cerr <<
"Error opening socket" << std::endl;
82 setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR,
83 (
const void *)&optval ,
sizeof(
int));
92 memset((
char *) &serveraddr, 0,
sizeof(serveraddr));
93 serveraddr.sin_family = AF_INET;
94 serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
95 inet_ntop(AF_INET, &(serveraddr.sin_addr), ip_addr_string, INET_ADDRSTRLEN);
96 serveraddr.sin_port = htons((
unsigned short)_port);
100 if (bind(_sockfd, (
struct sockaddr *) &serveraddr,
sizeof(serveraddr)) < 0) {
101 std::cerr <<
"ERROR on binding socket" << std::endl;
105 std::cerr <<
"Status socket bound" << std::endl;
108 FD_SET(_sockfd, &set);
119 if (makeSocketFlag) {
120 return _makeSocket();
133 if (makeSocketFlag) {
134 return _makeSocket();
140 bool StatusReceiver::setUpdatePE(
bool updatePE) {
141 boost::mutex::scoped_lock lock(this->objectAccessMutex);
142 std::cerr << std::endl << std::endl <<
"setUpdatePE(" << updatePE <<
")" << std::endl << std::endl;
143 _updatePE = updatePE;
144 return _updatePE==updatePE;
147 int StatusReceiver::setMaxFreeSpace(
float fs,
float maxLatency) {
148 boost::mutex::scoped_lock lock(this->objectAccessMutex);
149 int maxSamplesLatency = (int)std::floor(maxLatency*fs);
151 _freeSpaceMax = std::min( MAX_RADIO_BUFFSIZE-RADIO_BUFFER_RESERVE, maxSamplesLatency );
152 return _freeSpaceMax;
156 std::cout <<
"StatusReceiver::run() " << _651freeSpace << std::endl;
157 struct timespec spec;
159 struct sockaddr_in clientaddr;
160 socklen_t clientlen =
sizeof(clientaddr);
163 long int oldFreeSpace = 0;
164 while(this->
isRunning() && (!_shutdown)) {
165 tout.tv_sec = (
long int) 0;
166 tout.tv_usec = (
long int) 500000;
168 FD_SET(_sockfd, &set);
169 if (select(FD_SETSIZE, &set, NULL, NULL, &tout)>0) {
171 numBytesRx = recvfrom(_sockfd, _rxbuff, MAX_RX_SIZE, 0, (
struct sockaddr *) &clientaddr, &clientlen);
175 oldFreeSpace = _651freeSpace;
179 std::cout << std::endl;
183 std::cout <<
"Free space = " << oldFreeSpace <<
"->" << _651freeSpace <<
"?" << status->
status.
spaceAvailable <<
" [" << _freeSpaceMax <<
"]";
213 std::cerr <<
"> " << std::endl;
229 void StatusReceiver::blockUntilAvailable(
unsigned int numSamples)
231 boost::mutex::scoped_lock lock(this->objectAccessMutex);
232 while (!this->
okToSend(numSamples,
false))
234 this->waitCondition.wait(lock);
238 bool StatusReceiver::_setFreeSpace(
int updateFromRadio,
bool flagPP,
bool flagPE,
bool flagPF) {
240 boost::mutex::scoped_lock lock(this->objectAccessMutex);
241 bool updated =
false;
242 if (((!_updatePE)&&flagPP)||(_updatePE&&flagPE)) {
243 _651freeSpace = std::min( _freeSpaceMax, updateFromRadio - RADIO_BUFFER_RESERVE );
246 this->waitCondition.notify_one();
252 bool ok = _651freeSpace>=numSamples;
257 boost::mutex::scoped_lock lock(this->objectAccessMutex);
258 return _651freeSpace;
262 boost::mutex::scoped_lock lock(this->objectAccessMutex);
263 _651freeSpace -= samplesSent;
264 return _651freeSpace>0;
virtual int debug(const char *format,...)
Outputs debug information.
Debuggable(bool debug=false, const std::string &debug_name="", FILE *debug_fp=DEBUG_FP, const std::string &debug_timefmt=DEBUG_TIME_FMT)
Constructs a Debuggable object.
virtual void run()
Executes the main processing loop for the thread.
bool okToSend(long int pendingSamples, bool lockIfOk)
Determines if it is OK to send data.
StatusReceiver(std::string ifname, unsigned int port, bool debug, bool updatePE)
Constructs a StatusReceiver object.
bool setStatusInterface(std::string ifname)
Sets the interface name.
bool setStatusPort(unsigned int port)
Sets the UDP port.
virtual ~StatusReceiver()
Destroys a StatusReceiver object.
long int getFreeSpace(void)
Gets the amount of free space available.
bool sentNSamples(long int samplesSent)
Updates status based on the number of samples sent.
Thread(const std::string &name="", const std::string &cls="")
Creates a Thread object.
virtual void sleep(double secs)
Pauses thread execution for a given time, checking for user interrupts during that time.
virtual bool isRunning() const
Determines if the thread is running or not.
virtual void interrupt()
Interrupts (stops) the thread.
Provides programming elements for controlling the CyberRadio Solutions NDR651 radio.
Defines functionality for LibCyberRadio applications.
Transmit status frame information.
struct Vita49Header v49
VITA 49 frame header.
struct TxStatusPayload status
Transmit status information.
uint32_t emptyFlag
Empty flag.
uint32_t spaceAvailable
Space available.
uint32_t overrunFlag
Overrun flag.
uint32_t packetLossCount
Packet loss count.
uint32_t underrunCount
Underrun count.
uint32_t underrunFlag
Underrun flag.
uint32_t PF
Filling trigger.
uint32_t PP
Periodic notification.
uint32_t PE
Emptying trigger.
uint32_t overrunCount
Overrun count.
uint32_t packetLossFlag
Packet loss flag.