13 #include <sys/socket.h> 14 #include <netinet/in.h> 15 #include <arpa/inet.h> 16 #include "LibCyberRadio/NDR651/PacketTypes.h" 17 #include "LibCyberRadio/NDR651/StatusReceiver.h" 18 #include <sys/types.h> 19 #include <sys/ioctl.h> 39 _freeSpaceMax(MAX_RADIO_BUFFSIZE - RADIO_BUFFER_RESERVE),
43 bzero(&_rxbuff, MAX_RX_SIZE);
50 this->
debug(
"Interrupting\n");
56 this->
debug(
"Closing socket\n");
60 this->
debug(
"Goodbye!\n");
63 bool StatusReceiver::_makeSocket(
void) {
65 struct sockaddr_in serveraddr;
66 char ip_addr_string[INET_ADDRSTRLEN];
74 if ((_sockfd<0)&&(_port>0)) {
75 _sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
77 std::cerr <<
"Error opening socket" << std::endl;
82 setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR,
83 (
const void *)&optval ,
sizeof(
int));
92 memset((
char *) &serveraddr, 0,
sizeof(serveraddr));
93 serveraddr.sin_family = AF_INET;
94 serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
95 inet_ntop(AF_INET, &(serveraddr.sin_addr), ip_addr_string, INET_ADDRSTRLEN);
96 serveraddr.sin_port = htons((
unsigned short)_port);
100 if (bind(_sockfd, (
struct sockaddr *) &serveraddr,
sizeof(serveraddr)) < 0) {
101 std::cerr <<
"ERROR on binding socket" << std::endl;
105 std::cerr <<
"Status socket bound" << std::endl;
108 FD_SET(_sockfd, &
set);
119 if (makeSocketFlag) {
120 return _makeSocket();
133 if (makeSocketFlag) {
134 return _makeSocket();
140 bool StatusReceiver::setUpdatePE(
bool updatePE) {
141 boost::mutex::scoped_lock lock(this->objectAccessMutex);
142 std::cerr << std::endl << std::endl <<
"setUpdatePE(" << updatePE <<
")" << std::endl << std::endl;
143 _updatePE = updatePE;
144 return _updatePE==updatePE;
147 int StatusReceiver::setMaxFreeSpace(
float fs,
float maxLatency) {
148 boost::mutex::scoped_lock lock(this->objectAccessMutex);
149 int maxSamplesLatency = (int)std::floor(maxLatency*fs);
151 _freeSpaceMax = std::min( MAX_RADIO_BUFFSIZE-RADIO_BUFFER_RESERVE, maxSamplesLatency );
152 return _freeSpaceMax;
156 std::cout <<
"StatusReceiver::run() " << _651freeSpace << std::endl;
157 struct timespec spec;
159 struct sockaddr_in clientaddr;
160 socklen_t clientlen =
sizeof(clientaddr);
163 long int oldFreeSpace = 0;
164 while(this->
isRunning() && (!_shutdown)) {
165 tout.tv_sec = (
long int) 0;
166 tout.tv_usec = (
long int) 500000;
168 FD_SET(_sockfd, &
set);
169 if (select(FD_SETSIZE, &
set, NULL, NULL, &tout)>0) {
171 numBytesRx = recvfrom(_sockfd, _rxbuff, MAX_RX_SIZE, 0, (
struct sockaddr *) &clientaddr, &clientlen);
175 oldFreeSpace = _651freeSpace;
179 std::cout << std::endl;
183 std::cout <<
"Free space = " << oldFreeSpace <<
"->" << _651freeSpace <<
"?" << status->
status.
spaceAvailable <<
" [" << _freeSpaceMax <<
"]";
213 std::cerr <<
"> " << std::endl;
229 void StatusReceiver::blockUntilAvailable(
unsigned int numSamples)
231 boost::mutex::scoped_lock lock(this->objectAccessMutex);
232 while (!this->
okToSend(numSamples,
false))
234 this->waitCondition.wait(lock);
238 bool StatusReceiver::_setFreeSpace(
int updateFromRadio,
bool flagPP,
bool flagPE,
bool flagPF) {
240 boost::mutex::scoped_lock lock(this->objectAccessMutex);
241 bool updated =
false;
242 if (((!_updatePE)&&flagPP)||(_updatePE&&flagPE)) {
243 _651freeSpace = std::min( _freeSpaceMax, updateFromRadio - RADIO_BUFFER_RESERVE );
246 this->waitCondition.notify_one();
252 bool ok = _651freeSpace>=numSamples;
257 boost::mutex::scoped_lock lock(this->objectAccessMutex);
258 return _651freeSpace;
262 boost::mutex::scoped_lock lock(this->objectAccessMutex);
263 _651freeSpace -= samplesSent;
264 return _651freeSpace>0;
uint32_t emptyFlag
Empty flag.
virtual bool isRunning() const
Determines if the thread is running or not.
uint32_t underrunCount
Underrun count.
virtual void interrupt()
Interrupts (stops) the thread.
bool okToSend(long int pendingSamples, bool lockIfOk)
Determines if it is OK to send data.
uint32_t packetLossCount
Packet loss count.
StatusReceiver(std::string ifname, unsigned int port, bool debug, bool updatePE)
Constructs a StatusReceiver object.
uint32_t overrunFlag
Overrun flag.
uint32_t underrunFlag
Underrun flag.
virtual void run()
Executes the main processing loop for the thread.
Class that supports debug output.
bool setStatusPort(unsigned int port)
Sets the UDP port.
bool setStatusInterface(std::string ifname)
Sets the interface name.
uint32_t overrunCount
Overrun count.
bool sentNSamples(long int samplesSent)
Updates status based on the number of samples sent.
uint32_t packetLossFlag
Packet loss flag.
virtual ~StatusReceiver()
Destroys a StatusReceiver object.
uint32_t spaceAvailable
Space available.
struct Vita49Header v49
VITA 49 frame header.
virtual int debug(const char *format,...)
Outputs debug information.
Defines functionality for LibCyberRadio applications.
Transmit status frame information.
uint32_t PE
Emptying trigger.
long int getFreeSpace(void)
Gets the amount of free space available.
Base class for a thread object, based on Boost Threads.
virtual void sleep(double secs)
Pauses thread execution for a given time, checking for user interrupts during that time...
uint32_t PF
Filling trigger.
struct TxStatusPayload status
Transmit status information.
uint32_t PP
Periodic notification.