WIP3
This commit is contained in:
@@ -0,0 +1,338 @@
|
||||
#!/usr/bin/python3 -tt
|
||||
# SPDX-License-Identifier: BSD-3-Clause
|
||||
# vim: fileencoding=utf8
|
||||
|
||||
import os
|
||||
import socket
|
||||
import sys
|
||||
import uuid
|
||||
|
||||
# pylint: disable=import-error, no-name-in-module
|
||||
from ansible.module_utils.network_lsr import MyError
|
||||
|
||||
|
||||
class Util:
|
||||
|
||||
PY3 = sys.version_info[0] == 3
|
||||
|
||||
STRING_TYPE = str if PY3 else basestring # noqa:F821
|
||||
|
||||
@staticmethod
|
||||
def first(iterable, default=None, pred=None):
|
||||
for v in iterable:
|
||||
if pred is None or pred(v):
|
||||
return v
|
||||
return default
|
||||
|
||||
@staticmethod
|
||||
def check_output(argv):
|
||||
# subprocess.check_output is python 2.7.
|
||||
with open("/dev/null", "wb") as DEVNULL:
|
||||
import subprocess
|
||||
|
||||
env = os.environ.copy()
|
||||
env["LANG"] = "C"
|
||||
p = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=DEVNULL, env=env)
|
||||
# FIXME: Can we assume this to always be UTF-8?
|
||||
out = p.communicate()[0].decode("UTF-8")
|
||||
if p.returncode != 0:
|
||||
raise MyError("failure calling %s: exit with %s" % (argv, p.returncode))
|
||||
return out
|
||||
|
||||
@classmethod
|
||||
def create_uuid(cls):
|
||||
return str(uuid.uuid4())
|
||||
|
||||
@classmethod
|
||||
def NM(cls):
|
||||
n = getattr(cls, "_NM", None)
|
||||
if n is None:
|
||||
# Installing pygobject in a tox virtualenv does not work out of the
|
||||
# box
|
||||
# pylint: disable=import-error
|
||||
import gi
|
||||
|
||||
gi.require_version("NM", "1.0")
|
||||
from gi.repository import NM, GLib, Gio, GObject
|
||||
|
||||
cls._NM = NM
|
||||
cls._GLib = GLib
|
||||
cls._Gio = Gio
|
||||
cls._GObject = GObject
|
||||
n = NM
|
||||
return n
|
||||
|
||||
@classmethod
|
||||
def GLib(cls):
|
||||
cls.NM()
|
||||
return cls._GLib
|
||||
|
||||
@classmethod
|
||||
def Gio(cls):
|
||||
cls.NM()
|
||||
return cls._Gio
|
||||
|
||||
@classmethod
|
||||
def GObject(cls):
|
||||
cls.NM()
|
||||
return cls._GObject
|
||||
|
||||
@classmethod
|
||||
def Timestamp(cls):
|
||||
return cls.GLib().get_monotonic_time()
|
||||
|
||||
@classmethod
|
||||
def GMainLoop(cls):
|
||||
gmainloop = getattr(cls, "_GMainLoop", None)
|
||||
if gmainloop is None:
|
||||
gmainloop = cls.GLib().MainLoop()
|
||||
cls._GMainLoop = gmainloop
|
||||
return gmainloop
|
||||
|
||||
@classmethod
|
||||
def GMainLoop_run(cls, timeout=None):
|
||||
if timeout is None:
|
||||
cls.GMainLoop().run()
|
||||
return True
|
||||
|
||||
GLib = cls.GLib()
|
||||
timeout_reached = []
|
||||
loop = cls.GMainLoop()
|
||||
|
||||
def _timeout_cb(unused):
|
||||
timeout_reached.append(1)
|
||||
loop.quit()
|
||||
return False
|
||||
|
||||
timeout_id = GLib.timeout_add(int(timeout * 1000), _timeout_cb, None)
|
||||
loop.run()
|
||||
if not timeout_reached:
|
||||
GLib.source_remove(timeout_id)
|
||||
return not timeout_reached
|
||||
|
||||
@classmethod
|
||||
def GMainLoop_iterate(cls, may_block=False):
|
||||
return cls.GMainLoop().get_context().iteration(may_block)
|
||||
|
||||
@classmethod
|
||||
def GMainLoop_iterate_all(cls):
|
||||
c = 0
|
||||
while cls.GMainLoop_iterate():
|
||||
c += 1
|
||||
return c
|
||||
|
||||
@classmethod
|
||||
def call_async_method(cls, object_, action, args, mainloop_timeout=10):
|
||||
""" Asynchronously call a NetworkManager method """
|
||||
cancellable = cls.create_cancellable()
|
||||
async_action = action + "_async"
|
||||
# NM does not use a uniform naming for the async methods,
|
||||
# for checkpoints it is:
|
||||
# NMClient.checkpoint_create() and NMClient.checkpoint_create_finish(),
|
||||
# but for reapply it is:
|
||||
# NMDevice.reapply_async() and NMDevice.reapply_finish()
|
||||
# NMDevice.reapply() is a synchronous version
|
||||
# Therefore check if there is a method if an `async` suffix and use the
|
||||
# one without the suffix otherwise
|
||||
if not hasattr(object_, async_action):
|
||||
async_action = action
|
||||
finish = action + "_finish"
|
||||
user_data = {}
|
||||
|
||||
fullargs = []
|
||||
fullargs += args
|
||||
fullargs += (cancellable, cls.create_callback(finish), user_data)
|
||||
|
||||
getattr(object_, async_action)(*fullargs)
|
||||
|
||||
if not cls.GMainLoop_run(mainloop_timeout):
|
||||
cancellable.cancel()
|
||||
raise MyError("failure to call %s.%s(): timeout" % object_, async_action)
|
||||
|
||||
success = user_data.get("success", None)
|
||||
if success is not None:
|
||||
return success
|
||||
|
||||
raise MyError(
|
||||
"failure to %s checkpoint: %s: %r"
|
||||
% (action, user_data.get("error", "unknown error"), user_data)
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def create_cancellable(cls):
|
||||
return cls.Gio().Cancellable.new()
|
||||
|
||||
@classmethod
|
||||
def create_callback(cls, finish_method):
|
||||
"""
|
||||
Create a callback that will return the result of the finish method and
|
||||
quit the GMainLoop
|
||||
|
||||
:param finish_method str: Name of the finish method to call from the
|
||||
source object in the callback
|
||||
"""
|
||||
|
||||
def callback(source_object, res, user_data):
|
||||
success = None
|
||||
try:
|
||||
success = getattr(source_object, finish_method)(res)
|
||||
except Exception as e:
|
||||
if cls.error_is_cancelled(e):
|
||||
return
|
||||
user_data["error"] = str(e)
|
||||
user_data["success"] = success
|
||||
cls.GMainLoop().quit()
|
||||
|
||||
return callback
|
||||
|
||||
@classmethod
|
||||
def error_is_cancelled(cls, e):
|
||||
GLib = cls.GLib()
|
||||
if isinstance(e, GLib.GError):
|
||||
if (
|
||||
e.domain == "g-io-error-quark"
|
||||
and e.code == cls.Gio().IOErrorEnum.CANCELLED
|
||||
):
|
||||
return True
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def ifname_valid(ifname):
|
||||
# see dev_valid_name() in kernel's net/core/dev.c
|
||||
if not ifname:
|
||||
return False
|
||||
if ifname in [".", ".."]:
|
||||
return False
|
||||
if len(ifname) >= 16:
|
||||
return False
|
||||
if any([c == "/" or c == ":" or c.isspace() for c in ifname]):
|
||||
return False
|
||||
# FIXME: encoding issues regarding python unicode string
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def mac_aton(mac_str, force_len=None):
|
||||
# we also accept None and '' for convenience.
|
||||
# - None yiels None
|
||||
# - '' yields []
|
||||
if mac_str is None:
|
||||
return mac_str
|
||||
i = 0
|
||||
b = []
|
||||
for c in mac_str:
|
||||
if i == 2:
|
||||
if c != ":":
|
||||
raise MyError("not a valid MAC address: '%s'" % (mac_str))
|
||||
i = 0
|
||||
continue
|
||||
try:
|
||||
if i == 0:
|
||||
n = int(c, 16) * 16
|
||||
i = 1
|
||||
else:
|
||||
assert i == 1
|
||||
n = n + int(c, 16)
|
||||
i = 2
|
||||
b.append(n)
|
||||
except Exception:
|
||||
raise MyError("not a valid MAC address: '%s'" % (mac_str))
|
||||
if i == 1:
|
||||
raise MyError("not a valid MAC address: '%s'" % (mac_str))
|
||||
if force_len is not None:
|
||||
if force_len != len(b):
|
||||
raise MyError(
|
||||
"not a valid MAC address of length %s: '%s'" % (force_len, mac_str)
|
||||
)
|
||||
return b
|
||||
|
||||
@staticmethod
|
||||
def mac_ntoa(mac):
|
||||
if mac is None:
|
||||
return None
|
||||
return ":".join(["%02x" % c for c in mac])
|
||||
|
||||
@staticmethod
|
||||
def mac_norm(mac_str, force_len=None):
|
||||
return Util.mac_ntoa(Util.mac_aton(mac_str, force_len))
|
||||
|
||||
@staticmethod
|
||||
def boolean(arg):
|
||||
if arg is None or isinstance(arg, bool):
|
||||
return arg
|
||||
arg0 = arg
|
||||
if isinstance(arg, Util.STRING_TYPE):
|
||||
arg = arg.lower()
|
||||
|
||||
if arg in ["y", "yes", "on", "1", "true", 1, True]:
|
||||
return True
|
||||
if arg in ["n", "no", "off", "0", "false", 0, False]:
|
||||
return False
|
||||
|
||||
raise MyError("value '%s' is not a boolean" % (arg0))
|
||||
|
||||
@staticmethod
|
||||
def parse_ip(addr, family=None):
|
||||
if addr is None:
|
||||
return (None, None)
|
||||
if family is not None:
|
||||
Util.addr_family_check(family)
|
||||
a = socket.inet_pton(family, addr)
|
||||
else:
|
||||
a = None
|
||||
family = None
|
||||
try:
|
||||
a = socket.inet_pton(socket.AF_INET, addr)
|
||||
family = socket.AF_INET
|
||||
except Exception:
|
||||
a = socket.inet_pton(socket.AF_INET6, addr)
|
||||
family = socket.AF_INET6
|
||||
return (socket.inet_ntop(family, a), family)
|
||||
|
||||
@staticmethod
|
||||
def addr_family_check(family):
|
||||
if family != socket.AF_INET and family != socket.AF_INET6:
|
||||
raise MyError("invalid address family %s" % (family))
|
||||
|
||||
@staticmethod
|
||||
def addr_family_to_v(family):
|
||||
if family is None:
|
||||
return ""
|
||||
if family == socket.AF_INET:
|
||||
return "v4"
|
||||
if family == socket.AF_INET6:
|
||||
return "v6"
|
||||
raise MyError("invalid address family '%s'" % (family))
|
||||
|
||||
@staticmethod
|
||||
def addr_family_default_prefix(family):
|
||||
Util.addr_family_check(family)
|
||||
if family == socket.AF_INET:
|
||||
return 24
|
||||
else:
|
||||
return 64
|
||||
|
||||
@staticmethod
|
||||
def addr_family_valid_prefix(family, prefix):
|
||||
Util.addr_family_check(family)
|
||||
if family == socket.AF_INET:
|
||||
m = 32
|
||||
else:
|
||||
m = 128
|
||||
return prefix >= 0 and prefix <= m
|
||||
|
||||
@staticmethod
|
||||
def parse_address(address, family=None):
|
||||
try:
|
||||
parts = address.split()
|
||||
addr_parts = parts[0].split("/")
|
||||
if len(addr_parts) != 2:
|
||||
raise MyError("expect two addr-parts: ADDR/PLEN")
|
||||
a, family = Util.parse_ip(addr_parts[0], family)
|
||||
prefix = int(addr_parts[1])
|
||||
if not Util.addr_family_valid_prefix(family, prefix):
|
||||
raise MyError("invalid prefix %s" % (prefix))
|
||||
if len(parts) > 1:
|
||||
raise MyError("too many parts")
|
||||
return {"address": a, "family": family, "prefix": prefix}
|
||||
except Exception:
|
||||
raise MyError("invalid address '%s'" % (address))
|
||||
Reference in New Issue
Block a user