Utilities

Solarman Scan Utility

 1""" Scan local network for Solarman data loggers """
 2
 3import socket
 4
 5
 6def main():
 7    """Solarman data logger scanner"""
 8    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
 9    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
10    sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
11    sock.settimeout(1.0)
12
13    request = "WIFIKIT-214028-READ"
14    address = ("<broadcast>", 48899)
15
16    sock.sendto(request.encode(), address)
17    while True:
18        try:
19            data = sock.recv(1024)
20        except socket.timeout:
21            break
22        keys = dict.fromkeys(["ipaddress", "mac", "serial"])
23        values = data.decode().split(",")
24        result = dict(zip(keys, values))
25        print(result)
26
27
28if __name__ == "__main__":
29    main()

Solarman Unicast Scanner

Solarman Decoder

  1""" Parse and decode V5 frames passed via argv
  2
  3user@host:~/src/pysolarmanv5$ ./venv/bin/python utils/solarman_decoder.py a5 17 00 10 45 bb 00 b2 6e 3c 6a 02 00 00 00 00 00 00 00 00 00 00 00 00 00 00 01 03 00 03 00 05 75 c9 39 15
  4Frame start: a5 (valid: True)
  5V5 Checksum: 39 (valid: True)
  6Length: 23
  7Control Code: V5Request (hex: 4510)
  8Sequence numbers: (187, 0) (hex: bb 00)
  9Serial Hex: 6a3c6eb2
 10Serial: 1782345394
 11Frame Type (Inverter): 2
 12Frame Status: 0
 13Total Time: 0
 14PowerOn Time: 0
 15Offset Time: 0
 16Frame Time: 1970-01-01 00:00:00+00:00
 17Checksum: 30153 hex: 75c9 - RTU start at: 0103000300
 18========== RTU Payload - [Request] ==========
 19	Slave address: 1
 20	Function code: 3
 21	CRC: 75c9 (valid: True)
 22	Request Start Addr: 3 (03)
 23	Request Quantity: 5 (05)
 24
 25"""
 26
 27import datetime
 28import enum
 29import sys
 30
 31from umodbus.client.serial.redundancy_check import get_crc
 32
 33
 34class V5Definitions:
 35
 36    Start = 0xA5
 37    End = 0x15
 38
 39    ReqFrameLen = 15
 40    RespFrameLen = 14
 41
 42
 43class V5CtrlCode(int, enum.Enum):
 44    V5Request = 0x4510
 45    V5Response = 0x1510
 46    LoggerPing = 0x4710
 47    LoggerResponse = 0x4210
 48
 49    Unknown = 0xDEADC0DE
 50
 51    @classmethod
 52    def _missing_(cls, value):
 53        return V5CtrlCode.Unknown
 54
 55
 56class V5FrameType(int, enum.Enum):
 57
 58    KeepAlive = 0
 59    Logger = 1
 60    Inverter = 2
 61    Unknown = -1
 62
 63    @classmethod
 64    def _missing_(cls, value):
 65        return V5FrameType.Unknown
 66
 67
 68def unsigned_int(x: bytes) -> int:
 69    return int.from_bytes(x, byteorder="little", signed=False)
 70
 71
 72def rtu_unsigned_int(x: bytes) -> int:
 73    return int.from_bytes(x, byteorder="big", signed=False)
 74
 75
 76class V5Frame:
 77
 78    def __init__(self, hex_frame: str):
 79        self._frame = bytes.fromhex(hex_frame)
 80
 81    @property
 82    def frame_start(self) -> int:
 83        return self._frame[0]
 84
 85    @property
 86    def frame_start_valid(self) -> bool:
 87        return self.frame_start == V5Definitions.Start
 88
 89    @property
 90    def v5_checksum(self) -> int:
 91        flen = len(self._frame) - 2
 92        check = 0
 93        for i in range(1, flen):
 94            check = (check + self._frame[i]) & 0xFF
 95        return check
 96
 97    @property
 98    def v5_checksum_valid(self) -> bool:
 99        return self._frame[-2] == self.v5_checksum
100
101    @property
102    def v5_length(self) -> int:
103        return unsigned_int(self._frame[1:3])
104
105    @property
106    def control_code(self) -> V5CtrlCode:
107        return V5CtrlCode(unsigned_int(self._frame[3:5]))
108
109    @property
110    def sequence_numbers(self) -> (int, int):
111        return self._frame[5], self._frame[6]
112
113    @property
114    def serial(self) -> int:
115        return unsigned_int(self._frame[7:11])
116
117    @property
118    def frame_type(self) -> V5FrameType:
119        return V5FrameType(self._frame[11])
120
121    @property
122    def frame_status(self) -> int:
123        return self._frame[12]
124
125    @property
126    def total_work_time(self) -> int:
127        if self.frame_type == V5FrameType.KeepAlive:
128            return 0
129        return unsigned_int(self._frame[13:17])
130
131    @property
132    def power_on_time(self) -> int:
133        if self.frame_type == V5FrameType.KeepAlive:
134            return 0
135        return unsigned_int(self._frame[17:21])
136
137    @property
138    def offset_time(self) -> int:
139        if self.frame_type == V5FrameType.KeepAlive:
140            return 0
141        return unsigned_int(self._frame[21:25])
142
143    @property
144    def frame_crc(self) -> int:
145        return rtu_unsigned_int(self._frame[-4:-2])
146
147    @property
148    def calculated_crc(self) -> int:
149        rtu_frame = self._frame[self.rtu_start_at : -4]
150        rtu_crc = get_crc(rtu_frame)
151        return rtu_unsigned_int(rtu_crc)
152
153    @property
154    def rtu_crc_valid(self) -> bool:
155        return self.frame_crc == self.calculated_crc
156
157    @property
158    def rtu_start_at(self) -> int:
159        """
160        RTU start position.
161
162        A bit clunky, but it works (for now)
163
164        :return:
165        """
166        if self.control_code in (V5CtrlCode.V5Request, V5CtrlCode.LoggerResponse):
167            return 26
168        return 25
169
170    @property
171    def rtu_head(self) -> str:
172        head = self.rtu_start_at
173        return self._frame[head : head + 5].hex()
174
175    @property
176    def rtu(self) -> bytes:
177        return self._frame[self.rtu_start_at : -2]
178
179    @property
180    def double_crc_frame(self) -> bool:
181        real_crc = self.rtu[-4:-2]
182        calculated = get_crc(self.rtu[:-4])
183
184        return real_crc == calculated
185
186    def payload_string(self) -> str:
187        start = self.rtu_start_at
188        if self.control_code == V5CtrlCode.V5Request:
189            payload_t = "Request"
190        elif self.control_code == V5CtrlCode.V5Response:
191            payload_t = "Response"
192        else:
193            payload_t = "Unknown"
194
195        msg = "=" * 10 + f" RTU Payload - [{payload_t}] " + "=" * 10 + "\n\t"
196        msg += f"Slave address: {self._frame[start]}\n\t"
197        msg += f"Function code: {self._frame[start+1]}\n\t"
198        msg += f"CRC: {self.calculated_crc:02x} (valid: {self.rtu_crc_valid})\n\t"
199        if self.double_crc_frame:
200            real_crc = self.rtu[-4:-2].hex()
201            msg += f"DOUBLE CRC FRAME DETECTED - REAL CRC: {real_crc}\n\t"
202
203        if self.control_code == V5CtrlCode.V5Response:
204            reported_size = self.v5_length - V5Definitions.RespFrameLen
205            msg += f"Quantity: {reported_size}\n\t"
206            msg += f"Data: {self._frame[start:-2].hex()}\n\t"
207        elif self.control_code == V5CtrlCode.V5Request:
208            addr = rtu_unsigned_int(self._frame[start + 2 : start + 4])
209            qty = rtu_unsigned_int(self._frame[start + 4 : start + 6])
210            msg += f"Request Start Addr: {addr} ({addr:02x})\n\t"
211            msg += f"Request Quantity: {qty} ({qty:02x})\n\t"
212
213        return msg
214
215
216if __name__ == "__main__":
217    frame = sys.argv[1:]
218    solarman = V5Frame("".join(frame))
219
220    print(
221        f"Frame start: {solarman.frame_start:02x} (valid: {solarman.frame_start_valid})"
222    )
223    print(
224        f"V5 Checksum: {solarman.v5_checksum:02x} (valid: {solarman.v5_checksum_valid})"
225    )
226    print(f"Length: {solarman.v5_length}")
227    print(
228        f"Control Code: {solarman.control_code.name} (hex: {solarman.control_code.value:02x})"
229    )
230    seq_no = solarman.sequence_numbers
231    print(f"Sequence numbers: {seq_no} (hex: {seq_no[0]:02x} {seq_no[1]:02x})")
232    print(f"Serial Hex: {solarman.serial:02x}")
233    print(f"Serial: {solarman.serial}")
234    ftype = solarman.frame_type
235    print(f"Frame Type ({ftype.name}): {ftype.value}")
236    print(f"Frame Status: {solarman.frame_status}")
237    print(f"Total Time: {solarman.total_work_time}")
238    print(f"PowerOn Time: {solarman.power_on_time}")
239    print(f"Offset Time: {solarman.offset_time}")
240
241    frame_time = (
242        solarman.total_work_time + solarman.power_on_time + solarman.offset_time
243    )
244    print(
245        f"Frame Time: {datetime.datetime.fromtimestamp(frame_time, datetime.timezone.utc)}"
246    )
247    if solarman.frame_type != V5FrameType.KeepAlive:
248        print(
249            f"Checksum: {solarman.frame_crc} hex: {solarman.frame_crc:02x} - RTU start at: {solarman.rtu_head}"
250        )
251        print(solarman.payload_string())