Adding Netbox

This commit is contained in:
Patrick Toal
2019-05-06 00:34:45 -04:00
parent 832502de34
commit 6e2205a046
278 changed files with 12767 additions and 0 deletions

View File

@@ -0,0 +1,174 @@
# (c) 2018, Ansible by Red Hat, inc
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
ANSIBLE_METADATA = {'metadata_version': '1.1',
'status': ['preview'],
'supported_by': 'network'}
DOCUMENTATION = """
---
module: cli
author: Peter Sprygada (@privateip)
short_description: Runs the specific command and returns the output
description:
- The command specified in C(command) will be executed on the remote
device and its output will be returned to the module. This module
requires that the device is supported using the C(network_cli)
connection plugin and has a valid C(cliconf) plugin to work correctly.
version_added: "2.5"
options:
command:
description:
- The command to be executed on the remote node. The value for this
argument will be passed unchanged to the network device and the
output returned.
required: yes
default: null
parser:
description:
- The parser file to pass the output from the command through to
generate Ansible facts. If this argument is specified, the output
from the command will be parsed based on the rules in the
specified parser.
default: null
engine:
description:
- Defines the engine to use when parsing the output. This argument
accepts one of two valid values, C(command_parser) or C(textfsm_parser).
default: command_parser
choices:
- command_parser
- textfsm_parser
name:
description:
- The C(name) argument is used to define the top-level fact name to
hold the output of textfsm_engine parser. If this argument is not provided,
the output from parsing will not be exported. Note that this argument is
only considered when C(engine) is C(textfsm_parser).
default: null
"""
EXAMPLES = """
- name: return show version
cli:
command: show version
- name: return parsed command output
cli:
command: show version
parser: parser_templates/show_version.yaml
- name: parse with textfsm_parser engine
cli:
command: show version
parser: parser_templates/show_version
engine: textfsm_parser
name: system_facts
"""
RETURN = """
stdout:
description: returns the output from the command
returned: always
type: dict
json:
description: the output converted from json to a hash
returned: always
type: dict
"""
import json
from ansible.plugins.action import ActionBase
from ansible.module_utils.connection import Connection, ConnectionError
from ansible.module_utils._text import to_text
from ansible.errors import AnsibleError
from ansible.utils.display import Display
display = Display()
class ActionModule(ActionBase):
def run(self, tmp=None, task_vars=None):
''' handler for cli operations '''
if task_vars is None:
task_vars = dict()
result = super(ActionModule, self).run(tmp, task_vars)
del tmp # tmp no longer has any effect
try:
command = self._task.args['command']
parser = self._task.args.get('parser')
engine = self._task.args.get('engine', 'command_parser')
if engine == 'textfsm_parser':
name = self._task.args.get('name')
elif engine == 'command_parser' and self._task.args.get('name'):
display.warning('name is unnecessary when using command_parser and will be ignored')
del self._task.args['name']
except KeyError as exc:
raise AnsibleError(to_text(exc))
socket_path = getattr(self._connection, 'socket_path') or task_vars.get('ansible_socket')
connection = Connection(socket_path)
# make command a required argument
if not command:
raise AnsibleError('missing required argument `command`')
try:
output = connection.get(command)
except ConnectionError as exc:
raise AnsibleError(to_text(exc))
result['stdout'] = output
# try to convert the cli output to native json
try:
json_data = json.loads(output)
except Exception:
json_data = None
result['json'] = json_data
if parser:
if engine not in ('command_parser', 'textfsm_parser'):
raise AnsibleError('missing or invalid value for argument engine')
new_task = self._task.copy()
new_task.args = {
'file': parser,
'content': (json_data or output)
}
if engine == 'textfsm_parser':
new_task.args.update({'name': name})
kwargs = {
'task': new_task,
'connection': self._connection,
'play_context': self._play_context,
'loader': self._loader,
'templar': self._templar,
'shared_loader_obj': self._shared_loader_obj
}
task_parser = self._shared_loader_obj.action_loader.get(engine, **kwargs)
result.update(task_parser.run(task_vars=task_vars))
self._remove_tmp_path(self._connection._shell.tmpdir)
# this is needed so the strategy plugin can identify the connection as
# a persistent connection and track it, otherwise the connection will
# not be closed at the end of the play
socket_path = getattr(self._connection, 'socket_path') or task_vars.get('ansible_socket')
self._task.args['_ansible_socket'] = socket_path
return result

View File

@@ -0,0 +1,416 @@
# (c) 2018, Ansible by Red Hat, inc
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
import sys
import collections
from ansible import constants as C
from ansible.plugins.action import ActionBase
from ansible.module_utils.common._collections_compat import Mapping
from ansible.module_utils.six import iteritems, string_types
from ansible.module_utils._text import to_text
from ansible.errors import AnsibleError
from ansible.utils.display import Display
try:
from ansible.module_utils.network.common.utils import to_list
except ImportError:
# keep role compatible with Ansible 2.4
from ansible.module_utils.network_common import to_list
sys.path.insert(0, os.path.join(os.path.dirname(__file__), os.path.pardir, 'lib'))
from network_engine.plugins import template_loader, parser_loader
from network_engine.utils import dict_merge, generate_source_path
display = Display()
def warning(msg):
if C.ACTION_WARNINGS:
display.warning(msg)
class ActionModule(ActionBase):
VALID_FILE_EXTENSIONS = ('.yaml', '.yml', '.json')
VALID_GROUP_DIRECTIVES = ('pattern_group', 'block')
VALID_ACTION_DIRECTIVES = ('parser_metadata', 'pattern_match', 'set_vars', 'json_template')
VALID_DIRECTIVES = VALID_GROUP_DIRECTIVES + VALID_ACTION_DIRECTIVES
VALID_EXPORT_AS = ('list', 'elements', 'dict', 'object', 'hash')
def run(self, tmp=None, task_vars=None):
if task_vars is None:
task_vars = dict()
result = super(ActionModule, self).run(tmp, task_vars)
try:
source_dir = self._task.args.get('dir')
source_file = self._task.args.get('file')
content = self._task.args['content']
except KeyError as exc:
return {'failed': True, 'msg': 'missing required argument: %s' % exc}
if source_dir and source_file:
return {'failed': True, 'msg': '`dir` and `file` are mutually exclusive arguments'}
if source_dir:
sources = self.get_files(to_list(source_dir))
else:
if source_file:
sources = to_list(source_file)
else:
searchpath = []
searchpath = task_vars.get('ansible_search_path', [])
if not searchpath:
searchpath.append(self._loader._basedir)
if 'parser_templates' in os.listdir(searchpath[0]):
subdir_searchpath = os.path.join(searchpath[0], 'parser_templates')
# parser in {{ playbook_dir }}/parser_templates/{{ ansible_network_os }}
if task_vars['ansible_network_os'] in os.listdir(subdir_searchpath):
newsearchpath = os.path.join(subdir_searchpath, task_vars['ansible_network_os'])
sources = self.get_parser(path=newsearchpath)
# parser in {{ playbook_dir }}/parser_templates
else:
sources = self.get_parser(path=subdir_searchpath)
# parser in {{ playbook_dir }}
else:
sources = self.get_parser(path=searchpath[0])
facts = {}
self.template = template_loader.get('json_template', self._templar)
paths = self._task.get_search_path()
for src in sources:
src = generate_source_path(paths, src)
if src is None:
raise AnsibleError("src [%s] is either missing or invalid" % src)
tasks = self._loader.load_from_file(src)
self.ds = {'content': content}
self.ds.update(task_vars)
for task in tasks:
name = task.pop('name', None)
display.vvvv('processing directive: %s' % name)
register = task.pop('register', None)
extend = task.pop('extend', None)
if extend:
extend = self.template(extend, self.ds)
export = task.pop('export', False)
export_as = task.pop('export_as', 'list')
export_as = self.template(export_as, self.ds)
if export_as not in self.VALID_EXPORT_AS:
raise AnsibleError('invalid value for export_as, got %s' % export_as)
if 'export_facts' in task:
task['set_vars'] = task.pop('export_facts')
export = True
elif 'set_vars' not in task:
if export and not register:
warning('entry will not be exported due to missing register option')
when = task.pop('when', None)
if when is not None:
if not self._check_conditional(when, self.ds):
display.vvv('command_parser: skipping task [%s] due to conditional check' % name)
continue
loop = task.pop('loop', None)
loop_var = task.pop('loop_control', {}).get('loop_var') or 'item'
if loop is not None:
loop = self.template(loop, self.ds)
if not loop:
display.vvv('command_parser: loop option was defined but no loop data found')
res = list()
if loop:
# loop is a hash so break out key and value
if isinstance(loop, Mapping):
for loop_key, loop_value in iteritems(loop):
self.ds[loop_var] = {'key': loop_key, 'value': loop_value}
resp = self._process_directive(task)
res.append(resp)
# loop is either a list or a string
else:
for loop_item in loop:
self.ds[loop_var] = loop_item
resp = self._process_directive(task)
res.append(resp)
if 'set_vars' in task:
if register:
self.ds[register] = res
if export:
if extend:
facts.update(self.merge_facts(task_vars, extend, register, res))
else:
facts[register] = res
else:
self.ds.update(res)
if export:
facts.update(res)
elif register:
self.ds[register] = res
if export:
if export_as in ('dict', 'hash', 'object'):
if extend:
facts.update(self.merge_facts(task_vars, extend, register, res, expand=True))
else:
if register not in facts:
facts[register] = {}
for item in res:
facts[register] = self.rec_update(facts[register], item)
else:
if extend:
facts.update(self.merge_facts(task_vars, extend, register, res))
else:
facts[register] = res
else:
res = self._process_directive(task)
if 'set_vars' in task:
if register:
self.ds[register] = res
if export:
if extend:
facts.update(self.merge_facts(task_vars, extend, register, res))
else:
facts[register] = res
else:
self.ds.update(res)
if export:
facts.update(res)
elif res and register:
self.ds[register] = res
if export:
if register:
if extend:
facts.update(self.merge_facts(task_vars, extend, register, res))
else:
facts[register] = res
else:
for r in to_list(res):
for k, v in iteritems(r):
facts.update({to_text(k): v})
task_vars.update(facts)
result.update({
'ansible_facts': facts,
'included': sources
})
return result
def merge_facts(self, task_vars, extend, register, res, expand=False):
update = self.build_update(extend, register, res, expand)
root = extend.split('.')[0]
current = {root: task_vars.get(root, {})}
return dict_merge(current, update)
def build_update(self, path, child, value, expand=False):
"""Build an update based on the current results
This method will take the current results and build a nested dict
object. The keys for the nested dict object are identified by
path.
:param path: The path of the nest keys
:param child: The child key name to assign the value to
:param value: The value to assign to the child key
:param expand: When set to True, this will iterate over the value
:returns: A nest dict object
"""
update_set = dict()
working_set = update_set
if expand is True:
for key in path.split('.'):
working_set[key] = dict()
working_set = working_set[key]
working_set[child] = {}
for item in value:
working_set[child] = self.rec_update(working_set[child], item)
else:
for key in path.split('.'):
working_set[key] = dict()
working_set = working_set[key]
working_set[child] = value
return update_set
def get_parser(self, path):
sources = list()
src_file = list()
for i in os.listdir(path):
if i.startswith('show_'):
f, ext = os.path.splitext(i)
if ext in self.VALID_FILE_EXTENSIONS:
src_file.append(i)
if len(src_file) == 1:
sources.append(os.path.join(path, src_file[0]))
elif len(src_file) == 0:
raise AnsibleError("no parser file found in {0}, please create a parser".format(path))
else:
raise AnsibleError("too many files in {0}, please use `file` or `dir` parameter".format(path))
return sources
def get_files(self, source_dirs):
include_files = list()
_processed = set()
for source_dir in source_dirs:
if not os.path.isdir(source_dir):
raise AnsibleError('%s does not appear to be a valid directory' % source_dir)
for filename in os.listdir(source_dir):
fn, fext = os.path.splitext(filename)
if fn not in _processed:
_processed.add(fn)
filename = os.path.join(source_dir, filename)
if not os.path.isfile(filename) or fext not in self.VALID_FILE_EXTENSIONS:
continue
else:
include_files.append(filename)
return include_files
def rec_update(self, d, u):
for k, v in iteritems(u):
if isinstance(v, Mapping):
d[k] = self.rec_update(d.get(k, {}), v)
else:
d[k] = v
return d
def do_pattern_group(self, block):
results = list()
registers = {}
for entry in block:
task = entry.copy()
name = task.pop('name', None)
display.vvv("command_parser: starting pattern_match [%s] in pattern_group" % name)
register = task.pop('register', None)
when = task.pop('when', None)
if when is not None:
if not self._check_conditional(when, self.ds):
warning('skipping task due to conditional check failure')
continue
loop = task.pop('loop', None)
if loop:
loop = self.template(loop, self.ds)
loop_var = task.pop('loop_control', {}).get('loop_var') or 'item'
display.vvvv('command_parser: loop_var is %s' % loop_var)
if not set(task).issubset(('pattern_group', 'pattern_match')):
raise AnsibleError('invalid directive specified')
if 'pattern_group' in task:
if loop and isinstance(loop, collections.Iterable) and not isinstance(loop, string_types):
res = list()
for loop_item in loop:
self.ds[loop_var] = loop_item
res.append(self.do_pattern_group(task['pattern_group']))
else:
res = self.do_pattern_group(task['pattern_group'])
if res:
results.append(res)
if register:
registers[register] = res
elif isinstance(loop, collections.Iterable) and not isinstance(loop, string_types):
loop_result = list()
for loop_item in loop:
self.ds[loop_var] = loop_item
loop_result.append(self._process_directive(task))
results.append(loop_result)
if register:
registers[register] = loop_result
else:
res = self._process_directive(task)
if res:
results.append(res)
if register:
registers[register] = res
return registers
def _process_directive(self, task):
for directive, args in iteritems(task):
if directive == 'block':
display.deprecated('`block` is not longer supported, use `pattern_group` instead', version=2.6)
directive = 'pattern_group'
if directive not in self.VALID_DIRECTIVES:
raise AnsibleError('invalid directive in parser: %s' % directive)
meth = getattr(self, 'do_%s' % directive)
if meth:
if directive in self.VALID_GROUP_DIRECTIVES:
return meth(args)
elif directive in self.VALID_ACTION_DIRECTIVES:
return meth(**args)
else:
raise AnsibleError('invalid directive: %s' % directive)
def do_parser_metadata(self, version=None, command=None, network_os=None):
if version:
display.vvv('command_parser: using parser version %s' % version)
if network_os not in (None, self.ds['ansible_network_os']):
raise AnsibleError('parser expected %s, got %s' % (network_os, self.ds['ansible_network_os']))
def do_pattern_match(self, regex, content=None, match_all=None, match_until=None, match_greedy=None):
content = self.template(content, self.ds) or self.template("{{ content }}", self.ds)
regex = self.template(regex, self.ds)
parser = parser_loader.get('pattern_match', content)
return parser.match(regex, match_all, match_until, match_greedy)
def do_json_template(self, template):
return self.template.run(template, self.ds)
def do_set_vars(self, **kwargs):
return self.template(kwargs, self.ds)
def _check_conditional(self, when, variables):
conditional = "{%% if %s %%}True{%% else %%}False{%% endif %%}"
return self.template(conditional % when, variables)

View File

@@ -0,0 +1,81 @@
# -*- coding: utf-8 -*-
# (c) 2018, Ansible by Red Hat, inc
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from ansible.module_utils.six import StringIO, string_types
from ansible.plugins.action import ActionBase
from ansible.errors import AnsibleError
try:
import textfsm
HAS_TEXTFSM = True
except ImportError:
HAS_TEXTFSM = False
class ActionModule(ActionBase):
def run(self, tmp=None, task_vars=None):
''' handler for textfsm action '''
if task_vars is None:
task_vars = dict()
result = super(ActionModule, self).run(tmp, task_vars)
del tmp # tmp no longer has any effect
try:
if not HAS_TEXTFSM:
raise AnsibleError('textfsm_parser engine requires the TextFSM library to be installed')
try:
filename = self._task.args.get('file')
src = self._task.args.get('src')
content = self._task.args['content']
name = self._task.args.get('name')
except KeyError as exc:
raise AnsibleError('missing required argument: %s' % exc)
if src and filename:
raise AnsibleError('`src` and `file` are mutually exclusive arguments')
if not isinstance(content, string_types):
return {'failed': True, 'msg': '`content` must be of type str, got %s' % type(content)}
if filename:
tmpl = open(filename)
else:
tmpl = StringIO()
tmpl.write(src.strip())
tmpl.seek(0)
try:
re_table = textfsm.TextFSM(tmpl)
fsm_results = re_table.ParseText(content)
except Exception as exc:
raise AnsibleError(str(exc))
final_facts = []
for item in fsm_results:
facts = {}
facts.update(dict(zip(re_table.header, item)))
final_facts.append(facts)
if name:
result['ansible_facts'] = {name: final_facts}
else:
result['ansible_facts'] = {}
finally:
self._remove_tmp_path(self._connection._shell.tmpdir)
return result

View File

@@ -0,0 +1,127 @@
# (c) 2018, Ansible by Red Hat, inc
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
ANSIBLE_METADATA = {'metadata_version': '1.1',
'status': ['preview'],
'supported_by': 'network'}
DOCUMENTATION = """
---
module: validate_role_spec
author: Peter Sprygada (@privateip
short_description: Validate required arguments are set from facts
description:
- This module will accept an external argument spec file that will be used to
validate arguments have been configured and set properly in order to allow
the role to proceed. This validate specification file provides the
equivalent of the Ansible module argument spec.
version_added: "2.7"
options:
spec:
description:
- Relative or absolute path to the arugment specification file to use to
validate arguments are properly set for role execution.
required: yes
"""
EXAMPLES = """
- name: use spec file for role validation
validate_role_spec:
spec: args.yaml
"""
RETURN = """
"""
import os
import json
from ansible.plugins.action import ActionBase
from ansible.module_utils._text import to_text, to_bytes
from ansible.module_utils.six import iteritems, string_types
from ansible.module_utils import basic
from ansible.errors import AnsibleModuleError
from ansible.utils.display import Display
display = Display()
class ActionModule(ActionBase):
VALID_MODULE_KWARGS = (
'argument_spec', 'mutually_exclusive', 'required_if',
'required_one_of', 'required_together'
)
def run(self, tmp=None, task_vars=None):
''' handler for cli operations '''
if task_vars is None:
task_vars = dict()
result = super(ActionModule, self).run(tmp, task_vars)
del tmp # tmp no longer has any effect
try:
spec = self._task.args['spec']
except KeyError as exc:
raise AnsibleModuleError(to_text(exc))
if not spec:
raise AnsibleModuleError('missing required argument: spec')
spec_fp = os.path.join(task_vars['role_path'], 'meta/%s' % spec)
display.vvv('using role spec %s' % spec_fp)
spec = self._loader.load_from_file(spec_fp)
if 'argument_spec' not in spec:
return {'failed': True, 'msg': 'missing required field in specification file: argument_spec'}
argument_spec = spec['argument_spec']
args = {}
self._handle_options(task_vars, args, argument_spec)
basic._ANSIBLE_ARGS = to_bytes(json.dumps({'ANSIBLE_MODULE_ARGS': args}))
basic.AnsibleModule.fail_json = self.fail_json
spec = dict([(k, v) for k, v in iteritems(spec) if k in self.VALID_MODULE_KWARGS])
validated_spec = basic.AnsibleModule(**spec)
result['role_params'] = validated_spec.params
result['changed'] = False
self._remove_tmp_path(self._connection._shell.tmpdir)
return result
def fail_json(self, msg):
raise AnsibleModuleError(msg)
def _handle_options(self, task_vars, args, spec):
for key, attrs in iteritems(spec):
if attrs is None:
spec[key] = {'type': 'str'}
elif isinstance(attrs, dict):
suboptions_spec = attrs.get('options')
if suboptions_spec:
args[key] = dict()
self._handle_options(task_vars, args[key], suboptions_spec)
if key in task_vars:
if isinstance(task_vars[key], string_types):
value = self._templar.do_template(task_vars[key])
if value:
args[key] = value
else:
args[key] = task_vars[key]
elif attrs:
if 'aliases' in attrs:
for item in attrs['aliases']:
if item in task_vars:
args[key] = self._templar.do_template(task_vars[item])
else:
args[key] = None

View File

@@ -0,0 +1,188 @@
# (c) 2019, Ansible Inc,
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
import yaml
import copy
import re
from ansible import constants as C
from ansible.module_utils._text import to_text
from ansible.playbook.role.requirement import RoleRequirement
from ansible.plugins.action import ActionBase
from ansible.utils.display import Display
display = Display()
class ActionModule(ActionBase):
def run(self, tmp=None, task_vars=None):
result = super(ActionModule, self).run(task_vars=task_vars)
self.META_MAIN = os.path.join('meta', 'main.yml')
self.META_INSTALL = os.path.join('meta', '.galaxy_install_info')
try:
role_path = self._task.args.get('role_path')
role_root_dir = os.path.split(role_path)[0]
except KeyError as exc:
return {'failed': True, 'msg': 'missing required argument: %s' % exc}
# Get dependancy version dict if not encoded in meta
depends_dict = self._task.args.get('depends_map')
try:
self._depends = self._get_role_dependencies(role_path)
# check if we know min_version for each dependant role
# from meta file or through user input to this plugin
(rc, msg) = self._check_depends(self._depends, depends_dict)
if not rc:
result['failed'] = True
result['msg'] = msg
return result
default_roles_path = copy.copy(C.DEFAULT_ROLES_PATH)
default_roles_path.append(role_root_dir)
(rc, msg) = self._find_dependant_role_version(
self._depends, default_roles_path)
if rc == 'Error':
result['failed'] = True
result['msg'] = msg
elif rc == 'Warning':
result['changed'] = True
result['Warning'] = True
result['msg'] = msg
elif rc == 'Success':
result['changed'] = False
result['msg'] = msg
except Exception as exc:
result['failed'] = True
result['msg'] = ('Exception received : %s' % exc)
return result
def _get_role_dependencies(self, role_path):
role_dependencies = []
dep_info = None
meta_path = os.path.join(role_path, self.META_MAIN)
if os.path.isfile(meta_path):
try:
f = open(meta_path, 'r')
metadata = yaml.safe_load(f)
role_dependencies = metadata.get('dependencies') or []
except (OSError, IOError):
display.vvv("Unable to load metadata for %s" % role_path)
return False
finally:
f.close()
if role_dependencies:
for dep in role_dependencies:
dep_req = RoleRequirement()
dep_info = dep_req.role_yaml_parse(dep)
return dep_info
def _find_dependant_role_version(self, dep_role, search_role_path):
found = False
dep_role_list = []
if isinstance(dep_role, dict):
# single role dependancy
dep_role_list.append(dep_role)
else:
dep_role_list = dep_role
# First preferrence is to find role in defined C.default_roles_path
for roles in dep_role_list:
for r_path in search_role_path:
dep_role_path = os.path.join(r_path, roles['name'])
if os.path.exists(dep_role_path):
found = True
install_ver = self._get_role_version(dep_role_path)
if install_ver == 'unknown':
msg = "WARNING! : role: %s installed version is unknown " \
"please check version if you downloded it from scm" % roles['name']
return ("Warning", msg)
if install_ver < roles['version']:
msg = "Error! : role: %s installed version :%s is less than " \
"required version: %s" % (roles['name'],
install_ver, roles['version'])
return ("Error", msg)
if not found:
msg = "role : %s is not installed in role search path: %s" \
% (roles['name'], search_role_path)
return ("Error", msg)
return ("Success", 'Success: All dependent roles meet min version requirements')
def _check_depends(self, depends, depends_dict):
depends_list = []
if isinstance(depends, dict):
# single role dependancy
depends_list.append(depends)
else:
depends_list = depends
for dep in depends_list:
if dep['version'] and depends_dict is None:
# Nothing to be done. Use version from meta
return (True, '')
if dep['version'] is None and depends_dict is None:
msg = "could not find min version from meta for dependent role : %s" \
" you can pass this info as depends_map arg e.g." \
"depends_map: - name: %s \n version: 2.6.5" \
% (dep['name'], dep['name'])
return (False, msg)
# Galaxy might return empty string when meta does not have version
# specified
if dep['version'] == '' and depends_dict is None:
msg = "could not find min version from meta for dependent role : %s" \
" you can pass this info as depends_map arg e.g." \
"depends_map: - name: %s \n version: 2.6.5" \
% (dep['name'], dep['name'])
return (False, msg)
for in_depends in depends_dict:
if in_depends['name'] == dep['name']:
if in_depends['version'] is None:
msg = 'min_version for role_name: %s is Unknown' % dep['name']
return (False, msg)
else:
ver = to_text(in_depends['version'])
# if version is defined without 'v<>' add 'v' for
# compliance with galaxy versioning
galaxy_compliant_ver = re.sub(r'^(\d+\..*)', r'v\1', ver)
dep['version'] = galaxy_compliant_ver
return (True, '')
def _get_role_version(self, role_path):
version = "unknown"
install_info = None
info_path = os.path.join(role_path, self.META_INSTALL)
if os.path.isfile(info_path):
try:
f = open(info_path, 'r')
install_info = yaml.safe_load(f)
except (OSError, IOError):
display.vvv(
"Unable to load galaxy install info for %s" % role_path)
return "unknown"
finally:
f.close()
if install_info:
version = install_info.get("version", None)
return version