summary history files

app.py
import asyncio
import socket
import os
from datetime import datetime, timezone
from typing import List, Union, Dict, Optional
from influxdb_client.client.write_api import SYNCHRONOUS, WriteApi
from pychonet.lib.udpserver import UDPServer
from pychonet import ECHONETAPIClient, HomeAirConditioner
from pychonet import ECHONETAPIClient as api
from influxdb_client import InfluxDBClient, Point


async def main(hosts: List[str], **kwargs) -> None:
    influx_client: Optional[InfluxDBClient] = None
    write_api: Optional[WriteApi] = None
    influx_token: Optional[str] = kwargs.get("influx_token", None)
    influx_bucket: Optional[str] = kwargs.get("influx_bucket", None)
    influx_url: Optional[str] = kwargs.get("influx_url", None)
    influx_org: Optional[str] = kwargs.get("influx_org", None)

    udp: UDPServer = UDPServer()
    loop: asyncio.AbstractEventLoop = asyncio.get_event_loop()
    udp.run("0.0.0.0", 3610, loop=loop)
    server: ECHONETAPIClient = api(server=udp)

    if all(
        v is not None for v in (influx_token, influx_bucket, influx_url, influx_org)
    ):
        influx_client = InfluxDBClient(url=str(influx_url), token=str(influx_token))
        write_api = influx_client.write_api(write_options=SYNCHRONOUS)

    for host in hosts:
        try:
            ip_address: str = socket.gethostbyname(host)
        except socket.gaierror as e:
            print(f"Failed to resolve hostname {host}: {e}")
            continue

        await server.discover(ip_address)

        aircon: HomeAirConditioner = HomeAirConditioner(ip_address, server)

        room_temperature = await getRoomTemperature(aircon)
        print(f"Room temperature for host {host}: {room_temperature}")

        operational_temperature = await getOperationalTemperature(aircon)
        print(f"Operational temperature for host {host}: {operational_temperature}")

        operational_status = await getOperationalStatus(aircon)
        status = "off"
        if operational_status == 0:
            status = "on"
        print(f"Operational status for host {host}: {status}")

        cumulative_power = await getCumulativePower(aircon)
        print(f"Cumulative power for host {host}: {cumulative_power}")

        if write_api:
            points = {
                "room_temperature": {
                    "field": "temperature",
                    "field_value": room_temperature,
                },
                "operational_temperature": {
                    "field": "temperature",
                    "field_value": operational_temperature,
                },
                "operational_status": {
                    "field": "status",
                    "field_value": operational_status,
                },
                "cumulative_power": {
                    "field": "power",
                    "field_value": cumulative_power,
                },
            }
            for k, v in points.items():
                point: Point = (
                    Point(k)
                    .tag("host", host)
                    .field(v["field"], v["field_value"])
                    .time(datetime.now(timezone.utc))
                )
                write_api.write(str(influx_bucket), influx_org, point)
                print(point)


async def getOperationModeSetting(aircon) -> int:
    resp = await aircon.getMessage(0xB0)
    return int.from_bytes(bytes(resp), byteorder="big")


async def getCumulativePower(aircon) -> int:
    resp = await aircon.getMessage(0x85)
    return int.from_bytes(bytes(resp), byteorder="big")


async def getRoomTemperature(aircon) -> int:
    room_temperature: Union[bytes, bool] = await aircon.getRoomTemperature()
    if not room_temperature:
        raise Exception()
    return int.from_bytes(bytes(room_temperature), byteorder="big")


async def getOperationalTemperature(aircon) -> int:
    operational_temperature: Union[
        bytes, bool
    ] = await aircon.getOperationalTemperature()
    if not operational_temperature:
        raise Exception()
    return int.from_bytes(bytes(operational_temperature), byteorder="big")


async def getOperationalStatus(aircon) -> int:
    operational_status = await aircon.getOperationalStatus()
    if not operational_status:
        raise Exception()
    return int(operational_status.decode("utf-8"))


if __name__ == "__main__":
    hosts: List[str] = os.environ.get("AIRCON_HOSTS", "").split()
    influx_url: Optional[str] = os.environ.get("INFLUX_URL", None)
    influx_token: Optional[str] = os.environ.get("INFLUX_TOKEN", None)
    influx_bucket: Optional[str] = os.environ.get("INFLUX_BUCKET", None)
    influx_org: Optional[str] = os.environ.get("INFLUX_ORG", None)
    kwargs: Dict[str, Optional[str]] = {
        "influx_url": influx_url,
        "influx_token": influx_token,
        "influx_bucket": influx_bucket,
        "influx_org": influx_org,
    }

    asyncio.run(main(hosts, **kwargs))