# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Module which contains common Kubernetes related code. """ import os import base64 import warnings from typing import Optional from libcloud.utils.py3 import b, httplib from libcloud.common.base import ( JsonResponse, ConnectionKey, ConnectionUserAndKey, KeyCertificateConnection, ) from libcloud.common.types import InvalidCredsError __all__ = [ "KubernetesException", "KubernetesBasicAuthConnection", "KubernetesTLSAuthConnection", "KubernetesTokenAuthConnection", "KubernetesDriverMixin", "VALID_RESPONSE_CODES", ] VALID_RESPONSE_CODES = [ httplib.OK, httplib.ACCEPTED, httplib.CREATED, httplib.NO_CONTENT, ] class KubernetesException(Exception): def __init__(self, code, message): self.code = code self.message = message self.args = (code, message) def __str__(self): return "{} {}".format(self.code, self.message) def __repr__(self): return "KubernetesException {} {}".format(self.code, self.message) class KubernetesResponse(JsonResponse): valid_response_codes = [ httplib.OK, httplib.ACCEPTED, httplib.CREATED, httplib.NO_CONTENT, ] def parse_error(self): if self.status == 401: raise InvalidCredsError("Invalid credentials") return self.body def success(self): return self.status in self.valid_response_codes class KubernetesTLSAuthConnection(KeyCertificateConnection): responseCls = KubernetesResponse timeout = 60 def __init__( self, key, secure=True, host="localhost", port="6443", key_file=None, cert_file=None, **kwargs, ): super().__init__( key_file=key_file, cert_file=cert_file, secure=secure, host=host, port=port, url=None, proxy_url=None, timeout=None, backoff=None, retry_delay=None, ) if key_file: keypath = os.path.expanduser(key_file) is_file_path = os.path.exists(keypath) and os.path.isfile(keypath) if not is_file_path: raise InvalidCredsError( "You need an key PEM file to authenticate " "via tls. For more info please visit:" "https://kubernetes.io/docs/concepts/" "cluster-administration/certificates/" ) self.key_file = key_file certpath = os.path.expanduser(cert_file) is_file_path = os.path.exists(certpath) and os.path.isfile(certpath) if not is_file_path: raise InvalidCredsError( "You need an certificate PEM file to authenticate " "via tls. For more info please visit:" "https://kubernetes.io/docs/concepts/" "cluster-administration/certificates/" ) self.cert_file = cert_file def add_default_headers(self, headers): if "Content-Type" not in headers: headers["Content-Type"] = "application/json" return headers class KubernetesTokenAuthConnection(ConnectionKey): responseCls = KubernetesResponse timeout = 60 def add_default_headers(self, headers): if "Content-Type" not in headers: headers["Content-Type"] = "application/json" if self.key: headers["Authorization"] = "Bearer " + self.key else: raise ValueError("Please provide a valid token in the key param") return headers class KubernetesBasicAuthConnection(ConnectionUserAndKey): responseCls = KubernetesResponse timeout = 60 def add_default_headers(self, headers): """ Add parameters that are necessary for every request If user and password are specified, include a base http auth header """ if "Content-Type" not in headers: headers["Content-Type"] = "application/json" if self.user_id and self.key: auth_string = b("{}:{}".format(self.user_id, self.key)) user_b64 = base64.b64encode(auth_string) headers["Authorization"] = "Basic %s" % (user_b64.decode("utf-8")) return headers class KubernetesDriverMixin: """ Base driver class to be used with various Kubernetes drivers. NOTE: This base class can be used in different APIs such as container and compute one. """ def __init__( self, key=None, secret=None, secure=False, host="localhost", port=4243, key_file=None, cert_file=None, ca_cert=None, ex_token_bearer_auth=False, ): """ :param key: API key or username to be used (required) :type key: ``str`` :param secret: Secret password to be used (required) :type secret: ``str`` :param secure: Whether to use HTTPS or HTTP. Note: Some providers only support HTTPS, and it is on by default. :type secure: ``bool`` :param host: Override hostname used for connections. :type host: ``str`` :param port: Override port used for connections. :type port: ``int`` :param key_file: Path to the key file used to authenticate (when using key file auth). :type key_file: ``str`` :param cert_file: Path to the cert file used to authenticate (when using key file auth). :type cert_file: ``str`` :param ex_token_bearer_auth: True to use token bearer auth. :type ex_token_bearer_auth: ``bool`` :return: ``None`` """ if ex_token_bearer_auth: self.connectionCls = KubernetesTokenAuthConnection if not key: msg = 'The token must be a string provided via "key" argument' raise ValueError(msg) secure = True if key_file or cert_file: # Certificate based auth is used if not (key_file and cert_file): raise ValueError("Both key and certificate files are needed") if key_file: self.connectionCls = KubernetesTLSAuthConnection self.key_file = key_file self.cert_file = cert_file secure = True if host and host.startswith("https://"): secure = True host = self._santize_host(host=host) super().__init__( key=key, secret=secret, secure=secure, host=host, port=port, key_file=key_file, cert_file=cert_file, ) if ca_cert: self.connection.connection.ca_cert = ca_cert else: # do not verify SSL certificate warnings.warn( "Kubernetes has its own CA, since you didn't supply " "a CA certificate be aware that SSL verification " "will be disabled for this session." ) self.connection.connection.ca_cert = False self.connection.secure = secure self.connection.host = host if port is not None: self.connection.port = port def _ex_connection_class_kwargs(self): kwargs = {} if hasattr(self, "key_file"): kwargs["key_file"] = self.key_file if hasattr(self, "cert_file"): kwargs["cert_file"] = self.cert_file return kwargs def _santize_host(self, host=None): # type: (Optional[str]) -> Optional[str] """ Sanitize "host" argument any remove any protocol prefix (if specified). """ if not host: return None prefixes = ["http://", "https://"] for prefix in prefixes: if host.startswith(prefix): host = host.lstrip(prefix) return host