Source code for clusterdock.utils

# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Various utilities to be used by other modules."""

import json
import logging
import operator
import os
import random
import re
import subprocess
from collections import namedtuple
from functools import reduce
from pkg_resources import get_distribution
from time import sleep, time

import docker

from .config import defaults

logger = logging.getLogger(__name__)

client = docker.from_env()


[docs]def nested_get(dict_, keys): """Utility function that returns the value of a sequence of nested keys. Example: >>> details = {'name': {'first': {'english': 'Dima'}}} >>> nested_get(details, ['name', 'first', 'english']) 'Dima' Args: dict_ (:obj:`dict`): Dictionary to access. keys (:obj:`list`): A list of keys to access in a nested manner. Returns: The value. """ return reduce(operator.getitem, keys, dict_)
# The `#:` constructs at the end of assignments are part of Sphinx's autodoc functionality. DEFAULT_TIME_BETWEEN_CHECKS = 1 #: DEFAULT_TIMEOUT = 60 #:
[docs]def wait_for_condition(condition, condition_args=None, condition_kwargs=None, time_between_checks=DEFAULT_TIME_BETWEEN_CHECKS, timeout=DEFAULT_TIMEOUT, time_to_success=0, success=None, failure=None): """Wait until a condition is satisfied (or timeout). Args: condition: Callable to evaluate. condition_args (optional): A list of args to pass to the ``condition``. Default: ``None`` condition_kwargs (optional): A dictionary of kwargs to pass to the ``condition``. Default: ``None`` time_between_checks (:obj:`int`, optional): Seconds between condition checks. Default: :py:const:`DEFAULT_TIME_BETWEEN_CHECKS` timeout (:obj:`int`, optional): Seconds to wait before timing out. Default: :py:const:`DEFAULT_TIMEOUT` time_to_success (:obj:`int`, optional): Seconds for the condition to hold true before it is considered satisfied. Default: ``0`` success (optional): Callable to invoke when ``condition`` succeeds. A ``time`` variable will be passed as an argument, so can be used. Default: ``None`` failure (optional): Callable to invoke when timeout occurs. ``timeout`` will be passed as an argument. Default: ``None`` Raises: :py:obj:`TimeoutError` """ start_time = time() stop_time = start_time + timeout success_start_time = None while time() < stop_time: outcome = condition(*condition_args or [], **condition_kwargs or {}) if outcome: success_start_time = success_start_time or time() if time() >= success_start_time + time_to_success: if success is not None: success(time='{:.3f}'.format(time() - start_time)) return else: success_start_time = None sleep(time_between_checks) failure(timeout=timeout)
[docs]def join_url_parts(*parts): """ Join a URL from a list of parts. See http://stackoverflow.com/questions/24814657 for examples of why urllib.parse.urljoin is insufficient for what we want to do. """ return '/'.join([piece.strip('/') for piece in parts])
[docs]def version_tuple(version): """ Convert a version string or tuple to a tuple. Will return (major, minor, release) kind of format. """ if isinstance(version, str): return tuple(int(x) for x in version.split('.')) elif isinstance(version, tuple): return version
[docs]def version_str(version): """ Convert a version tuple or string to a string. Will return major.minor.release kind of format. """ if isinstance(version, str): return version elif isinstance(version, tuple): return '.'.join([str(int(x)) for x in version])
[docs]def get_clusterdock_label(cluster_name=None): """ Generate a clusterdock meta data label in json format. Meta data such as: clusterdock package name, version, location of clusterdock install, etc. Args: cluster_name (:obj:`str`, optional): Cluster name to attach to meta data label. Default: ``None`` Returns: (json): clusterdock meta data label """ label_str = '' try: package = get_distribution('clusterdock') label_info = {'name': package.project_name, 'version': package.version, 'location': package.location} if cluster_name: label_info['cluster_name'] = cluster_name label_str = json.dumps(label_info) except: pass return label_str
ADJECTIVES = ['accurate', 'actual', 'angular', 'associative', 'astronomical', 'asymmetrical', 'available', 'beautiful', 'biggest', 'bimodal', 'biochemical', 'biological', 'bright', 'celestial', 'closest', 'colorful', 'comparable', 'computational', 'consistent', 'conspicuous', 'continuous', 'conventional', 'coolest', 'cosmic', 'cosmological', 'critical', 'crucial', 'cubic', 'deeper', 'different', 'difficult', 'distant', 'dynamical', 'early', 'easiest', 'efficient', 'electromagnetic', 'empirical', 'evolutionary', 'faster', 'favorable', 'fewer', 'fissile', 'fissionable', 'functional', 'galactic', 'gaseous', 'gaussian', 'gravitational', 'greater', 'gregarious', 'hard', 'heaviest', 'hierarchical', 'highest', 'historical', 'homogeneous]', 'hot', 'impervious', 'important', 'intelligent', 'intense', 'intergalactic', 'internal', 'interstellar', 'intrinsic', 'invisible', 'kinetic', 'largest', 'linear', 'magnetic', 'mechanical', 'molecular', 'morphological', 'naive', 'nearest', 'nuclear', 'obvious', 'oldest', 'optical', 'orbital', 'outer', 'outward', 'perceptible', 'photographic', 'photometric', 'physical', 'planetary', 'precise', 'proper', 'random', 'reliable', 'richest', 'robust', 'rotational', 'scientific', 'shortest', 'significant', 'similar', 'skeletal', 'smallest', 'solar', 'southern', 'spectral', 'spectroscopic', 'spherical', 'strong', 'subsequent', 'successful', 'sufficient', 'systematic', 'terrestrial', 'thematic', 'tidal', 'tighter', 'typical', 'uncertain', 'uncollected', 'unformed', 'unlikely', 'unrelated', 'unresolved', 'unstable', 'unusual', 'useful', 'violent', 'visible', 'visual', 'weak'] # Astro cluster names NAMES = ['antlia', 'bullet', 'carolines_rose', 'centaurus', 'chandelier', 'coathanger', 'coma', 'double', 'el_gordo', 'fornax', 'globular', 'hyades', 'hydra', 'laniakea_super', 'm22', 'm35', 'mayall2', 'musket_ball', 'ngc752', 'norma', 'omicron_velorum', 'pandora', 'phoenix', 'pleiades', 'praesepe', 'ptolemy', 'pyxis', 'reticulum', 'beehive', 'hercules', 'wild_duck', 'virgo']
[docs]def generate_cluster_name(): """ Generate a random cluster name. """ return '{}_{}'.format(random.choice(ADJECTIVES), random.choice(NAMES))
[docs]def get_containers(clusterdock=False): """ Get Docker containers. Args: clusterdock (:obj:`bool`, optional): clusterdock containers only. Default: ``False`` Returns: (:obj:`list`): List of containers. """ Container = namedtuple('Container', ['cluster_name', 'container']) label_key = defaults['DEFAULT_DOCKER_LABEL_KEY'] cluster_containers = [] if client.containers.list(): for container in client.containers.list(all=True): if not clusterdock: cluster_containers.append(Container(None, container)) else: labels = nested_get(container.attrs, ['Config', 'Labels']) if label_key in labels: label = json.loads(labels[label_key]) cluster_containers.append(Container(label['cluster_name'], container)) return cluster_containers
[docs]def max_len_list_dict_item(list_dict, attr): """ Returns max length of a given attribute from a list of dict items. """ length = 0 for item in list_dict: length = length if length > len(item[attr]) else len(item[attr]) return length
[docs]def get_container(hostname): """ Get running Docker container for a given hostname. """ for container in client.containers.list(): if nested_get(container.attrs, ['Config', 'Hostname']) == hostname: return container
[docs]class VersionSplit: """Util function to hold various parts of a version. Args: name (:obj:`str`) delimiter1 (:obj:`str`) version (:obj:`str`) delimiter2 (:obj:`str`) specifier (:obj:`str`) """ __slots__ = ['name', 'delimiter1', 'version', 'delimiter2', 'specifier'] def __init__(self, name, delimiter1, version, delimiter2, specifier): self.name = name self.delimiter1 = delimiter1 self.version = version self.delimiter2 = delimiter2 self.specifier = specifier def __iter__(self): for attr in self.__slots__: yield getattr(self, attr)
[docs]class Version: """Maven version string abstraction. Use this class to enable correct comparison of Maven versioned projects. For our purposes, any version is equivalent to any other version that has the same 4-digit version number (i.e. 3.0.0.0-SNAPSHOT == 3.0.0.0-RC2 == 3.0.0.0). Args: version (:obj:`str`) or (:obj:`int`) or (:obj:`float`): Version string (e.g. '2.5.0.0-SNAPSHOT'). """ # pylint: disable=protected-access,too-few-public-methods def __init__(self, version): self._pattern = '(^[a-zA-Z]+)?(-)?([\d.]+)(-)?(\w+)?' # name (^[a-zA-Z]+) May or may not exist # delimiter1 (-)? May or may not exist e.g. - or None # version ([\d.]+) Version number e.g. 3.8.2 # delimiter2 (-)? May or may not exist e.g. - or None # specifier (\w*) May or may not exist e.g. RC2 # Handle the case where version is int or float. if isinstance(version, (int, float)): version = str(version) self._str = version groups = re.search(self._pattern, self._str).groups() version_split = VersionSplit(*groups) # Parse the numeric part of versions. numeric_version_list = [int(i) for i in version_split.version.split('.')] # Add additional 0's to keep a min length of version. Not so pythonic but, probably is more readable and simple. while len(numeric_version_list) < 4: numeric_version_list.append(0) # Update version_split with appended zeros. version_split.version = numeric_version_list self._version_split = version_split self._tuple = tuple(version_split) def __repr__(self): return str(self._tuple) def __eq__(self, other): return (self._version_split.name == other._version_split.name and self._version_split.version == other._version_split.version) def __lt__(self, other): if not isinstance(other, Version): raise TypeError('Comparison can only be done for two Version instances.') if self._version_split.name != other._version_split.name: raise TypeError('Comparison can only be done between two Version instances with same name.') return self._version_split.version < other._version_split.version def __gt__(self, other): return other.__lt__(self) def __ge__(self, other): return self.__gt__(other) or self.__eq__(other) def __le__(self, other): return other.__ge__(self)