|
|
|
import re
|
|
|
|
import shutil
|
|
|
|
import subprocess
|
|
|
|
from dataclasses import dataclass
|
|
|
|
|
|
|
|
|
|
|
|
class Xfconf:
|
|
|
|
"""Interface around Xfconf, using xion-query behind the scene."""
|
|
|
|
|
|
|
|
def __init__(self, xq=None):
|
|
|
|
self._xq = xq or self.find_xq()
|
|
|
|
|
|
|
|
def xq(self, command, print_failures=True):
|
|
|
|
"""Run a xion-query command and return its output or None on error."""
|
|
|
|
command.insert(0, self._xq)
|
|
|
|
try:
|
|
|
|
return subprocess.check_output(
|
|
|
|
command, stderr=subprocess.STDOUT
|
|
|
|
).decode()
|
|
|
|
except subprocess.CalledProcessError as exc:
|
|
|
|
if not print_failures:
|
|
|
|
return None
|
|
|
|
print(f"xion-query command failed with code {exc.returncode}.")
|
|
|
|
if exc.stdout:
|
|
|
|
print("stdout:", exc.stdout.decode().strip())
|
|
|
|
if exc.stderr:
|
|
|
|
print("stderr:", exc.stderr.decode().strip())
|
|
|
|
return None
|
|
|
|
|
|
|
|
def xqs(self, command_str, print_failures=True):
|
|
|
|
"""Wrapper of xq, splitting a string command."""
|
|
|
|
return self.xq(command_str.split(" "), print_failures=print_failures)
|
|
|
|
|
|
|
|
def get_channel_list(self):
|
|
|
|
"""Return the channel list or None on error."""
|
|
|
|
output = self.xqs("-l")
|
|
|
|
if output is None:
|
|
|
|
return None
|
|
|
|
return output.splitlines()
|
|
|
|
|
|
|
|
def get_property_list(self, channel, root="/"):
|
|
|
|
"""Return the property list for this channel or None on error."""
|
|
|
|
output = self.xqs(f"-c {channel} -l")
|
|
|
|
if output is None:
|
|
|
|
return None
|
|
|
|
return [p for p in output.splitlines() if p.startswith(root)]
|
|
|
|
|
|
|
|
def does_property_exist(self, channel, prop):
|
|
|
|
"""Return True if this property exists."""
|
|
|
|
output = self.xqs(f"-c {channel} -p {prop}", print_failures=False)
|
|
|
|
return output is not None
|
|
|
|
|
|
|
|
def get_property(self, channel, prop):
|
|
|
|
"""Return this property or None on error."""
|
|
|
|
output = self.xqs(f"-c {channel} -p {prop}")
|
|
|
|
if output is None:
|
|
|
|
return None
|
|
|
|
return XfconfProperty.parse(output)
|
|
|
|
|
|
|
|
def set_property(self, channel, prop, prop_type, value):
|
|
|
|
"""Create or update this property."""
|
|
|
|
if not self.does_property_exist(channel, prop):
|
|
|
|
self.create_property(channel, prop, prop_type, value)
|
|
|
|
else:
|
|
|
|
self.update_property(channel, prop, value)
|
|
|
|
|
|
|
|
def create_property(self, channel, prop, prop_type, value):
|
|
|
|
"""Create a new property with those params, return True on success."""
|
|
|
|
if " " in value:
|
|
|
|
value = f'"{value}"'
|
|
|
|
output = self.xq(["-c", channel, "-p", prop, "-n",
|
|
|
|
"-t", prop_type, "-s", value])
|
|
|
|
if output is None:
|
|
|
|
return False
|
|
|
|
|
|
|
|
def update_property(self, channel, prop, value):
|
|
|
|
"""Update an existing property, return True on success."""
|
|
|
|
if " " in value:
|
|
|
|
value = f'"{value}"'
|
|
|
|
output = self.xq(["-c", channel, "-p", prop, "-s", value])
|
|
|
|
return output == ""
|
|
|
|
|
|
|
|
def reset_root(self, channel, root):
|
|
|
|
"""Reset all channel properties under root, return True on success."""
|
|
|
|
output = self.xqs(f"-c {channel} -p {root} -r -R")
|
|
|
|
return output == ""
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def find_xq():
|
|
|
|
xq = shutil.which("xion-query")
|
|
|
|
if not xq:
|
|
|
|
exit("Could not find xion-query in path.")
|
|
|
|
return xq
|
|
|
|
|
|
|
|
|
|
|
|
XION_PROP_RE = re.compile(r"t:(\S+) (.+)")
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
class XfconfProperty:
|
|
|
|
"""Hold type and value for an Xfconf property."""
|
|
|
|
gtype: str
|
|
|
|
value: None = None
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def parse(prop_str):
|
|
|
|
if prop_str.startswith("a:"):
|
|
|
|
return XfconfProperty.parse_array(prop_str)
|
|
|
|
return XfconfProperty._parse_property(prop_str)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def parse_array(prop_str):
|
|
|
|
expected_length = 0
|
|
|
|
properties = []
|
|
|
|
for line in prop_str.splitlines():
|
|
|
|
if line.startswith("a:"):
|
|
|
|
try:
|
|
|
|
expected_length = int(line.split(":")[1])
|
|
|
|
except ValueError:
|
|
|
|
print("Failed to get expected array length.")
|
|
|
|
return None
|
|
|
|
elif line.startswith("t:"):
|
|
|
|
prop = XfconfProperty._parse_property(line)
|
|
|
|
if not prop:
|
|
|
|
return None
|
|
|
|
properties.append(prop)
|
|
|
|
if len(properties) != expected_length:
|
|
|
|
print(f"Number of properties ({len(properties)}) received "
|
|
|
|
f"is different than expected ({expected_length}).")
|
|
|
|
return properties
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def _parse_property(prop_str):
|
|
|
|
match = XION_PROP_RE.match(prop_str)
|
|
|
|
if not match:
|
|
|
|
print(f"Failed to parse '{prop_str}'.")
|
|
|
|
return None
|
|
|
|
return XfconfProperty(gtype=match.group(1), value=match.group(2))
|