Files
HAddons/samba/rootfs/usr/local/bin/mqtt_daemon.py
ai-dev e8681cdead test
2025-10-11 12:14:55 +02:00

651 lines
24 KiB
Python

# python 3.11
import logging
import subprocess
import time
import argparse
import collections
import asyncio
import json
import os
import typing
import hashlib
from pySMART import DeviceList, Device
from pySMART.interface import NvmeAttributes, AtaAttributes
import psutil
from psutil._common import sdiskio, sdiskusage
from ha_mqtt_discoverable import Settings, DeviceInfo
from ha_mqtt_discoverable.sensors import Sensor, SensorInfo, BinarySensor
from ha_mqtt_discoverable.sensors import BinarySensorInfo
import signal
from typing import Any, Callable, List, Literal, NoReturn, Self, Tuple, Union
import re
import humanize
from abc import ABC, abstractmethod
from diskinfo import Disk
# config = configparser.ConfigParser( strict=False )
# config.read("/etc/samba/smb.conf")
# Get the arguments from the command-line except the filename
ap = argparse.ArgumentParser()
# Add the arguments to the parser
ap.add_argument("-b", "--broker", required=True, help="Broker")
ap.add_argument("-p", "--port", required=True, help="Broker Port")
ap.add_argument("-u", "--user", required=True, help="Broker User")
ap.add_argument("-P", "--password", required=True, help="Broker Password")
ap.add_argument("-t", "--topic", required=False, help="Topic", default="sambanas")
ap.add_argument(
"-i",
"--hdidle_log",
required=False,
help="HD_IDLE log to listen for sleeping drivers",
)
ap.add_argument(
"-T", "--discovery_topic", required=False, help="Topic", default="homeassistant"
)
ap.add_argument(
"-d", "--persist_discovery", required=False, help="Topic", default=True, type=bool
)
ap.add_argument(
"-v", "--addon_version", required=False, help="Addon Version", default="latest"
)
ap.add_argument(
"-l",
"--logLevel",
required=False,
default="WARNING",
choices=[
"ALL",
"TRACE",
"DEBUG",
"INFO",
"NOTICE",
"WARNING",
"ERROR",
"CRITICAL",
"FATAL",
"OFF",
],
)
args: dict[str, Any] = vars(ap.parse_args())
match str(args["logLevel"]).upper():
case "DEBUG" | "ALL" | "TRACE":
logging.basicConfig(level=logging.DEBUG)
case "NOTICE":
logging.basicConfig(level=logging.INFO)
case "WARNING" | "INFO":
logging.basicConfig(level=logging.WARNING)
case "ERROR":
logging.basicConfig(level=logging.ERROR)
case "CRITICAL" | "FATAL":
logging.basicConfig(level=logging.CRITICAL)
case "OFF":
logging.basicConfig(level=logging.NOTSET)
case "_":
logging.basicConfig(level=logging.WARNING)
mqtt_settings = Settings.MQTT(
host=args["broker"],
port=int(args["port"]),
username=args["user"],
password=args["password"],
discovery_prefix=args["discovery_topic"],
state_prefix=args["topic"],
)
# Global Sensors regitry
class ConfigEntity(ABC):
def __init__(
self,
sensorInfo: Union[SensorInfo, BinarySensorInfo],
state_function: Callable[[Self], Any],
attributes_function: Callable[[Self], dict[str, Any]] | None = None,
) -> None:
self.sensorInfo: SensorInfo | BinarySensorInfo = sensorInfo
self.state_function = state_function
self.attributes_function = attributes_function
self.sensor: Union[Sensor, BinarySensor] = None # type: ignore
def createSensor(self) -> Self:
if self.sensor is not None:
return self.sensor # type: ignore
settings = Settings(mqtt=mqtt_settings, entity=self.sensorInfo)
if isinstance(self.sensorInfo, BinarySensorInfo):
self.sensor = BinarySensor(settings) # type: ignore
logging.debug(
"BinarySensor '%s' created", self.sensor.generate_config()["name"]
)
elif isinstance(self.sensorInfo, SensorInfo):
self.sensor = Sensor(settings) # type: ignore
logging.debug("Sensor '%s' created", self.sensor.generate_config()["name"])
return self
@abstractmethod
def detstroy(self) -> None:
pass
class ConfigEntityAutonomous(ConfigEntity):
def detstroy(self) -> None:
self.sensor.delete()
class ConfigEntityFromDevice(ConfigEntity):
def __init__(
self,
sensorInfo: Union[SensorInfo, BinarySensorInfo],
device: Device,
state_function: Callable[[Self], Any],
attributes_function: Callable[[Self], dict[str, Any]] = None, # type: ignore
):
super().__init__(sensorInfo, state_function, attributes_function)
self.device = device
def detstroy(self) -> None:
self.sensor.delete()
class ConfigEntityFromHDIdle(ConfigEntity):
def __init__(
self,
sensorInfo: Union[SensorInfo, BinarySensorInfo],
device: Device,
state_function: Callable[[Self], Any],
attributes_function: Callable[[Self], dict[str, Any]] = None, # type: ignore
):
super().__init__(sensorInfo, state_function, attributes_function)
self.device = device
def detstroy(self) -> None:
self.sensor.delete()
class ConfigEntityFromIoStat(ConfigEntity):
def __init__(
self,
sensorInfo: Union[SensorInfo, BinarySensorInfo],
iostat_device: str,
state_function: Callable[[Self], Any],
attributes_function: Callable[[Self], dict[str, Any]] | None = None,
) -> None:
super().__init__(sensorInfo, state_function, attributes_function)
self.iostat_device: str = iostat_device
self._iostat: collections.deque[tuple[int, dict[str, sdiskio] | None]] = (
collections.deque([(0, None), (0, None)], maxlen=2)
)
@property
def iostat(self) -> tuple[int, dict[str, sdiskio] | None]:
return self._iostat[1]
@property
def h_iostat(self) -> tuple[int, dict[str, sdiskio] | None]:
return self._iostat[0]
def addIostat(self, iostat: dict) -> None:
self._iostat.append((time.time_ns(), iostat.copy()))
def detstroy(self) -> None:
self.sensor.delete()
class ConfigEntityFromSamba(ConfigEntity):
def __init__(
self,
sensorInfo: Union[SensorInfo, BinarySensorInfo],
samba: dict,
state_function: Callable[[Self], Any],
attributes_function: Callable[[Self], dict[str, Any]] | None = None,
) -> None:
super().__init__(sensorInfo, state_function, attributes_function)
self.samba = samba
def detstroy(self):
self.sensor.delete()
def sambaMetricCollector() -> dict[str, Any]:
# smbstatus gets report of current samba server connections
try:
p = subprocess.Popen(["smbstatus", "-jf"], stdout=subprocess.PIPE)
output, err = p.communicate()
jsondata = json.loads(output.decode())
logging.debug("SmbClient: %s", jsondata)
except Exception:
logging.warning("Exception on smbstat comunication!")
jsondata = {}
data = {"samba_version": jsondata.get("version", "Unknown")}
if "sessions" in jsondata:
data["users"] = len(jsondata["sessions"])
data["users_json"] = jsondata["sessions"]
else:
data["users"] = 0
data["users_json"] = {}
data["connections"] = len(jsondata["tcons"]) if "tcons" in jsondata else 0
if "open_files" in jsondata:
data["open_files"] = len(jsondata["open_files"])
else:
data["open_files"] = 0
logging.debug(data)
return data
sensorList: List[Tuple[str, ConfigEntity]] = []
devicePowerStatus: dict[str, str] = {}
# Main device SambaNas
samba: dict[str, Any] = sambaMetricCollector()
sambanas_device_info = DeviceInfo(
name="SambaNas",
model="Addon",
manufacturer="@Dianlight",
sw_version=samba["samba_version"],
hw_version=args["addon_version"],
identifiers=[os.getenv("HOSTNAME", default="local_test")],
)
devlist = DeviceList()
psdata: dict[str, sdiskio] = psutil.disk_io_counters(perdisk=True, nowrap=True)
for dev_name in psdata.keys():
if re.search(r"\d+$", dev_name):
continue
dev = Device(dev_name)
disk_device_info = DeviceInfo(
name=f"SambaNas Disk {dev_name}",
model=dev.model,
sw_version=dev.firmware,
# connections=[[dev_name,sambanas_device_info.identifiers[0]]],
identifiers=[dev.serial or "Unknown(%s)" % dev_name],
via_device=sambanas_device_info.identifiers[0], # type: ignore
)
# sambanas_device_info.connections.append([dev_name,disk_device_info.identifiers[0]])
def smartAssesmentAttribute(ce: ConfigEntityFromDevice) -> dict[str, Any]:
attributes: dict[str, Any] = {
"smart_capable": ce.device.smart_capable,
"smart_enabled": ce.device.smart_enabled,
"assessment": ce.device.assessment,
"messages": ce.device.messages,
"rotation_rate": ce.device.rotation_rate,
"_test_running": ce.device._test_running,
"_test_progress": ce.device._test_progress,
}
if ce.device.if_attributes is None:
return attributes
if isinstance(ce.device.if_attributes, AtaAttributes):
atattrs: AtaAttributes = ce.device.if_attributes
for atattr in atattrs.legacyAttributes:
if atattr is None:
continue
attributes[atattr.name] = atattr.raw
health_value: Literal["OK", "FAIL"] = (
"OK"
if (atattr.thresh is not None and atattr.worst > atattr.thresh)
else "FAIL"
)
attributes[f"{atattr.name}_Health"] = health_value
elif isinstance(ce.device.if_attributes, NvmeAttributes):
nwmattrs: NvmeAttributes = ce.device.if_attributes
for nwmattr in nwmattrs.__dict__.keys():
if nwmattrs.__dict__[nwmattr] is None:
continue
attributes[nwmattr] = nwmattrs.__dict__[nwmattr]
return attributes
if dev.smart_capable:
smartAssessment = ConfigEntityFromDevice(
sensorInfo=BinarySensorInfo(
name=f"S.M.A.R.T {dev.name}",
unique_id=hashlib.md5(
f"S.M.A.R.T {dev.name}".encode("utf-8")
).hexdigest(),
device=disk_device_info,
# enabled_by_default= dev.smart_capable
device_class="problem",
),
state_function=lambda ce: ce.device.assessment != "PASS",
attributes_function=smartAssesmentAttribute,
device=dev,
)
sensorList.append((f"smart_{dev.name}", smartAssessment.createSensor()))
if args["hdidle_log"] is not None:
hdIdleAssessment = ConfigEntityFromHDIdle(
sensorInfo=BinarySensorInfo(
name=f"POWER {dev.name}",
unique_id=hashlib.md5(f"POWER {dev.name}".encode("utf-8")).hexdigest(),
device=disk_device_info,
device_class="power",
),
state_function=lambda ce: devicePowerStatus.get(ce.device.name) != "spindown",
attributes_function=lambda ce: {"pipe_file": args["hdidle_log"] + "s"},
device=dev,
)
sensorList.append((f"power_{dev.name}", hdIdleAssessment.createSensor()))
def totalDiskRate(ce: ConfigEntityFromIoStat) -> float | Literal[0]:
if (
ce.h_iostat[0] in [0, ce.iostat[0]] or ce.iostat[1] is None or ce.h_iostat[1] is None
):
return 0
t_read: int = int(ce.iostat[1][ce.iostat_device].read_bytes) - int(
ce.h_iostat[1][ce.iostat_device].read_bytes
)
t_write: int = int(ce.iostat[1][ce.iostat_device].write_bytes) - int(
ce.h_iostat[1][ce.iostat_device].write_bytes
)
t_ns_time: int = ce.iostat[0] - ce.h_iostat[0]
logging.debug("%s %d %d %d", ce.iostat_device, t_read, t_write, t_ns_time)
return round(((t_read + t_write) * 1000000000 / t_ns_time) / 1024, 2) # kB/s
def iostatAttribute(ce: ConfigEntityFromIoStat) -> dict[str, str]:
if ce.iostat[1] is None:
return {}
attributes: dict[str, str] = {
key: getattr(ce.iostat[1][ce.iostat_device], key)
for key in ce.iostat[1][ce.iostat_device]._fields
}
return attributes
diskInfo = ConfigEntityFromIoStat(
sensorInfo=SensorInfo(
name=f"IOSTAT {dev.name}",
unique_id=hashlib.md5(f"IOSTAT {dev.name}".encode("utf-8")).hexdigest(),
device=disk_device_info,
unit_of_measurement="kB/s",
device_class="data_rate",
),
state_function=totalDiskRate,
attributes_function=iostatAttribute,
iostat_device=dev.name,
)
sensorList.append((f"iostat_{dev.name}", diskInfo.createSensor()))
partitionDevices: typing.Dict[str, DeviceInfo] = {}
for partition in Disk(dev_name).get_partition_list():
if (partition.get_fs_label() or partition.get_fs_uuid()) == "":
continue
if not partition.get_name() in partitionDevices:
partitionDevices[partition.get_name()] = DeviceInfo(
name=f"SambaNas Partition {partition.get_fs_label() or partition.get_fs_uuid()}",
model=partition.get_fs_label() or partition.get_fs_uuid(),
manufacturer=partition.get_fs_type(),
identifiers=[partition.get_fs_label() or partition.get_fs_uuid()],
via_device=disk_device_info.identifiers[0], # type: ignore
)
try:
sdiskparts = list(
filter(
lambda part: part.device.endswith(partition.get_name()),
psutil.disk_partitions(),
)
)
if sdiskparts and sdiskparts[0] and sdiskparts[0].mountpoint:
if partitionDevices[partition.get_name()].identifiers is None:
partitionDevices[partition.get_name()].identifiers = []
if isinstance(partitionDevices[partition.get_name()].identifiers, str):
partitionDevices[partition.get_name()].identifiers = [
str(partitionDevices[partition.get_name()].identifiers)
]
partitionDevices[partition.get_name()].identifiers.append( # type: ignore
sdiskparts[0].mountpoint
)
finally:
pass
logging.debug(
"Generated %d Partitiond Device for %s", len(partitionDevices), dev.name
)
for partition_device, partitionDeviceInfo in partitionDevices.items():
partitionInfo = ConfigEntityFromIoStat(
sensorInfo=SensorInfo(
name=f"IOSTAT {partitionDeviceInfo.model}",
unique_id=hashlib.md5(
f"IOSTAT {partitionDeviceInfo.model}".encode("utf-8")
).hexdigest(),
device=partitionDeviceInfo,
unit_of_measurement="kB/s",
device_class="data_rate",
),
state_function=totalDiskRate,
attributes_function=iostatAttribute,
iostat_device=partition_device,
)
sensorList.append((f"iostat_{partition_device}", partitionInfo.createSensor()))
def usageAttribute(ce: ConfigEntityAutonomous) -> dict[str, str]:
if ce.sensorInfo.device is None or ce.sensorInfo.device.identifiers is None:
return {}
usage: sdiskusage = psutil.disk_usage(ce.sensorInfo.device.identifiers[-1])
attributes: dict[str, str] = {
"used": humanize.naturalsize(usage.used),
"total": humanize.naturalsize(usage.total),
"free": humanize.naturalsize(usage.free),
}
return attributes
def partitionUsage(ce: ConfigEntityAutonomous) -> Any:
if ce.sensorInfo.device is None or ce.sensorInfo.device.identifiers is None:
return
logging.debug(
"Collecting Usage from %s [%s]",
ce.sensorInfo.device.identifiers,
ce.sensorInfo.device.identifiers[-1],
)
return psutil.disk_usage(ce.sensorInfo.device.identifiers[-1]).percent
if (
partitionDeviceInfo.identifiers is not None and len(partitionDeviceInfo.identifiers) > 1
):
partitionInfo = ConfigEntityAutonomous(
sensorInfo=SensorInfo(
name=f"Usage {partitionDeviceInfo.model}",
unique_id=hashlib.md5(
f"Usage {partitionDeviceInfo.model}".encode("utf-8")
).hexdigest(),
device=partitionDeviceInfo,
icon="mdi:harddisk",
unit_of_measurement="%",
device_class="power_factor",
),
state_function=partitionUsage,
attributes_function=usageAttribute,
)
sensorList.append(
(f"usage_{partition_device}", partitionInfo.createSensor())
)
sambaUsers = ConfigEntityFromSamba(
sensorInfo=SensorInfo(
name="Online Users",
device=sambanas_device_info,
unique_id=hashlib.md5("Online Users".encode("utf-8")).hexdigest(),
),
state_function=lambda ce: ce.samba["users"],
samba=samba,
)
sensorList.append(("samba_users", sambaUsers.createSensor()))
sambaConnections = ConfigEntityFromSamba(
sensorInfo=SensorInfo(
name="Active Connections",
device=sambanas_device_info,
unique_id=hashlib.md5("Active Connections".encode("utf-8")).hexdigest(),
),
state_function=lambda ce: ce.samba["connections"],
attributes_function=lambda ce: {"open_files": ce.samba["open_files"]},
samba=samba,
)
sensorList.append(("samba_connections", sambaConnections.createSensor()))
tasks: list[asyncio.Task] = []
async def publish_states() -> NoReturn:
while True:
for sensor in sensorList:
if isinstance(sensor[1], ConfigEntityAutonomous):
logging.info("Updating sensor %s", sensor[0])
sensor[1].sensor.set_state(sensor[1].state_function(sensor[1])) # type: ignore
if sensor[1].attributes_function is not None:
sensor[1].sensor.set_attributes(
sensor[1].attributes_function(sensor[1])
)
await asyncio.sleep(5)
async def publish_idle_states() -> NoReturn:
with open(args["hdidle_log"]) as pipe_hdidle:
while True:
line = pipe_hdidle.readline()
if len(line) == 0:
logging.error(f"Pipe Broken {args["hdidle_log"]}!")
break
# Parse line with ilde status
# Aug 8 00:14:55 enterprise hd-idle[9958]: sda spindown
# Aug 8 00:14:55 enterprise hd-idle[9958]: sdb spindown
# Aug 8 00:14:56 enterprise hd-idle[9958]: sdc spindown
# Aug 8 00:17:55 enterprise hd-idle[9958]: sdb spinup
# Aug 8 00:28:55 enterprise hd-idle[9958]: sdb spindown
disk, status = line.split(' ', maxsplit=1)
logging.info("Disk %s change to status %s", disk, status.strip())
devicePowerStatus[disk] = status.strip()
logging.debug(devicePowerStatus)
for sensor in sensorList:
if isinstance(sensor[1], ConfigEntityFromHDIdle) and sensor[1].device.name == disk:
logging.info("Updating sensor %s", sensor[0])
if isinstance(sensor[1].sensor, BinarySensor):
sensor[1].sensor.update_state(sensor[1].state_function(sensor[1]))
elif isinstance(sensor[1].sensor, Sensor):
sensor[1].sensor.set_state(sensor[1].state_function(sensor[1])) # type: ignore
if sensor[1].attributes_function is not None:
sensor[1].sensor.set_attributes(
sensor[1].attributes_function(sensor[1])
)
async def publish_device_states() -> NoReturn:
while True:
for sensor in sensorList:
if isinstance(sensor[1], ConfigEntityFromDevice):
logging.info(
"Updating Device sensor %s (pw:%s)",
sensor[0],
devicePowerStatus.get(sensor[1].device.name),
)
if devicePowerStatus.get(sensor[1].device.name) == "spindown":
continue
sensor[1].device.update()
if isinstance(sensor[1].sensor, BinarySensor):
sensor[1].sensor.update_state(sensor[1].state_function(sensor[1]))
elif isinstance(sensor[1].sensor, Sensor):
sensor[1].sensor.set_state(sensor[1].state_function(sensor[1]))
if sensor[1].attributes_function is not None:
sensor[1].sensor.set_attributes(
sensor[1].attributes_function(sensor[1])
)
await asyncio.sleep(5)
async def publish_iostate_states() -> NoReturn:
while True:
iostate: dict[str, sdiskio] = psutil.disk_io_counters(perdisk=True, nowrap=True)
for sensor in sensorList:
if isinstance(sensor[1], ConfigEntityFromIoStat):
logging.info("Updating Iostat sensor %s", sensor[0])
sensor[1].addIostat(iostate)
sensor[1].sensor.set_state(sensor[1].state_function(sensor[1])) # type: ignore
if sensor[1].attributes_function is not None:
sensor[1].sensor.set_attributes(
sensor[1].attributes_function(sensor[1])
)
await asyncio.sleep(1)
async def publish_samba_states() -> NoReturn:
while True:
samba: dict[str, Any] = sambaMetricCollector()
for sensor in sensorList:
if isinstance(sensor[1], ConfigEntityFromSamba):
logging.info("Updating Samba sensor %s", sensor[0])
sensor[1].samba = samba
sensor[1].sensor.set_state(sensor[1].state_function(sensor[1])) # type: ignore
if sensor[1].attributes_function is not None:
sensor[1].sensor.set_attributes(
sensor[1].attributes_function(sensor[1])
)
await asyncio.sleep(10)
async def run() -> None:
def doneCallback(task):
for sensor in sensorList:
logging.info("Unpublish sensor %s", sensor[0])
sensor[1].sensor.delete()
# Loop Status publish
async with asyncio.TaskGroup() as tg:
if args["hdidle_log"] is not None:
task = tg.create_task(publish_idle_states(), name="Read and Publish HD-IDLE states")
task.add_done_callback(doneCallback)
tasks.append(task)
task = tg.create_task(publish_states(), name="Publish States")
task.add_done_callback(doneCallback)
tasks.append(task)
task = tg.create_task(publish_device_states(), name="Publish Device States")
task.add_done_callback(doneCallback)
tasks.append(task)
task = tg.create_task(publish_iostate_states(), name="Publish IO States")
task.add_done_callback(doneCallback)
tasks.append(task)
task = tg.create_task(publish_samba_states(), name="Publish Samba States")
task.add_done_callback(doneCallback)
tasks.append(task)
loop = asyncio.get_event_loop()
async def ask_exit(signame):
logging.warning("Signal %x. Unpublish %d sensors", signame, len(sensorList))
loop.remove_signal_handler(signame)
for task in tasks:
try:
task.cancel()
finally:
logging.warning(f"Task {task.get_name()} cancelled!")
for signame in ('SIGINT', 'SIGTERM', 'SIGHUP'):
loop.add_signal_handler(getattr(signal, signame),
lambda signame=signame: asyncio.create_task(ask_exit(signame)))
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(run())