-
Notifications
You must be signed in to change notification settings - Fork 0
/
dhcp_monitor_2.py
138 lines (116 loc) · 3.78 KB
/
dhcp_monitor_2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
from __future__ import print_function
from scapy.all import sniff
import time
import sqlalchemy
import sqlite3
import datetime
import pandas as pd
import requests
import json
def pushbullet_message(title, body, token):
msg = {"type": "note", "title": title, "body": body}
TOKEN = token
headers = {'Authorization': 'Bearer ' + TOKEN,
'Content-Type': 'application/json'}
resp = requests.post('https://api.pushbullet.com/v2/pushes',
data=json.dumps(msg),
headers=headers)
if resp.status_code != 200:
raise Exception('Error', resp.status_code)
else:
print(body)
def store_in_db(df: pd.DataFrame, db_name, dir=''):
if len(dir) == 0:
db_create_path = f'sqlite:///{db_name}.db'
db_path = f'{db_name}.db'
else:
db_create_path = f'sqlite:////{dir}/{db_name}.db'
db_path = f'{dir}/{db_name}.db'
# create db & connection
engine = sqlalchemy.create_engine(db_create_path)
con = sqlite3.connect(db_path)
cursor = con.cursor()
# create table
create_q = f"""
CREATE TABLE IF NOT EXISTS {db_name}(
p_id INTEGER PRIMARY_KEY,
hostname VARCHAR(100) NOT NULL,
requested_addr VARCHAR(20) NOT NULL,
server_id VARCHAR(20) NOT NULL,
vendor_class_id VARCHAR(20),
vendor VARCHAR(10) NOT NULL,
date DATETIME NOT NULL
)
"""
cursor.execute(create_q)
con.commit()
# data to db
df.to_sql(db_name, engine, if_exists='append', index=False)
cursor.close()
con.close()
def handle_dhcp_packet(packet, pb_token):
hostname = ''
requested_addr = ''
server_id = ''
vendor_class_id = ''
pad_list = []
vendor = ''
# Request Message
if 'DHCP' in packet and packet['DHCP'].options[0][1] == 3:
print('package entered')
print(packet['DHCP'].options)
for item in packet['DHCP'].options:
if item[0] == 'hostname':
hostname = item[1].decode()
elif item[0] == 'requested_addr':
requested_addr = item[1]
elif item[0] == 'server_id':
server_id = item[1]
elif item[0] == 'vendor_class_id':
vendor_class_id = item[1].decode()
elif item == 'pad':
pad_list.append(item)
# vendor variable
l_vendor = vendor_class_id.lower()
if 'msft' in l_vendor:
vendor = 'Microsoft'
elif 'cisco' in l_vendor:
vendor = 'Cisco Systems'
elif 'alcatel' in l_vendor:
vendor = 'Alcatel'
elif 'android' in l_vendor:
vendor = 'Android'
elif len(pad_list) <= 9:
vendor = 'Apple'
elif len(pad_list) > 9:
vendor = 'Linux'
else:
vendor = 'other'
# vendor_class_id adjustment
if len(vendor_class_id) == 0:
vendor_class_id = 'NVT'
date = datetime.datetime.now()
date = date.strftime("%Y-%m-%d %H:%M:%S")
print(date)
# add to DHCP db
data = {
'hostname': [hostname],
'requested_addr': [requested_addr],
'server_id': [server_id],
'vendor_class_id': [vendor_class_id],
'vendor': [vendor],
'date': [date]
}
df = pd.DataFrame(data)
store_in_db(df=df, db_name='DHCP')
# Send message
title = "DHCP"
message = f"{hostname} connected to home."
pushbullet_message(title, message, pb_token)
if __name__ == "__main__":
pb_token = 'INSERT TOKEN'
host = sniff(filter="udp and (port 67 or 68)", prn=lambda x: handle_dhcp_packet(x, pb_token))
try:
time.sleep(1)
except KeyboardInterrupt:
print("interrupted")