#include #include #include #include #include #include #include "Jeep/JParser.hh" #include "Jeep/JMessage.hh" #include "JLang/JMemory.hh" #include "JLang/JSharedCounter.hh" #include "JNet/JTCPSocket.hh" #include "JNet/JSocketChannel.hh" #include "JNet/JServerSocket.hh" #include "JNet/JSelect.hh" #include "JNet/JControlHost.hh" #include "JSystem/JSystemToolkit.hh" #include "JMath/JConstants.hh" namespace JNET { using namespace JPP; template class JMemory_t : public JMalloc {}; // Memory manager typedef JPrefix JPrefix_t; typedef JSocketInputChannel JSocketInputChannel_t; /** * Get total size of internet packet. * * \param prefix prefix * \return number of bytes */ inline int getSizeOfPacket(const JPrefix_t& prefix) { return prefix.getSize() + sizeof(JPrefix_t); } /** * Set total size of internet packet. * * \param size number of bytes * \param prefix prefix */ inline void setSizeOfPacket(const int size, JPrefix_t& prefix) { prefix.setSize(size - sizeof(JPrefix_t)); } /** * Data structure of a ControlHost message. * This data structure consists of a copy of the ControlHost prefix and * an array of bytes (including the ControlHost prefix). * A new array of bytes is created by the appropriate constructor. * The allocated memory is released upon destruction of the last object of this class. */ class JDispatch : public JPrefix_t, public JSharedCounter { public: typedef JMalloc JMemory_t; static long long int MEMORY_TOTAL; //!< Total size of data [Bytes] static long long int MEMORY_LIMIT; //!< Limit size of data [Bytes] /** * Default constructor. */ JDispatch() : JPrefix_t(), JSharedCounter(), buffer(NULL) {} /** * Constructor. * Note that the input data should contain a copy of the prefix. * * \param prefix prefix * \param data data */ JDispatch(const JPrefix_t& prefix, const char* data) : JPrefix_t(prefix), JSharedCounter() { create(); memcpy(this->buffer, data, size()); } /** * Constructor. * Note that the given message is appended to a copy of the prefix. * * \param tag tag * \param message message */ JDispatch(const JTag& tag, const std::string& message) : JPrefix_t(tag, message.size()), JSharedCounter() { create(); memcpy(this->buffer, static_cast(this), sizeof(JPrefix_t)); memcpy(this->buffer + sizeof(JPrefix_t), message.data(), message.size()); } /** * Copy constructor. * * \param message message */ JDispatch(const JDispatch& message) { static_cast(*this) = message; buffer = message.buffer; attach(message); } /** * Destructor. */ ~JDispatch() { if (detach()) { release(); } } /** * Assignment operator. * * \param message message * \return this JDispatch */ JDispatch& operator=(const JDispatch& message) { if (buffer != message.buffer) { if (detach()) { release(); } buffer = message.buffer; attach(message); } return *this; } /** * Type conversion operator. * * \return socket output buffer */ operator JSocketOutputBuffer () const { return JSocketOutputBuffer(this->data(), this->size()); } /** * Get size. * * \return number of bytes */ int size() const { return getSizeOfPacket(static_cast(*this)); } /** * Get data. * * \return pointer to data */ const char* data() const { return buffer; } protected: /** * Allocate memory. */ void create() { buffer = JMemory_t::create(size()); if (buffer != NULL) { JSharedCounter::initialise(); MEMORY_TOTAL += size(); } else { throw JMallocException("Not enough space in memory."); } } /** * Release memory. */ void release() { JMemory_t::release(buffer); MEMORY_TOTAL -= size(); } char* buffer; }; /** * ControlHost client manager. */ class JClient : public JTCPSocket { public: static unsigned int QUEUE_LIMIT; //!< Maximum number of messages in queue /** * Default constructor. */ JClient() : JTCPSocket(), in (*this), out(*this), requestAll(false), requestCounter(0) {} /** * Constructor. * * \param socket socket */ JClient(const JTCPSocket& socket) : JTCPSocket(socket), in (*this), out(*this), requestAll(false), requestCounter(0) {} /** * Get nick name. * * \return nick name */ const std::string& getNickname() const { return nick_name; } /** * Set nick name. * * \param nick_name nick name */ void setNickname(const std::string& nick_name) { this->nick_name = nick_name; } /** * Check request. * * \return true if request can be honoured; else false */ bool checkRequest() const { return requestAll || requestCounter != 0; } /** * Increment request by one. */ void incrementRequest() { ++requestCounter; } /** * Decrement request by one. */ void decrementRequest() { --requestCounter; } /** * Set no request. */ void setRequestAll() { requestAll = true; } /** * Get subscription. * * \return subscription */ const std::set& getSubscriptionAll() const { return subscriptionAll; } /** * Get subscription. * * \return subscription */ const std::set& getSubscriptionAny() const { return subscriptionAny; } /** * Set subcription. * * \param subscription subscription * \return true of OK; else false */ bool setSubscription(const std::string& subscription) { using namespace std; subscriptionAll.clear(); subscriptionAny.clear(); try { char c; JTag tag; for (istringstream is(subscription); is >> c >> tag; ) { if (c == SUBSCRIBE_ALL) subscriptionAll.insert(tag); else if (c == SUBSCRIBE_ANY) subscriptionAny.insert(tag); //else if (c == SUBSCRIBE_SHARED_MEMORY) subscriptionAny.insert(tag); } } catch(const JControlHostException& error) { return false; } return true; } /** * Check subscription for given prefix. * * \param prefix prefix * \return true if subscription valid; else false */ bool checkSubscriptionAll(const JPrefix_t& prefix) const { return subscriptionAll.find(prefix) != subscriptionAll.end(); } /** * Check subscription for given prefix. * * \param prefix prefix * \return true if subscription valid; else false */ bool checkSubscriptionAny(const JPrefix_t& prefix) const { return subscriptionAny.find(prefix) != subscriptionAny.end() && checkRequest() && queue.size() < QUEUE_LIMIT; } /** * Check subscription for given prefix. * * \param prefix prefix * \return true if subscription valid; else false */ bool checkSubscription(const JPrefix_t& prefix) const { return checkSubscriptionAll(prefix) || checkSubscriptionAny(prefix); } /** * Add message to client queues depending on subscription of each client. * Note that adding a message may result in dropping (other) messages. * * \param message message */ void add(const JDispatch& message) { if (checkSubscription(message)) { queue.push_back(message); if (queue.size() > QUEUE_LIMIT) { drop(); } } } /** * Drop all messages for which the client has not the 'all' subscription. */ void drop() { for (std::deque::iterator i = queue.begin(); i != queue.end(); ) { if (!checkSubscriptionAll(*i) && (i != queue.begin() || !out.isBusy())) i = queue.erase(i); else ++i; } } JSocketInputChannel_t in; //!< reader for incoming messages JSocketNonblockingWriter out; //!< writer for outgoing messages std::deque queue; //!< queue for outgoing messages protected: std::set subscriptionAll; std::set subscriptionAny; std::string nick_name; bool requestAll; int requestCounter; }; /** * List of ControlHost client managers. */ class JClientList : public std::vector { public: /** * Default constructor. */ JClientList() : std::vector() {} /** * Add message to client queues depending on subscription of each client. * * \param message message */ void add(const JDispatch& message) { for (iterator i = this->begin(); i != this->end(); ++i) { i->add(message); } } /** * Drop all messages from client queues for which the client has not the 'all' subscription. */ void drop() { for (iterator i = this->begin(); i != this->end(); ++i) { i->drop(); } } }; /** * Print message. * * \param out output stream * \param message message * \return output stream */ inline std::ostream& operator<<(std::ostream& out, const JDispatch& message) { return out << "(" << message.getTag() << "," << message.size() << ")"; } /** * Print socket. * * \param out output stream * \param socket socket * \return output stream */ inline std::ostream& operator<<(std::ostream& out, const JSocket& socket) { return out << "[" << socket.getFileDescriptor() << "]"; } /** * Print socket status. * * \param out output stream * \param status socket status * \return output stream */ inline std::ostream& operator<<(std::ostream& out, const JSocketStatus& status) { return out << "(" << status.isReady() << "," << status.getCounter() << ")"; } /** * Print socket input buffer. * * \param out output stream * \param buffer socket buffer * \return output stream */ inline std::ostream& operator<<(std::ostream& out, const JSocketInputBuffer& buffer) { return out << "(" << buffer.isReady() << "," << buffer.getCounter() << "," << buffer.getSize() << ")"; } long long int JDispatch::MEMORY_TOTAL = 0; //!< Total memory allocation. long long int JDispatch::MEMORY_LIMIT; //!< Limit memory allocation. unsigned int JClient::QUEUE_LIMIT; //!< queue size limit } /** * \file * * Quicksilver Messenger Service - a ControlHost server. * \author mdejong */ int main(int argc, char* argv[]) { using namespace std; using namespace JPP; int port; int backlog; int timeout_us; int buffer_size; int debug; try { JParser<> zap("ControlHost server."); zap['P'] = make_field(port) = DISPATCH_PORT; zap['q'] = make_field(backlog) = 1024; zap['T'] = make_field(timeout_us) = 5; zap['s'] = make_field(buffer_size) = JSocket::getDefaultBufferSize(); zap['Q'] = make_field(JClient::QUEUE_LIMIT) = 100; zap['M'] = make_field(JDispatch::MEMORY_LIMIT) = (JSYSTEM::getRAM() >> 1); zap['d'] = make_field(debug) = 0; zap(argc, argv); } catch(const exception &error) { FATAL(error.what() << endl); } JServerSocket server(port, backlog); JSelect select; JClientList clientList; DEBUG("Port " << setw(10) << port << endl); DEBUG("Memory limit " << setw(10) << JDispatch::MEMORY_LIMIT << endl); DEBUG("Queue limit " << setw(10) << JClient::QUEUE_LIMIT << endl); for ( ; ; ) { select.reset(); select.setReaderMask(server); for (JClientList::iterator client = clientList.begin(); client != clientList.end(); ++client) { if (!client->in.isReady()) { select.setReaderMask(*client); } if (client->out.isReset()) { if (!client->queue.empty()) { DEBUG("Client" << *client << ".set" << client->queue.front() << endl); client->out.set(client->queue.front()); client->decrementRequest(); select.setWriterMask(*client); } } else if (client->out.isBusy()) { select.setWriterMask(*client); } } int nfds = 0; try { nfds = select(timeout_us); } catch(const exception& error) { ERROR("" << error.what() << endl); } if (nfds > 0) { for (JClientList::iterator client = clientList.begin(); client != clientList.end(); ) { try { if (select.hasReaderMask(*client)) { try { client->in.read(); } catch(const exception& error) { ERROR("Remove (3) client" << *client << "<" << client->getNickname() << ">: " << error.what() << endl); if (client->getNickname() != "") { clientList.add(JDispatch(DISPTAG_Died, client->getNickname())); } client->shutdown(); client = clientList.erase(client); continue; } DEBUG("Client" << *client << ".read" << static_cast(client->in) << endl); } if (client->in.isReady()) { DEBUG("Message" << *client << ' ' << client->in.prefix.c_str() << ' ' << client->in.size() << endl); bool special = JControlHost::maybe_special(client->in.prefix); if (special) { client->in.seekg(sizeof(JPrefix_t)); // skip prefix if (client->in.prefix.getTag() == DISPTAG_Subscribe) { client->setSubscription(string(client->in.getRemainingData(), client->in.getRemainingSize())); } else if (client->in.prefix.getTag() == DISPTAG_MyId) { client->setNickname(string(client->in.getRemainingData(), client->in.getRemainingSize())); clientList.add(JDispatch(DISPTAG_Born, client->getNickname())); } else if (client->in.prefix.getTag() == DISPTAG_Gime) { client->incrementRequest(); } else if (client->in.prefix.getTag() == DISPTAG_Always) { client->setRequestAll(); } else if (client->in.prefix.getTag() == DISPTAG_WhereIs) { string nick_name(client->in.getRemainingData(), client->in.getRemainingSize()); string buffer; for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) { if (i->getNickname() == nick_name) { buffer += " " + i->getHostname(); } } JControlHost socket(*client); socket.PutFullString(DISPTAG_WhereIs, buffer.substr(buffer.empty() ? 0 : 1)); DEBUG("Remove (1) client" << *client << endl); client->shutdown(); client = clientList.erase(client); continue; // skip any action } else if (client->in.prefix.getTag() == DISPTAG_ShowStat) { client->shutdown(); client = clientList.erase(client); for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) { int total = 0; for (std::deque::const_iterator message = i->queue.begin(); message != i->queue.end(); ++message) { total += message->size(); } cout << "client[" << i->getFileDescriptor() << "] " << i->getNickname() << endl; cout << "tag - all:"; for (std::set::const_iterator tag = i->getSubscriptionAll().begin(); tag != i->getSubscriptionAll().end(); ++tag) { cout << ' ' << *tag; } cout << endl; cout << "tag - any:"; for (std::set::const_iterator tag = i->getSubscriptionAny().begin(); tag != i->getSubscriptionAny().end(); ++tag) { cout << ' ' << *tag; } cout << endl; cout << "queue " << i->queue.size() << ' ' << total << "B" << endl; } continue; // skip any action } else if (client->in.prefix.getTag() == DISPTAG_Debug) { istringstream is(string(client->in.getRemainingData(), client->in.getRemainingSize())); is >> debug; } else { special = false; // not a reserved tag. } } if (!special) { clientList.add(JDispatch(client->in.prefix, client->in.data())); if (JDispatch::MEMORY_TOTAL > JDispatch::MEMORY_LIMIT) { WARNING("Memory " << setw(10) << JDispatch::MEMORY_TOTAL << " > " << setw(10) << JDispatch::MEMORY_LIMIT << endl); clientList.drop(); } } client->in.reset(); } if (select.hasWriterMask(*client)) { client->out.write(); DEBUG("Client" << *client << ".write" << static_cast(client->out) << endl); if (client->out.isReady()) { client->out.reset(); client->queue.pop_front(); } } ++client; } catch(const exception& error) { DEBUG("Remove (2) client" << *client << "<" << client->getNickname() << ">: " << error.what() << endl); if (client->getNickname() != "") { clientList.add(JDispatch(DISPTAG_Died, client->getNickname())); } client->shutdown(); client = clientList.erase(client); } } if (select.hasReaderMask(server)) { JTCPSocket socket(server.getFileDescriptor()); socket.setSendBufferSize (buffer_size); socket.setReceiveBufferSize(buffer_size); socket.setReuseAddress(true); socket.setKeepAlive (true); socket.setNonBlocking (true); DEBUG("New client" << socket << endl); clientList.push_back(JClient(socket)); } } } }