libcyberradio  22.01.24
UdpStatusReceiver.cpp
1 /*
2  * UdpStatusReceiver.cpp
3  *
4  * Created on: Dec 1, 2015
5  * Author: nh
6  */
7 #include <cstdio>
8 #include <stdlib.h>
9 #include <stdarg.h>
10 #include <string.h>
11 #include <time.h>
12 #include <sys/socket.h>
13 #include <netinet/in.h>
14 #include <arpa/inet.h>
15 #include "LibCyberRadio/NDR651/PacketTypes.h"
16 #include "LibCyberRadio/NDR651/UdpStatusReceiver.h"
17 #include <sys/types.h>
18 #include <sys/ioctl.h>
19 #include <net/if.h>
20 #include <algorithm> // std::min
21 #include <iostream>
22 
23 
24 namespace LibCyberRadio {
25 
26  namespace NDR651 {
27 
28  UdpStatusReceiver::UdpStatusReceiver(std::string ifname, unsigned int port, bool debug, bool updatePE) :
29  LibCyberRadio::Thread("UdpStatusReceiver", "UdpStatusReceiver"),
30  LibCyberRadio::Debuggable(debug, "UdpStatusReceiver"),
31  _sockfd(-1),
32  _shutdown(false),
33  _651freeSpace(0), // 2^26 - 2^18
34  _sendLock(false),
35  _ifname(ifname),
36  _port(port),
37  _updatePE(updatePE),
38  _freeSpaceMax(MAX_RADIO_BUFFSIZE - RADIO_BUFFER_RESERVE),
39  timeoutCount(0)
40  {
41  // TODO Auto-generated constructor stub
42  bzero(&_rxbuff, MAX_RX_SIZE);
43  FD_ZERO(&set);
44  _makeSocket();
45  }
46 
48  // TODO Auto-generated destructor stub
49  this->debug("Interrupting\n");
50  if (this->isRunning()) {
51  this->interrupt();
52  }
53  _shutdown = true;
54  this->_fcMutex.lock();
55  this->_selMutex.lock();
56  if (_sockfd>=0) {
57  this->debug("Closing socket\n");
58  //TODO: Close the socket...
59  // this interferes with the select statement in run(), so care must be taken.
60  }
61  this->debug("Goodbye!\n");
62  }
63 
64  bool UdpStatusReceiver::_makeSocket(void) {
65  int optval; /* flag value for setsockopt */
66  struct sockaddr_in serveraddr; /* server's addr */
67  char ip_addr_string[INET_ADDRSTRLEN];
68  _selMutex.lock();
69  // Kill existing socket if it exists.
70  if (_sockfd>=0) {
71  close(_sockfd);
72  _sockfd = -1;
73  FD_ZERO(&set);
74  }
75  // Create new socket.
76  if ((_sockfd<0)&&(_port>0)) {
77  _sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
78  if (_sockfd<0) {
79  std::cerr << "Error opening socket" << std::endl;
80  return false;
81  }
82 
83  optval = 1;
84  setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR,
85  (const void *)&optval , sizeof(int));
86 
87  // We're binding to a specific device. With this, we won't need to bind to an IP.
88  //~ setsockopt(_sockfd, SOL_SOCKET, SO_BINDTODEVICE,
89  //~ (void *)_ifname.c_str(), _ifname.length()+1);
90 
91  /*
92  * build the server's Internet address
93  */
94  memset((char *) &serveraddr, 0, sizeof(serveraddr));
95  serveraddr.sin_family = AF_INET;
96  serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
97  inet_ntop(AF_INET, &(serveraddr.sin_addr), ip_addr_string, INET_ADDRSTRLEN);
98  serveraddr.sin_port = htons((unsigned short)_port);
99  /*
100  * bind: associate the parent socket with a port
101  */
102  if (bind(_sockfd, (struct sockaddr *) &serveraddr, sizeof(serveraddr)) < 0) {
103  std::cerr << "ERROR on binding socket" << std::endl;
104  _sockfd = -1;
105  return false;
106  } else {
107  std::cerr << "Status socket bound" << std::endl;
108  }
109  FD_ZERO(&set);
110  FD_SET(_sockfd, &set);
111  }
112  _selMutex.unlock();
113  return _sockfd>=0;
114  }
115 
116  bool UdpStatusReceiver::setStatusInterface(std::string ifname) {
117  return setStatusInterface(ifname, true);
118  }
119 
120  bool UdpStatusReceiver::setStatusInterface(std::string ifname, bool makeSocketFlag) {
121  _ifname = ifname;
122  if (makeSocketFlag) {
123  return _makeSocket();
124  } else {
125  return true;
126  }
127  }
128 
129  bool UdpStatusReceiver::setStatusPort(unsigned int port) {
130  _port = port;
131  return setStatusPort(port, true);
132  }
133 
134  bool UdpStatusReceiver::setStatusPort(unsigned int port, bool makeSocketFlag) {
135  _port = port;
136  if (makeSocketFlag) {
137  return _makeSocket();
138  } else {
139  return true;
140  }
141  }
142 
143  bool UdpStatusReceiver::setUpdatePE(bool updatePE) {
144  std::cerr << std::endl << std::endl << "setUpdatePE(" << updatePE << ")" << std::endl << std::endl;
145  _fcMutex.lock();
146  _selMutex.lock();
147  _updatePE = updatePE;
148  _fcMutex.unlock();
149  _selMutex.unlock();
150  return _updatePE==updatePE;
151  }
152 
153  int UdpStatusReceiver::setMaxFreeSpace(float fs, float maxLatency) {
154  int maxSamplesLatency = (int)std::floor(maxLatency*fs);
155  int maxSamplesLog2;
156  _fcMutex.lock();
157  _freeSpaceMax = std::min( MAX_RADIO_BUFFSIZE-RADIO_BUFFER_RESERVE, maxSamplesLatency );
158  _fcMutex.unlock();
159  return _freeSpaceMax;
160  }
161 
163  std::cout << "UdpStatusReceiver::run() " << _651freeSpace << std::endl;
164  struct timespec spec;
165  struct timeval tout;
166  struct sockaddr_in clientaddr; /* client addr */
167  socklen_t clientlen = sizeof(clientaddr); /* byte size of client's address */
168  struct TxStatusFrame * status;
169  int numBytesRx;
170  long int oldFreeSpace = 0;
171  while(this->isRunning() && (!_shutdown)) {
172  tout.tv_sec = (long int) 0;
173  tout.tv_usec = (long int) 500000;
174  FD_ZERO(&set);
175  FD_SET(_sockfd, &set);
176  _selMutex.lock();
177  if (select(FD_SETSIZE, &set, NULL, NULL, &tout)>0) {
178  //std::cout << "Select trigger" << std::endl;
179  numBytesRx = recvfrom(_sockfd, _rxbuff, MAX_RX_SIZE, 0, (struct sockaddr *) &clientaddr, &clientlen);
180  _selMutex.unlock();
181  //std::cout << "# Bytes Rx = " << numBytesRx << std::endl;
182  if (numBytesRx==sizeof(TxStatusFrame)) {
183  status = (struct TxStatusFrame *)_rxbuff;
184  oldFreeSpace = _651freeSpace;
185  if ((bool)status->status.PE||(bool)status->status.PF) {
186  std::cout << "DUCHS FRAME: PEF = " << status->status.PP << status->status.PE << status->status.PF;
187  std::cout << ", Free Space = " << status->status.spaceAvailable << " samples";
188  std::cout << std::endl;
189  }
190  if ( this->_setFreeSpace( status->status.spaceAvailable, (bool)status->status.PP, (bool)status->status.PE, (bool)status->status.PF ) ) {
191  if (status->status.PE) {
192  std::cout << "Free space = " << oldFreeSpace << "->" << _651freeSpace << "?" << status->status.spaceAvailable << " [" << _freeSpaceMax << "]";
193  std::cout << " (" << status->status.PP << status->status.PE << status->status.PF << "), ";
194  std::cout << "@ time = " << status->v49.timeSeconds << " " << status->v49.timeFracSecMSB << " " << status->v49.timeFracSecLSB << std::endl;
195  }
196  }
197  if (status->status.emptyFlag||status->status.underrunFlag||status->status.overrunFlag||status->status.packetLossFlag) {
198  //~ if (status->status.emptyFlag||status->status.underrunFlag||status->status.overrunFlag) {
199  std::cerr << "<" << status->v49.streamId << "@" << status->v49.timeSeconds << "." << status->v49.timeFracSecLSB << ":";
200  if (status->status.PP) {
201  std::cerr << "P";
202  }
203  if (status->status.PE) {
204  std::cerr << "E";
205  }
206  if (status->status.PF) {
207  std::cerr << "F";
208  }
209  std::cerr << "(" << ( status->status.spaceAvailable-67108862 ) << ")";
210  if (status->status.emptyFlag) {
211  std::cerr << "_e";
212  }
213  if (status->status.underrunFlag) {
214  std::cerr << "_u" << status->status.underrunCount;
215  }
216  if (status->status.overrunFlag) {
217  std::cerr << "_o" << status->status.overrunCount;
218  }
219  if (status->status.packetLossFlag) {
220  std::cerr << "_p" << status->status.packetLossCount;
221  }
222  std::cerr << "> " << std::endl;
223  }
224  //~ else if (status->status.PF) {
225  //~ std::cout << "\tFree space notification = " << status->status.spaceAvailable << ", current = " << _651freeSpace;
226  //~ std::cout << "@ time = " << status->v49.timeSeconds << " " << status->v49.timeFracSecMSB << " " << status->v49.timeFracSecLSB << std::endl;
227  //~ }
228  }
229  } else {
230  _selMutex.unlock();
231  timeoutCount += 1;
232  //~ this->debug("Timeout\n");
233  //~ usleep(1000);
234  }
235  this->sleep(2e-6);
236  //~ _selMutex.unlock();
237  }
238  }
239 
240  bool UdpStatusReceiver::_setFreeSpace(int updateFromRadio, bool flagPP, bool flagPE, bool flagPF) {
241  bool updated = false;
242  _fcMutex.lock();
243  if (((!_updatePE)&&flagPP)||(_updatePE&&flagPE)) {
244  _651freeSpace = std::min( _freeSpaceMax, updateFromRadio - RADIO_BUFFER_RESERVE );
245  updated = true;
246  }
247  _fcMutex.unlock();
248  return updated;
249  }
250 
251  bool UdpStatusReceiver::okToSend(long int numSamples, bool lockIfOk) {
252  _fcMutex.lock();
253  bool ok = _651freeSpace>=numSamples;
254  if (!(ok&&lockIfOk)) {
255  _fcMutex.unlock();
256  } else {
257  _sendLock = true;
258  }
259  return ok;
260  }
261 
263  boost::mutex::scoped_lock lock(_fcMutex);
264  return _651freeSpace;
265  }
266 
267  bool UdpStatusReceiver::sentNSamples(long int samplesSent) {
268  if (!_sendLock) {
269  _fcMutex.lock();
270  }
271  _651freeSpace -= samplesSent;
272  _fcMutex.unlock();
273  return _651freeSpace>0;
274  }
275 
276  } /* namespace NDR651 */
277 
278 } /* namespace CyberRadio */
uint32_t timeSeconds
Timestamp integer field.
Definition: PacketTypes.h:65
virtual bool isRunning() const
Determines if the thread is running or not.
Definition: Thread.cpp:72
uint32_t underrunCount
Underrun count.
Definition: PacketTypes.h:107
uint32_t timeFracSecMSB
Timestamp fractional field, MSW.
Definition: PacketTypes.h:66
bool okToSend(long int pendingSamples, bool lockIfOk)
Determines if it is OK to send data.
bool sentNSamples(long int samplesSent)
Updates status based on the number of samples sent.
virtual void interrupt()
Interrupts (stops) the thread.
Definition: Thread.cpp:60
uint32_t packetLossCount
Packet loss count.
Definition: PacketTypes.h:109
uint32_t overrunFlag
Overrun flag.
Definition: PacketTypes.h:106
uint32_t underrunFlag
Underrun flag.
Definition: PacketTypes.h:108
bool setStatusInterface(std::string ifname)
Sets the interface name.
Class that supports debug output.
Definition: Debuggable.h:38
virtual ~UdpStatusReceiver()
Destroys a UdpStatusReceiver object.
uint32_t overrunCount
Overrun count.
Definition: PacketTypes.h:105
virtual void run()
Executes the main processing loop for the thread.
long int getFreeSpace(void)
Gets the amount of free space available.
UdpStatusReceiver(std::string ifname, unsigned int port, bool debug, bool updatePE)
Constructs a UdpStatusReceiver object.
uint32_t packetLossFlag
Packet loss flag.
Definition: PacketTypes.h:110
uint32_t spaceAvailable
Space available.
Definition: PacketTypes.h:103
struct Vita49Header v49
VITA 49 frame header.
Definition: PacketTypes.h:125
virtual int debug(const char *format,...)
Outputs debug information.
Definition: Debuggable.cpp:95
Defines functionality for LibCyberRadio applications.
Definition: App.h:23
Transmit status frame information.
Definition: PacketTypes.h:124
uint32_t PE
Emptying trigger.
Definition: PacketTypes.h:116
bool setStatusPort(unsigned int port)
Sets the UDP port.
Base class for a thread object, based on Boost Threads.
Definition: Thread.h:48
virtual void sleep(double secs)
Pauses thread execution for a given time, checking for user interrupts during that time...
Definition: Thread.cpp:65
uint32_t PF
Filling trigger.
Definition: PacketTypes.h:115
uint32_t timeFracSecLSB
Timestamp fractional field, LSW.
Definition: PacketTypes.h:67
struct TxStatusPayload status
Transmit status information.
Definition: PacketTypes.h:126
uint32_t PP
Periodic notification.
Definition: PacketTypes.h:117