app.py
import asyncio
import socket
import os
from datetime import datetime, timezone
from typing import List, Union, Dict, Optional
from influxdb_client.client.write_api import SYNCHRONOUS, WriteApi
from pychonet.lib.udpserver import UDPServer
from pychonet import ECHONETAPIClient, HomeAirConditioner
from pychonet import ECHONETAPIClient as api
from influxdb_client import InfluxDBClient, Point
async def main(hosts: List[str], **kwargs) -> None:
influx_client: Optional[InfluxDBClient] = None
write_api: Optional[WriteApi] = None
influx_token: Optional[str] = kwargs.get("influx_token", None)
influx_bucket: Optional[str] = kwargs.get("influx_bucket", None)
influx_url: Optional[str] = kwargs.get("influx_url", None)
influx_org: Optional[str] = kwargs.get("influx_org", None)
udp: UDPServer = UDPServer()
loop: asyncio.AbstractEventLoop = asyncio.get_event_loop()
udp.run("0.0.0.0", 3610, loop=loop)
server: ECHONETAPIClient = api(server=udp)
if all(
v is not None for v in (influx_token, influx_bucket, influx_url, influx_org)
):
influx_client = InfluxDBClient(url=str(influx_url), token=str(influx_token))
write_api = influx_client.write_api(write_options=SYNCHRONOUS)
for host in hosts:
try:
ip_address: str = socket.gethostbyname(host)
except socket.gaierror as e:
print(f"Failed to resolve hostname {host}: {e}")
continue
await server.discover(ip_address)
aircon: HomeAirConditioner = HomeAirConditioner(ip_address, server)
room_temperature = await getRoomTemperature(aircon)
print(f"Room temperature for host {host}: {room_temperature}")
operational_temperature = await getOperationalTemperature(aircon)
print(f"Operational temperature for host {host}: {operational_temperature}")
operational_status = await getOperationalStatus(aircon)
status = "off"
if operational_status == 0:
status = "on"
print(f"Operational status for host {host}: {status}")
cumulative_power = await getCumulativePower(aircon)
print(f"Cumulative power for host {host}: {cumulative_power}")
if write_api:
points = {
"room_temperature": {
"field": "temperature",
"field_value": room_temperature,
},
"operational_temperature": {
"field": "temperature",
"field_value": operational_temperature,
},
"operational_status": {
"field": "status",
"field_value": operational_status,
},
"cumulative_power": {
"field": "power",
"field_value": cumulative_power,
},
}
for k, v in points.items():
point: Point = (
Point(k)
.tag("host", host)
.field(v["field"], v["field_value"])
.time(datetime.now(timezone.utc))
)
write_api.write(str(influx_bucket), influx_org, point)
print(point)
async def getOperationModeSetting(aircon) -> int:
resp = await aircon.getMessage(0xB0)
return int.from_bytes(bytes(resp), byteorder="big")
async def getCumulativePower(aircon) -> int:
resp = await aircon.getMessage(0x85)
return int.from_bytes(bytes(resp), byteorder="big")
async def getRoomTemperature(aircon) -> int:
room_temperature: Union[bytes, bool] = await aircon.getRoomTemperature()
if not room_temperature:
raise Exception()
return int.from_bytes(bytes(room_temperature), byteorder="big")
async def getOperationalTemperature(aircon) -> int:
operational_temperature: Union[
bytes, bool
] = await aircon.getOperationalTemperature()
if not operational_temperature:
raise Exception()
return int.from_bytes(bytes(operational_temperature), byteorder="big")
async def getOperationalStatus(aircon) -> int:
operational_status = await aircon.getOperationalStatus()
if not operational_status:
raise Exception()
return int(operational_status.decode("utf-8"))
if __name__ == "__main__":
hosts: List[str] = os.environ.get("AIRCON_HOSTS", "").split()
influx_url: Optional[str] = os.environ.get("INFLUX_URL", None)
influx_token: Optional[str] = os.environ.get("INFLUX_TOKEN", None)
influx_bucket: Optional[str] = os.environ.get("INFLUX_BUCKET", None)
influx_org: Optional[str] = os.environ.get("INFLUX_ORG", None)
kwargs: Dict[str, Optional[str]] = {
"influx_url": influx_url,
"influx_token": influx_token,
"influx_bucket": influx_bucket,
"influx_org": influx_org,
}
asyncio.run(main(hosts, **kwargs))