python实现黑铁砖安防接警中心

前言

最近搞了一套安防系统,大致就是一堆传感器,检测然后报警。原来的功能是报警就喇叭一直响三分钟,实在刺耳,虽然原系统也支持微信告警,但是感觉不太好用。最后决定自己研究一下,原系统支持接警中心配置,安定宝协议(ademco)(关于这个没有过多研究)其实就是tcp通信。

主要实现

这家公司的实现是c的

https://github.com/captainwong/ademco_hb

参考代码用python实现

重写CRC校验

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# /**
# * @brief 计算一个字节(char)的crc16值
# * @param c 要计算crc16的字节
# * @param crc 初始crc值
# * @return crc16
# */
def CalculateCRC_char(c, crc):
crcTable = [
# /* DEFINE THE FIRST ORDER POLYINOMIAL TABLE */
0x0000, 0xC0C1, 0xC181, 0x0140, 0xC301, 0x03C0, 0x0280, 0xC241,
0xC601, 0x06C0, 0x0780, 0xC741, 0x0500, 0xC5C1, 0xC481, 0x0440,
0xCC01, 0x0CC0, 0x0D80, 0xCD41, 0x0F00, 0xCFC1, 0xCE81, 0x0E40,
0x0A00, 0xCAC1, 0xCB81, 0x0B40, 0xC901, 0x09C0, 0x0880, 0xC841,
0xD801, 0x18C0, 0x1980, 0xD941, 0x1B00, 0xDBC1, 0xDA81, 0x1A40,
0x1E00, 0xDEC1, 0xDF81, 0x1F40, 0xDD01, 0x1DC0, 0x1C80, 0xDC41,
0x1400, 0xD4C1, 0xD581, 0x1540, 0xD701, 0x17C0, 0x1680, 0xD641,
0xD201, 0x12C0, 0x1380, 0xD341, 0x1100, 0xD1C1, 0xD081, 0x1040,
0xF001, 0x30C0, 0x3180, 0xF141, 0x3300, 0xF3C1, 0xF281, 0x3240,
0x3600, 0xF6C1, 0xF781, 0x3740, 0xF501, 0x35C0, 0x3480, 0xF441,
0x3C00, 0xFCC1, 0xFD81, 0x3D40, 0xFF01, 0x3FC0, 0x3E80, 0xFE41,
0xFA01, 0x3AC0, 0x3B80, 0xFB41, 0x3900, 0xF9C1, 0xF881, 0x3840,
0x2800, 0xE8C1, 0xE981, 0x2940, 0xEB01, 0x2BC0, 0x2A80, 0xEA41,
0xEE01, 0x2EC0, 0x2F80, 0xEF41, 0x2D00, 0xEDC1, 0xEC81, 0x2C40,
0xE401, 0x24C0, 0x2580, 0xE541, 0x2700, 0xE7C1, 0xE681, 0x2640,
0x2200, 0xE2C1, 0xE381, 0x2340, 0xE101, 0x21C0, 0x2080, 0xE041,
0xA001, 0x60C0, 0x6180, 0xA141, 0x6300, 0xA3C1, 0xA281, 0x6240,
0x6600, 0xA6C1, 0xA781, 0x6740, 0xA501, 0x65C0, 0x6480, 0xA441,
0x6C00, 0xACC1, 0xAD81, 0x6D40, 0xAF01, 0x6FC0, 0x6E80, 0xAE41,
0xAA01, 0x6AC0, 0x6B80, 0xAB41, 0x6900, 0xA9C1, 0xA881, 0x6840,
0x7800, 0xB8C1, 0xB981, 0x7940, 0xBB01, 0x7BC0, 0x7A80, 0xBA41,
0xBE01, 0x7EC0, 0x7F80, 0xBF41, 0x7D00, 0xBDC1, 0xBC81, 0x7C40,
0xB401, 0x74C0, 0x7580, 0xB541, 0x7700, 0xB7C1, 0xB681, 0x7640,
0x7200, 0xB2C1, 0xB381, 0x7340, 0xB101, 0x71C0, 0x7080, 0xB041,
0x5000, 0x90C1, 0x9181, 0x5140, 0x9301, 0x53C0, 0x5280, 0x9241,
0x9601, 0x56C0, 0x5780, 0x9741, 0x5500, 0x95C1, 0x9481, 0x5440,
0x9C01, 0x5CC0, 0x5D80, 0x9D41, 0x5F00, 0x9FC1, 0x9E81, 0x5E40,
0x5A00, 0x9AC1, 0x9B81, 0x5B40, 0x9901, 0x59C0, 0x5880, 0x9841,
0x8801, 0x48C0, 0x4980, 0x8941, 0x4B00, 0x8BC1, 0x8A81, 0x4A40,
0x4E00, 0x8EC1, 0x8F81, 0x4F40, 0x8D01, 0x4DC0, 0x4C80, 0x8C41,
0x4400, 0x84C1, 0x8581, 0x4540, 0x8701, 0x47C0, 0x4680, 0x8641,
0x8201, 0x42C0, 0x4380, 0x8341, 0x4100, 0x81C1, 0x8081, 0x4040,
]
CRC = crc
tmp = (CRC >> 8) ^ (crcTable[ord(c) ^ (CRC & 0xFF)])
CRC = tmp
return CRC


#
# /**
# * @brief 计算一段数据包的crc16值
# * @param buff 要计算crc16的数据对象
# * @param crc 初始crc值
# * @return crc16
# */
def CalculateCRC(buff, crc=0):
CRC = crc
for i in buff:
CRC = CalculateCRC_char(i, CRC)
return str(hex(CRC)).upper().replace("0X", "")

解读客户端发过来的包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def handle_data(data):
status = True
try:
sdata = str(data, encoding="utf-8")
if len(sdata) > 100:
print("bigdata")
sdata = sdata.split('\r\n')[0] + '\r'
length = int(sdata[5:9], 16)
body = sdata[9:length + 9]
head, foot = body.split('R000000L000000', 1)
dic = {
"LF": sdata[0:1],
"crc16": sdata[1:5],
"length": sdata[5:9],
"id": head[:-4],
"seq": head[-4:],
"sn": foot[0:15],
"data": foot[15:-20],
"timestamp": sdata[-21:-1],
"CR": sdata[-1:]
}
if dic['id'] == '"HENG-BO"' or dic['id'] == '"ADM-CID"':
dic['data_event'] = dic['data'][9:13]
#
dic['data_gg'] = dic['data'][14:16]
# 防区号 000 为主机事件
dic['data_zone'] = dic['data'][17:20]
except Exception as e:
import traceback
traceback.print_exc()
status = False
dic = {"error": str(e)}
print_exc()
return dic, status

构建回复包

1
2
3
4
5
6
7
8
def gen_data(id_type, seq: str = '0000', sn: str = '#11220133108688'):
# 构建心跳包回复和
# ret = '\n'+f""+'\r'
# < "id" > < seq > < Rrcvr > < Lpref > < # acct> <data> <*xdata> <timestamp>
ret = id_type + f"{seq}R123ABCL456DEF{sn}[]" + utc_now()
crc16 = CalculateCRC(ret)
length = get_hex_upper(len(ret))
return bytes('\n' + crc16 + length.rjust(4, '0') + ret + '\r', encoding='utf-8')

主要代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# !usr/bin/env python
# -*- coding:utf-8 -*-
# Echo server program

import socketserver
from datetime import datetime
from traceback import print_exc
from win10toast import ToastNotifier
from playsound import playsound

HOST = ''
PORT = 12345


def utc_now():
return datetime.now().strftime("_%H:%M:%S,%m-%d-%Y")


def get_hex_upper(ten_num: int):
return hex(ten_num).upper().replace('0X', '')

class MyTCPHandler(socketserver.BaseRequestHandler):

def handle(self):
toast = ToastNotifier()
toast.show_toast(title="开始监控", msg=datetime.now().strftime('%H:%M:%S'),
icon_path='not.ico',
duration=10, threaded=True)
while True:
# self.request is the TCP socket connected to the client
try:
# 等待连接
self.data = self.request.recv(1024)
except Exception as e:
import traceback
traceback.print_exc()
print(str(e))
continue
# print("{} wrote:".format(self.client_address[0]))
# print(self.data)
try:
ret, status = handle_data(self.data)
# NULL, ACK, DUH, ADM - CID, HENG - BO,分别为心跳(客户端)、回应(服务器)、错误、主机事件、恒博事件
if ret['id'] == '"NULL"':
# 心跳
send_data = gen_data('"ACK"', sn=ret['sn'])
# print("心跳")
print(".", end="")
elif ret['id'] == '"HENG-BO"':
# 回应
if ret['seq'] == '0000':
# 心跳 或 事件
send_data = gen_data('"ACK"', sn=ret['sn'])
# if ret['data_event'] != '3400' and ret['data_event'] != '1400':
# # print(ret['data_event'],type(ret['data_event']))
print("事件", ret['data_event'], ret['data_gg'], ret['data_zone'])
# else:
# print(".",end="")
else:
# 告警
send_data = gen_data('"ACK"', seq=ret['seq'], sn=ret['sn'])
# 处理 声音 and 弹窗
try:
playsound('m1.wav',block=False)
except Exception as e:
print("播放语音失败!")
print("\n收到警报!序号-", ret['seq'], "类型:", ret['data_event'], ret['data_gg'], "防区:",
ret['data_zone'])
toast.show_toast(title="有人经过", msg=datetime.now().strftime('%H:%M:%S'),
icon_path='not.ico',
duration=10, threaded=True)
# print("应答包:", send_data, type(send_data))
self.request.sendall(send_data)
except Exception as e:
import traceback
traceback.print_exc()
print(e)
break


if __name__ == "__main__":
print("start")
server = socketserver.TCPServer((HOST, PORT), MyTCPHandler)
server.serve_forever()
input("over!")

实现了报警在win端播放声音 和 弹出win通知。

写在后面

最近还买了个usb继电器和一个usb灯,准备在报警的时候实现灯条闪烁,但是因为疫情暂时还没到货,后面到了再看看

python实现黑铁砖安防接警中心

https://blog.rui.plus/2022/2022-3-24/

作者

Rui plus

发布于

2022-03-24

更新于

2022-03-24

许可协议

评论