Data breaches and leaked credentials have become a recurring threat in the cybersecurity landscape, exposing sensitive information such as usernames, passwords, and email addresses. When attackers gain access to this data, they can exploit it for unauthorized access, phishing attacks, or identity theft. The risk to businesses and individuals is significant, whether leaked credentials from a third-party service or compromised internal sensitive data. Detecting and responding to compromised accounts quickly is critical to minimizing potential damage, preventing unauthorized access, and safeguarding sensitive information.
Wazuh, an open source security platform, offers capabilities for monitoring and detecting compromised accounts in real-time. By integrating with external threat intelligence sources, such as breach databases and dark web monitoring tools, Wazuh can proactively alert security teams when an account is suspected to be compromised.
This post explores how Wazuh detects compromised accounts using the Have I Been Pwned platform (HIBP). HIBP is an online platform that allows individuals and organizations to check whether their email addresses or passwords are exposed to known data breaches. Here, we will learn how to set up automated breach detection and best practices for responding to such incidents.
Infrastructure
The following infrastructure is used to demonstrate the detection of compromised accounts with Wazuh:
- A pre-built, ready-to-use Wazuh OVA 4.10.0, which includes the Wazuh central components (the Wazuh server, indexer, and dashboard). Follow this guide to download and set up Wazuh in a virtual environment.
- A Have I Been Pwned API key. This requires a subscription to one of the available services offered.
Configuration
Python dependencies
Note: We use Python 3.8 as it is considered one of the most stable versions.
1. We install the Python dependencies on the Wazuh server using the commands below:
# sudo amazon-linux-extras enable python3.8 # sudo yum install python3.8
2. Confirm that you have a Python version above 3.8 installed.
# python3.8 --version
Python 3.8.20
3. Install Pip:
# curl -O https://bootstrap.pypa.io/get-pip.py # sudo python3.8 get-pip.py
4. Install requests
and downgrade urllib3
:
# python3.8 -m pip install requests # python3.8 -m pip uninstall urllib3 # python3.8 -m pip install urllib3==1.26.15
We need to downgrade urllib3
because the current version requires OpenSSL 1.1.1+
, while the Wazuh server has an older version of OpenSSL (1.0.2)
. Upgrading OpenSSL may cause some compatibility issues for our Python script.
Python script
1. Create a script hibp.py
in the /home/wazuh-user/hibp.py
folder of the Wazuh server:
import requests import time import json import os from datetime import datetime, timedelta # Configuration API_KEY = "<YOUR_HIBP_API_KEY>" # Replace with your HIBP API key EMAIL_LIST_FILE = "/home/wazuh-user/email_list.txt" # File containing email addresses (one per line) OUTPUT_LOG_FILE = "/var/log/hibp_breach_checks.log" # Log file path CACHE_FILE = "hibp_cache.json" # To store recently checked emails and avoid redundant checks BREACH_DETAILS_CACHE_FILE = "breach_details_cache.json" # Cache for breach descriptions RATE_LIMIT_DELAY = 60 # Seconds to wait between API calls to respect rate limits CACHE_EXPIRATION_DAYS = 6 # How long to consider cached results valid # Ensure the output log file directory exists os.makedirs(os.path.dirname(OUTPUT_LOG_FILE), exist_ok=True) # Load cache (if exists and valid) if os.path.exists(CACHE_FILE): try: with open(CACHE_FILE, "r") as f: cache = json.load(f) except (json.JSONDecodeError, ValueError): print("Cache file is empty or corrupted. Initializing empty cache.") cache = {} else: cache = {} # Load breach details cache (if exists and valid) if os.path.exists(BREACH_DETAILS_CACHE_FILE): try: with open(BREACH_DETAILS_CACHE_FILE, "r") as f: breach_details_cache = json.load(f) except (json.JSONDecodeError, ValueError): print("Breach details cache file is empty or corrupted. Initializing empty cache.") breach_details_cache = {} else: breach_details_cache = {} def save_cache(): """Save the email check cache to a file.""" with open(CACHE_FILE, "w") as f: json.dump(cache, f) def save_breach_details_cache(): """Save the breach details cache to a file.""" with open(BREACH_DETAILS_CACHE_FILE, "w") as f: json.dump(breach_details_cache, f) def is_recently_checked(email): """Check if an email has been checked recently.""" if email in cache: last_checked = datetime.strptime(cache[email]["last_checked"], "%Y-%m-%dT%H:%M:%S") if datetime.now() - last_checked < timedelta(days=CACHE_EXPIRATION_DAYS): return True return False def get_breach_details(breach_name): """Fetch detailed information about a breach, using cache if available.""" if breach_name in breach_details_cache: return breach_details_cache[breach_name] url = f"https://haveibeenpwned.com/api/v3/breach/{breach_name}" headers = {"hibp-api-key": API_KEY, "User-Agent": "HIBP-Wazuh-Integration"} response = requests.get(url, headers=headers) if response.status_code == 200: breach_data = response.json() description = breach_data.get("Description", "No description available") breach_details_cache[breach_name] = description save_breach_details_cache() return description else: return "No description available" def log_breach_info(email, breach, description): """Log each breach separately with its description to a file.""" log_entry = { "email": email, "breaches": {breach: description}, "last_checked": datetime.now().strftime("%Y-%m-%dT%H:%M:%S"), "Source": "hibpwned" } with open(OUTPUT_LOG_FILE, "a") as log_file: log_file.write(json.dumps(log_entry) + "\n") def check_email_breaches(email): """Check if an email has been breached using the HIBP API.""" url = f"https://haveibeenpwned.com/api/v3/breachedaccount/{email}" headers = {"hibp-api-key": API_KEY, "User-Agent": "HIBP-Wazuh-Integration"} response = requests.get(url, headers=headers) if response.status_code == 200: breaches = response.json() breach_names = [breach['Name'] for breach in breaches] # Extract breach names cache[email] = {"breaches": breach_names, "last_checked": datetime.now().strftime("%Y-%m-%dT%H:%M:%S")} for breach in breach_names: description = get_breach_details(breach) log_breach_info(email, breach, description) print(f"{email} found in breaches: {breach_names}") elif response.status_code == 404: print(f"{email} not found in any breaches.") cache[email] = {"breaches": [], "last_checked": datetime.now().strftime("%Y-%m-%dT%H:%M:%S")} else: print(f"Error checking {email}: {response.status_code} - {response.text}") def main(): """Main function to read emails and check breaches.""" with open(EMAIL_LIST_FILE, "r") as f: emails = [line.strip() for line in f.readlines()] for email in emails: if is_recently_checked(email): print(f"Skipping recently checked email: {email}") continue check_email_breaches(email) save_cache() time.sleep(RATE_LIMIT_DELAY) if __name__ == "__main__": main()
You can customize the following values according to your requirements:
API_KEY = "<YOUR_HIBP_API_KEY>"
: Replace<YOUR_HIBP_API_KEY>
with your HIBP API key.EMAIL_LIST_FILE = "/home/wazuh-user/email_list.txt"
: This file contains the referenced email addresses (one per line).OUTPUT_LOG_FILE = "/var/log/hibp_breach_checks.log"
: This is the log file path. It will be included in the Wazuh/var/ossec/etc/ossec.conf
configuration file for monitoring.CACHE_FILE = "hibp_cache.json"
: This file stores recently checked emails to avoid redundant checks.BREACH_DETAILS_CACHE_FILE = "breach_details_cache.json"
: This cache stores breach descriptions.RATE_LIMIT_DELAY = 60
: You can replace this value to customize the number of seconds to wait between API calls. Please note that HIPB has a rate limit of 10 API calls per minute.CACHE_EXPIRATION_DAYS = 7
: This value represents how long it takes to consider cached results valid.
2. Set the correct permissions:
# sudo chown root:wazuh-user /home/wazuh-user/hibp.py # sudo chmod 750 /home/wazuh-user/hibp.py
3. Create an email list for your script in the /home/wazuh-user/
directory and save the file name as email_list.txt
. In this file, you will save the email addresses of the users you want to have routine checks on.
Cron job
Create a cron job to execute this script every 7 days (weekly).
1. Run the following command to open the cron tab table for the current user:
# export VISUAL=nano # export EDITOR=nano # crontab -e
2. Add the following line to the crontab table:
0 0 */7 * * /usr/bin/python3.8 /home/wazuh-user/hibp.py
The above cron job command runs the script /home/wazuh-user/hibp.py
at midnight (00:00) every 7 days.
Save and exit.
Rules
1. Add the following rules to the /var/ossec/etc/rules/local_rules.xml
file:
<group name="hibp, compromised_accounts, security"> <rule id="100100" level="10"> <decoded_as>json</decoded_as> <match>hibpwned</match> <description>Potential data breach detected for monitored email</description> </rule> </group>
Log collection
1. Add the following command to the <ossec_config>
block of the Wazuh server /var/ossec/etc/ossec.conf
configuration file:
<localfile> <log_format>json</log_format> <location>/var/log/hibp_breach_checks.log</location> </localfile>
Where:
<log_format>
: represents the received log format.
<location>
: represents the output log path for the hibp.py
Python script.
2. Restart the Wazuh manager service:
# systemctl restart wazuh-manager
Visualizing the alerts on your Wazuh dashboard
The cronjob created earlier will automatically execute the hibp.py
script weekly using the email address list email_list.txt
as a reference. You can always modify the execution frequency and include more email addresses to the existing list to have as much coverage as you wish.
1. Navigate to the Threat Hunting page on the Wazuh dashboard after executing the script.
2. Select the Events tab and click + Add filter. Filter for rule.id
in the Field field, filter for is
in the Operator field, and 100100
in the Value field.
3. Click Save to enable the filter.
4. Select Inspect document details to view the details of the logs.
Conclusion
Detecting compromised accounts is important to any organization’s cybersecurity strategy. By integrating external breach intelligence with Wazuh, businesses can actively monitor for leaked credentials and respond swiftly to potential threats.
Wazuh is a free and open source security platform built with several capabilities to detect and protect your digital assets from attacks. You can also integrate your Wazuh deployment with external platforms for expanded coverage. Please join our community to interact with other professionals and learn more about our product.