#ifndef __JRUNCONTROL__JDAQDRIVER__
#define __JRUNCONTROL__JDAQDRIVER__

#include <string>
#include <iostream>
#include <sstream>
#include <limits>
#include <unistd.h>

#include "JLang/JException.hh"
#include "JLang/JRedirectStream.hh"
#include "JLang/JWhiteSpacesFacet.hh"

#include "JRuncontrol/JDAQClient.hh"
#include "JRuncontrol/JClient.hh"
#include "JRuncontrol/JClientList.hh"


/**
 * \author mdejong
 */

namespace KM3NETDAQ {

  using JLANG::JException;
  using JLANG::JIOException;
  using JLANG::JControlHostException;


  /**
   * Simple driver for run control clients.
   * This class can be used to start a set of run control clients,
   * trigger events and eventually stop the clients.
   */
  class JDAQDriver :
    public JDAQClient
  {
  public:
    /**
     * Constructor.
     *
     * \param  name                 name of driver
     * \param  server               name of command message server
     * \param  logger               logger
     * \param  level                debug level 
     * \param  timeout_s            timeout_s [s]
     */
    JDAQDriver(const std::string& name,
	       const std::string& server,
	       JLogger*           logger,
	       const int          level,
	       const int          timeout_s) :
      JDAQClient(name, server, logger, level),
      timeout_s (timeout_s),
      is_alive  (false)
    {}


    /**
     * Enter the state machine.
     * The driver will subscribe to the ControlHost tags corresponding to born, died and
     * reply messages of the clients instead of the standard tags for run control commands.
     * The clients are started after the driver is ready to receive ControlHost messages.
     * In case of an error, a message is printed on the terminal and the state machine
     * is not entered.
     */
    virtual bool enter() override
    {
      using namespace std;
      using namespace JLANG;
      using namespace JNET;

      if (server.is_valid() && logger.is_valid()) {

	try {
	  
	  server->Subscribe(JSubscriptionAll(RC_REPLY)     + 
			    JSubscriptionAll(RC_LOG)       + 
			    JSubscriptionAll(DISPTAG_Born) + 
			    JSubscriptionAll(DISPTAG_Died));
	  server->SendMeAlways();
	  server->MyId(getFullName());

	  // check alive of this driver

	  for (int i = 0; i != timeout_s && !is_alive; ) {
	    if (!update(true)) {
	      sleep(1);
	      ++i;
	    }
	  }
	  
	  if (is_alive) {

	    {
	      clientList.start();
	      
	      for (int i = 0; i != timeout_s; ++i) {

		while (update(true)) {}

		if (clientList.count() >= clientList.count(JClient::ACTIVE))
		  break;
		else
		  sleep(1);
	      }
	    }

	    if (clientList.count() != clientList.count(JClient::ACTIVE)) {
	      for (JClientList::const_iterator i = clientList.begin(); i != clientList.end(); ++i) {
		if (i->isActive() && i->getBorn() <= i->getDied()) {
		  JWarningStream(logger) << "client " << i->getFullName() << " did not start.";
		}
	      }
	    }

	    return CHSM::machine::enter();

	  } else {
	    cerr << "Timeout at subscription." << endl;
	  }
	}
	catch(const JControlHostException& error) {
	  cerr << error << endl;
	}

      } else {
	cerr << "Message server or logger not properly initialised." << endl;
      }

      return false;
    }


    /**
     * Exit the state machine.
     *
     * This method waits for the clients to terminate using the died message generated by ControlHost.
     * In case of a timeout, the process is terminated by calling the method stop() of the corresponding client.
     */
    void actionExit()
    {
      using namespace std;
      using namespace JLANG;

      for (int i = 0; i != timeout_s; ++i) {

	while (update(true)) {}

	if (clientList.count() == 0)
	  break;
	else
	  sleep(1);
      }

      if (clientList.count() != 0) {
	JWarningStream(logger) << "Timeout at transition " << "exit(); forced stop.";
      }

      clientList.stop();
    }


    /**
     * Action when entering state.
     * This method waits for all clients to produce the enter state message.
     * In case of a timeout, no specific action is taken but an error message is logged.
     *
     * \param  state                entered state
     * \param  event                event that triggered transition
     */
    virtual void enterState(const CHSM::state& state, const CHSM::event& event) override
    {
      for (int i = 0; i != timeout_s && clientList.count(state) < clientList.count(JClient::ACTIVE) && clientList.count() != 0; ) {
	if (!update(true)) {
	  sleep(1);
	  ++i;
	}
      }

      if (clientList.count(state) < clientList.count(JClient::ACTIVE)) {
	JWarningStream(logger) << "Timeout at transition " << event.name() << " to state " << state.name(); 
      }
    }


    /**
     * Update client list with incoming ControlHost message.
     * This method receives and processes a message.
     * The client list is updated accordingly.
     * If the no-wait option is set to true, it returns in the absence of a pending message immediately.
     * The return value is then false.
     * If the no-wait option is set to false, it waits until the next message is received.
     *
     * \param  no_wait              wait option
     * \return                      true if message received; else false
     */
    bool update(const bool no_wait)
    {
      using namespace std;
      using namespace JNET;
      using JLANG::JWhiteSpacesFacet;

      try {

	string        tag;
	long long int length = 0;

	if ( no_wait && server->CheckHead(tag, length) <= 0) { return false; }
	if (!no_wait && server->WaitHead (tag, length) <  0) { return false; }
	
	char* data= new char[length];
	
	server->GetFullData(data, length);
	
	const string buffer(data, length);

	delete [] data;
	
	JDebugStream(logger) << "Got message " << tag << ' ' << buffer;
	
	if (tag == RC_LOG) {

	  rc_log = buffer;

	} else if (buffer.find(getFullName()) != string::npos) {
	  
	  if      (tag == DISPTAG_Born)
	    is_alive  = true;
	  else if (tag == DISPTAG_Died)
	    is_alive  = false;
	  
	} else {

	  JClientList::iterator i = clientList.find(buffer);
	  
	  if (i != clientList.end()) {

	    i->update(tag, buffer);

	  } else {

	    JErrorStream(logger) << "Message fom illegal client " << buffer;

	    try {

	      if (tag == DISPTAG_Born ||
		  tag == DISPTAG_Died ||
		  tag == RC_REPLY) {

		string key, hostname, name;
		
		istringstream is(buffer);
      
		const locale loc(is.getloc(), new JWhiteSpacesFacet(is.getloc(), TOKEN_DELIMETER));
      
		is.imbue(loc);
		
		if (is >> key >> hostname >> name && key == RUN_CONTROL_CLIENT) {

		  JClient client(name, hostname);

		  client.update(tag, buffer);
		  client.setMode(JClient::ILLEGAL);
	    
		  clientList.insert(client);
		  
		  JWarningStream(logger) << "Added illegal client " << client.getFullName();

		} else {
		  THROW(JIOException, "JClient: Error reading " << buffer);
		}
	      }
	    }
	    catch(const JException& error) {
	      JErrorStream(logger) << error;
	    }
	  }
	}

	return true;
      }
      catch(const JControlHostException& error) {
	JErrorStream(logger) << error;
      }

      return false;
    }

    virtual void actionStart(int, const char*) override
    {
      rc_log = "";
    }

    virtual void actionStop(int, const char*) override
    {
      if (rc_log != "") 
	JNoticeStream(logger) << rc_log;
      else
	JErrorStream (logger) << "Missing message from JDataWriter with tag " << RC_LOG;
    }

    /**
     * Run driver with user input.
     */
    void run()
    {
      run(std::cin);
    }


    /**
     * Run driver.
     *
     * Example input format:
     * <pre>
     * # comment line.
     *
     * process \<process name\> \<host name\> [\<start command\>];
     *
     * # The following tokens in \<start command\> will be substituted: 
     * #   $HOST$    by  \<host name\>;
     * #   $NAME$    by  \<process name\>;
     * #   $SERVER$  by  JClient::SERVER;
     * #   $LOGGER$  by  JClient::LOGGER;
     * #   $ARGS$    by  part following '/' in \<process name\>;
     *
     * # enter state machine.
     *
     * enter
     *
     * # trigger event
     * # data can be provided online and mixed with data from a separate file (optional).
     * # multiple tags should be separated by a new line.
     *
     * event \<event name\> {
     *   [\<tag 1\> [data]]
     *   [\<tag 2\> [data][%\<file name\>%][data]]
     * }
     *
     * # optionally quit before end of input
     * [quit]
     *
     * # optionally kill processes that did not properly terminate.
     * [exit]
     * </pre>
     *
     * \param  in                   input stream
     */
    void run(std::istream& in)
    {
      using namespace std;
      
      for (string key; in >> key; ) {

	if (key[0] == '#') {

	  in.ignore(numeric_limits<streamsize>::max(), '\n');

	} else if (key == "enter") {
	  
	  enter();
	  
	  if (!active()) {
	    cerr << "State machine not entered; abort." << endl;
	    return;
	  }

	} else if (key == "exit") {
	  
	  timeout_s = 0;
	  exit();
	  
	} else if (key == "quit") {
	  
	  break;
	  
	} else if (key == "sleep") {

	  int sec;

	  if (in >> sec) {
	    sleep(sec);
	  }
	  
	} else if (key == "process") {

	  string buffer;

	  getline(in, buffer, ';');
	  
	  istringstream is(buffer);
	  
	  JClient client;
	  
	  if (is >> client) {

	    client.setMode(JClient::ACTIVE);

	    if (!clientList.insert(client).second) {
	      JWarningStream(logger) << "Process already exists " << client;
	    }

	  } else {
	    JErrorStream(logger) << "Error reading key word process.";
	  }

	} else if (key == "event" || key == "event*") {

	  JEvent_t   event;
	  char       c;
	  string     buffer;
	  const char eol = '\n';
	  
	  if (in >> event >> c && c == '{' && getline(in, buffer, '}')) {

	    if (clientList.count() != 0) {

	      JDAQEvent_t* pev = findEvent(RC_CMD, event.getName());
	    
	      if (pev != NULL) {

		if (pev->active() || key == "event*") {

		  istringstream is(buffer);
		
		  for (string tag; is >> tag; ) {
		  
		    ostringstream os;

		    os << event << getTokenDelimeter();

		    copy(is, os, eol);

		    server->PutFullString(tag, os.str());
		  }

		  if (key != "event*") {
		    (*pev)(0, NULL);   // trigger driver
		  }

		} else {
		  JErrorStream(logger) << "Inactive event " << event;
		}

	      } else {
		JErrorStream(logger) << "Unknown event " << event;
	      }

	    } else {
	      JErrorStream(logger) << "No active client to trigger event.";
	    }

	  } else {
	    JErrorStream(logger) << "Error reading key word event.";
	  }

	} else if (key == "message") {

	  string tag;
	  string buffer;

	  if (in >> tag && getline(in, buffer, ';'))
	    server->PutFullString(tag, buffer);
	  else
	    JErrorStream(logger) << "Invalid message: <" << tag << "> \"" << buffer << "\"";
	  
	} else if (key == "print") {

	  for (JClientList::const_iterator i = clientList.begin(); i != clientList.end(); ++i) {
	    JNoticeStream(logger) << i->getFullName()     << ' '
				  << i->getStartCommand() << ' '
				  << i->getAlive()        << ' '
				  << i->getStatename();
	  }

	} else if (key == "filter") {

	  string client;
	  string buffer;

	  getline(in, buffer, ';');

	  for (istringstream is(buffer); is >> client; ) {
	    filter(client);
	  }

	} else if (key == "sync") {

	  synchronise();
	  
	} else {

	  JErrorStream(logger) << "Unknown key: " << key;

	  in.ignore(numeric_limits<streamsize>::max(), '\n');
	}
      }
    }


    /**
     * Update client list with incoming ControlHost messages until the client list 
     * is synchronised with the current state or until the timeout.
     */
    void update()
    {
      using namespace std;

      const CHSM::parent& parent = static_cast<const CHSM::parent&>(Main.RunControl);

      for (CHSM::parent::const_iterator state = parent.begin(); state != parent.end(); ++state) {

        if (state->active()) {
	  for (int i = 0; i != timeout_s && clientList.count(*state) < clientList.count(JClient::ACTIVE); ) {
	    if (!update(true)) {
	      sleep(1);
	      ++i;
	    }
	  }
	}
      }
    }


    /**
     * Synchronise clients.
     */
    void synchronise()
    {
      using namespace std;

      const CHSM::parent& parent = static_cast<const CHSM::parent&>(Main.RunControl);

      for (CHSM::parent::const_iterator state = parent.begin(); state != parent.end(); ++state) {

        if (state->active()) {

	  if (clientList.count(*state) < clientList.count(JClient::ACTIVE)) {

	    JDebugStream(logger) << "Synchronising " << state->name();
	    
	    for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) {

	      if (i->getMode() == JClient::ACTIVE) {
		
		if (!i->getAlive()) {
	      
		  try {
		
		    string buffer;
		
		    if (JControlHost::WhereIs(JClient::SERVER, i->getFullName(), buffer) > 0) {

		      i->setAlive(true);

		      if (buffer.find(i->getHostname()) == string::npos) {
			JErrorStream(logger) << i->getFullName() << " running on " << buffer << " but not alive.";
		      }
		    }
		  }
		  catch(const JControlHostException& error) {
		    JErrorStream(logger) << error;
		  }
		}

		if (i->getAlive() && i->getStatename() != state->name()) {
		  server->PutFullString(KM3NETDAQ::getUniqueTag(i->getHostname(), i->getName()), ev_check.name()); 
		}
	      }
	    }
	    
	    
	    for (int i = 0; i != timeout_s && clientList.count(*state) < clientList.count(JClient::ACTIVE); ) {
	      if (!update(true)) {
		sleep(1);
		++i;
	      }
	    }
	    
	    if (clientList.count(*state) < clientList.count(JClient::ACTIVE)) {
	      JWarningStream(logger) << "Timeout at synchronisation.";
	    }
	  }
	}
      }
    }


    /**
     * Filter client list by putting failing clients to sleep.
     * In this, only clients with names that contain the given character sequence are considered.
     *
     * \param  target               target name of client(s)
     */
    void filter(const std::string& target = "")
    {
      const CHSM::parent& parent = static_cast<const CHSM::parent&>(Main.RunControl);

      for (CHSM::parent::const_iterator state = parent.begin(); state != parent.end(); ++state) {

        if (state->active()) {

	  for (JClientList::iterator i = clientList.begin(); i != clientList.end(); ++i) {

	    if (target == "" || i->getName().find(target) != std::string::npos) {

	      if (!i->getAlive() || i->getStatename() != state->name()) {

		JNoticeStream(logger) << "Put to sleep " << i->getFullName();

		i->setMode(JClient::SLEEP);
	      }
	    }
	  }
	}
      }
    }
    
    int         timeout_s;      //!< timeout of state transitions [s]

  protected:

    JClientList clientList;
    bool        is_alive;
    std::string rc_log;

    /**
     * Copy data from input to output stream.
     * Tagged file names are recursively expanded.
     *
     * \param  in                   input  stream
     * \param  out                  output stream
     * \param  eol                  end of line
     */
    static void copy(std::istream& in, std::ostream& out, const char eol = '\n') 
    {
      using namespace std;

      string buffer; 

      if (getline(in, buffer, eol)) {

	for (string::size_type pos = 0; pos < buffer.length(); ) {
	  
	  string::size_type lpos = buffer.substr(pos).find(FILENAME_PREFIX);
	  string::size_type rpos = buffer.substr(pos).find(FILENAME_POSTFIX);
	  
	  if (lpos != string::npos && 
	      rpos != string::npos) {
	    
	    out << buffer.substr(pos, lpos);
	    
	    lpos += FILENAME_PREFIX.length();
	    pos  += lpos;
	    
	    ifstream file(buffer.substr(pos, rpos - lpos).c_str());
	    
	    copy(file, out, '\0');
	    
	    rpos += FILENAME_POSTFIX.length();
	    pos  += rpos - lpos;
	    
	  } else {
	    
	    out << buffer.substr(pos);
	    
	    pos  += buffer.substr(pos).length();
	  }
	}
      }
    }
  };
}

#endif