-
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathexample_api_msg_async.py
More file actions
62 lines (47 loc) · 2.05 KB
/
example_api_msg_async.py
File metadata and controls
62 lines (47 loc) · 2.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import asyncio
from datetime import datetime
import logging
import sys
from pystuderxcom import AsyncXcomApiTcp, XcomApiTcp, XcomApiTcpMode
from pystuderxcom import XcomData, XcomValues, XcomValuesItem
from pystuderxcom import XcomVoltage, XcomAggregationType, XcomFormat
from helper import RunHelper
# Setup logging to StdOut
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
logger = logging.getLogger(__name__)
async def main():
# When Moxa is configured as TCP Client (preferred mode):
# api = AsyncXcomApiTcp(mode=XcomApiTcpMode.SERVER, listen_port=4001)
#
# When Moxa is configured as TCP Server:
# api = AsyncXcomApiTcp(mode=XcomApiTcpMode.CLIENT, remote_ip=<moxa_ip>, remote_port=<moxa_port>)
#
# When Moxa is configured as UDP:
# api = AsyncXcomApiUdp(remote_ip=<moxa_ip>, remote_port=<moxa_port>, local_port=4001)
api = AsyncXcomApiTcp(mode=XcomApiTcpMode.SERVER, listen_port=4001) # port number configured in Xcom-LAN/Moxa NPort
try:
if not await api.start():
logger.info(f"Did not connect to Xcom")
return
# Retrieve unique guid for this installation
logger.info(f"")
logger.info(f"Retrieve unique guid of this installation")
value = await api.request_guid()
logger.info(f"Installation Guid: {value}")
# Retrieve messages
logger.info(f"")
logger.info(f"Retrieve messages")
# Retrieving message #0 returns the last saved message.
# But be aware that it will also erase the flag that there are new messages
idx = 0
for idx in range(0, 0xFFFFFFFF):
msg = await api.request_message(idx)
logger.info(f"msg #{idx} from {msg.source_address} at {datetime.fromtimestamp(msg.timestamp)}: {msg.message_string}")
if msg.message_total <= 1:
break
except Exception as e:
logger.info(f"Unexpected exception: {e}")
finally:
logger.info(f"")
await api.stop()
RunHelper.run(main) # main loop