#ifndef RECIPIENT_HH #define RECIPIENT_HH #include #include #include #include "log.hh" /** * \author cpellegrino */ typedef boost::circular_buffer CircularBuffer; class Recipient { boost::asio::ip::tcp::socket m_sock; boost::asio::ip::tcp::endpoint m_endpoint; CircularBuffer m_cbuffer; bool m_connected; public: Recipient(boost::asio::io_service& service, const boost::asio::ip::tcp::endpoint& endpoint, size_t circbuff_size) : m_sock(service), m_endpoint(endpoint), m_cbuffer(circbuff_size), m_connected(false) { LOG_NOTICE << "Trying to connect to " << m_endpoint; connect(); if (!m_connected) { LOG_ERROR << "Connection to " << m_endpoint << " failed"; } } void sock_reset() { stop(); connect(); } bool sendIfPossible(const Frame& data) { if (! m_connected) { sock_reset(); if (! m_connected) { m_cbuffer.push_back(data); return false; } } while (m_cbuffer.size()) { if (send(m_cbuffer.front())) { m_cbuffer.pop_front(); } else { break; } } if (! send(data)) { m_cbuffer.push_back(data); } return m_connected; } ~Recipient() { stop(); } friend class RecipientsHandler; private: void connect() { boost::system::error_code ec; m_sock.connect(m_endpoint, ec); m_connected = !ec; if (m_connected) { boost::asio::socket_base::send_buffer_size option(67108864); m_sock.set_option(option); LOG_NOTICE << "Connection to " << m_endpoint << " succeeded"; boost::system::error_code ec; m_sock.shutdown(boost::asio::ip::tcp::socket::shutdown_receive, ec); } } void stop() { boost::system::error_code ec; m_sock.shutdown(boost::asio::ip::tcp::socket::shutdown_send, ec); m_sock.close(ec); } /** * Send data * * \param data the Frame to send. * \return true if OK; else false */ bool send(const Frame& data) { boost::system::error_code ec; boost::asio::write(m_sock, boost::asio::buffer(data.data(), data.getFrameLength()), ec); m_connected = !ec; if (ec) { LOG_ERROR << "Error transmitting data to " << m_endpoint << ": " << ec; } return m_connected; } }; #endif // RECIPIENT_HH