From 0558263616506d14ab06326fcc7fe45923b89c47 Mon Sep 17 00:00:00 2001 From: Bryant Mairs Date: Tue, 24 Dec 2013 00:11:09 -0500 Subject: [PATCH] Converted LinkInterface from a pure-virtual interface into an abstract base class. This centralized the data rate calculation code for all subclasses. Also added the necessary code to provide upstream/downstream data rate calculations for all LinkInterface-derived classes. --- src/comm/LinkInterface.h | 147 +++++++++++++++++++++++++++++++++++--- src/comm/MAVLinkSimulationLink.cc | 29 +++----- src/comm/OpalLink.cc | 7 ++ src/comm/SerialLink.cc | 123 ++----------------------------- src/comm/SerialLink.h | 32 --------- src/comm/TCPLink.cc | 8 +++ src/comm/UDPLink.cc | 14 +++- src/comm/XbeeLink.cpp | 13 +++- 8 files changed, 190 insertions(+), 183 deletions(-) diff --git a/src/comm/LinkInterface.h b/src/comm/LinkInterface.h index cfe0b95..833a804 100644 --- a/src/comm/LinkInterface.h +++ b/src/comm/LinkInterface.h @@ -33,6 +33,9 @@ along with PIXHAWK. If not, see . #define _LINKINTERFACE_H_ #include +#include +#include +#include /** * The link interface defines the interface for all links used to communicate @@ -43,8 +46,27 @@ class LinkInterface : public QThread { Q_OBJECT public: - LinkInterface(QObject* parent = 0) : QThread(parent) {} - virtual ~LinkInterface() { emit this->deleteLink(this); } + LinkInterface() : + QThread(0) + { + // Initialize everything for the data rate calculation buffers. + inDataIndex = 0; + outDataIndex = 0; + + // Initialize our data rate buffers manually, cause C++<03 is dumb. + for (int i = 0; i < dataRateBufferSize; ++i) + { + inDataWriteAmounts[i] = 0; + inDataWriteTimes[i] = 0; + outDataWriteAmounts[i] = 0; + outDataWriteTimes[i] = 0; + } + + } + + virtual ~LinkInterface() { + emit this->deleteLink(this); + } /* Connection management */ @@ -91,7 +113,10 @@ public: * * @return The data rate of the interface in bits per second, 0 if unknown **/ - virtual qint64 getCurrentInDataRate() const = 0; + qint64 getCurrentInDataRate() const + { + return getCurrentDataRate(inDataIndex, inDataWriteTimes, inDataWriteAmounts); + } /** * @Brief Get the current outgoing data rate. @@ -101,7 +126,10 @@ public: * * @return The data rate of the interface in bits per second, 0 if unknown **/ - virtual qint64 getCurrentOutDataRate() const = 0; + qint64 getCurrentOutDataRate() const + { + return getCurrentDataRate(outDataIndex, outDataWriteTimes, outDataWriteAmounts); + } /** * @brief Connect this interface logically @@ -177,10 +205,114 @@ signals: void communicationUpdate(const QString& linkname, const QString& text); - /** @brief destroying element */ - void deleteLink(LinkInterface* const link); + /** @brief destroying element */ + void deleteLink(LinkInterface* const link); protected: + + static const int dataRateBufferSize = 20; ///< Specify how many data points to capture for data rate calculations. + + static const qint64 dataRateCurrentTimespan = 500; ///< Set the maximum age of samples to use for data calculations (ms). + + // Implement a simple circular buffer for storing when and how much data was received. + // Used for calculating the incoming data rate. Use with *StatsBuffer() functions. + int inDataIndex; + quint64 inDataWriteAmounts[dataRateBufferSize]; // In bytes + qint64 inDataWriteTimes[dataRateBufferSize]; // in ms + + // Implement a simple circular buffer for storing when and how much data was transmit. + // Used for calculating the outgoing data rate. Use with *StatsBuffer() functions. + int outDataIndex; + quint64 outDataWriteAmounts[dataRateBufferSize]; // In bytes + qint64 outDataWriteTimes[dataRateBufferSize]; // in ms + + mutable QMutex dataRateMutex; // Mutex for accessing the data rate member variables + + /** + * @brief logDataRateToBuffer Stores transmission times/amounts for statistics + * + * This function logs the send times and amounts of datas to the given circular buffers. + * This data is used for calculating the transmission rate. + * + * @param bytesBuffer[out] The buffer to write the bytes value into. + * @param timeBuffer[out] The buffer to write the time value into + * @param writeIndex[out] The write index used for this buffer. + * @param bytes The amount of bytes transmit. + * @param time The time (in ms) this transmission occurred. + */ + static void logDataRateToBuffer(quint64 *bytesBuffer, qint64 *timeBuffer, int *writeIndex, quint64 bytes, qint64 time) + { + int i = *writeIndex; + + // Now write into the buffer, if there's no room, we just overwrite the first data point. + bytesBuffer[i] = bytes; + timeBuffer[i] = time; + + // Increment and wrap the write index + ++i; + if (i == dataRateBufferSize) + { + i = 0; + } + *writeIndex = i; + } + + /** + * @brief getCurrentDataRate Get the current data rate given a data rate buffer. + * + * This function attempts to use the times and number of bytes transmit into a current data rate + * estimation. Since it needs to use timestamps to get the timeperiods over when the data was sent, + * this is effectively a global data rate over the last dataRateBufferSize - 1 data points. Also note + * that data points older than NOW - dataRateCurrentTimespan are ignored. + * + * @param index The first valid sample in the data rate buffer. Refers to the oldest time sample. + * @param dataWriteTimes The time, in ms since epoch, that each data sample took place. + * @param dataWriteAmounts The amount of data (in bits) that was transferred. + * @return The bits per second of data transferrence of the interface over the last [-statsCurrentTimespan, 0] timespan. + */ + qint64 getCurrentDataRate(int index, const qint64 dataWriteTimes[], const quint64 dataWriteAmounts[]) const + { + const qint64 now = QDateTime::currentMSecsSinceEpoch(); + + // Limit the time we calculate to the recent past + const qint64 cutoff = now - dataRateCurrentTimespan; + + // Grab the mutex for working with the stats variables + QMutexLocker dataRateLocker(&dataRateMutex); + + // Now iterate through the buffer of all received data packets adding up all values + // within now and our cutof. + qint64 totalBytes = 0; + qint64 totalTime = 0; + qint64 lastTime = 0; + int size = dataRateBufferSize; + while (size-- > 0) + { + // If this data is within our cutoff time, include it in our calculations. + // This also accounts for when the buffer is empty and filled with 0-times. + if (dataWriteTimes[index] > cutoff && lastTime > 0) { + // Track the total time, using the previous time as our timeperiod. + totalTime += dataWriteTimes[index] - lastTime; + totalBytes += dataWriteAmounts[index]; + } + + // Track the last time sample for doing timespan calculations + lastTime = dataWriteTimes[index]; + + // Increment and wrap the index if necessary. + if (++index == dataRateBufferSize) + { + index = 0; + } + } + + // Return the final calculated value in bits / s, converted from bytes/ms. + qint64 dataRate = (totalTime != 0)?(qint64)((float)totalBytes * 8.0f / ((float)totalTime / 1000.0f)):0; + + // Finally return our calculated data rate. + return dataRate; + } + static int getNextLinkId() { static int nextId = 1; return nextId++; @@ -198,7 +330,4 @@ protected slots: }; -/* Declare C++ interface as Qt interface */ -//Q_DECLARE_INTERFACE(LinkInterface, "org.openground.comm.links.LinkInterface/1.0") - #endif // _LINKINTERFACE_H_ diff --git a/src/comm/MAVLinkSimulationLink.cc b/src/comm/MAVLinkSimulationLink.cc index c6db817..099ad79 100644 --- a/src/comm/MAVLinkSimulationLink.cc +++ b/src/comm/MAVLinkSimulationLink.cc @@ -55,7 +55,7 @@ This file is part of the QGROUNDCONTROL project * @param writeFile The received messages are written to that file * @param rate The rate at which the messages are sent (in intervals of milliseconds) **/ -MAVLinkSimulationLink::MAVLinkSimulationLink(QString readFile, QString writeFile, int rate, QObject* parent) : LinkInterface(parent), +MAVLinkSimulationLink::MAVLinkSimulationLink(QString readFile, QString writeFile, int rate, QObject* parent) : readyBytes(0), timeOffset(0) { @@ -857,6 +857,12 @@ void MAVLinkSimulationLink::writeBytes(const char* data, qint64 size) } } + // Log the amount and time written out for future data rate calculations. + // While this interface doesn't actually write any data to external systems, + // this data "transmit" here should still count towards the outgoing data rate. + QMutexLocker dataRateLocker(&dataRateMutex); + logDataRateToBuffer(outDataWriteAmounts, outDataWriteTimes, &outDataIndex, size, QDateTime::currentMSecsSinceEpoch()); + readyBufferMutex.lock(); for (int i = 0; i < streampointer; i++) { @@ -884,25 +890,12 @@ void MAVLinkSimulationLink::readBytes() QByteArray b(data, len); emit bytesReceived(this, b); - readyBufferMutex.unlock(); -// if (len > 0) -// { -// qDebug() << "Simulation sent " << len << " bytes to groundstation: "; - -// /* Increase write counter */ -// //bitsSentTotal += size * 8; - -// //Output all bytes as hex digits -// int i; -// for (i=0; idequeue()); receiveDataMutex.unlock(); + // Log the amount and time received for future data rate calculations. + QMutexLocker dataRateLocker(&dataRateMutex); + logDataRateToBuffer(inDataWriteAmounts, inDataWriteTimes, &inDataIndex, s, QDateTime::currentMSecsSinceEpoch()); } void OpalLink::receiveMessage(mavlink_message_t message) diff --git a/src/comm/SerialLink.cc b/src/comm/SerialLink.cc index 2656762..8e1bfed 100644 --- a/src/comm/SerialLink.cc +++ b/src/comm/SerialLink.cc @@ -23,19 +23,9 @@ SerialLink::SerialLink(QString portname, int baudRate, bool hardwareFlowControl, int dataBits, int stopBits) : m_bytesRead(0), m_port(NULL), - inDataIndex(0), - outDataIndex(0), m_stopp(false), m_reqReset(false) { - // Initialize our arrays manually, cause C++<03 is dumb. - for (int i = 0; i < buffer_size; ++i) - { - inDataWriteAmounts[i] = 0; - inDataWriteTimes[i] = 0; - outDataWriteAmounts[i] = 0; - outDataWriteTimes[i] = 0; - } // Get the name of the current port in use. m_portName = portname.trimmed(); @@ -212,8 +202,8 @@ void SerialLink::run() // Log this written data for this timestep. If this value ends up being 0 due to // write() failing, that's what we want as well. - QMutexLocker statsLocker(&m_statisticsMutex); - WriteDataStatsBuffer(outDataWriteAmounts, outDataWriteTimes, &outDataIndex, numWritten, QDateTime::currentMSecsSinceEpoch()); + QMutexLocker dataRateLocker(&dataRateMutex); + logDataRateToBuffer(outDataWriteAmounts, outDataWriteTimes, &outDataIndex, numWritten, QDateTime::currentMSecsSinceEpoch()); } //wait n msecs for data to be ready @@ -230,8 +220,8 @@ void SerialLink::run() emit bytesReceived(this, readData); // Log this data reception for this timestep - QMutexLocker statsLocker(&m_statisticsMutex); - WriteDataStatsBuffer(inDataWriteAmounts, inDataWriteTimes, &inDataIndex, readData.length(), QDateTime::currentMSecsSinceEpoch()); + QMutexLocker dataRateLocker(&dataRateMutex); + logDataRateToBuffer(inDataWriteAmounts, inDataWriteTimes, &inDataIndex, readData.length(), QDateTime::currentMSecsSinceEpoch()); // Track the total amount of data read. m_bytesRead += readData.length(); @@ -293,23 +283,6 @@ void SerialLink::run() } } -void SerialLink::WriteDataStatsBuffer(quint64 *bytesBuffer, qint64 *timeBuffer, int *writeIndex, quint64 bytes, qint64 time) -{ - int i = *writeIndex; - - // Now write into the buffer, if there's no room, we just overwrite the first data point. - bytesBuffer[i] = bytes; - timeBuffer[i] = time; - - // Increment and wrap the write index - ++i; - if (i == buffer_size) - { - i = 0; - } - *writeIndex = i; -} - void SerialLink::writeBytes(const char* data, qint64 size) { if(m_port && m_port->isOpen()) { @@ -555,94 +528,6 @@ qint64 SerialLink::getConnectionSpeed() const return dataRate; } -qint64 SerialLink::getCurrentOutDataRate() const -{ - const qint64 now = QDateTime::currentMSecsSinceEpoch(); - - // Limit the time we calculate to the recent past - const qint64 cutoff = now - stats_timespan; - - // Grab the mutex for working with the stats variables - QMutexLocker statsLocker(&m_statisticsMutex); - - // Now iterate through the buffer of all received data packets adding up all values - // within now and our cutof. - int index = outDataIndex; - qint64 totalBytes = 0; - qint64 totalTime = 0; - qint64 lastTime = 0; - int size = buffer_size; - while (size-- > 0) - { - // If this data is within our cutoff time, include it in our calculations. - // This also accounts for when the buffer is empty and filled with 0-times. - if (outDataWriteTimes[index] > cutoff && lastTime > 0) { - // Track the total time, using the previous time as our timeperiod. - totalTime += outDataWriteTimes[index] - lastTime; - totalBytes += outDataWriteAmounts[index]; - } - - // Track the last time sample for doing timespan calculations - lastTime = outDataWriteTimes[index]; - - // Increment and wrap the index if necessary. - if (++index == buffer_size) - { - index = 0; - } - } - - // Return the final calculated value in bits / s, converted from bytes/ms. - qint64 dataRate = (totalTime != 0)?(qint64)((float)totalBytes * 8.0f / ((float)totalTime / 1000.0f)):0; - - // Finally return our calculated data rate. - return dataRate; -} - -qint64 SerialLink::getCurrentInDataRate() const -{ - const qint64 now = QDateTime::currentMSecsSinceEpoch(); - - // Limit the time we calculate to the recent past - const qint64 cutoff = now - stats_timespan; - - // Grab the mutex for working with the stats variables - QMutexLocker statsLocker(&m_statisticsMutex); - - // Now iterate through the buffer of all received data packets adding up all values - // within now and our cutof. - int index = inDataIndex; - qint64 totalBytes = 0; - qint64 totalTime = 0; - qint64 lastTime = 0; - int size = buffer_size; - while (size-- > 0) - { - // If this data is within our cutoff time, include it in our calculations. - // This also accounts for when the buffer is empty and filled with 0-times. - if (inDataWriteTimes[index] > cutoff && lastTime > 0) { - // Track the total time, using the previous time as our timeperiod. - totalTime += inDataWriteTimes[index] - lastTime; - totalBytes += inDataWriteAmounts[index]; - } - - // Track the last time sample for doing timespan calculations - lastTime = inDataWriteTimes[index]; - - // Increment and wrap the index if necessary. - if (++index == buffer_size) - { - index = 0; - } - } - - // Return the final calculated value in bits / s, converted from bytes/ms. - qint64 dataRate = (totalTime != 0)?(qint64)((float)totalBytes * 8.0f / ((float)totalTime / 1000.0f)):0; - - // Finally return our calculated data rate. - return dataRate; -} - QString SerialLink::getPortName() const { return m_portName; diff --git a/src/comm/SerialLink.h b/src/comm/SerialLink.h index 73898a3..2245a0e 100644 --- a/src/comm/SerialLink.h +++ b/src/comm/SerialLink.h @@ -68,10 +68,6 @@ public: static const int poll_interval = SERIAL_POLL_INTERVAL; ///< Polling interval, defined in configuration.h - static const int buffer_size = 20; ///< Specify how many data points to capture for data rate calculations. - - static const qint64 stats_timespan = 500; ///< Set the maximum age of samples to use for data calculations (ms). - /** @brief Get a list of the currently available ports */ QList getCurrentPorts(); @@ -154,39 +150,11 @@ protected: QString m_portName; int m_timeout; int m_id; - // Implement a simple circular buffer for storing when and how much data was received. - // Used for calculating the incoming data rate. Use with *StatsBuffer() functions. - int inDataIndex; - quint64 inDataWriteAmounts[buffer_size]; // In bytes - qint64 inDataWriteTimes[buffer_size]; // in ms - - // Implement a simple circular buffer for storing when and how much data was transmit. - // Used for calculating the outgoing data rate. Use with *StatsBuffer() functions. - int outDataIndex; - quint64 outDataWriteAmounts[buffer_size]; // In bytes - qint64 outDataWriteTimes[buffer_size]; // in ms - - quint64 m_connectionStartTime; // Connection start time (ms since epoch) - mutable QMutex m_statisticsMutex; // Mutex for accessing the statistics member variables (inData*,outData*,bitsSent,bitsReceived) QMutex m_dataMutex; // Mutex for reading data from m_port QMutex m_writeMutex; // Mutex for accessing the m_transmitBuffer. QList m_ports; private: - /** - * @brief WriteDataStatsBuffer Stores transmission times/amounts for statistics - * - * This function logs the send times and amounts of datas to the given circular buffers. - * This data is used for calculating the transmission rate. - * - * @param bytesBuffer The buffer to write the bytes value into. - * @param timeBuffer The buffer to write the time value into - * @param writeIndex The write index used for this buffer. - * @param bytes The amount of bytes transmit. - * @param time The time (in ms) this transmission occurred. - */ - void WriteDataStatsBuffer(quint64 *bytesBuffer, qint64 *timeBuffer, int *writeIndex, quint64 bytes, qint64 time); - volatile bool m_stopp; volatile bool m_reqReset; QMutex m_stoppMutex; // Mutex for accessing m_stopp diff --git a/src/comm/TCPLink.cc b/src/comm/TCPLink.cc index f88a3ee..d2044a8 100644 --- a/src/comm/TCPLink.cc +++ b/src/comm/TCPLink.cc @@ -131,6 +131,10 @@ void TCPLink::writeBytes(const char* data, qint64 size) writeDebugBytes(data, size); #endif socket->write(data, size); + + // Log the amount and time written out for future data rate calculations. + QMutexLocker dataRateLocker(&dataRateMutex); + logDataRateToBuffer(outDataWriteAmounts, outDataWriteTimes, &outDataIndex, size, QDateTime::currentMSecsSinceEpoch()); } /** @@ -152,6 +156,10 @@ void TCPLink::readBytes() emit bytesReceived(this, buffer); + // Log the amount and time received for future data rate calculations. + QMutexLocker dataRateLocker(&dataRateMutex); + logDataRateToBuffer(inDataWriteAmounts, inDataWriteTimes, &inDataIndex, byteCount, QDateTime::currentMSecsSinceEpoch()); + #ifdef TCPLINK_READWRITE_DEBUG writeDebugBytes(buffer.data(), buffer.size()); #endif diff --git a/src/comm/UDPLink.cc b/src/comm/UDPLink.cc index baa7da4..101ee15 100644 --- a/src/comm/UDPLink.cc +++ b/src/comm/UDPLink.cc @@ -39,9 +39,10 @@ This file is part of the QGROUNDCONTROL project #include //#include -UDPLink::UDPLink(QHostAddress host, quint16 port) - : socket(NULL) +UDPLink::UDPLink(QHostAddress host, quint16 port) : + socket(NULL) { + this->host = host; this->port = port; this->connectState = false; @@ -197,6 +198,10 @@ void UDPLink::writeBytes(const char* data, qint64 size) qDebug() << "ASCII:" << ascii; #endif socket->writeDatagram(data, size, currentHost, currentPort); + + // Log the amount and time written out for future data rate calculations. + QMutexLocker dataRateLocker(&dataRateMutex); + logDataRateToBuffer(outDataWriteAmounts, outDataWriteTimes, &outDataIndex, size, QDateTime::currentMSecsSinceEpoch()); } } @@ -220,6 +225,11 @@ void UDPLink::readBytes() // FIXME TODO Check if this method is better than retrieving the data by individual processes emit bytesReceived(this, datagram); + // Log this data reception for this timestep + QMutexLocker dataRateLocker(&dataRateMutex); + logDataRateToBuffer(inDataWriteAmounts, inDataWriteTimes, &inDataIndex, datagram.length(), QDateTime::currentMSecsSinceEpoch()); + + // // Echo data for debugging purposes // std::cerr << __FILE__ << __LINE__ << "Received datagram:" << std::endl; // int i; diff --git a/src/comm/XbeeLink.cpp b/src/comm/XbeeLink.cpp index 9f7e8a6..760625d 100644 --- a/src/comm/XbeeLink.cpp +++ b/src/comm/XbeeLink.cpp @@ -198,6 +198,9 @@ void XbeeLink::writeBytes(const char *bytes, qint64 length) // TO DO: delete th } if(!xbee_nsenddata(this->m_xbeeCon,data,length)) // return value of 0 is successful written { + // Log the amount and time written out for future data rate calculations. + QMutexLocker dataRateLocker(&dataRateMutex); + logDataRateToBuffer(outDataWriteAmounts, outDataWriteTimes, &outDataIndex, length, QDateTime::currentMSecsSinceEpoch()); } else { @@ -216,9 +219,13 @@ void XbeeLink::readBytes() for(unsigned int i=0;i<=xbeePkt->datalen;i++) { data.push_back(xbeePkt->data[i]); - } - qDebug() << data; - emit bytesReceived(this,data); + } + + emit bytesReceived(this, data); + + // Log the amount and time received for future data rate calculations. + QMutexLocker dataRateLocker(&dataRateMutex); + logDataRateToBuffer(inDataWriteAmounts, inDataWriteTimes, &inDataIndex, data.length(), QDateTime::currentMSecsSinceEpoch()); } }