# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import re from typing import Dict, List from xml.etree import ElementTree as ET # noqa from libcloud.common.base import XmlResponse, ConnectionUserAndKey # API HOST to connect API_HOST = "durabledns.com" def _schema_builder(urn_nid, method, attributes): """ Return a xml schema used to do an API request. :param urn_nid: API urn namespace id. :type urn_nid: type: ``str`` :param method: API method. :type method: type: ``str`` :param attributes: List of attributes to include. :type attributes: ``list`` of ``str`` rtype: :class:`Element` """ soap = ET.Element("soap:Body", {"xmlns:m": "https://durabledns.com/services/dns/%s" % method}) urn = ET.SubElement(soap, "urn:{}:{}".format(urn_nid, method)) # Attributes specification for attribute in attributes: ET.SubElement(urn, "urn:{}:{}".format(urn_nid, attribute)) return soap SCHEMA_BUILDER_MAP = { "list_zones": { "urn_nid": "listZoneswsdl", "method": "listZones", "attributes": ["apiuser", "apikey"], }, "list_records": { "urn_nid": "listRecordswsdl", "method": "listRecords", "attributes": ["apiuser", "apikey", "zonename"], }, "get_zone": { "urn_nid": "getZonewsdl", "method": "getZone", "attributes": ["apiuser", "apikey", "zonename"], }, "get_record": { "urn_nid": "getRecordwsdl", "method": "getRecord", "attributes": ["apiuser", "apikey", "zonename", "recordid"], }, "create_zone": { "urn_nid": "createZonewsdl", "method": "createZone", "attributes": [ "apiuser", "apikey", "zonename", "ns", "mbox", "refresh", "retry", "expire", "minimum", "ttl", "xfer", "update_acl", ], }, "create_record": { "urn_nid": "createRecordwsdl", "method": "createRecord", "attributes": [ "apiuser", "apikey", "zonename", "name", "type", "data", "aux", "ttl", "ddns_enabled", ], }, "update_zone": { "urn_nid": "updateZonewsdl", "method": "updateZone", "attributes": [ "apiuser", "apikey", "zonename", "ns", "mbox", "refresh", "retry", "expire", "minimum", "ttl", "xfer", "update_acl", ], }, "update_record": { "urn_nid": "updateRecordwsdl", "method": "updateRecord", "attributes": [ "apiuser", "apikey", "zonename", "id", "name", "aux", "data", "ttl", "ddns_enabled", ], }, "delete_zone": { "urn_nid": "deleteZonewsdl", "method": "deleteZone", "attributes": ["apiuser", "apikey", "zonename"], }, "delete_record": { "urn_nid": "deleteRecordwsdl", "method": "deleteRecord", "attributes": ["apiuser", "apikey", "zonename", "id"], }, } class DurableDNSException(Exception): def __init__(self, code, message): self.code = code self.message = message self.args = (code, message) def __str__(self): return "{} {}".format(self.code, self.message) def __repr__(self): return "DurableDNSException {} {}".format(self.code, self.message) class DurableResponse(XmlResponse): errors = [] # type: List[Dict] objects = [] # type: List[Dict] def __init__(self, response, connection): super().__init__(response=response, connection=connection) self.objects, self.errors = self.parse_body_and_error() if self.errors: raise self._make_excp(self.errors[0]) def parse_body_and_error(self): """ Used to parse body from httplib.HttpResponse object. """ objects = [] errors = [] error_dict = {} extra = {} zone_dict = {} record_dict = {} xml_obj = self.parse_body() # pylint: disable=no-member envelop_body = list(xml_obj)[0] method_resp = list(envelop_body)[0] # parse the xml_obj # handle errors if "Fault" in method_resp.tag: fault = [fault for fault in list(method_resp) if fault.tag == "faultstring"][0] error_dict["ERRORMESSAGE"] = fault.text.strip() error_dict["ERRORCODE"] = self.status errors.append(error_dict) # parsing response from listZonesResponse if "listZonesResponse" in method_resp.tag: answer = list(method_resp)[0] for element in answer: zone_dict["id"] = list(element)[0].text objects.append(zone_dict) # reset the zone_dict zone_dict = {} # parse response from listRecordsResponse if "listRecordsResponse" in method_resp.tag: answer = list(method_resp)[0] for element in answer: for child in list(element): if child.tag == "id": record_dict["id"] = child.text.strip() objects.append(record_dict) # reset the record_dict for later usage record_dict = {} # parse response from getZoneResponse if "getZoneResponse" in method_resp.tag: for child in list(method_resp): if child.tag == "origin": zone_dict["id"] = child.text.strip() zone_dict["domain"] = child.text.strip() elif child.tag == "ttl": zone_dict["ttl"] = int(child.text.strip()) elif child.tag == "retry": extra["retry"] = int(child.text.strip()) elif child.tag == "expire": extra["expire"] = int(child.text.strip()) elif child.tag == "minimum": extra["minimum"] = int(child.text.strip()) else: if child.text: extra[child.tag] = child.text.strip() else: extra[child.tag] = "" zone_dict["extra"] = extra objects.append(zone_dict) # parse response from getRecordResponse if "getRecordResponse" in method_resp.tag: answer = list(method_resp)[0] for child in list(method_resp): if child.tag == "id" and child.text: record_dict["id"] = child.text.strip() elif child.tag == "name" and child.text: record_dict["name"] = child.text.strip() elif child.tag == "type" and child.text: record_dict["type"] = child.text.strip() elif child.tag == "data" and child.text: record_dict["data"] = child.text.strip() elif child.tag == "aux" and child.text: record_dict["aux"] = child.text.strip() elif child.tag == "ttl" and child.text: record_dict["ttl"] = child.text.strip() if not record_dict: error_dict["ERRORMESSAGE"] = "Record does not exist" error_dict["ERRORCODE"] = 404 errors.append(error_dict) objects.append(record_dict) record_dict = {} if "createZoneResponse" in method_resp.tag: answer = list(method_resp)[0] if answer.tag == "return" and answer.text: record_dict["id"] = answer.text.strip() objects.append(record_dict) # catch Record does not exists error when deleting record if "deleteRecordResponse" in method_resp.tag: answer = list(method_resp)[0] if "Record does not exists" in answer.text.strip(): errors.append({"ERRORMESSAGE": answer.text.strip(), "ERRORCODE": self.status}) # parse response in createRecordResponse if "createRecordResponse" in method_resp.tag: answer = list(method_resp)[0] record_dict["id"] = answer.text.strip() objects.append(record_dict) record_dict = {} return (objects, errors) def parse_body(self): # A problem arise in the api response because there are undeclared # xml namespaces. In order to fix that at the moment, we use the # _fix_response method to clean up since we won't always have lxml # library. self._fix_response() body = super().parse_body() return body def success(self): """ Used to determine if the request was successful. """ return len(self.errors) == 0 def _make_excp(self, error): return DurableDNSException(error["ERRORCODE"], error["ERRORMESSAGE"]) def _fix_response(self): items = re.findall('', self.body, flags=0) for item in items: parts = item.split(" ") prefix = parts[0].replace("<", "").split(":")[1] new_item = "<" + prefix + ">" close_tag = "" new_close_tag = "" self.body = self.body.replace(item, new_item) self.body = self.body.replace(close_tag, new_close_tag) class DurableConnection(ConnectionUserAndKey): host = API_HOST responseCls = DurableResponse def add_default_params(self, params): params["user_id"] = self.user_id params["key"] = self.key return params def add_default_headers(self, headers): headers["Content-Type"] = "text/xml" headers["Content-Encoding"] = "gzip; charset=ISO-8859-1" return headers