Examples¶
Basic (sync) Client Example¶
1""" A basic client demonstrating how to use pysolarmanv5."""
2from pysolarmanv5 import PySolarmanV5
3
4
5def main():
6 """Create new PySolarman instance, using IP address and S/N of data logger
7
8 Only IP address and S/N of data logger are mandatory parameters. If port,
9 mb_slave_id, and verbose are omitted, they will default to 8899, 1 and 0
10 respectively.
11 """
12 modbus = PySolarmanV5(
13 "192.168.1.24", 123456789, port=8899, mb_slave_id=1, verbose=False
14 )
15
16 """Query six input registers, results as a list"""
17 print(modbus.read_input_registers(register_addr=33022, quantity=6))
18
19 """Query six holding registers, results as list"""
20 print(modbus.read_holding_registers(register_addr=43000, quantity=6))
21
22 """Query single input register, result as an int"""
23 print(modbus.read_input_register_formatted(register_addr=33035, quantity=1))
24
25 """Query single input register, apply scaling, result as a float"""
26 print(
27 modbus.read_input_register_formatted(register_addr=33035, quantity=1, scale=0.1)
28 )
29
30 """Query two input registers, shift first register up by 16 bits, result as a signed int, """
31 print(
32 modbus.read_input_register_formatted(register_addr=33079, quantity=2, signed=1)
33 )
34
35 """Query single holding register, apply bitmask and bitshift left (extract bit1 from register)"""
36 print(
37 modbus.read_holding_register_formatted(
38 register_addr=43110, quantity=1, bitmask=0x2, bitshift=1
39 )
40 )
41
42 modbus.disconnect()
43
44if __name__ == "__main__":
45 main()
Async Client Example¶
1""" A basic client demonstrating how to use the async version of pysolarmanv5."""
2from pysolarmanv5 import PySolarmanV5Async
3import asyncio
4
5
6async def main():
7 """Create new PySolarman instance, using IP address and S/N of data logger
8
9 Only IP address and S/N of data logger are mandatory parameters. If port,
10 mb_slave_id, and verbose are omitted, they will default to 8899, 1 and 0
11 respectively.
12 """
13 modbus = PySolarmanV5Async(
14 '192.168.1.121', 1234567890, port=8899, mb_slave_id=1, verbose=False, auto_reconnect=True
15 )
16 await modbus.connect()
17
18 """Query six input registers, results as a list"""
19 print(await modbus.read_input_registers(register_addr=33022, quantity=6))
20
21 """Query six holding registers, results as list"""
22 print(await modbus.read_holding_registers(register_addr=60, quantity=6))
23
24 """Query single input register, result as an int"""
25 print(await modbus.read_input_register_formatted(register_addr=33035, quantity=1))
26
27 """Query single input register, apply scaling, result as a float"""
28
29 print(
30 await modbus.read_input_register_formatted(register_addr=33035, quantity=1, scale=0.1)
31 )
32
33 """Query two input registers, shift first register up by 16 bits, result as a signed int, """
34 print(
35 await modbus.read_input_register_formatted(register_addr=33079, quantity=2, signed=1)
36 )
37
38
39 """Query single holding register, apply bitmask and bitshift left (extract bit1 from register)"""
40 print(
41 await modbus.read_holding_register_formatted(
42 register_addr=500, quantity=1, bitmask=0x2, bitshift=1
43 )
44 )
45
46 await modbus.disconnect()
47
48if __name__ == "__main__":
49 asyncio.run(main())
Register Scan Example¶
1""" Scan Modbus registers to find valid registers"""
2from pysolarmanv5 import PySolarmanV5, V5FrameError
3import umodbus.exceptions
4
5
6def main():
7 modbus = PySolarmanV5(
8 "192.168.1.24", 123456789, port=8899, mb_slave_id=1, verbose=False
9 )
10
11 print("Scanning input registers")
12 for x in range(30000, 39999):
13 try:
14 val = modbus.read_input_registers(register_addr=x, quantity=1)[0]
15 print(f"Register: {x:05}\t\tValue: {val:05} ({val:#06x})")
16 except (V5FrameError, umodbus.exceptions.IllegalDataAddressError):
17 continue
18 print("Finished scanning input registers")
19
20 print("Scanning holding registers")
21 for x in range(40000, 49999):
22 try:
23 val = modbus.read_holding_registers(register_addr=x, quantity=1)[0]
24 print(f"Register: {x:05}\t\tValue: {val:05} ({val:#06x})")
25 except (V5FrameError, umodbus.exceptions.IllegalDataAddressError):
26 continue
27 print("Finished scanning holding registers")
28
29
30if __name__ == "__main__":
31 main()
Solarman Proxy¶
1""" Modbus RTU over TCP to Solarman proxy
2
3Can be used with Home Assistant's native Modbus integration using config below:
4
5- name: "solarman-modbus-proxy"
6 type: rtuovertcp
7 host: 192.168.1.20
8 port: 1502
9 delay: 3
10 retry_on_empty: true
11 sensors:
12 [...]
13
14"""
15
16import asyncio
17from pysolarmanv5 import PySolarmanV5Async, V5FrameError, NoSocketAvailableError
18
19
20async def handle_client(reader, writer):
21 solarmanv5 = PySolarmanV5Async(
22 "192.168.1.24", 123456789, verbose=True, auto_reconnect=True
23 )
24 await solarmanv5.connect()
25
26 addr = writer.get_extra_info("peername")
27
28 print(f"{addr}: New connection")
29
30 while True:
31 modbus_request = await reader.read(1024)
32 if not modbus_request:
33 break
34 try:
35 reply = await solarmanv5.send_raw_modbus_frame(modbus_request)
36 writer.write(reply)
37 except:
38 pass
39
40 await writer.drain()
41 print(f"{addr}: Connection closed")
42 await solarmanv5.disconnect()
43
44
45async def run_server():
46 server = await asyncio.start_server(handle_client, "0.0.0.0", 1502)
47 async with server:
48 await server.serve_forever()
49
50
51asyncio.run(run_server())