libcyberradio 22.01.24
UdpStatusReceiver.cpp
1/*
2 * UdpStatusReceiver.cpp
3 *
4 * Created on: Dec 1, 2015
5 * Author: nh
6 */
7#include <cstdio>
8#include <stdlib.h>
9#include <stdarg.h>
10#include <string.h>
11#include <time.h>
12#include <sys/socket.h>
13#include <netinet/in.h>
14#include <arpa/inet.h>
15#include "LibCyberRadio/NDR651/PacketTypes.h"
16#include "LibCyberRadio/NDR651/UdpStatusReceiver.h"
17#include <sys/types.h>
18#include <sys/ioctl.h>
19#include <net/if.h>
20#include <algorithm> // std::min
21#include <iostream>
22
23
24namespace LibCyberRadio {
25
26 namespace NDR651 {
27
28 UdpStatusReceiver::UdpStatusReceiver(std::string ifname, unsigned int port, bool debug, bool updatePE) :
29 LibCyberRadio::Thread("UdpStatusReceiver", "UdpStatusReceiver"),
30 LibCyberRadio::Debuggable(debug, "UdpStatusReceiver"),
31 _sockfd(-1),
32 _shutdown(false),
33 _651freeSpace(0), // 2^26 - 2^18
34 _sendLock(false),
35 _ifname(ifname),
36 _port(port),
37 _updatePE(updatePE),
38 _freeSpaceMax(MAX_RADIO_BUFFSIZE - RADIO_BUFFER_RESERVE),
39 timeoutCount(0)
40 {
41 // TODO Auto-generated constructor stub
42 bzero(&_rxbuff, MAX_RX_SIZE);
43 FD_ZERO(&set);
44 _makeSocket();
45 }
46
48 // TODO Auto-generated destructor stub
49 this->debug("Interrupting\n");
50 if (this->isRunning()) {
51 this->interrupt();
52 }
53 _shutdown = true;
54 this->_fcMutex.lock();
55 this->_selMutex.lock();
56 if (_sockfd>=0) {
57 this->debug("Closing socket\n");
58 //TODO: Close the socket...
59 // this interferes with the select statement in run(), so care must be taken.
60 }
61 this->debug("Goodbye!\n");
62 }
63
64 bool UdpStatusReceiver::_makeSocket(void) {
65 int optval; /* flag value for setsockopt */
66 struct sockaddr_in serveraddr; /* server's addr */
67 char ip_addr_string[INET_ADDRSTRLEN];
68 _selMutex.lock();
69 // Kill existing socket if it exists.
70 if (_sockfd>=0) {
71 close(_sockfd);
72 _sockfd = -1;
73 FD_ZERO(&set);
74 }
75 // Create new socket.
76 if ((_sockfd<0)&&(_port>0)) {
77 _sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
78 if (_sockfd<0) {
79 std::cerr << "Error opening socket" << std::endl;
80 return false;
81 }
82
83 optval = 1;
84 setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR,
85 (const void *)&optval , sizeof(int));
86
87 // We're binding to a specific device. With this, we won't need to bind to an IP.
88 //~ setsockopt(_sockfd, SOL_SOCKET, SO_BINDTODEVICE,
89 //~ (void *)_ifname.c_str(), _ifname.length()+1);
90
91 /*
92 * build the server's Internet address
93 */
94 memset((char *) &serveraddr, 0, sizeof(serveraddr));
95 serveraddr.sin_family = AF_INET;
96 serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
97 inet_ntop(AF_INET, &(serveraddr.sin_addr), ip_addr_string, INET_ADDRSTRLEN);
98 serveraddr.sin_port = htons((unsigned short)_port);
99 /*
100 * bind: associate the parent socket with a port
101 */
102 if (bind(_sockfd, (struct sockaddr *) &serveraddr, sizeof(serveraddr)) < 0) {
103 std::cerr << "ERROR on binding socket" << std::endl;
104 _sockfd = -1;
105 return false;
106 } else {
107 std::cerr << "Status socket bound" << std::endl;
108 }
109 FD_ZERO(&set);
110 FD_SET(_sockfd, &set);
111 }
112 _selMutex.unlock();
113 return _sockfd>=0;
114 }
115
116 bool UdpStatusReceiver::setStatusInterface(std::string ifname) {
117 return setStatusInterface(ifname, true);
118 }
119
120 bool UdpStatusReceiver::setStatusInterface(std::string ifname, bool makeSocketFlag) {
121 _ifname = ifname;
122 if (makeSocketFlag) {
123 return _makeSocket();
124 } else {
125 return true;
126 }
127 }
128
129 bool UdpStatusReceiver::setStatusPort(unsigned int port) {
130 _port = port;
131 return setStatusPort(port, true);
132 }
133
134 bool UdpStatusReceiver::setStatusPort(unsigned int port, bool makeSocketFlag) {
135 _port = port;
136 if (makeSocketFlag) {
137 return _makeSocket();
138 } else {
139 return true;
140 }
141 }
142
143 bool UdpStatusReceiver::setUpdatePE(bool updatePE) {
144 std::cerr << std::endl << std::endl << "setUpdatePE(" << updatePE << ")" << std::endl << std::endl;
145 _fcMutex.lock();
146 _selMutex.lock();
147 _updatePE = updatePE;
148 _fcMutex.unlock();
149 _selMutex.unlock();
150 return _updatePE==updatePE;
151 }
152
153 int UdpStatusReceiver::setMaxFreeSpace(float fs, float maxLatency) {
154 int maxSamplesLatency = (int)std::floor(maxLatency*fs);
155 int maxSamplesLog2;
156 _fcMutex.lock();
157 _freeSpaceMax = std::min( MAX_RADIO_BUFFSIZE-RADIO_BUFFER_RESERVE, maxSamplesLatency );
158 _fcMutex.unlock();
159 return _freeSpaceMax;
160 }
161
163 std::cout << "UdpStatusReceiver::run() " << _651freeSpace << std::endl;
164 struct timespec spec;
165 struct timeval tout;
166 struct sockaddr_in clientaddr; /* client addr */
167 socklen_t clientlen = sizeof(clientaddr); /* byte size of client's address */
168 struct TxStatusFrame * status;
169 int numBytesRx;
170 long int oldFreeSpace = 0;
171 while(this->isRunning() && (!_shutdown)) {
172 tout.tv_sec = (long int) 0;
173 tout.tv_usec = (long int) 500000;
174 FD_ZERO(&set);
175 FD_SET(_sockfd, &set);
176 _selMutex.lock();
177 if (select(FD_SETSIZE, &set, NULL, NULL, &tout)>0) {
178 //std::cout << "Select trigger" << std::endl;
179 numBytesRx = recvfrom(_sockfd, _rxbuff, MAX_RX_SIZE, 0, (struct sockaddr *) &clientaddr, &clientlen);
180 _selMutex.unlock();
181 //std::cout << "# Bytes Rx = " << numBytesRx << std::endl;
182 if (numBytesRx==sizeof(TxStatusFrame)) {
183 status = (struct TxStatusFrame *)_rxbuff;
184 oldFreeSpace = _651freeSpace;
185 if ((bool)status->status.PE||(bool)status->status.PF) {
186 std::cout << "DUCHS FRAME: PEF = " << status->status.PP << status->status.PE << status->status.PF;
187 std::cout << ", Free Space = " << status->status.spaceAvailable << " samples";
188 std::cout << std::endl;
189 }
190 if ( this->_setFreeSpace( status->status.spaceAvailable, (bool)status->status.PP, (bool)status->status.PE, (bool)status->status.PF ) ) {
191 if (status->status.PE) {
192 std::cout << "Free space = " << oldFreeSpace << "->" << _651freeSpace << "?" << status->status.spaceAvailable << " [" << _freeSpaceMax << "]";
193 std::cout << " (" << status->status.PP << status->status.PE << status->status.PF << "), ";
194 std::cout << "@ time = " << status->v49.timeSeconds << " " << status->v49.timeFracSecMSB << " " << status->v49.timeFracSecLSB << std::endl;
195 }
196 }
197 if (status->status.emptyFlag||status->status.underrunFlag||status->status.overrunFlag||status->status.packetLossFlag) {
198 //~ if (status->status.emptyFlag||status->status.underrunFlag||status->status.overrunFlag) {
199 std::cerr << "<" << status->v49.streamId << "@" << status->v49.timeSeconds << "." << status->v49.timeFracSecLSB << ":";
200 if (status->status.PP) {
201 std::cerr << "P";
202 }
203 if (status->status.PE) {
204 std::cerr << "E";
205 }
206 if (status->status.PF) {
207 std::cerr << "F";
208 }
209 std::cerr << "(" << ( status->status.spaceAvailable-67108862 ) << ")";
210 if (status->status.emptyFlag) {
211 std::cerr << "_e";
212 }
213 if (status->status.underrunFlag) {
214 std::cerr << "_u" << status->status.underrunCount;
215 }
216 if (status->status.overrunFlag) {
217 std::cerr << "_o" << status->status.overrunCount;
218 }
219 if (status->status.packetLossFlag) {
220 std::cerr << "_p" << status->status.packetLossCount;
221 }
222 std::cerr << "> " << std::endl;
223 }
224 //~ else if (status->status.PF) {
225 //~ std::cout << "\tFree space notification = " << status->status.spaceAvailable << ", current = " << _651freeSpace;
226 //~ std::cout << "@ time = " << status->v49.timeSeconds << " " << status->v49.timeFracSecMSB << " " << status->v49.timeFracSecLSB << std::endl;
227 //~ }
228 }
229 } else {
230 _selMutex.unlock();
231 timeoutCount += 1;
232 //~ this->debug("Timeout\n");
233 //~ usleep(1000);
234 }
235 this->sleep(2e-6);
236 //~ _selMutex.unlock();
237 }
238 }
239
240 bool UdpStatusReceiver::_setFreeSpace(int updateFromRadio, bool flagPP, bool flagPE, bool flagPF) {
241 bool updated = false;
242 _fcMutex.lock();
243 if (((!_updatePE)&&flagPP)||(_updatePE&&flagPE)) {
244 _651freeSpace = std::min( _freeSpaceMax, updateFromRadio - RADIO_BUFFER_RESERVE );
245 updated = true;
246 }
247 _fcMutex.unlock();
248 return updated;
249 }
250
251 bool UdpStatusReceiver::okToSend(long int numSamples, bool lockIfOk) {
252 _fcMutex.lock();
253 bool ok = _651freeSpace>=numSamples;
254 if (!(ok&&lockIfOk)) {
255 _fcMutex.unlock();
256 } else {
257 _sendLock = true;
258 }
259 return ok;
260 }
261
263 boost::mutex::scoped_lock lock(_fcMutex);
264 return _651freeSpace;
265 }
266
267 bool UdpStatusReceiver::sentNSamples(long int samplesSent) {
268 if (!_sendLock) {
269 _fcMutex.lock();
270 }
271 _651freeSpace -= samplesSent;
272 _fcMutex.unlock();
273 return _651freeSpace>0;
274 }
275
276 } /* namespace NDR651 */
277
278} /* namespace CyberRadio */
virtual int debug(const char *format,...)
Outputs debug information.
Debuggable(bool debug=false, const std::string &debug_name="", FILE *debug_fp=DEBUG_FP, const std::string &debug_timefmt=DEBUG_TIME_FMT)
Constructs a Debuggable object.
virtual void run()
Executes the main processing loop for the thread.
bool okToSend(long int pendingSamples, bool lockIfOk)
Determines if it is OK to send data.
UdpStatusReceiver(std::string ifname, unsigned int port, bool debug, bool updatePE)
Constructs a UdpStatusReceiver object.
bool setStatusInterface(std::string ifname)
Sets the interface name.
bool setStatusPort(unsigned int port)
Sets the UDP port.
virtual ~UdpStatusReceiver()
Destroys a UdpStatusReceiver object.
long int getFreeSpace(void)
Gets the amount of free space available.
bool sentNSamples(long int samplesSent)
Updates status based on the number of samples sent.
Thread(const std::string &name="", const std::string &cls="")
Creates a Thread object.
Definition Thread.cpp:19
virtual void sleep(double secs)
Pauses thread execution for a given time, checking for user interrupts during that time.
Definition Thread.cpp:65
virtual bool isRunning() const
Determines if the thread is running or not.
Definition Thread.cpp:72
virtual void interrupt()
Interrupts (stops) the thread.
Definition Thread.cpp:60
Provides programming elements for controlling the CyberRadio Solutions NDR651 radio.
Defines functionality for LibCyberRadio applications.
Definition App.h:24
Transmit status frame information.
struct Vita49Header v49
VITA 49 frame header.
struct TxStatusPayload status
Transmit status information.
uint32_t spaceAvailable
Space available.
uint32_t packetLossCount
Packet loss count.
uint32_t underrunCount
Underrun count.
uint32_t underrunFlag
Underrun flag.
uint32_t PP
Periodic notification.
uint32_t overrunCount
Overrun count.
uint32_t packetLossFlag
Packet loss flag.
uint32_t timeSeconds
Timestamp integer field.
Definition PacketTypes.h:65
uint32_t timeFracSecLSB
Timestamp fractional field, LSW.
Definition PacketTypes.h:67
uint32_t timeFracSecMSB
Timestamp fractional field, MSW.
Definition PacketTypes.h:66