libcyberradio  22.01.24
StatusReceiver.cpp
1 /*
2  * StatusReceiver.cpp
3  *
4  * Created on: Jun 20, 2017
5  * Author: nh
6  * Author: JVM
7  */
8 #include <cstdio>
9 #include <stdlib.h>
10 #include <stdarg.h>
11 #include <string.h>
12 #include <time.h>
13 #include <sys/socket.h>
14 #include <netinet/in.h>
15 #include <arpa/inet.h>
16 #include "LibCyberRadio/NDR651/PacketTypes.h"
17 #include "LibCyberRadio/NDR651/StatusReceiver.h"
18 #include <sys/types.h>
19 #include <sys/ioctl.h>
20 #include <net/if.h>
21 #include <algorithm> // std::min
22 #include <iostream>
23 
24 
25 namespace LibCyberRadio {
26 
27  namespace NDR651 {
28 
29  StatusReceiver::StatusReceiver(std::string ifname, unsigned int port, bool debug, bool updatePE) :
30  LibCyberRadio::Thread("StatusReceiver", "StatusReceiver"),
31  LibCyberRadio::Debuggable(debug, "StatusReceiver"),
32  _sockfd(-1),
33  _shutdown(false),
34  _651freeSpace(0), // 2^26 - 2^18
35  _sendLock(false),
36  _ifname(ifname),
37  _port(port),
38  _updatePE(updatePE),
39  _freeSpaceMax(MAX_RADIO_BUFFSIZE - RADIO_BUFFER_RESERVE),
40  timeoutCount(0)
41  {
42  // TODO Auto-generated constructor stub
43  bzero(&_rxbuff, MAX_RX_SIZE);
44  FD_ZERO(&set);
45  _makeSocket();
46  }
47 
49  // TODO Auto-generated destructor stub
50  this->debug("Interrupting\n");
51  if (this->isRunning()) {
52  this->interrupt();
53  }
54  _shutdown = true;
55  if (_sockfd>=0) {
56  this->debug("Closing socket\n");
57  //TODO: Close the socket...
58  // this interferes with the select statement in run(), so care must be taken.
59  }
60  this->debug("Goodbye!\n");
61  }
62 
63  bool StatusReceiver::_makeSocket(void) {
64  int optval; /* flag value for setsockopt */
65  struct sockaddr_in serveraddr; /* server's addr */
66  char ip_addr_string[INET_ADDRSTRLEN];
67  // Kill existing socket if it exists.
68  if (_sockfd>=0) {
69  close(_sockfd);
70  _sockfd = -1;
71  FD_ZERO(&set);
72  }
73  // Create new socket.
74  if ((_sockfd<0)&&(_port>0)) {
75  _sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
76  if (_sockfd<0) {
77  std::cerr << "Error opening socket" << std::endl;
78  return false;
79  }
80 
81  optval = 1;
82  setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR,
83  (const void *)&optval , sizeof(int));
84 
85  // We're binding to a specific device. With this, we won't need to bind to an IP.
86  //~ setsockopt(_sockfd, SOL_SOCKET, SO_BINDTODEVICE,
87  //~ (void *)_ifname.c_str(), _ifname.length()+1);
88 
89  /*
90  * build the server's Internet address
91  */
92  memset((char *) &serveraddr, 0, sizeof(serveraddr));
93  serveraddr.sin_family = AF_INET;
94  serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
95  inet_ntop(AF_INET, &(serveraddr.sin_addr), ip_addr_string, INET_ADDRSTRLEN);
96  serveraddr.sin_port = htons((unsigned short)_port);
97  /*
98  * bind: associate the parent socket with a port
99  */
100  if (bind(_sockfd, (struct sockaddr *) &serveraddr, sizeof(serveraddr)) < 0) {
101  std::cerr << "ERROR on binding socket" << std::endl;
102  _sockfd = -1;
103  return false;
104  } else {
105  std::cerr << "Status socket bound" << std::endl;
106  }
107  FD_ZERO(&set);
108  FD_SET(_sockfd, &set);
109  }
110  return _sockfd>=0;
111  }
112 
113  bool StatusReceiver::setStatusInterface(std::string ifname) {
114  return setStatusInterface(ifname, true);
115  }
116 
117  bool StatusReceiver::setStatusInterface(std::string ifname, bool makeSocketFlag) {
118  _ifname = ifname;
119  if (makeSocketFlag) {
120  return _makeSocket();
121  } else {
122  return true;
123  }
124  }
125 
126  bool StatusReceiver::setStatusPort(unsigned int port) {
127  _port = port;
128  return setStatusPort(port, true);
129  }
130 
131  bool StatusReceiver::setStatusPort(unsigned int port, bool makeSocketFlag) {
132  _port = port;
133  if (makeSocketFlag) {
134  return _makeSocket();
135  } else {
136  return true;
137  }
138  }
139 
140  bool StatusReceiver::setUpdatePE(bool updatePE) {
141  boost::mutex::scoped_lock lock(this->objectAccessMutex);
142  std::cerr << std::endl << std::endl << "setUpdatePE(" << updatePE << ")" << std::endl << std::endl;
143  _updatePE = updatePE;
144  return _updatePE==updatePE;
145  }
146 
147  int StatusReceiver::setMaxFreeSpace(float fs, float maxLatency) {
148  boost::mutex::scoped_lock lock(this->objectAccessMutex);
149  int maxSamplesLatency = (int)std::floor(maxLatency*fs);
150  int maxSamplesLog2;
151  _freeSpaceMax = std::min( MAX_RADIO_BUFFSIZE-RADIO_BUFFER_RESERVE, maxSamplesLatency );
152  return _freeSpaceMax;
153  }
154 
156  std::cout << "StatusReceiver::run() " << _651freeSpace << std::endl;
157  struct timespec spec;
158  struct timeval tout;
159  struct sockaddr_in clientaddr; /* client addr */
160  socklen_t clientlen = sizeof(clientaddr); /* byte size of client's address */
161  struct TxStatusFrame * status;
162  int numBytesRx;
163  long int oldFreeSpace = 0;
164  while(this->isRunning() && (!_shutdown)) {
165  tout.tv_sec = (long int) 0;
166  tout.tv_usec = (long int) 500000;
167  FD_ZERO(&set);
168  FD_SET(_sockfd, &set);
169  if (select(FD_SETSIZE, &set, NULL, NULL, &tout)>0) {
170  //std::cout << "Select trigger" << std::endl;
171  numBytesRx = recvfrom(_sockfd, _rxbuff, MAX_RX_SIZE, 0, (struct sockaddr *) &clientaddr, &clientlen);
172  //std::cout << "# Bytes Rx = " << numBytesRx << std::endl;
173  if (numBytesRx==sizeof(TxStatusFrame)) {
174  status = (struct TxStatusFrame *)_rxbuff;
175  oldFreeSpace = _651freeSpace;
176  if ((bool)status->status.PE||(bool)status->status.PF) {
177  std::cout << "DUCHS FRAME (" << status->v49.streamId << "): PEF = " << status->status.PP << status->status.PE << status->status.PF;
178  std::cout << ", Free Space = " << status->status.spaceAvailable << " samples";
179  std::cout << std::endl;
180  }
181  if ( this->_setFreeSpace( status->status.spaceAvailable, (bool)status->status.PP, (bool)status->status.PE, (bool)status->status.PF ) ) {
182  if (status->status.PE) {
183  std::cout << "Free space = " << oldFreeSpace << "->" << _651freeSpace << "?" << status->status.spaceAvailable << " [" << _freeSpaceMax << "]";
184  std::cout << " (" << status->status.PP << status->status.PE << status->status.PF << "), ";
185  std::cout << "@ time = " << status->v49.timeSeconds << " " << status->v49.timeFracSecMSB << " " << status->v49.timeFracSecLSB << std::endl;
186  }
187  }
188  if (status->status.emptyFlag||status->status.underrunFlag||status->status.overrunFlag||status->status.packetLossFlag) {
189  //~ if (status->status.emptyFlag||status->status.underrunFlag||status->status.overrunFlag) {
190  std::cerr << "<" << status->v49.streamId << "@" << status->v49.timeSeconds << "." << status->v49.timeFracSecLSB << ":";
191  if (status->status.PP) {
192  std::cerr << "P";
193  }
194  if (status->status.PE) {
195  std::cerr << "E";
196  }
197  if (status->status.PF) {
198  std::cerr << "F";
199  }
200  std::cerr << "(" << ( status->status.spaceAvailable-67108862 ) << ")";
201  if (status->status.emptyFlag) {
202  std::cerr << "_e";
203  }
204  if (status->status.underrunFlag) {
205  std::cerr << "_u" << status->status.underrunCount;
206  }
207  if (status->status.overrunFlag) {
208  std::cerr << "_o" << status->status.overrunCount;
209  }
210  if (status->status.packetLossFlag) {
211  std::cerr << "_p" << status->status.packetLossCount;
212  }
213  std::cerr << "> " << std::endl;
214  }
215  //~ else if (status->status.PF) {
216  //~ std::cout << "\tFree space notification = " << status->status.spaceAvailable << ", current = " << _651freeSpace;
217  //~ std::cout << "@ time = " << status->v49.timeSeconds << " " << status->v49.timeFracSecMSB << " " << status->v49.timeFracSecLSB << std::endl;
218  //~ }
219  }
220  } else {
221  timeoutCount += 1;
222  //~ this->debug("Timeout\n");
223  //~ usleep(1000);
224  }
225  this->sleep(2e-6);
226  }
227  }
228 
229  void StatusReceiver::blockUntilAvailable(unsigned int numSamples)
230  {
231  boost::mutex::scoped_lock lock(this->objectAccessMutex);
232  while (!this->okToSend(numSamples, false))
233  {
234  this->waitCondition.wait(lock);
235  }
236  }
237 
238  bool StatusReceiver::_setFreeSpace(int updateFromRadio, bool flagPP, bool flagPE, bool flagPF) {
239 
240  boost::mutex::scoped_lock lock(this->objectAccessMutex);
241  bool updated = false;
242  if (((!_updatePE)&&flagPP)||(_updatePE&&flagPE)) {
243  _651freeSpace = std::min( _freeSpaceMax, updateFromRadio - RADIO_BUFFER_RESERVE );
244  updated = true;
245  }
246  this->waitCondition.notify_one();
247  return updated;
248  }
249 
250  // I removed locking from this function, because blockUntilAvailable should handle it for us --JVM
251  bool StatusReceiver::okToSend(long int numSamples, bool lockIfOk) {
252  bool ok = _651freeSpace>=numSamples;
253  return ok;
254  }
255 
257  boost::mutex::scoped_lock lock(this->objectAccessMutex);
258  return _651freeSpace;
259  }
260 
261  bool StatusReceiver::sentNSamples(long int samplesSent) {
262  boost::mutex::scoped_lock lock(this->objectAccessMutex);
263  _651freeSpace -= samplesSent;
264  return _651freeSpace>0;
265  }
266 
267  } /* namespace NDR651 */
268 
269 } /* namespace CyberRadio */
uint32_t timeSeconds
Timestamp integer field.
Definition: PacketTypes.h:65
virtual bool isRunning() const
Determines if the thread is running or not.
Definition: Thread.cpp:72
uint32_t underrunCount
Underrun count.
Definition: PacketTypes.h:107
uint32_t timeFracSecMSB
Timestamp fractional field, MSW.
Definition: PacketTypes.h:66
virtual void interrupt()
Interrupts (stops) the thread.
Definition: Thread.cpp:60
bool okToSend(long int pendingSamples, bool lockIfOk)
Determines if it is OK to send data.
uint32_t packetLossCount
Packet loss count.
Definition: PacketTypes.h:109
StatusReceiver(std::string ifname, unsigned int port, bool debug, bool updatePE)
Constructs a StatusReceiver object.
uint32_t overrunFlag
Overrun flag.
Definition: PacketTypes.h:106
uint32_t underrunFlag
Underrun flag.
Definition: PacketTypes.h:108
virtual void run()
Executes the main processing loop for the thread.
Class that supports debug output.
Definition: Debuggable.h:38
bool setStatusPort(unsigned int port)
Sets the UDP port.
bool setStatusInterface(std::string ifname)
Sets the interface name.
uint32_t overrunCount
Overrun count.
Definition: PacketTypes.h:105
bool sentNSamples(long int samplesSent)
Updates status based on the number of samples sent.
uint32_t packetLossFlag
Packet loss flag.
Definition: PacketTypes.h:110
virtual ~StatusReceiver()
Destroys a StatusReceiver object.
uint32_t spaceAvailable
Space available.
Definition: PacketTypes.h:103
struct Vita49Header v49
VITA 49 frame header.
Definition: PacketTypes.h:125
virtual int debug(const char *format,...)
Outputs debug information.
Definition: Debuggable.cpp:95
Defines functionality for LibCyberRadio applications.
Definition: App.h:23
Transmit status frame information.
Definition: PacketTypes.h:124
uint32_t PE
Emptying trigger.
Definition: PacketTypes.h:116
long int getFreeSpace(void)
Gets the amount of free space available.
Base class for a thread object, based on Boost Threads.
Definition: Thread.h:48
virtual void sleep(double secs)
Pauses thread execution for a given time, checking for user interrupts during that time...
Definition: Thread.cpp:65
uint32_t PF
Filling trigger.
Definition: PacketTypes.h:115
uint32_t timeFracSecLSB
Timestamp fractional field, LSW.
Definition: PacketTypes.h:67
struct TxStatusPayload status
Transmit status information.
Definition: PacketTypes.h:126
uint32_t PP
Periodic notification.
Definition: PacketTypes.h:117