libcyberradio 22.01.24
StatusReceiver.cpp
1/*
2 * StatusReceiver.cpp
3 *
4 * Created on: Jun 20, 2017
5 * Author: nh
6 * Author: JVM
7 */
8#include <cstdio>
9#include <stdlib.h>
10#include <stdarg.h>
11#include <string.h>
12#include <time.h>
13#include <sys/socket.h>
14#include <netinet/in.h>
15#include <arpa/inet.h>
16#include "LibCyberRadio/NDR651/PacketTypes.h"
17#include "LibCyberRadio/NDR651/StatusReceiver.h"
18#include <sys/types.h>
19#include <sys/ioctl.h>
20#include <net/if.h>
21#include <algorithm> // std::min
22#include <iostream>
23
24
25namespace LibCyberRadio {
26
27 namespace NDR651 {
28
29 StatusReceiver::StatusReceiver(std::string ifname, unsigned int port, bool debug, bool updatePE) :
30 LibCyberRadio::Thread("StatusReceiver", "StatusReceiver"),
31 LibCyberRadio::Debuggable(debug, "StatusReceiver"),
32 _sockfd(-1),
33 _shutdown(false),
34 _651freeSpace(0), // 2^26 - 2^18
35 _sendLock(false),
36 _ifname(ifname),
37 _port(port),
38 _updatePE(updatePE),
39 _freeSpaceMax(MAX_RADIO_BUFFSIZE - RADIO_BUFFER_RESERVE),
40 timeoutCount(0)
41 {
42 // TODO Auto-generated constructor stub
43 bzero(&_rxbuff, MAX_RX_SIZE);
44 FD_ZERO(&set);
45 _makeSocket();
46 }
47
49 // TODO Auto-generated destructor stub
50 this->debug("Interrupting\n");
51 if (this->isRunning()) {
52 this->interrupt();
53 }
54 _shutdown = true;
55 if (_sockfd>=0) {
56 this->debug("Closing socket\n");
57 //TODO: Close the socket...
58 // this interferes with the select statement in run(), so care must be taken.
59 }
60 this->debug("Goodbye!\n");
61 }
62
63 bool StatusReceiver::_makeSocket(void) {
64 int optval; /* flag value for setsockopt */
65 struct sockaddr_in serveraddr; /* server's addr */
66 char ip_addr_string[INET_ADDRSTRLEN];
67 // Kill existing socket if it exists.
68 if (_sockfd>=0) {
69 close(_sockfd);
70 _sockfd = -1;
71 FD_ZERO(&set);
72 }
73 // Create new socket.
74 if ((_sockfd<0)&&(_port>0)) {
75 _sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
76 if (_sockfd<0) {
77 std::cerr << "Error opening socket" << std::endl;
78 return false;
79 }
80
81 optval = 1;
82 setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR,
83 (const void *)&optval , sizeof(int));
84
85 // We're binding to a specific device. With this, we won't need to bind to an IP.
86 //~ setsockopt(_sockfd, SOL_SOCKET, SO_BINDTODEVICE,
87 //~ (void *)_ifname.c_str(), _ifname.length()+1);
88
89 /*
90 * build the server's Internet address
91 */
92 memset((char *) &serveraddr, 0, sizeof(serveraddr));
93 serveraddr.sin_family = AF_INET;
94 serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
95 inet_ntop(AF_INET, &(serveraddr.sin_addr), ip_addr_string, INET_ADDRSTRLEN);
96 serveraddr.sin_port = htons((unsigned short)_port);
97 /*
98 * bind: associate the parent socket with a port
99 */
100 if (bind(_sockfd, (struct sockaddr *) &serveraddr, sizeof(serveraddr)) < 0) {
101 std::cerr << "ERROR on binding socket" << std::endl;
102 _sockfd = -1;
103 return false;
104 } else {
105 std::cerr << "Status socket bound" << std::endl;
106 }
107 FD_ZERO(&set);
108 FD_SET(_sockfd, &set);
109 }
110 return _sockfd>=0;
111 }
112
113 bool StatusReceiver::setStatusInterface(std::string ifname) {
114 return setStatusInterface(ifname, true);
115 }
116
117 bool StatusReceiver::setStatusInterface(std::string ifname, bool makeSocketFlag) {
118 _ifname = ifname;
119 if (makeSocketFlag) {
120 return _makeSocket();
121 } else {
122 return true;
123 }
124 }
125
126 bool StatusReceiver::setStatusPort(unsigned int port) {
127 _port = port;
128 return setStatusPort(port, true);
129 }
130
131 bool StatusReceiver::setStatusPort(unsigned int port, bool makeSocketFlag) {
132 _port = port;
133 if (makeSocketFlag) {
134 return _makeSocket();
135 } else {
136 return true;
137 }
138 }
139
140 bool StatusReceiver::setUpdatePE(bool updatePE) {
141 boost::mutex::scoped_lock lock(this->objectAccessMutex);
142 std::cerr << std::endl << std::endl << "setUpdatePE(" << updatePE << ")" << std::endl << std::endl;
143 _updatePE = updatePE;
144 return _updatePE==updatePE;
145 }
146
147 int StatusReceiver::setMaxFreeSpace(float fs, float maxLatency) {
148 boost::mutex::scoped_lock lock(this->objectAccessMutex);
149 int maxSamplesLatency = (int)std::floor(maxLatency*fs);
150 int maxSamplesLog2;
151 _freeSpaceMax = std::min( MAX_RADIO_BUFFSIZE-RADIO_BUFFER_RESERVE, maxSamplesLatency );
152 return _freeSpaceMax;
153 }
154
156 std::cout << "StatusReceiver::run() " << _651freeSpace << std::endl;
157 struct timespec spec;
158 struct timeval tout;
159 struct sockaddr_in clientaddr; /* client addr */
160 socklen_t clientlen = sizeof(clientaddr); /* byte size of client's address */
161 struct TxStatusFrame * status;
162 int numBytesRx;
163 long int oldFreeSpace = 0;
164 while(this->isRunning() && (!_shutdown)) {
165 tout.tv_sec = (long int) 0;
166 tout.tv_usec = (long int) 500000;
167 FD_ZERO(&set);
168 FD_SET(_sockfd, &set);
169 if (select(FD_SETSIZE, &set, NULL, NULL, &tout)>0) {
170 //std::cout << "Select trigger" << std::endl;
171 numBytesRx = recvfrom(_sockfd, _rxbuff, MAX_RX_SIZE, 0, (struct sockaddr *) &clientaddr, &clientlen);
172 //std::cout << "# Bytes Rx = " << numBytesRx << std::endl;
173 if (numBytesRx==sizeof(TxStatusFrame)) {
174 status = (struct TxStatusFrame *)_rxbuff;
175 oldFreeSpace = _651freeSpace;
176 if ((bool)status->status.PE||(bool)status->status.PF) {
177 std::cout << "DUCHS FRAME (" << status->v49.streamId << "): PEF = " << status->status.PP << status->status.PE << status->status.PF;
178 std::cout << ", Free Space = " << status->status.spaceAvailable << " samples";
179 std::cout << std::endl;
180 }
181 if ( this->_setFreeSpace( status->status.spaceAvailable, (bool)status->status.PP, (bool)status->status.PE, (bool)status->status.PF ) ) {
182 if (status->status.PE) {
183 std::cout << "Free space = " << oldFreeSpace << "->" << _651freeSpace << "?" << status->status.spaceAvailable << " [" << _freeSpaceMax << "]";
184 std::cout << " (" << status->status.PP << status->status.PE << status->status.PF << "), ";
185 std::cout << "@ time = " << status->v49.timeSeconds << " " << status->v49.timeFracSecMSB << " " << status->v49.timeFracSecLSB << std::endl;
186 }
187 }
188 if (status->status.emptyFlag||status->status.underrunFlag||status->status.overrunFlag||status->status.packetLossFlag) {
189 //~ if (status->status.emptyFlag||status->status.underrunFlag||status->status.overrunFlag) {
190 std::cerr << "<" << status->v49.streamId << "@" << status->v49.timeSeconds << "." << status->v49.timeFracSecLSB << ":";
191 if (status->status.PP) {
192 std::cerr << "P";
193 }
194 if (status->status.PE) {
195 std::cerr << "E";
196 }
197 if (status->status.PF) {
198 std::cerr << "F";
199 }
200 std::cerr << "(" << ( status->status.spaceAvailable-67108862 ) << ")";
201 if (status->status.emptyFlag) {
202 std::cerr << "_e";
203 }
204 if (status->status.underrunFlag) {
205 std::cerr << "_u" << status->status.underrunCount;
206 }
207 if (status->status.overrunFlag) {
208 std::cerr << "_o" << status->status.overrunCount;
209 }
210 if (status->status.packetLossFlag) {
211 std::cerr << "_p" << status->status.packetLossCount;
212 }
213 std::cerr << "> " << std::endl;
214 }
215 //~ else if (status->status.PF) {
216 //~ std::cout << "\tFree space notification = " << status->status.spaceAvailable << ", current = " << _651freeSpace;
217 //~ std::cout << "@ time = " << status->v49.timeSeconds << " " << status->v49.timeFracSecMSB << " " << status->v49.timeFracSecLSB << std::endl;
218 //~ }
219 }
220 } else {
221 timeoutCount += 1;
222 //~ this->debug("Timeout\n");
223 //~ usleep(1000);
224 }
225 this->sleep(2e-6);
226 }
227 }
228
229 void StatusReceiver::blockUntilAvailable(unsigned int numSamples)
230 {
231 boost::mutex::scoped_lock lock(this->objectAccessMutex);
232 while (!this->okToSend(numSamples, false))
233 {
234 this->waitCondition.wait(lock);
235 }
236 }
237
238 bool StatusReceiver::_setFreeSpace(int updateFromRadio, bool flagPP, bool flagPE, bool flagPF) {
239
240 boost::mutex::scoped_lock lock(this->objectAccessMutex);
241 bool updated = false;
242 if (((!_updatePE)&&flagPP)||(_updatePE&&flagPE)) {
243 _651freeSpace = std::min( _freeSpaceMax, updateFromRadio - RADIO_BUFFER_RESERVE );
244 updated = true;
245 }
246 this->waitCondition.notify_one();
247 return updated;
248 }
249
250 // I removed locking from this function, because blockUntilAvailable should handle it for us --JVM
251 bool StatusReceiver::okToSend(long int numSamples, bool lockIfOk) {
252 bool ok = _651freeSpace>=numSamples;
253 return ok;
254 }
255
257 boost::mutex::scoped_lock lock(this->objectAccessMutex);
258 return _651freeSpace;
259 }
260
261 bool StatusReceiver::sentNSamples(long int samplesSent) {
262 boost::mutex::scoped_lock lock(this->objectAccessMutex);
263 _651freeSpace -= samplesSent;
264 return _651freeSpace>0;
265 }
266
267 } /* namespace NDR651 */
268
269} /* namespace CyberRadio */
virtual int debug(const char *format,...)
Outputs debug information.
Debuggable(bool debug=false, const std::string &debug_name="", FILE *debug_fp=DEBUG_FP, const std::string &debug_timefmt=DEBUG_TIME_FMT)
Constructs a Debuggable object.
virtual void run()
Executes the main processing loop for the thread.
bool okToSend(long int pendingSamples, bool lockIfOk)
Determines if it is OK to send data.
StatusReceiver(std::string ifname, unsigned int port, bool debug, bool updatePE)
Constructs a StatusReceiver object.
bool setStatusInterface(std::string ifname)
Sets the interface name.
bool setStatusPort(unsigned int port)
Sets the UDP port.
virtual ~StatusReceiver()
Destroys a StatusReceiver object.
long int getFreeSpace(void)
Gets the amount of free space available.
bool sentNSamples(long int samplesSent)
Updates status based on the number of samples sent.
Thread(const std::string &name="", const std::string &cls="")
Creates a Thread object.
Definition Thread.cpp:19
virtual void sleep(double secs)
Pauses thread execution for a given time, checking for user interrupts during that time.
Definition Thread.cpp:65
virtual bool isRunning() const
Determines if the thread is running or not.
Definition Thread.cpp:72
virtual void interrupt()
Interrupts (stops) the thread.
Definition Thread.cpp:60
Provides programming elements for controlling the CyberRadio Solutions NDR651 radio.
Defines functionality for LibCyberRadio applications.
Definition App.h:24
Transmit status frame information.
struct Vita49Header v49
VITA 49 frame header.
struct TxStatusPayload status
Transmit status information.
uint32_t spaceAvailable
Space available.
uint32_t packetLossCount
Packet loss count.
uint32_t underrunCount
Underrun count.
uint32_t underrunFlag
Underrun flag.
uint32_t PP
Periodic notification.
uint32_t overrunCount
Overrun count.
uint32_t packetLossFlag
Packet loss flag.
uint32_t timeSeconds
Timestamp integer field.
Definition PacketTypes.h:65
uint32_t timeFracSecLSB
Timestamp fractional field, LSW.
Definition PacketTypes.h:67
uint32_t timeFracSecMSB
Timestamp fractional field, MSW.
Definition PacketTypes.h:66