libcyberradio  22.01.24
FlowControlClient.cpp
1 /***************************************************************************
2  * \file FlowControlClient.cpp
3  *
4  * \brief NDR651 flow control client.
5  *
6  * \author NH
7  * \copyright Copyright (c) 2015-2021 CyberRadio Solutions, Inc.
8  *
9  */
10 
11 #include "LibCyberRadio/NDR651/FlowControlClient.h"
12 #include "boost/format.hpp"
13 #include <boost/algorithm/string.hpp>
14 #include <math.h>
15 #include <stdlib.h>
16 #include <stdarg.h>
17 
18 #define BOOL_DEBUG(x) (x ? "true" : "false")
19 
20 using namespace boost::algorithm;
21 
22 
23 namespace LibCyberRadio
24 {
25  namespace NDR651
26  {
27  FlowControlClient::FlowControlClient(unsigned int ducChannel,
28  bool config_tx,
29  unsigned int updatesPerSecond,
30  bool debug) :
31  Thread("FlowControlClient", "FlowControlClient"),
32  Debuggable(debug, "FlowControlClient"),
33  _config_tx(config_tx),
34  _ducEnable(false),
35  _ducChannel(0),
36  _ducRateIndex(0),
37  _ducStreamId(0),
38  _ducTxChannel(0),
39  _ducTenGbePort(0),
40  _ducAttenuation(0),
41  _txAttenuation(0),
42  _txFreq(0.0),
43  _shfMode(-1),
44  _ducFreq(0),
45  _statusSockfd(-1)
46  {
47  this->debug("construction\n");
48  _firstUpdate = true;
49  _tbsQuery = std::string("\n");
50  _651freeSpace = 0;
51  _multiChannel = false;
52  _mySocket = NULL;
53  _initStatusFrame();
54  _statusSockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
55  _initStatusAddress();
56  setDucChannel( ducChannel );
57  setUpdateRate( updatesPerSecond );
58  //~ _clearDucSettingsInRadio();
59  }
60 
62  this->debug("destruction\n");
63  if (_mySocket->isConnected()) {
64  _mySocket->disconnect();
65  }
66  if (this->isRunning()) {
67  this->interrupt();
68  }
69  this->debug("Goodbye.\n");
70  }
71 
72  void FlowControlClient::_initStatusFrame() {
73  memset(&_statusFrame, 0, sizeof(TxStatusFrame));
74  _statusFrame.v49.frameStart = VRLP;
75  _statusFrame.v49.frameSize = 14;
76  _statusFrame.v49.C = 1;
77  _statusFrame.v49.packetSize = 11;
78  _statusFrame.v49.TSI = 1;
79  _statusFrame.v49.TSF = 0;
80  _statusFrame.vend.frameEnd = VEND;
81  }
82 
83  void FlowControlClient::_initStatusAddress() {
84  _statusDestLen = sizeof(_statusDestAddr);
85  memset(&_statusDestAddr, 0, _statusDestLen);
86  _statusDestAddr.sin_family = AF_INET;
87  _statusDestAddr.sin_addr.s_addr = htonl( INADDR_LOOPBACK );
88  _statusDestAddr.sin_port = htons(_ducStreamId);
89 
90  memset(&_statusCopyAddr, 0, _statusDestLen);
91  _statusCopyAddr.sin_family = AF_INET;
92  _statusCopyAddr.sin_addr.s_addr = htonl( INADDR_LOOPBACK );
93  _statusCopyAddr.sin_port = htons(0xfff0);
94  }
95 
96  void FlowControlClient::_sendStatusFrame() {
97  _statusFrame.status.emptyFlag = _tbsEmptyFlag;
98  _statusFrame.status.fullFlag = _tbsFullFlag;
99  _statusFrame.status.spaceAvailable = _tbsSpace;
100  _statusFrame.status.underrunFlag = _tbsUnderrunFlag;
101  _statusFrame.status.underrunCount = _tbsUnderrunCount;
102  _statusFrame.status.overrunFlag = _tbsOverrunFlag;
103  _statusFrame.status.overrunCount = _tbsOverrunCount;
104  _statusFrame.v49.timeSeconds = _utc;
105  _statusFrame.v49.frameCount = (_statusFrame.v49.frameCount+1)%4096;
106  _statusFrame.v49.packetCount = (_statusFrame.v49.packetCount+1)%4096;
107  if (_statusSockfd>=0) {
108  sendto(_statusSockfd, (char *) &_statusFrame, sizeof(TxStatusFrame), 0, (struct sockaddr *) &_statusDestAddr, _statusDestLen);
109  sendto(_statusSockfd, (char *) &_statusFrame, sizeof(TxStatusFrame), 0, (struct sockaddr *) &_statusCopyAddr, _statusDestLen);
110  } else {
111  std::cerr << "No socket to send status?" << std::endl;
112  }
113  }
114 
116  this->debug("entering run loop\n");
117  _firstUpdate = true;
118  while(true)
119  {
120  if ( (_mySocket != NULL) && _mySocket->isConnected())
121  {
122  _queryUtc();
123  update();
124  }
125  this->sleep( ((double)_fcUpdateDelay)/1e6 );
126  }
127  this->debug("exiting run loop\n");
128  }
129 
131  _queryBufferState();
132  }
133 
134  bool FlowControlClient::connectToRadio(const std::string& hostname, unsigned int port) {
135  bool ret;
136  this->debug("connecting\n");
137  _radioHostname = hostname;
138  _radioTcpPort = port;
139  if ( _mySocket != NULL )
140  {
141  delete _mySocket;
142  _mySocket = NULL;
143  }
144  _mySocket = new ClientSocket(hostname, port, _debug);
145  if ( _mySocket != NULL )
146  {
147  _mySocket->connectToServer();
148  if (_mySocket->isConnected()) {
149  disableDuc();
150  }
151  }
152  ret = (_mySocket != NULL) && _mySocket->isConnected();
153  this->debug("connect = %s\n", BOOL_DEBUG(ret));
154  return ret;
155  }
156 
157  unsigned int FlowControlClient::setUpdateRate(unsigned int updatesPerSecond) {
158  _fcUpdateRate = updatesPerSecond<=20?updatesPerSecond:20;
159  _fcUpdateDelay = 1000000/_fcUpdateRate;
160  _samplesPerUpdate = _getDucSampleRate() / _fcUpdateRate;
161  return _fcUpdateRate;
162  }
163 
164  void FlowControlClient::setDucChannel(unsigned int ducChannel) {
165  this->debug("setting duc channel = %d\n", ducChannel);
166  _ducChannel = ducChannel;
167  _tbsQuery = (boost::format("TBS? %d\n") % _ducChannel).str();
168  this->debug("duc channel ok\n");
169  }
170 
172  this->debug("disconnecting\n");
173  return _mySocket->disconnect();
174  }
175 
177  _queryBufferState();
178  }
179 
180  std::string FlowControlClient::getRadioMac(unsigned int tenGbeIndex) {
181  //this->debug("query radio mac\n");
182  std::string dmac;
183  std::string cmd = (boost::format("#MAC? %d\n") % tenGbeIndex).str();
184  BasicStringList rspVec, splitRes;
185  BasicStringList::iterator rspIter;
186  if ( _mySocket != NULL )
187  {
188  if (!_mySocket->sendCmdAndGetRsp(cmd, rspVec, 100)) {
189  //std::cout << "...success!\n";
190  } else {
191  //std::cout << "...ERROR?\n";
192  }
193  for (rspIter=rspVec.begin(); rspIter!=rspVec.end(); rspIter++) {
194  if (strstr((*rspIter).c_str(),"#MAC ")!=NULL) {
195  //std::cout << "MAC ADDRESS!!!!" << *rspIter << std::endl;
196  split(splitRes, *rspIter, is_any_of(", "));
197  dmac = splitRes.back();
198  }
199  }
200  //this->debug("radio mac = %s\n", dmac.c_str());
201  }
202  else
203  {
204  //this->debug("radio mac skipped\n");
205  }
206  return dmac;
207  }
208 
209  std::string FlowControlClient::getRadioIp(unsigned int tenGbeIndex) {
210  std::string dip;
211  std::string cmd = (boost::format("SIP? %d\n") % tenGbeIndex).str();
212  BasicStringList rspVec, splitRes;
213  BasicStringList::iterator rspIter;
214  if ( _mySocket != NULL )
215  {
216  if (!_mySocket->sendCmdAndGetRsp(cmd, rspVec, 100)) {
217  //std::cout << "...success!\n";
218  } else {
219  //std::cout << "...ERROR?\n";
220  }
221  for (rspIter=rspVec.begin(); rspIter!=rspVec.end(); rspIter++) {
222  if (strstr((*rspIter).c_str(),"SIP ")!=NULL) {
223  //std::cout << "IP ADDRESS!!!!" << *rspIter << std::endl;
224  split(splitRes, *rspIter, is_any_of(", "));
225  dip = splitRes.back();
226  }
227  //this->debug("radio ip = %s\n", dip.c_str());
228  }
229  }
230  else
231  {
232  //this->debug("radio ip skipped\n");
233  }
234  return dip;
235  }
236 
238  _ducEnable = false;
239  std::string cmd = (boost::format("DUCHS %d, 0, 0, 0, 0, 0, 0, 0\n") % _ducChannel).str();
240  std::string qry = (boost::format("DUCHS? %d\n") % _ducChannel ).str();
241  _sendCmdAndQry(cmd, qry);
242  return _sendDuc();
243  }
244 
245  bool FlowControlClient::enableDuc(unsigned int rateIndex,
246  unsigned int txChannel,
247  unsigned int streamId,
248  unsigned int tenGbeIndex,
249  float attenuation,
250  double txFreq,
251  long ducFreq,
252  unsigned int txAtten,
253  bool ducEnable ) {
254  setDucTenGbePort(tenGbeIndex, false);
255  setDucEnable(ducEnable, false);
256  setDucRateIndex(rateIndex, false);
257  setDucStreamId(streamId, false);
258  setDucFrequency(ducFreq, false);
259  setTxFrequency(txFreq, false);
260  setTxAttenuation(txAtten, false);
261  setDucAttenuation(attenuation, false);
262  return setDucTxChannel(txChannel, true);
263  }
264 
265  //Optional method to set _txFreq and
266  bool FlowControlClient::setTxFrequency(double txFreq, bool applySetting) {
267  _txFreq = txFreq;
268  return applySetting?_sendTxf():false;
269  }
270 
271  /* _setTxAttenuation(unsigned int)
272  *
273  * sets internal variable _txAttenuation and then sends commands
274  *
275  */
276  bool FlowControlClient::setTxAttenuation(unsigned int txAttenuation, bool applySetting) {
277  _txAttenuation = txAttenuation;
278  return applySetting?_sendTxa():false;
279  }
280 
281 
282  bool FlowControlClient::setDucTenGbePort(unsigned int ducTenGbePort, bool applySetting) {
283  _ducTenGbePort = ducTenGbePort;
284  return applySetting?_sendDuc():false;
285  }
286 
287  bool FlowControlClient::setDucEnable(bool ducEnable, bool applySetting) {
288  _ducEnable = ducEnable;
289  return applySetting?_sendDuc():false;
290  }
291 
292  /* _setDucTxChannel(unsigned int, bool)
293  *
294  * setter for _ducTxChannel; boolean flag controls application
295  * of frequency and
296  *
297  */
298  bool FlowControlClient::setDucTxChannel(unsigned int txChannel, bool applySetting) {
299  bool cmdError = false;
300  _ducTxChannel = txChannel;
301  if (applySetting) {
302  cmdError |= _sendTxp();
303  cmdError |= _sendTxf();
304  cmdError |= _sendTxa();
305  cmdError |= _sendDuc();
306  }
307  return cmdError;
308  }
309 
310  bool FlowControlClient::setDucRateIndex(unsigned int rateIndex, bool applySetting) {
311  _ducRateIndex = rateIndex;
312  _samplesPerUpdate = _getDucSampleRate() / _fcUpdateRate;
313  return applySetting?_sendDuc():false;
314  }
315 
316  bool FlowControlClient::setDucStreamId(unsigned int streamId, bool applySetting) {
317  _ducStreamId = streamId;
318  _statusFrame.v49.streamId = _ducStreamId;
319  _initStatusAddress();
320  return applySetting?_sendDuc():false;
321  }
322 
323  bool FlowControlClient::setDucAttenuation(float attenuation, bool applySetting) {
324  _ducAttenuation = attenuation;
325  return applySetting?_sendWba():false;
326  }
327 
328  bool FlowControlClient::setDucFrequency(long ducFreq, bool applySetting) {
329  _ducFreq = ducFreq;
330  return applySetting?_sendWbf():false;
331  }
332 
333  bool FlowControlClient::setDucTxinvMode(unsigned int txinvMode, bool applySetting) {
334  _txinvMode = txinvMode;
335  return applySetting?_sendTxinv():false;
336  }
337 
338  bool FlowControlClient::okToSend(long int numSamples, bool lockIfOk) {
339  _fcMutex.lock();
340  bool ok = _651freeSpace>numSamples;
341  if (!(ok&&lockIfOk)) {
342  _fcMutex.unlock();
343  } else {
344  _sendLock = true;
345  }
346  return ok;
347  }
348 
350  boost::mutex::scoped_lock lock(_fcMutex);
351  return _651freeSpace;
352  }
353 
354  bool FlowControlClient::sentNSamples(long int samplesSent) {
355  if (!_sendLock) {
356  _fcMutex.lock();
357  }
358  _651freeSpace -= samplesSent;
359  _fcMutex.unlock();
360  return _651freeSpace>TX_BUFFER_MIN_SIZE;
361  }
362 
363  bool FlowControlClient::_sendCmdAndQry(const std::string& cmd, const std::string& qry) {
364  bool cmdError = false;
365  _rspVec.clear();
366  cmdError |= _mySocket->sendCmdAndGetRsp(cmd, _rspVec, 10000);
367  if (!cmdError) {
368  _rspVec.clear();
369  cmdError |= _mySocket->sendCmdAndGetRsp(qry, _rspVec, 10000);
370  }
371  this->debug("cmd = \"%s\" rsp = \"%s\" err = %s\n",
372  cmd.c_str(), _rspVec[0].c_str(),
373  BOOL_DEBUG(cmdError));
374  return cmdError;
375  }
376 
377  bool FlowControlClient::_sendShf() {
378  bool cmdError = false;
379  if (_config_tx&&(_shfMode>=0)) {
380  for (int chan=1; chan<=2; chan++) {
381  if ((_ducTxChannel&chan)>0)
382  {
383  this->debug("_sendShf: txf=%f and shfMode=%d\n", _txFreq, _shfMode);
384  int currentShfMode;
385  BasicStringList::iterator rspIter;
386  BasicStringList splitRes;
387  std::string cmd, qry;
388  cmd = (boost::format("SHF %d, 1, %d\n") % chan %_shfMode ).str();
389  qry = (boost::format("SHF? %d, 1\n") % chan ).str();
390  _rspVec.clear();
391  cmdError = _mySocket->sendCmdAndGetRsp(qry, _rspVec, 10000);
392  for (rspIter = _rspVec.begin();
393  rspIter != _rspVec.end();
394  rspIter++)
395  {
396  if (strstr((*rspIter).c_str(),"SHF ") != NULL)
397  {
398  //this->debug("buffer state response = %s\n",
399  // (*rspIter).c_str());
400  split(splitRes, *rspIter, is_any_of(", "));
401  // Sanity check -- make sure response is long enough
402  // to parse
403  if ( splitRes.size() >= 2 )
404  {
405  for (int ii=0; ii<splitRes.size(); ii++) {
406  std::cout << "splitRes.at(" << ii << ") = " << splitRes.at(ii).c_str() << std::endl;
407  }
408  currentShfMode = strtol( splitRes.at(5).c_str(), NULL, 10 );
409  this->debug("currentShfMode = %d\n", currentShfMode);
410  if (currentShfMode!=_shfMode)
411  {
412  cmdError = _sendCmdAndQry(cmd, qry);
413  } else {
414  std::cout << "NOT sending SHF" << std::endl;
415  }
416  break;
417  }
418  }
419  }
420  }
421  }
422  }
423  return cmdError;
424  }
425 
426  bool FlowControlClient::_sendTxf() {
427  bool cmdError = false;
428  bool shfChange = false;
429  if (_config_tx) {
430  std::string cmd, qry;
431  if (_ducTxChannel!=0) {
432  _shfMode = (_txFreq<200.0)?1:0;
433  _sendShf();
434  }
435  //~ if ((_txFreq<200.0)&&(_shfMode!=1)) {
436  //~ _shfMode = 1;
437  //~ shfChange = true;
438  //~ } else if ((_txFreq>=200.0)&&(_shfMode!=0)) {
439  //~ _shfMode = 0;
440  //~ shfChange = true;
441  //~ }
442  //~ if (shfChange) {
443  //~ cmdError |= _sendShf();
444  //~ }
445  for (int chan=1; chan<=2; chan++) {
446  if ((_ducTxChannel&chan)>0) {
447  cmd = (boost::format("TXF %d, %f\n") % chan % _txFreq ).str();
448  qry = (boost::format("TXF? %d\n") % chan ).str();
449  cmdError |= _sendCmdAndQry(cmd, qry);
450  }
451  }
452  }
453  return cmdError;
454  }
455 
456  /* _sendTxa()
457  *
458  * using internal variable _txAttenuation to formulate radio commands
459  *
460  */
461  bool FlowControlClient::_sendTxa() {
462  bool cmdError = false;
463  if (_config_tx) {
464  std::string cmd, qry;
465  for (int chan=1; chan<=2; chan++) {
466  if ((_ducTxChannel&chan)>0) {
467  cmd = (boost::format("TXA %d, %d\n") % chan % _txAttenuation ).str();
468  qry = (boost::format("TXA? %d\n") % chan ).str();
469  cmdError |= _sendCmdAndQry(cmd, qry);
470  }
471  }
472  }
473  return cmdError;
474  }
475 
476  bool FlowControlClient::_sendTxp() {
477  bool cmdError = false;
478  if (_config_tx) {
479  std::string cmd, qry;
480  BasicStringList::iterator rspIter;
481  BasicStringList splitRes;
482  unsigned int channel, txstate;
483  for (int chan=1; chan<=2; chan++) {
484  if ((_ducTxChannel&chan)>0) {
485  cmd = (boost::format("TXP %d, 1\n") % chan ).str();
486  qry = (boost::format("TXP? %d\n") % chan ).str();
487  _rspVec.clear();
488  cmdError = _mySocket->sendCmdAndGetRsp(qry, _rspVec, 10000);
489  for (rspIter = _rspVec.begin();
490  rspIter != _rspVec.end();
491  rspIter++)
492  {
493  if (strstr((*rspIter).c_str(),"TXP ") != NULL)
494  {
495  //this->debug("buffer state response = %s\n",
496  // (*rspIter).c_str());
497  split(splitRes, *rspIter, is_any_of(", "));
498  // Sanity check -- make sure response is long enough
499  // to parse
500  if ( splitRes.size() >= 2 )
501  {
502  channel = strtol( splitRes.at(1).c_str(), NULL, 10 );
503  txstate = strtol( splitRes.at(3).c_str(), NULL, 10 );
504  if ((channel==chan) && (txstate==0))
505  {
506  cmdError = _sendCmdAndQry(cmd, qry);
507  break;
508  }
509  }
510  }
511  }
512  }
513  }
514  }
515  return cmdError;
516  }
517 
518  bool FlowControlClient::_sendTxinv() {
519  bool cmdError = false;
520  if (_config_tx) {
521  std::string cmd, qry;
522  cmd = (boost::format("TXINV %d, %d\n") % _ducChannel % _txinvMode ).str();
523  qry = (boost::format("TXINV? %d\n") % _ducChannel ).str();
524  cmdError |= _sendCmdAndQry(cmd, qry);
525  }
526  return cmdError;
527  }
528 
529  bool FlowControlClient::_sendWbduc() {
530  std::string cmd = (boost::format("WBDUC %d, %d, %d, %f, %d, %d, 0, %d\n") % _ducChannel % _ducTenGbePort % _ducFreq % _ducAttenuation % _ducRateIndex % (_ducEnable ? _ducTxChannel : 0) % _ducStreamId ).str();
531  std::string qry = (boost::format("WBDUC? %d\n") % _ducChannel ).str();
532  return _sendCmdAndQry(cmd, qry);
533  }
534 
535  bool FlowControlClient::_sendDucp() {
536  std::string cmd = (boost::format("DUCP %d, %d\n") % _ducChannel % (_ducEnable ? 1 : 0) ).str();
537  std::string qry = (boost::format("DUCP? %d\n") % _ducChannel ).str();
538  return _sendCmdAndQry(cmd, qry);
539  }
540 
541  bool FlowControlClient::_sendDuc() {
542  std::string cmd = (boost::format("DUC %d, %d, %d, %f, %d, %d, %d, %d\n")
543  % _ducChannel
544  % _ducTenGbePort
545  % _ducFreq
546  % _ducAttenuation
547  % _ducRateIndex
548  % (_ducEnable ? _ducTxChannel : 0)
549  % (_ducEnable ? 0 : 1)
550  % _ducStreamId ).str();
551  std::string qry = (boost::format("DUC? %d\n") % _ducChannel ).str();
552  return _sendCmdAndQry(cmd, qry);
553  }
554 
555  bool FlowControlClient::_sendWba() {
556  std::string cmd = (boost::format("DUCA %d, %f\n") % _ducChannel % _ducAttenuation ).str();
557  std::string qry = (boost::format("DUCA? %d\n") % _ducChannel ).str();
558  return _sendCmdAndQry(cmd, qry);
559  }
560 
561  bool FlowControlClient::_sendWbf() {
562  std::string cmd = (boost::format("DUCF %d, %d\n") % _ducChannel % _ducFreq ).str();
563  std::string qry = (boost::format("DUCF? %d\n") % _ducChannel ).str();
564  return _sendCmdAndQry(cmd, qry);
565  }
566 
567  bool FlowControlClient::_sendDip() {
568  return true;
569  }
570 
571  bool FlowControlClient::_sendDuchs() {
572  return true;
573  }
574 
575  bool FlowControlClient::unpause() {
576  return _sendDucp();
577  }
578 
579  bool FlowControlClient::setDucDipStatusEntry(int dipIndex, std::string dip, std::string dmac, unsigned int ducStatusPort) {
580  if (dipIndex<0) {
581  _ducDipIndex = 32-_ducChannel;
582  } else {
583  _ducDipIndex = dipIndex;
584  }
585  std::cout << "dIP = " << dip << " & dMAC = " << dmac << std::endl;
586  std::string cmd = (boost::format("DIP %d, %d, %s, %s, %d, %d\n") % _ducTenGbePort % _ducDipIndex % dip % dmac % ducStatusPort % ducStatusPort ).str();
587  std::string qry = (boost::format("DIP? %d, %d\n") % _ducTenGbePort % _ducDipIndex ).str();
588  _sendCmdAndQry(cmd, qry);
589  return false;
590  }
591 
592  bool FlowControlClient::setDuchsParameters(unsigned int duchsFullThresh, unsigned int duchsEmptyThresh, unsigned int duchsPeriod) {
593  _duchsFullThresh = duchsFullThresh;
594  _duchsEmptyThresh = duchsEmptyThresh;
595  _duchsPeriod = duchsPeriod;
596  std::string cmd = (boost::format("DUCHS %d, %d, %d, %d, %d, %d, 0, %d\n") % _ducChannel % _ducTenGbePort % _duchsFullThresh % _duchsEmptyThresh % _duchsPeriod % _ducDipIndex % _ducStreamId ).str();
597  std::string qry = (boost::format("DUCHS? %d\n") % _ducChannel ).str();
598  return _sendCmdAndQry(cmd, qry);
599  }
600 
601  bool FlowControlClient::_clearDucSettingsInRadio() {
602  bool rv = false;
603  std::string cmd = (boost::format("DUC %d, 0, 0, 0, 0, 0, 0, 0\n") % _ducChannel).str();
604  std::string qry = (boost::format("DUC? %d\n") % _ducChannel ).str();
605  rv = rv||_sendCmdAndQry(cmd, qry);
606 
607  cmd = (boost::format("DUCHS %d, 0, 0, 0, 0, 0, 0, 0\n") % _ducChannel).str();
608  qry = (boost::format("DUCHS? %d\n") % _ducChannel ).str();
609  rv = rv||_sendCmdAndQry(cmd, qry);
610  return rv;
611  }
612 
613 
614 
615  void FlowControlClient::_queryBufferState()
616  {
617  BasicStringList rspVec, splitRes;
618  BasicStringList::iterator rspIter;
619  _fcMutex.lock();
620  if ( (_mySocket != NULL) &&
621  !_mySocket->sendCmdAndGetRsp(_tbsQuery, rspVec, 100, _debug))
622  {
623  _651freeSpaceLast = _651freeSpace;
624  for (rspIter = rspVec.begin();
625  rspIter != rspVec.end();
626  rspIter++)
627  {
628  if (strstr((*rspIter).c_str(),"TBS ") != NULL)
629  {
630  //this->debug("buffer state response = %s\n",
631  // (*rspIter).c_str());
632  split(splitRes, *rspIter, is_any_of(", "));
633  // Sanity check -- make sure response is long enough
634  // to parse
635  if ( splitRes.size() >= 16 )
636  {
637  // Index 1 -- DUC channel
638  // Index 7 -- Sample space available
639  // Index 9 -- Underrun event happened
640  // Index 13 -- Overrun event happened
641  _tbsChannel = strtol( splitRes.at(1).c_str(), NULL, 10 );
642  if (_tbsChannel == _ducChannel)
643  {
644  _tbsEmptyFlag = strtol( splitRes.at(3).c_str(), NULL, 10 );
645  _tbsFullFlag = strtol( splitRes.at(5).c_str(), NULL, 10 );
646  _tbsSpace = strtol( splitRes.at(7).c_str(), NULL, 10 );
647  _tbsUnderrunFlag = strtol( splitRes.at(9).c_str(), NULL, 10 );
648  _tbsUnderrunCount = strtol( splitRes.at(11).c_str(), NULL, 10 );
649  _tbsOverrunFlag = strtol( splitRes.at(13).c_str(), NULL, 10 );
650  _tbsOverrunCount = strtol( splitRes.at(15).c_str(), NULL, 10 );
651 
652  _651freeSpace = _tbsSpace;
653  _fcMutex.unlock();
654  //~ if (_tbsOverrunFlag || _tbsFullFlag)
655  //~ {
656  //~ this->debug(" O_%d@%lu_O\n", _ducChannel, _utc);
657  //~ std::cerr << (boost::format(" O_%d@%lu_O\n") % _ducChannel % _utc).str();
658  //~ std::cerr << "O" << _ducChannel << "@" << _utc << "o " << std::flush;
659  //~ }
660  //~ if (_tbsUnderrunFlag || _tbsEmptyFlag) {
661  //~ std::cerr << "U" << _ducChannel << "@" << _utc << "u " << std::flush;
662  //~ }
663  _sendStatusFrame();
664  break;
665  }
666  }
667  _firstUpdate = false;
668  }
669  }
670  } else {
671  std::cerr << "FC Query Error." << std::endl;
672  }
673  _fcMutex.unlock();
674  }
675 
676  void FlowControlClient::_queryUtc() {
677  BasicStringList rspVec, splitRes;
678  BasicStringList::iterator rspIter;
679  if ( (_mySocket != NULL) &&
680  !_mySocket->sendCmdAndGetRsp("UTC?\n", rspVec, 100, _debug)) {
681  for (rspIter=rspVec.begin(); rspIter!=rspVec.end(); rspIter++) {
682  if (strstr((*rspIter).c_str(),"UTC ")!=NULL) {
683  split(splitRes, *rspIter, is_any_of(", "));
684  _utc = strtol( splitRes.back().c_str(), NULL, 10 );
685  _statusFrame.v49.timeSeconds = _utc;
686  }
687  }
688  } else {
689  }
690  }
691 
692  void FlowControlClient::_queryStatus() {
693  BasicStringList rspVec, splitRes;
694  BasicStringList::iterator rspIter;
695  if ( (_mySocket != NULL) ) {
696  _mySocket->sendCmdAndGetRsp("STAT?\n", rspVec, 100, _debug);
697  //~ for (rspIter=rspVec.begin(); rspIter!=rspVec.end(); rspIter++) {
698  //~ if (strstr((*rspIter).c_str(),"STAT ")!=NULL) {
699  //~ split(splitRes, *rspIter, is_any_of(", "));
700  //~ _stat = strtol( splitRes.back().c_str(), NULL, 10 );
701  //~ }
702  //~ }
703  _mySocket->sendCmdAndGetRsp("TSTAT?\n", rspVec, 100, _debug);
704  //~ for (rspIter=rspVec.begin(); rspIter!=rspVec.end(); rspIter++) {
705  //~ if (strstr((*rspIter).c_str(),"TSTAT ")!=NULL) {
706  //~ split(splitRes, *rspIter, is_any_of(", "));
707  //~ _tstat = strtol( splitRes.back().c_str(), NULL, 10 );
708  //~ }
709  //~ }
710  }
711  }
712 
713  long FlowControlClient::_getDucSampleRate(void) const
714  {
715  long ret = 0;
716  if ( _ducRateIndex == 16 )
717  ret = 270833;
718  else
719  ret = (long)(102.4e6 / pow(2, _ducRateIndex));
720  return ret;
721  }
722 
723  } /* namespace NDR651 */
724 }
725 
uint16_t packetSize
Packet size, in 32-bit words.
Definition: PacketTypes.h:43
uint32_t timeSeconds
Timestamp integer field.
Definition: PacketTypes.h:65
virtual bool isRunning() const
Determines if the thread is running or not.
Definition: Thread.cpp:72
uint32_t underrunCount
Underrun count.
Definition: PacketTypes.h:107
virtual void interrupt()
Interrupts (stops) the thread.
Definition: Thread.cpp:60
std::string getRadioIp(unsigned int tenGbeIndex)
Gets the IP address of a 10GigE port on the radio.
uint32_t frameCount
Frame Count.
Definition: PacketTypes.h:42
bool setTxFrequency(double txFreq, bool applySetting=true)
Sets the transmitter frequency.
bool connectToServer(void)
Connects to the server.
bool disconnect(void)
Disconnects from the server.
bool sendCmdAndGetRsp(const std::string &cmd, BasicStringList &rsp, float timeout, bool print)
Sends a command to the server and gets the response.
virtual ~FlowControlClient()
Destroys a FlowControlClient object.
uint32_t overrunFlag
Overrun flag.
Definition: PacketTypes.h:106
bool isConnected(void)
Gets whether or not the socket is connected.
Definition: ClientSocket.h:62
bool enableDuc(unsigned int rateIndex, unsigned int txChannel, unsigned int streamId, unsigned int tenGbeIndex, float attenuation, double txFreq, long ducFreq, unsigned int txAtten, bool ducEnable=true)
Enables the DUC.
uint32_t underrunFlag
Underrun flag.
Definition: PacketTypes.h:108
bool setDucRateIndex(unsigned int rateIndex, bool applySetting=true)
Sets the DUC rate index.
bool setDucStreamId(unsigned int streamId, bool applySetting=true)
Sets the Stream ID.
Class that supports debug output.
Definition: Debuggable.h:38
bool setTxAttenuation(unsigned int txAttenuation, bool applySetting=true)
Sets the transmitter attenuation.
uint32_t overrunCount
Overrun count.
Definition: PacketTypes.h:105
std::string getRadioMac(unsigned int tenGbeIndex)
Gets the MAC address of a 10GigE port on the radio.
uint16_t packetCount
Packet counter.
Definition: PacketTypes.h:44
uint32_t spaceAvailable
Space available.
Definition: PacketTypes.h:103
bool setDucTxChannel(unsigned int txChannel, bool applySetting=true)
Sets the DUC transmitter bitmap.
struct Vita49Header v49
VITA 49 frame header.
Definition: PacketTypes.h:125
bool okToSend(long int pendingSamples, bool lockIfOk)
Determines if it is OK to send data.
bool disconnect(void)
Disconnects the flow control client.
virtual int debug(const char *format,...)
Outputs debug information.
Definition: Debuggable.cpp:95
bool setDucEnable(bool ducEnable, bool applySetting=true)
Sets whether or not the DUC is enabled.
BASIC_LIST_CONTAINER< std::string > BasicStringList
Type representing a list of strings.
Definition: BasicList.h:25
uint16_t TSF
Timestamp fractional field format.
Definition: PacketTypes.h:45
bool setDucFrequency(long ducFreq, bool applySetting=true)
Sets the DUC frequency.
void testQueries(void)
Tests the flow control queries.
Defines functionality for LibCyberRadio applications.
Definition: App.h:23
virtual void run()
Executes the main processing loop for the thread.
bool connectToRadio(const std::string &hostname, unsigned int port)
Connects to the radio.
uint16_t C
Class ID field present indicator.
Definition: PacketTypes.h:49
uint32_t frameEnd
Frame end word (ASCII string "VEND")
Definition: PacketTypes.h:81
uint32_t frameSize
Frame size, in 32-bit words.
Definition: PacketTypes.h:41
bool setDucAttenuation(float attenuation, bool applySetting=true)
Sets the DUC attenuation.
uint32_t frameStart
Frame start word (ASCII string "VRLP")
Definition: PacketTypes.h:40
Transmit status frame information.
Definition: PacketTypes.h:124
long int getFreeSpace(void)
Gets the amount of free space available.
struct Vita49Trailer vend
VITA 49 frame trailer.
Definition: PacketTypes.h:127
uint16_t TSI
Timestamp integer field format.
Definition: PacketTypes.h:46
bool setDucTxinvMode(unsigned int txinvMode, bool applySetting=true)
Sets the DUC TX Inversion Mode.
bool sentNSamples(long int samplesSent)
Updates status based on the number of samples sent.
void setDucChannel(unsigned int ducChannel)
Sets the DUC channel number.
Base class for a thread object, based on Boost Threads.
Definition: Thread.h:48
void update(void)
Updates the flow controller.
virtual void sleep(double secs)
Pauses thread execution for a given time, checking for user interrupts during that time...
Definition: Thread.cpp:65
unsigned int setUpdateRate(unsigned int updatesPerSecond)
Sets the update rate.
bool setDucTenGbePort(unsigned int ducTenGbePort, bool applySetting=true)
Sets the 10GigE port used by the DUC.
struct TxStatusPayload status
Transmit status information.
Definition: PacketTypes.h:126