Last active
January 13, 2024 09:29
-
-
Save michaelrinderle/559e7ec6ba938ef00ad12161bdf4756b to your computer and use it in GitHub Desktop.
Python script to parse OpenCanary log file with broken JSON structure
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import socket | |
CANARY_FILE = "opencanary.log" | |
INFORMATION_LOG = 1001 | |
BRUTE_FORCE_LOG = 6001 | |
PORT_SCAN_LOG = 5001 | |
HTTP_SCAN_LOG = 3000 | |
log_data = [] | |
information_logs = [] | |
bruteforce_logs = [] | |
portscan_logs = [] | |
http_logs = [] | |
def parse_logfile(file_path): | |
with open(file_path, "r", encoding="utf-8") as f: | |
for line in f: | |
log_data.append(json.loads(line)) | |
def sort_totals(): | |
for a in log_data: | |
if a["logtype"] == INFORMATION_LOG: | |
information_logs.append(a) | |
elif a["logtype"] == BRUTE_FORCE_LOG: | |
bruteforce_logs.append(a) | |
elif a["logtype"] == PORT_SCAN_LOG: | |
portscan_logs.append(a) | |
else: | |
http_logs.append(a) | |
def print_information_logs(): | |
print(f"[*] Information logs : {len(information_logs)}") | |
for info in information_logs: | |
if info['logdata']['msg'] == "Canary running!!!": | |
print(f"[*] T: {info['local_time']} > {info['logdata']['msg']}") | |
else: | |
print(f"[*] T: {info['local_time']} > {info['logdata']['msg']['logdata']}") | |
def print_bruteforce_logs(): | |
print(f"[*] Bruteforce logs : {len(bruteforce_logs)}") | |
for brute in bruteforce_logs: | |
creds = brute['logdata'] | |
print("[*] T: {0:25} > P: {1} Src: {2:20} U: {3:15} P: {4:15}" | |
.format(brute['local_time'], | |
brute['dst_port'], | |
brute['src_host'], | |
creds['USERNAME'], | |
creds['PASSWORD'])) | |
def print_portscan_logs(): | |
print(f"[*] Portscan logs : {len(portscan_logs)}") | |
for scan in portscan_logs: | |
print("[*] T: {0:25} > P: {1} Src: {2:20}" | |
.format(scan['local_time'], | |
scan['dst_port'], | |
scan['src_host'])) | |
def print_http_logs(): | |
print(f"[*] Miscellaneous logs : {len(http_logs)}") | |
for http in http_logs: | |
print("[*] T: {0:25} > P: {1} Src: {2:20} U: {3:15} P: {4:15}" | |
.format(http['local_time'], | |
http['dst_port'], | |
http['src_host'], | |
http['logdata']['HOSTNAME'], | |
http['logdata']['USERAGENT'])) | |
def print_bruteforce_analytics(): | |
list_of_ips = [] | |
usernames = [] | |
passwords = [] | |
for brute in bruteforce_logs: | |
creds = brute['logdata'] | |
if brute['src_host'] not in list_of_ips: | |
list_of_ips.append(brute['src_host']) | |
if creds["USERNAME"] not in usernames: | |
usernames.append(creds['USERNAME']) | |
if creds['PASSWORD'] not in passwords: | |
passwords.append(creds['PASSWORD']) | |
list_of_ips.sort() | |
usernames.sort() | |
passwords.sort() | |
print("[*] Bruteforce IP/Hosts") | |
for ip in list_of_ips: | |
try: | |
hostname = socket.gethostbyaddr(ip) | |
except: | |
hostname = 'host not found' | |
print(f"IP: {ip} > {hostname}") | |
print("[*] Usernames used") | |
for username in usernames: | |
print(username) | |
print("[*] Passwords used") | |
for password in passwords: | |
print(password) | |
def main(): | |
parse_logfile(CANARY_FILE) | |
sort_totals() | |
print("[*] Log totals\n") | |
print_information_logs() | |
print_bruteforce_logs() | |
print_portscan_logs() | |
print_http_logs() | |
print_bruteforce_analytics() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment