Utilities¶
Solarman Scan Utility¶
1""" Scan local network for Solarman data loggers """
2
3import socket
4
5
6def main():
7 """Solarman data logger scanner"""
8 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
9 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
10 sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
11 sock.settimeout(1.0)
12
13 request = "WIFIKIT-214028-READ"
14 address = ("<broadcast>", 48899)
15
16 sock.sendto(request.encode(), address)
17 while True:
18 try:
19 data = sock.recv(1024)
20 except socket.timeout:
21 break
22 keys = dict.fromkeys(["ipaddress", "mac", "serial"])
23 values = data.decode().split(",")
24 result = dict(zip(keys, values))
25 print(result)
26
27
28if __name__ == "__main__":
29 main()
Solarman Unicast Scanner¶
Solarman Decoder¶
1""" Parse and decode V5 frames passed via argv
2
3user@host:~/src/pysolarmanv5$ ./venv/bin/python utils/solarman_decoder.py a5 17 00 10 45 bb 00 b2 6e 3c 6a 02 00 00 00 00 00 00 00 00 00 00 00 00 00 00 01 03 00 03 00 05 75 c9 39 15
4Frame start: a5 (valid: True)
5V5 Checksum: 39 (valid: True)
6Length: 23
7Control Code: V5Request (hex: 4510)
8Sequence numbers: (187, 0) (hex: bb 00)
9Serial Hex: 6a3c6eb2
10Serial: 1782345394
11Frame Type (Inverter): 2
12Frame Status: 0
13Total Time: 0
14PowerOn Time: 0
15Offset Time: 0
16Frame Time: 1970-01-01 00:00:00+00:00
17Checksum: 30153 hex: 75c9 - RTU start at: 0103000300
18========== RTU Payload - [Request] ==========
19 Slave address: 1
20 Function code: 3
21 CRC: 75c9 (valid: True)
22 Request Start Addr: 3 (03)
23 Request Quantity: 5 (05)
24
25"""
26
27import datetime
28import enum
29import sys
30
31from umodbus.client.serial.redundancy_check import get_crc
32
33
34class V5Definitions:
35
36 Start = 0xA5
37 End = 0x15
38
39 ReqFrameLen = 15
40 RespFrameLen = 14
41
42
43class V5CtrlCode(int, enum.Enum):
44 V5Request = 0x4510
45 V5Response = 0x1510
46 LoggerPing = 0x4710
47 LoggerResponse = 0x4210
48
49 Unknown = 0xDEADC0DE
50
51 @classmethod
52 def _missing_(cls, value):
53 return V5CtrlCode.Unknown
54
55
56class V5FrameType(int, enum.Enum):
57
58 KeepAlive = 0
59 Logger = 1
60 Inverter = 2
61 Unknown = -1
62
63 @classmethod
64 def _missing_(cls, value):
65 return V5FrameType.Unknown
66
67
68def unsigned_int(x: bytes) -> int:
69 return int.from_bytes(x, byteorder="little", signed=False)
70
71
72def rtu_unsigned_int(x: bytes) -> int:
73 return int.from_bytes(x, byteorder="big", signed=False)
74
75
76class V5Frame:
77
78 def __init__(self, hex_frame: str):
79 self._frame = bytes.fromhex(hex_frame)
80
81 @property
82 def frame_start(self) -> int:
83 return self._frame[0]
84
85 @property
86 def frame_start_valid(self) -> bool:
87 return self.frame_start == V5Definitions.Start
88
89 @property
90 def v5_checksum(self) -> int:
91 flen = len(self._frame) - 2
92 check = 0
93 for i in range(1, flen):
94 check = (check + self._frame[i]) & 0xFF
95 return check
96
97 @property
98 def v5_checksum_valid(self) -> bool:
99 return self._frame[-2] == self.v5_checksum
100
101 @property
102 def v5_length(self) -> int:
103 return unsigned_int(self._frame[1:3])
104
105 @property
106 def control_code(self) -> V5CtrlCode:
107 return V5CtrlCode(unsigned_int(self._frame[3:5]))
108
109 @property
110 def sequence_numbers(self) -> (int, int):
111 return self._frame[5], self._frame[6]
112
113 @property
114 def serial(self) -> int:
115 return unsigned_int(self._frame[7:11])
116
117 @property
118 def frame_type(self) -> V5FrameType:
119 return V5FrameType(self._frame[11])
120
121 @property
122 def frame_status(self) -> int:
123 return self._frame[12]
124
125 @property
126 def total_work_time(self) -> int:
127 if self.frame_type == V5FrameType.KeepAlive:
128 return 0
129 return unsigned_int(self._frame[13:17])
130
131 @property
132 def power_on_time(self) -> int:
133 if self.frame_type == V5FrameType.KeepAlive:
134 return 0
135 return unsigned_int(self._frame[17:21])
136
137 @property
138 def offset_time(self) -> int:
139 if self.frame_type == V5FrameType.KeepAlive:
140 return 0
141 return unsigned_int(self._frame[21:25])
142
143 @property
144 def frame_crc(self) -> int:
145 return rtu_unsigned_int(self._frame[-4:-2])
146
147 @property
148 def calculated_crc(self) -> int:
149 rtu_frame = self._frame[self.rtu_start_at : -4]
150 rtu_crc = get_crc(rtu_frame)
151 return rtu_unsigned_int(rtu_crc)
152
153 @property
154 def rtu_crc_valid(self) -> bool:
155 return self.frame_crc == self.calculated_crc
156
157 @property
158 def rtu_start_at(self) -> int:
159 """
160 RTU start position.
161
162 A bit clunky, but it works (for now)
163
164 :return:
165 """
166 if self.control_code in (V5CtrlCode.V5Request, V5CtrlCode.LoggerResponse):
167 return 26
168 return 25
169
170 @property
171 def rtu_head(self) -> str:
172 head = self.rtu_start_at
173 return self._frame[head : head + 5].hex()
174
175 @property
176 def rtu(self) -> bytes:
177 return self._frame[self.rtu_start_at : -2]
178
179 @property
180 def double_crc_frame(self) -> bool:
181 real_crc = self.rtu[-4:-2]
182 calculated = get_crc(self.rtu[:-4])
183
184 return real_crc == calculated
185
186 def payload_string(self) -> str:
187 start = self.rtu_start_at
188 if self.control_code == V5CtrlCode.V5Request:
189 payload_t = "Request"
190 elif self.control_code == V5CtrlCode.V5Response:
191 payload_t = "Response"
192 else:
193 payload_t = "Unknown"
194
195 msg = "=" * 10 + f" RTU Payload - [{payload_t}] " + "=" * 10 + "\n\t"
196 msg += f"Slave address: {self._frame[start]}\n\t"
197 msg += f"Function code: {self._frame[start+1]}\n\t"
198 msg += f"CRC: {self.calculated_crc:02x} (valid: {self.rtu_crc_valid})\n\t"
199 if self.double_crc_frame:
200 real_crc = self.rtu[-4:-2].hex()
201 msg += f"DOUBLE CRC FRAME DETECTED - REAL CRC: {real_crc}\n\t"
202
203 if self.control_code == V5CtrlCode.V5Response:
204 reported_size = self.v5_length - V5Definitions.RespFrameLen
205 msg += f"Quantity: {reported_size}\n\t"
206 msg += f"Data: {self._frame[start:-2].hex()}\n\t"
207 elif self.control_code == V5CtrlCode.V5Request:
208 addr = rtu_unsigned_int(self._frame[start + 2 : start + 4])
209 qty = rtu_unsigned_int(self._frame[start + 4 : start + 6])
210 msg += f"Request Start Addr: {addr} ({addr:02x})\n\t"
211 msg += f"Request Quantity: {qty} ({qty:02x})\n\t"
212
213 return msg
214
215
216if __name__ == "__main__":
217 frame = sys.argv[1:]
218 solarman = V5Frame("".join(frame))
219
220 print(
221 f"Frame start: {solarman.frame_start:02x} (valid: {solarman.frame_start_valid})"
222 )
223 print(
224 f"V5 Checksum: {solarman.v5_checksum:02x} (valid: {solarman.v5_checksum_valid})"
225 )
226 print(f"Length: {solarman.v5_length}")
227 print(
228 f"Control Code: {solarman.control_code.name} (hex: {solarman.control_code.value:02x})"
229 )
230 seq_no = solarman.sequence_numbers
231 print(f"Sequence numbers: {seq_no} (hex: {seq_no[0]:02x} {seq_no[1]:02x})")
232 print(f"Serial Hex: {solarman.serial:02x}")
233 print(f"Serial: {solarman.serial}")
234 ftype = solarman.frame_type
235 print(f"Frame Type ({ftype.name}): {ftype.value}")
236 print(f"Frame Status: {solarman.frame_status}")
237 print(f"Total Time: {solarman.total_work_time}")
238 print(f"PowerOn Time: {solarman.power_on_time}")
239 print(f"Offset Time: {solarman.offset_time}")
240
241 frame_time = (
242 solarman.total_work_time + solarman.power_on_time + solarman.offset_time
243 )
244 print(
245 f"Frame Time: {datetime.datetime.fromtimestamp(frame_time, datetime.timezone.utc)}"
246 )
247 if solarman.frame_type != V5FrameType.KeepAlive:
248 print(
249 f"Checksum: {solarman.frame_crc} hex: {solarman.frame_crc:02x} - RTU start at: {solarman.rtu_head}"
250 )
251 print(solarman.payload_string())