11 #include "LibCyberRadio/NDR651/FlowControlClient.h" 12 #include "boost/format.hpp" 13 #include <boost/algorithm/string.hpp> 18 #define BOOL_DEBUG(x) (x ? "true" : "false") 27 FlowControlClient::FlowControlClient(
unsigned int ducChannel,
29 unsigned int updatesPerSecond,
31 Thread(
"FlowControlClient",
"FlowControlClient"),
33 _config_tx(config_tx),
47 this->
debug(
"construction\n");
49 _tbsQuery = std::string(
"\n");
51 _multiChannel =
false;
54 _statusSockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
62 this->
debug(
"destruction\n");
69 this->
debug(
"Goodbye.\n");
72 void FlowControlClient::_initStatusFrame() {
76 _statusFrame.
v49.
C = 1;
83 void FlowControlClient::_initStatusAddress() {
84 _statusDestLen =
sizeof(_statusDestAddr);
85 memset(&_statusDestAddr, 0, _statusDestLen);
86 _statusDestAddr.sin_family = AF_INET;
87 _statusDestAddr.sin_addr.s_addr = htonl( INADDR_LOOPBACK );
88 _statusDestAddr.sin_port = htons(_ducStreamId);
90 memset(&_statusCopyAddr, 0, _statusDestLen);
91 _statusCopyAddr.sin_family = AF_INET;
92 _statusCopyAddr.sin_addr.s_addr = htonl( INADDR_LOOPBACK );
93 _statusCopyAddr.sin_port = htons(0xfff0);
96 void FlowControlClient::_sendStatusFrame() {
107 if (_statusSockfd>=0) {
108 sendto(_statusSockfd, (
char *) &_statusFrame,
sizeof(TxStatusFrame), 0, (
struct sockaddr *) &_statusDestAddr, _statusDestLen);
109 sendto(_statusSockfd, (
char *) &_statusFrame,
sizeof(TxStatusFrame), 0, (
struct sockaddr *) &_statusCopyAddr, _statusDestLen);
111 std::cerr <<
"No socket to send status?" << std::endl;
116 this->
debug(
"entering run loop\n");
120 if ( (_mySocket != NULL) && _mySocket->
isConnected())
125 this->
sleep( ((
double)_fcUpdateDelay)/1e6 );
127 this->
debug(
"exiting run loop\n");
136 this->
debug(
"connecting\n");
137 _radioHostname = hostname;
138 _radioTcpPort = port;
139 if ( _mySocket != NULL )
145 if ( _mySocket != NULL )
152 ret = (_mySocket != NULL) && _mySocket->
isConnected();
153 this->
debug(
"connect = %s\n", BOOL_DEBUG(ret));
158 _fcUpdateRate = updatesPerSecond<=20?updatesPerSecond:20;
159 _fcUpdateDelay = 1000000/_fcUpdateRate;
160 _samplesPerUpdate = _getDucSampleRate() / _fcUpdateRate;
161 return _fcUpdateRate;
165 this->
debug(
"setting duc channel = %d\n", ducChannel);
166 _ducChannel = ducChannel;
167 _tbsQuery = (boost::format(
"TBS? %d\n") % _ducChannel).str();
168 this->
debug(
"duc channel ok\n");
172 this->
debug(
"disconnecting\n");
183 std::string cmd = (boost::format(
"#MAC? %d\n") % tenGbeIndex).str();
185 BasicStringList::iterator rspIter;
186 if ( _mySocket != NULL )
193 for (rspIter=rspVec.begin(); rspIter!=rspVec.end(); rspIter++) {
194 if (strstr((*rspIter).c_str(),
"#MAC ")!=NULL) {
196 split(splitRes, *rspIter, is_any_of(
", "));
197 dmac = splitRes.back();
211 std::string cmd = (boost::format(
"SIP? %d\n") % tenGbeIndex).str();
213 BasicStringList::iterator rspIter;
214 if ( _mySocket != NULL )
221 for (rspIter=rspVec.begin(); rspIter!=rspVec.end(); rspIter++) {
222 if (strstr((*rspIter).c_str(),
"SIP ")!=NULL) {
224 split(splitRes, *rspIter, is_any_of(
", "));
225 dip = splitRes.back();
239 std::string cmd = (boost::format(
"DUCHS %d, 0, 0, 0, 0, 0, 0, 0\n") % _ducChannel).str();
240 std::string qry = (boost::format(
"DUCHS? %d\n") % _ducChannel ).str();
241 _sendCmdAndQry(cmd, qry);
246 unsigned int txChannel,
247 unsigned int streamId,
248 unsigned int tenGbeIndex,
252 unsigned int txAtten,
268 return applySetting?_sendTxf():
false;
277 _txAttenuation = txAttenuation;
278 return applySetting?_sendTxa():
false;
283 _ducTenGbePort = ducTenGbePort;
284 return applySetting?_sendDuc():
false;
288 _ducEnable = ducEnable;
289 return applySetting?_sendDuc():
false;
299 bool cmdError =
false;
300 _ducTxChannel = txChannel;
302 cmdError |= _sendTxp();
303 cmdError |= _sendTxf();
304 cmdError |= _sendTxa();
305 cmdError |= _sendDuc();
311 _ducRateIndex = rateIndex;
312 _samplesPerUpdate = _getDucSampleRate() / _fcUpdateRate;
313 return applySetting?_sendDuc():
false;
317 _ducStreamId = streamId;
318 _statusFrame.v49.streamId = _ducStreamId;
319 _initStatusAddress();
320 return applySetting?_sendDuc():
false;
324 _ducAttenuation = attenuation;
325 return applySetting?_sendWba():
false;
330 return applySetting?_sendWbf():
false;
334 _txinvMode = txinvMode;
335 return applySetting?_sendTxinv():
false;
340 bool ok = _651freeSpace>numSamples;
341 if (!(ok&&lockIfOk)) {
350 boost::mutex::scoped_lock lock(_fcMutex);
351 return _651freeSpace;
358 _651freeSpace -= samplesSent;
360 return _651freeSpace>TX_BUFFER_MIN_SIZE;
363 bool FlowControlClient::_sendCmdAndQry(
const std::string& cmd,
const std::string& qry) {
364 bool cmdError =
false;
371 this->
debug(
"cmd = \"%s\" rsp = \"%s\" err = %s\n",
372 cmd.c_str(), _rspVec[0].c_str(),
373 BOOL_DEBUG(cmdError));
377 bool FlowControlClient::_sendShf() {
378 bool cmdError =
false;
379 if (_config_tx&&(_shfMode>=0)) {
380 for (
int chan=1; chan<=2; chan++) {
381 if ((_ducTxChannel&chan)>0)
383 this->
debug(
"_sendShf: txf=%f and shfMode=%d\n", _txFreq, _shfMode);
385 BasicStringList::iterator rspIter;
387 std::string cmd, qry;
388 cmd = (boost::format(
"SHF %d, 1, %d\n") % chan %_shfMode ).str();
389 qry = (boost::format(
"SHF? %d, 1\n") % chan ).str();
392 for (rspIter = _rspVec.begin();
393 rspIter != _rspVec.end();
396 if (strstr((*rspIter).c_str(),
"SHF ") != NULL)
400 split(splitRes, *rspIter, is_any_of(
", "));
403 if ( splitRes.size() >= 2 )
405 for (
int ii=0; ii<splitRes.size(); ii++) {
406 std::cout <<
"splitRes.at(" << ii <<
") = " << splitRes.at(ii).c_str() << std::endl;
408 currentShfMode = strtol( splitRes.at(5).c_str(), NULL, 10 );
409 this->
debug(
"currentShfMode = %d\n", currentShfMode);
410 if (currentShfMode!=_shfMode)
412 cmdError = _sendCmdAndQry(cmd, qry);
414 std::cout <<
"NOT sending SHF" << std::endl;
426 bool FlowControlClient::_sendTxf() {
427 bool cmdError =
false;
428 bool shfChange =
false;
430 std::string cmd, qry;
431 if (_ducTxChannel!=0) {
432 _shfMode = (_txFreq<200.0)?1:0;
445 for (
int chan=1; chan<=2; chan++) {
446 if ((_ducTxChannel&chan)>0) {
447 cmd = (boost::format(
"TXF %d, %f\n") % chan % _txFreq ).str();
448 qry = (boost::format(
"TXF? %d\n") % chan ).str();
449 cmdError |= _sendCmdAndQry(cmd, qry);
461 bool FlowControlClient::_sendTxa() {
462 bool cmdError =
false;
464 std::string cmd, qry;
465 for (
int chan=1; chan<=2; chan++) {
466 if ((_ducTxChannel&chan)>0) {
467 cmd = (boost::format(
"TXA %d, %d\n") % chan % _txAttenuation ).str();
468 qry = (boost::format(
"TXA? %d\n") % chan ).str();
469 cmdError |= _sendCmdAndQry(cmd, qry);
476 bool FlowControlClient::_sendTxp() {
477 bool cmdError =
false;
479 std::string cmd, qry;
480 BasicStringList::iterator rspIter;
482 unsigned int channel, txstate;
483 for (
int chan=1; chan<=2; chan++) {
484 if ((_ducTxChannel&chan)>0) {
485 cmd = (boost::format(
"TXP %d, 1\n") % chan ).str();
486 qry = (boost::format(
"TXP? %d\n") % chan ).str();
489 for (rspIter = _rspVec.begin();
490 rspIter != _rspVec.end();
493 if (strstr((*rspIter).c_str(),
"TXP ") != NULL)
497 split(splitRes, *rspIter, is_any_of(
", "));
500 if ( splitRes.size() >= 2 )
502 channel = strtol( splitRes.at(1).c_str(), NULL, 10 );
503 txstate = strtol( splitRes.at(3).c_str(), NULL, 10 );
504 if ((channel==chan) && (txstate==0))
506 cmdError = _sendCmdAndQry(cmd, qry);
518 bool FlowControlClient::_sendTxinv() {
519 bool cmdError =
false;
521 std::string cmd, qry;
522 cmd = (boost::format(
"TXINV %d, %d\n") % _ducChannel % _txinvMode ).str();
523 qry = (boost::format(
"TXINV? %d\n") % _ducChannel ).str();
524 cmdError |= _sendCmdAndQry(cmd, qry);
529 bool FlowControlClient::_sendWbduc() {
530 std::string cmd = (boost::format(
"WBDUC %d, %d, %d, %f, %d, %d, 0, %d\n") % _ducChannel % _ducTenGbePort % _ducFreq % _ducAttenuation % _ducRateIndex % (_ducEnable ? _ducTxChannel : 0) % _ducStreamId ).str();
531 std::string qry = (boost::format(
"WBDUC? %d\n") % _ducChannel ).str();
532 return _sendCmdAndQry(cmd, qry);
535 bool FlowControlClient::_sendDucp() {
536 std::string cmd = (boost::format(
"DUCP %d, %d\n") % _ducChannel % (_ducEnable ? 1 : 0) ).str();
537 std::string qry = (boost::format(
"DUCP? %d\n") % _ducChannel ).str();
538 return _sendCmdAndQry(cmd, qry);
541 bool FlowControlClient::_sendDuc() {
542 std::string cmd = (boost::format(
"DUC %d, %d, %d, %f, %d, %d, %d, %d\n")
548 % (_ducEnable ? _ducTxChannel : 0)
549 % (_ducEnable ? 0 : 1)
550 % _ducStreamId ).str();
551 std::string qry = (boost::format(
"DUC? %d\n") % _ducChannel ).str();
552 return _sendCmdAndQry(cmd, qry);
555 bool FlowControlClient::_sendWba() {
556 std::string cmd = (boost::format(
"DUCA %d, %f\n") % _ducChannel % _ducAttenuation ).str();
557 std::string qry = (boost::format(
"DUCA? %d\n") % _ducChannel ).str();
558 return _sendCmdAndQry(cmd, qry);
561 bool FlowControlClient::_sendWbf() {
562 std::string cmd = (boost::format(
"DUCF %d, %d\n") % _ducChannel % _ducFreq ).str();
563 std::string qry = (boost::format(
"DUCF? %d\n") % _ducChannel ).str();
564 return _sendCmdAndQry(cmd, qry);
567 bool FlowControlClient::_sendDip() {
571 bool FlowControlClient::_sendDuchs() {
575 bool FlowControlClient::unpause() {
579 bool FlowControlClient::setDucDipStatusEntry(
int dipIndex, std::string dip, std::string dmac,
unsigned int ducStatusPort) {
581 _ducDipIndex = 32-_ducChannel;
583 _ducDipIndex = dipIndex;
585 std::cout <<
"dIP = " << dip <<
" & dMAC = " << dmac << std::endl;
586 std::string cmd = (boost::format(
"DIP %d, %d, %s, %s, %d, %d\n") % _ducTenGbePort % _ducDipIndex % dip % dmac % ducStatusPort % ducStatusPort ).str();
587 std::string qry = (boost::format(
"DIP? %d, %d\n") % _ducTenGbePort % _ducDipIndex ).str();
588 _sendCmdAndQry(cmd, qry);
592 bool FlowControlClient::setDuchsParameters(
unsigned int duchsFullThresh,
unsigned int duchsEmptyThresh,
unsigned int duchsPeriod) {
593 _duchsFullThresh = duchsFullThresh;
594 _duchsEmptyThresh = duchsEmptyThresh;
595 _duchsPeriod = duchsPeriod;
596 std::string cmd = (boost::format(
"DUCHS %d, %d, %d, %d, %d, %d, 0, %d\n") % _ducChannel % _ducTenGbePort % _duchsFullThresh % _duchsEmptyThresh % _duchsPeriod % _ducDipIndex % _ducStreamId ).str();
597 std::string qry = (boost::format(
"DUCHS? %d\n") % _ducChannel ).str();
598 return _sendCmdAndQry(cmd, qry);
601 bool FlowControlClient::_clearDucSettingsInRadio() {
603 std::string cmd = (boost::format(
"DUC %d, 0, 0, 0, 0, 0, 0, 0\n") % _ducChannel).str();
604 std::string qry = (boost::format(
"DUC? %d\n") % _ducChannel ).str();
605 rv = rv||_sendCmdAndQry(cmd, qry);
607 cmd = (boost::format(
"DUCHS %d, 0, 0, 0, 0, 0, 0, 0\n") % _ducChannel).str();
608 qry = (boost::format(
"DUCHS? %d\n") % _ducChannel ).str();
609 rv = rv||_sendCmdAndQry(cmd, qry);
615 void FlowControlClient::_queryBufferState()
618 BasicStringList::iterator rspIter;
620 if ( (_mySocket != NULL) &&
623 _651freeSpaceLast = _651freeSpace;
624 for (rspIter = rspVec.begin();
625 rspIter != rspVec.end();
628 if (strstr((*rspIter).c_str(),
"TBS ") != NULL)
632 split(splitRes, *rspIter, is_any_of(
", "));
635 if ( splitRes.size() >= 16 )
641 _tbsChannel = strtol( splitRes.at(1).c_str(), NULL, 10 );
642 if (_tbsChannel == _ducChannel)
644 _tbsEmptyFlag = strtol( splitRes.at(3).c_str(), NULL, 10 );
645 _tbsFullFlag = strtol( splitRes.at(5).c_str(), NULL, 10 );
646 _tbsSpace = strtol( splitRes.at(7).c_str(), NULL, 10 );
647 _tbsUnderrunFlag = strtol( splitRes.at(9).c_str(), NULL, 10 );
648 _tbsUnderrunCount = strtol( splitRes.at(11).c_str(), NULL, 10 );
649 _tbsOverrunFlag = strtol( splitRes.at(13).c_str(), NULL, 10 );
650 _tbsOverrunCount = strtol( splitRes.at(15).c_str(), NULL, 10 );
652 _651freeSpace = _tbsSpace;
667 _firstUpdate =
false;
671 std::cerr <<
"FC Query Error." << std::endl;
676 void FlowControlClient::_queryUtc() {
678 BasicStringList::iterator rspIter;
679 if ( (_mySocket != NULL) &&
681 for (rspIter=rspVec.begin(); rspIter!=rspVec.end(); rspIter++) {
682 if (strstr((*rspIter).c_str(),
"UTC ")!=NULL) {
683 split(splitRes, *rspIter, is_any_of(
", "));
684 _utc = strtol( splitRes.back().c_str(), NULL, 10 );
685 _statusFrame.v49.timeSeconds = _utc;
692 void FlowControlClient::_queryStatus() {
694 BasicStringList::iterator rspIter;
695 if ( (_mySocket != NULL) ) {
713 long FlowControlClient::_getDucSampleRate(
void)
const 716 if ( _ducRateIndex == 16 )
719 ret = (long)(102.4e6 / pow(2, _ducRateIndex));
uint32_t emptyFlag
Empty flag.
virtual bool isRunning() const
Determines if the thread is running or not.
uint32_t underrunCount
Underrun count.
virtual void interrupt()
Interrupts (stops) the thread.
std::string getRadioIp(unsigned int tenGbeIndex)
Gets the IP address of a 10GigE port on the radio.
bool setTxFrequency(double txFreq, bool applySetting=true)
Sets the transmitter frequency.
bool connectToServer(void)
Connects to the server.
bool disconnect(void)
Disconnects from the server.
bool sendCmdAndGetRsp(const std::string &cmd, BasicStringList &rsp, float timeout, bool print)
Sends a command to the server and gets the response.
virtual ~FlowControlClient()
Destroys a FlowControlClient object.
uint32_t overrunFlag
Overrun flag.
bool isConnected(void)
Gets whether or not the socket is connected.
bool enableDuc(unsigned int rateIndex, unsigned int txChannel, unsigned int streamId, unsigned int tenGbeIndex, float attenuation, double txFreq, long ducFreq, unsigned int txAtten, bool ducEnable=true)
Enables the DUC.
uint32_t underrunFlag
Underrun flag.
bool setDucRateIndex(unsigned int rateIndex, bool applySetting=true)
Sets the DUC rate index.
bool setDucStreamId(unsigned int streamId, bool applySetting=true)
Sets the Stream ID.
Class that supports debug output.
bool setTxAttenuation(unsigned int txAttenuation, bool applySetting=true)
Sets the transmitter attenuation.
uint32_t overrunCount
Overrun count.
std::string getRadioMac(unsigned int tenGbeIndex)
Gets the MAC address of a 10GigE port on the radio.
uint32_t spaceAvailable
Space available.
bool setDucTxChannel(unsigned int txChannel, bool applySetting=true)
Sets the DUC transmitter bitmap.
struct Vita49Header v49
VITA 49 frame header.
bool okToSend(long int pendingSamples, bool lockIfOk)
Determines if it is OK to send data.
bool disconnect(void)
Disconnects the flow control client.
virtual int debug(const char *format,...)
Outputs debug information.
bool setDucEnable(bool ducEnable, bool applySetting=true)
Sets whether or not the DUC is enabled.
BASIC_LIST_CONTAINER< std::string > BasicStringList
Type representing a list of strings.
bool setDucFrequency(long ducFreq, bool applySetting=true)
Sets the DUC frequency.
void testQueries(void)
Tests the flow control queries.
Defines functionality for LibCyberRadio applications.
virtual void run()
Executes the main processing loop for the thread.
bool connectToRadio(const std::string &hostname, unsigned int port)
Connects to the radio.
uint32_t frameEnd
Frame end word (ASCII string "VEND")
bool setDucAttenuation(float attenuation, bool applySetting=true)
Sets the DUC attenuation.
Transmit status frame information.
uint32_t fullFlag
Full flag.
long int getFreeSpace(void)
Gets the amount of free space available.
struct Vita49Trailer vend
VITA 49 frame trailer.
bool setDucTxinvMode(unsigned int txinvMode, bool applySetting=true)
Sets the DUC TX Inversion Mode.
bool sentNSamples(long int samplesSent)
Updates status based on the number of samples sent.
void setDucChannel(unsigned int ducChannel)
Sets the DUC channel number.
Base class for a thread object, based on Boost Threads.
void update(void)
Updates the flow controller.
virtual void sleep(double secs)
Pauses thread execution for a given time, checking for user interrupts during that time...
unsigned int setUpdateRate(unsigned int updatesPerSecond)
Sets the update rate.
bool setDucTenGbePort(unsigned int ducTenGbePort, bool applySetting=true)
Sets the 10GigE port used by the DUC.
struct TxStatusPayload status
Transmit status information.
bool disableDuc()
Disables the DUC.