-
-
Notifications
You must be signed in to change notification settings - Fork 37.3k
Expand file tree
/
Copy pathdevice_tracker.py
More file actions
249 lines (207 loc) · 8.53 KB
/
device_tracker.py
File metadata and controls
249 lines (207 loc) · 8.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
"""Support for device tracking of Huawei LTE routers."""
from __future__ import annotations
from dataclasses import dataclass, field
import logging
import re
from typing import Any, cast
from stringcase import snakecase
from homeassistant.components.device_tracker.config_entry import ScannerEntity
from homeassistant.components.device_tracker.const import (
DOMAIN as DEVICE_TRACKER_DOMAIN,
SOURCE_TYPE_ROUTER,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import entity_registry
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from . import HuaweiLteBaseEntity, Router
from .const import (
CONF_TRACK_WIRED_CLIENTS,
DEFAULT_TRACK_WIRED_CLIENTS,
DOMAIN,
KEY_LAN_HOST_INFO,
KEY_WLAN_HOST_LIST,
UPDATE_SIGNAL,
)
_LOGGER = logging.getLogger(__name__)
_DEVICE_SCAN = f"{DEVICE_TRACKER_DOMAIN}/device_scan"
_HostType = dict[str, Any]
def _get_hosts(
router: Router, ignore_subscriptions: bool = False
) -> list[_HostType] | None:
for key in KEY_LAN_HOST_INFO, KEY_WLAN_HOST_LIST:
if not ignore_subscriptions and key not in router.subscriptions:
continue
try:
return cast(list[_HostType], router.data[key]["Hosts"]["Host"])
except KeyError:
_LOGGER.debug("%s[%s][%s] not in data", key, "Hosts", "Host")
return None
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up from config entry."""
# Grab hosts list once to examine whether the initial fetch has got some data for
# us, i.e. if wlan host list is supported. Only set up a subscription and proceed
# with adding and tracking entities if it is.
router = hass.data[DOMAIN].routers[config_entry.unique_id]
if (hosts := _get_hosts(router, True)) is None:
return
# Initialize already tracked entities
tracked: set[str] = set()
registry = await entity_registry.async_get_registry(hass)
known_entities: list[Entity] = []
track_wired_clients = router.config_entry.options.get(
CONF_TRACK_WIRED_CLIENTS, DEFAULT_TRACK_WIRED_CLIENTS
)
for entity in registry.entities.values():
if (
entity.domain == DEVICE_TRACKER_DOMAIN
and entity.config_entry_id == config_entry.entry_id
):
mac = entity.unique_id.partition("-")[2]
# Do not add known wired clients if not tracking them (any more)
skip = False
if not track_wired_clients:
for host in hosts:
if host.get("MacAddress") == mac:
skip = not _is_wireless(host)
break
if not skip:
tracked.add(entity.unique_id)
known_entities.append(HuaweiLteScannerEntity(router, mac))
async_add_entities(known_entities, True)
# Tell parent router to poll hosts list to gather new devices
router.subscriptions[KEY_LAN_HOST_INFO].add(_DEVICE_SCAN)
router.subscriptions[KEY_WLAN_HOST_LIST].add(_DEVICE_SCAN)
async def _async_maybe_add_new_entities(unique_id: str) -> None:
"""Add new entities if the update signal comes from our router."""
if config_entry.unique_id == unique_id:
async_add_new_entities(router, async_add_entities, tracked)
# Register to handle router data updates
disconnect_dispatcher = async_dispatcher_connect(
hass, UPDATE_SIGNAL, _async_maybe_add_new_entities
)
config_entry.async_on_unload(disconnect_dispatcher)
# Add new entities from initial scan
async_add_new_entities(router, async_add_entities, tracked)
def _is_wireless(host: _HostType) -> bool:
# LAN host info entries have an "InterfaceType" property, "Ethernet" / "Wireless".
# WLAN host list ones don't, but they're expected to be all wireless.
return cast(str, host.get("InterfaceType", "Wireless")) != "Ethernet"
def _is_connected(host: _HostType | None) -> bool:
# LAN host info entries have an "Active" property, "1" or "0".
# WLAN host list ones don't, but that call appears to return active hosts only.
return False if host is None else cast(str, host.get("Active", "1")) != "0"
def _is_us(host: _HostType) -> bool:
"""Try to determine if the host entry is us, the HA instance."""
# LAN host info entries have an "isLocalDevice" property, "1" / "0"; WLAN host list ones don't.
return cast(str, host.get("isLocalDevice", "0")) == "1"
@callback
def async_add_new_entities(
router: Router,
async_add_entities: AddEntitiesCallback,
tracked: set[str],
) -> None:
"""Add new entities that are not already being tracked."""
if not (hosts := _get_hosts(router)):
return
track_wired_clients = router.config_entry.options.get(
CONF_TRACK_WIRED_CLIENTS, DEFAULT_TRACK_WIRED_CLIENTS
)
new_entities: list[Entity] = []
for host in (
x
for x in hosts
if not _is_us(x)
and _is_connected(x)
and x.get("MacAddress")
and (track_wired_clients or _is_wireless(x))
):
entity = HuaweiLteScannerEntity(router, host["MacAddress"])
if entity.unique_id in tracked:
continue
tracked.add(entity.unique_id)
new_entities.append(entity)
async_add_entities(new_entities, True)
def _better_snakecase(text: str) -> str:
# Awaiting https://github.com/okunishinishi/python-stringcase/pull/18
if text == text.upper():
# All uppercase to all lowercase to get http for HTTP, not h_t_t_p
text = text.lower()
else:
# Three or more consecutive uppercase with middle part lowercased
# to get http_response for HTTPResponse, not h_t_t_p_response
text = re.sub(
r"([A-Z])([A-Z]+)([A-Z](?:[^A-Z]|$))",
lambda match: f"{match.group(1)}{match.group(2).lower()}{match.group(3)}",
text,
)
return cast(str, snakecase(text))
@dataclass
class HuaweiLteScannerEntity(HuaweiLteBaseEntity, ScannerEntity):
"""Huawei LTE router scanner entity."""
_mac_address: str
_ip_address: str | None = field(default=None, init=False)
_is_connected: bool = field(default=False, init=False)
_hostname: str | None = field(default=None, init=False)
_extra_state_attributes: dict[str, Any] = field(default_factory=dict, init=False)
@property
def _entity_name(self) -> str:
return self.hostname or self.mac_address
@property
def _device_unique_id(self) -> str:
return self.mac_address
@property
def source_type(self) -> str:
"""Return SOURCE_TYPE_ROUTER."""
return SOURCE_TYPE_ROUTER
@property
def ip_address(self) -> str | None:
"""Return the primary ip address of the device."""
return self._ip_address
@property
def mac_address(self) -> str:
"""Return the mac address of the device."""
return self._mac_address
@property
def hostname(self) -> str | None:
"""Return hostname of the device."""
return self._hostname
@property
def is_connected(self) -> bool:
"""Get whether the entity is connected."""
return self._is_connected
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Get additional attributes related to entity state."""
return self._extra_state_attributes
async def async_update(self) -> None:
"""Update state."""
if (hosts := _get_hosts(self.router)) is None:
self._available = False
return
self._available = True
host = next(
(x for x in hosts if x.get("MacAddress") == self._mac_address), None
)
self._is_connected = _is_connected(host)
if host is not None:
# IpAddress can contain multiple semicolon separated addresses.
# Pick one for model sanity; e.g. the dhcp component to which it is fed, parses and expects to see just one.
self._ip_address = (host.get("IpAddress") or "").split(";", 2)[0] or None
self._hostname = host.get("HostName")
self._extra_state_attributes = {
_better_snakecase(k): v
for k, v in host.items()
if k
in {
"AddressSource",
"AssociatedSsid",
"InterfaceType",
}
}