# # Copyright (C) 2006 # # This code is distributed under the terms and conditions of the # CCP4 Program Suite Licence Agreement as a CCP4 Library. # A copy of the CCP4 licence can be obtained by writing to the # CCP4 Secretary, Daresbury Laboratory, Warrington WA4 4AD, UK. # #==================================================================== # CCP4 Interface - dbsocket.py # # Classes and methods for handling pseudo-asynchronous communications # with the db handler # # Peter Briggs # #==================================================================== #CCP4i_cvs_Id $Id: dbsocket.py,v 1.9 2008/08/12 10:47:59 pjx Exp $ ##################################################################### import socket import select import time import threading import Queue import re import exceptions import xml.dom.minidom import xml.sax.saxutils as su # Module globals _request_id = 0 _debug = False #_debug = True # The idea is that the client API should use this as the way to # communicate with the handler. # The client would need to implement its own listener to # check for broadcast messages - this would be important for a # visualiser application which would need to respond to changes # in the db content class dbRequest: """Class representing a handler request.""" def __init__(self,command,arguments): """Command is the base command being issued; arguments is a tuple of appropriate options""" self.__command = command self.__arg_list = arguments self.__id = get_request_id() def __str__(self): """Print the string representation of the dbRequest object.""" text = str(self.__command) if len(self.__arg_list) !=0: for arg in self.__arg_list: text = text + " " + str(arg) return text def xml(self): """Transform the request into XML""" xml_request = ""+su.escape(str(self.__command))+"" if len(self.__arg_list) !=0: for arg in self.__arg_list: if type(arg) == list: xml_request = xml_request + "" for item in arg: xml_request = xml_request + ""+ su.escape(str(item))+"" xml_request = xml_request + "" else: xml_request = xml_request + \ ""+su.escape(str(arg))+"" xml_request = xml_request + \ ""+self.__id+"" return xml_request def request_id(self): """Return the request id string.""" return self.__id class dbResponse: """Class representing a response from the handler. The object is initialised with the raw XML returned by the handler, and is classified as either a response to request or as a broadcast.""" def __init__(self,xml_response): """xml_response is the raw response from the handler""" self.__xml_response = xml_response # Parameters for response self.__xml_line = [] self.__is_response = [] self.__status = [] self.__result = [] self.__id = [] # Parameter for broadcast self.__is_broadcast = [] self.__broadcast = [] ### self.__message = [] self.__project = [] self.__jobid = [] self.__operation = [] self.__agent = [] ### # Check for multiple lines response_list = xml_response.split("\n") # Counter self.__count = 0 # Parse the input for xml_line in response_list: # Initialise the parameters self.__is_response.append(False) self.__status.append("") self.__result.append("") self.__id.append("") self.__is_broadcast.append(False) self.__broadcast.append("") self.__message.append("") self.__project.append("") self.__jobid.append("") self.__operation.append("") self.__agent.append("") # Process the line if self.parse(xml_line): self.__xml_line.append(xml_line) # Increment the counter self.__count = self.__count+1 def __str__(self): """String representation of the response object.""" for i in range(1,self.__count): if self.is_broadcast(i): return "Broadcast: "+str(self.broadcast(i)) elif self.is_response(i): return "Response: "+str(self.status(i))+" "+str(self.result(i)) else: return "Unrecognised: "+str(self.__xml_response[i]) def parse(self,xml_line): """Attempt to process the XML and extract the components. Return False if the extraction fails, and True otherwise.""" # Check for empty string if xml_line == "": ##print "Empty string in dbResponse parse method" return False # Set index for lists i = self.__count # Test for broadcast first if '' in xml_line: self.__is_broadcast[i] = True try: doc = xml.dom.minidom.parseString(xml_line) # Extract the message components message = self.getBroadcastElement(doc,"message") project = self.getBroadcastElement(doc,"project") jobid = self.getBroadcastElement(doc,"jobid") operation = self.getBroadcastElement(doc,"operation") agent = self.getBroadcastElement(doc,"agent") self.__message[i] = message self.__project[i] = project self.__jobid[i] = jobid self.__operation[i] = operation self.__agent[i] = agent self.__broadcast[i] = [self.__message[i], self.__project[i], self.__jobid[i], self.__operation[i], self.__agent[i]] return True except exceptions.Exception,e: # Parsing failed print "Exception in dbResponse parse method" print "Input xml: "+str(xml_line) print "response.parse (broadcast): Exception: "+str(e) raise elif '' in xml_line: # Process response try: doc = xml.dom.minidom.parseString(xml_line) # Status tag statusTag = doc.getElementsByTagName("status") status = statusTag[0].childNodes[0].data # Result tag resultTag = doc.getElementsByTagName("result") # Id tag # if single value, append to resultlist, else retrive # items from nested tag if resultTag[0].childNodes[0].nodeType == \ resultTag[0].childNodes[0].TEXT_NODE: result = resultTag[0].childNodes[0].data else: list = resultTag[0].childNodes items = list[0].childNodes result = self.getXMLitems(items,[]) try: responseidTag = doc.getElementsByTagName("response_id") responseid = responseidTag[0].childNodes[0].data except IndexError: # The response id is not compulsory responseid = "" # Successfully processed the XML self.__status[i] = status self.__result[i] = result self.__id[i] = responseid self.__is_response[i] = True return True except exceptions.Exception,e: # Parsing failed print "Exception in dbResponse parse method" print "Input xml: "+str(xml_line) print "response.parse (response): not a response:" return False else: # Something unrecognised return False def getBroadcastElement(self,doc,element): """Return value for a broadcast element. 'doc' is the result of a xml.dom.minidom.parseString() call on a string representing an XML broadcast; 'element' is a broadcast element name e.g. 'message', 'project', etc. Returns the value of text enclosed in the tags. If the raises an IndexError if the element is not found, unless the 'ignore_missing' argument is set to True.""" resultTag = doc.getElementsByTagName(element) result = "" try: if resultTag[0].hasChildNodes(): result = resultTag[0].childNodes[0].data except IndexError: # Unable to find the element - ignore pass return result def getXMLitems(self,items,varlist): """Retrieve items from an XML-wrapped result list. This is called from the parse method to deal with lists in the XML response i.e. ..... constructs.""" value = varlist tmp = [] for item in items: # check if it is nested try: if item.childNodes[0].nodeName == 'list': # if find the next node is 'list', get the nested items nesteditems = item.childNodes[0].childNodes # call the function itself self.getXMLitems(nesteditems,tmp) else: # if it is a text, get the data from the node tmp.append(item.childNodes[0].data) except IndexError: # This seems to happen when the "item" node contains no # data - append an empty data item tmp.append(u"") # append the tmp result to the return value value.append(tmp) if len(value) == 1: # only have one element in the list, then return the element. return value[0] else: return value def nresponses(self): """Return the number of responses (lines) processed.""" return self.__count def xml_line(self,i): """Return the i'th line of input XML.""" return self.__xml_line[i] def is_response(self,i): """Check if the i'th line contains an XML response.""" return self.__is_response[i] def status(self,i): """Return the status component of the i'th response.""" if self.__is_response[i]: return self.__status[i] else: return "" def result(self,i): """Return the result component of the i'th response.""" if self.__is_response[i]: return self.__result[i] else: return "" def response_id(self,i): """Return the id component of the i'th response.""" if self.__is_response[i]: return self.__id[i] else: return "" def is_broadcast(self,i): """Check if the i'th object contains an XML broadcast.""" return self.__is_broadcast[i] def broadcast(self,i): """Return the i'th broadcast message from the XML.""" if self.__is_broadcast[i]: return self.__broadcast[i] else: return "" def message(self,i): """ Return the message component of the i'th broadcast""" if self.__is_broadcast[i]: return self.__message[i] else: return "" def project(self,i): """ Return the project component of the i'th broadcast""" if self.__is_broadcast[i]: return self.__project[i] else: return "" def jobid(self,i): """ Return the jobid component of the i'th broadcast""" if self.__is_broadcast[i]: return self.__jobid[i] else: return "" def operation(self,i): """ Return the operation component of the i'th broadcast""" if self.__is_broadcast[i]: return self.__operation[i] else: return "" def agent(self,i): """ Return the agent component of the i'th broadcast""" if self.__is_broadcast[i]: return self.__agent[i] else: return "" def report(self): """Report the contents of the response object.""" print "******************************************" print "Number of lines: "+str(self.nresponses()) for i in range(0,self.nresponses()): print "Response #"+str(i) print "\tData line: "+str(self.__xml_line[i]) if self.is_broadcast(i): print "\tBroadcast message:" #print "\t\t\""+str(self.message(i))+"\"" print "\t\tmessage: "+str(self.message(i)) print "\t\tproject: "+str(self.project(i)) print "\t\tjobid : "+str(self.jobid(i)) print "\t\toperation: "+str(self.operation(i)) print "\t\tagent : "+str(self.agent(i)) elif self.is_response(i): print "\tRequest response:" print "\t\tStatus: "+str(self.status(i)) print "\t\tResult: "+str(self.result(i)) print "\t\tId : "+str(self.response_id(i)) else: print "\tUnrecognised:" print "\t\t"+str(self.__xml_line[i]) class dbsocket(threading.Thread): """Class for handling interactions between client and handler. A dbsocket object is instantiated by providing it with a host name and port number. It then opens a socket connection to the handler via that host port. The client can send requests to the handler, (and get back the responses) using the 'request' method of the dbsocket object. The dbsocket intercepts handler broadcasts and ensures that these don't interfere with the client's request/response pairs. The client can check and collect broadcasts separately using the nbroadcast and getbroadcast methods. Responses to requests are in the form of lists: [status,result].""" def __init__(self,host,port,timeout=30): # Start ##print "Instantiating dbsocket object..." threading.Thread.__init__(self) self.setDaemon(True) self.requestQueue = Queue.Queue() self.responseQueue = Queue.Queue() self.broadcastQueue = Queue.Queue() # Setup the socket connection try: self.host = host self.port = port self.addr = (self.host,self.port) self.socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM) self.socket.connect(self.addr) except exceptions.Exception,e: # Connection attempt failed print "dbsocket: init failed with exception:" print str(e) raise # Start the queue self.active = True self.timeout = timeout self.request_id = 0 #print "dbsocket: invoking start method" self.start() # Check that the dbsocket is still active def isActive(self): """Return True if the dbsocket is still active, False if not.""" return self.active # Application should use this to send a # request to the handler def request(self,command,args): """Send a request to the handler. The request consists of a command and a list of arguments. The result of the request method is the response from the handler (or a timeout message if no response was received within the timeout period).""" global _debug debug = _debug request = dbRequest(command,args) if debug: print "dbsocket: received request \""+str(request)+"\"" self.requestQueue.put(request) if debug: print "dbsocket: request placed in the queue, waiting" response = self.responseQueue.get() if debug: print "dbsocket: got the response \""+str(response)+"\"" if debug: print "dbsocket: returning" return response # Start the dbsocket process running def run(self): """Called by start method to start the dbsocket process. This handles sending requests to the handler, processing the responses, and dealing with any broadcast messages.""" global _debug debug = _debug # Use start_time as a flag to indicate whether we're # currently waiting for a request start_time = -1 # Set up a loop to run whileever the dbsocket is active xml_response = "" while self.active: ##if debug: print "In loop..." if self.requestQueue.qsize() > 0 and start_time < 0: # Get the next request and send it request = self.requestQueue.get() xml_request = request.xml() if debug: print "dbsocket.run: got request: \""+str(xml_request)+"\"" self.socket.send(str(xml_request)+"\n") if debug: print "dbsocket.run: request sent" # Get the current time in case of timeout start_time = time.time() if debug: print "dbsocket.run: start_time = "+str(start_time) if socket_is_readable(self.socket): if debug: print "dbsocket.run: socket is readable" # There is some data to read xml_response = xml_response+str(self.socket.recv(4096)) if debug: print "dbsocket.run: read data from socket: \""+str(xml_response)+"\"" # Process the response if xml_response != "" and xml_response[-1] == "\n": response = dbResponse(xml_response) for i in range(0,response.nresponses()): if response.xml_line(i) == "": # Empty string received from the handler if debug: print "dbsocket.run: empty string received"\ +"from handler" self.responseQueue.put(["ERROR","Empty string received"]) # Clear the current request start_time = -1 elif response.is_broadcast(i): # Add to the broadcast queue if debug: print "dbsocket.run: detected broadcast" print str(response.broadcast(i)) self.broadcastQueue.put(response.broadcast(i)) #self.broadcastQueue.put([response.message(i),response.project(i),response.jobid(i),response.operation(i),response.agent(i)]) elif start_time >= 0.0: # Check whether this is a response to a request if response.is_response(i): if response.response_id(i) == request.request_id(): # This is the correct response # Add it to the response queue if debug: print "dbsocket.run: response id matched request id" self.responseQueue.put([response.status(i), response.result(i)]) # Reset the timer start_time = -1 else: # A response but with the wrong id if debug: print "dbsocket.run: bad response received: \""\ +str(xml_response)+"\" (ignored)" else: # Not sure what this is if debug: print "dbsocket.run: error assumed" report_error(response.result(i)) # Reset the response buffer xml_response = "" ##if debug: print "dbsocket.run: checking for timeout..." if self.timeout_exceeded(start_time): # No response from server after timeout # has elapsed if debug: print "dbsocket.run: timeout condition reached" self.responseQueue.put(["ERROR","Server timed out"]) # Clear this request start_time = -1 # Close the connection def close(self): """Close the dbsocket connection. Terminate the socket connection to the handler and flag the dbsocket as inactive.""" self.active = False ntries=0 while self.isAlive() and ntries < 100: # Keep checking until the thread has terminated time.sleep(0.1) ntries=ntries+1 if self.isAlive(): print "dbsocket.close: warning, thread is still active" self.socket.close() return True # Return true if timeout period has been exceeded def timeout_exceeded(self,start_time): """Internal method: check whether the timeout has been exceeded. Returns True if the elapsed time has exceeded the timeout period, and False if not.""" if start_time < 0: return False if (time.time() - start_time) > self.timeout: return True return False # Return number of broadcast messages in the queue def nbroadcasts(self): """Return the number of broadcast messages waiting processing.""" return self.broadcastQueue.qsize() # Pull the oldest broadcast message from the queue def getbroadcast(self): """Return the oldest available broadcast message for processing. Broadcast messages are put in a queue when received. Fetching a broadcast message removes it from the queue.""" if self.nbroadcasts() > 0: print 'broadcast:' print self.broadcastQueue.get() return self.broadcastQueue.get() else: return "" # Check and report broadcasts class report_broadcast(threading.Thread): """Example of a thread process to monitor and report on broadcasts. Given a dbsocket object, the report_broadcast object continually checks for broadcast messages and prints them to stdout.""" def __init__(self,dbsock): threading.Thread.__init__(self) self.setDaemon(1) self.socket = dbsock self.start() def run(self): while self.socket.isActive(): while self.socket.nbroadcasts() > 0: broadcast = self.socket.getbroadcast() print "******************************************************" print "Received broadcast message:\n" print str(broadcast) print "******************************************************" # Test whether a specific socket is readable def socket_is_readable(socket): """Checks whether the specified socket object is readable. Returns True if data can be read from the socket, False if not.""" if not socket: return False try: slist = select.select([socket],[],[],0.01) if len(slist[0]) > 0: # Socket is ready to read return True except exceptions.Exception,e: print "Exception in socket_is_readable: "+str(e) return False # Generate a unique request id def get_request_id(): """Generate a unique identifier string for a request.""" global _request_id _request_id = _request_id + 1 requestid = "request#"+str(_request_id) return requestid # Report error def report_error(error): """Internal error reporting method.""" print "dbsocket: error \""+str(error)+"\""