Detecting compromised accounts with HIBP and Wazuh

| by | Wazuh 4.10.0
Post icon

Data breaches and leaked credentials have become a recurring threat in the cybersecurity landscape, exposing sensitive information such as usernames, passwords, and email addresses. When attackers gain access to this data, they can exploit it for unauthorized access, phishing attacks, or identity theft. The risk to businesses and individuals is significant, whether leaked credentials from a third-party service or compromised internal sensitive data. Detecting and responding to compromised accounts quickly is critical to minimizing potential damage, preventing unauthorized access, and safeguarding sensitive information.

Wazuh, an open source security platform, offers capabilities for monitoring and detecting compromised accounts in real-time. By integrating with external threat intelligence sources, such as breach databases and dark web monitoring tools, Wazuh can proactively alert security teams when an account is suspected to be compromised. 

This post explores how Wazuh detects compromised accounts using the Have I Been Pwned platform (HIBP). HIBP is an online platform that allows individuals and organizations to check whether their email addresses or passwords are exposed to known data breaches. Here, we will learn how to set up automated breach detection and best practices for responding to such incidents.

Infrastructure

The following infrastructure is used to demonstrate the detection of compromised accounts with Wazuh:

  • A pre-built, ready-to-use Wazuh OVA 4.10.0, which includes the Wazuh central components (the Wazuh server, indexer, and dashboard). Follow this guide to download and set up Wazuh in a virtual environment.
  • A Have I Been Pwned API key. This requires a subscription to one of the available services offered.

Configuration

Python dependencies

Note: We use Python 3.8 as it is considered one of the most stable versions.

1. We install the Python dependencies on the Wazuh server using the commands below:

# sudo amazon-linux-extras enable python3.8
# sudo yum install python3.8

2. Confirm that you have a Python version above 3.8 installed.

# python3.8 --version
Python 3.8.20

3. Install Pip:

# curl -O https://bootstrap.pypa.io/get-pip.py
# sudo python3.8 get-pip.py

4. Install requests and downgrade urllib3:

# python3.8 -m pip install requests
# python3.8 -m pip uninstall urllib3
# python3.8 -m pip install urllib3==1.26.15

We need to downgrade urllib3 because the current version requires OpenSSL 1.1.1+, while the Wazuh server has an older version of OpenSSL (1.0.2). Upgrading OpenSSL may cause some compatibility issues for our Python script.

Python script

1. Create a script hibp.py in the /home/wazuh-user/hibp.py folder of the Wazuh server:

import requests
import time
import json
import os
from datetime import datetime, timedelta

# Configuration
API_KEY = "<YOUR_HIBP_API_KEY>"  # Replace with your HIBP API key
EMAIL_LIST_FILE = "/home/wazuh-user/email_list.txt"  # File containing email addresses (one per line)
OUTPUT_LOG_FILE = "/var/log/hibp_breach_checks.log"  # Log file path
CACHE_FILE = "hibp_cache.json"  # To store recently checked emails and avoid redundant checks
BREACH_DETAILS_CACHE_FILE = "breach_details_cache.json"  # Cache for breach descriptions
RATE_LIMIT_DELAY = 60  # Seconds to wait between API calls to respect rate limits
CACHE_EXPIRATION_DAYS = 6  # How long to consider cached results valid

# Ensure the output log file directory exists
os.makedirs(os.path.dirname(OUTPUT_LOG_FILE), exist_ok=True)

# Load cache (if exists and valid)
if os.path.exists(CACHE_FILE):
    try:
        with open(CACHE_FILE, "r") as f:
            cache = json.load(f)
    except (json.JSONDecodeError, ValueError):
        print("Cache file is empty or corrupted. Initializing empty cache.")
        cache = {}
else:
    cache = {}

# Load breach details cache (if exists and valid)
if os.path.exists(BREACH_DETAILS_CACHE_FILE):
    try:
        with open(BREACH_DETAILS_CACHE_FILE, "r") as f:
            breach_details_cache = json.load(f)
    except (json.JSONDecodeError, ValueError):
        print("Breach details cache file is empty or corrupted. Initializing empty cache.")
        breach_details_cache = {}
else:
    breach_details_cache = {}

def save_cache():
    """Save the email check cache to a file."""
    with open(CACHE_FILE, "w") as f:
        json.dump(cache, f)

def save_breach_details_cache():
    """Save the breach details cache to a file."""
    with open(BREACH_DETAILS_CACHE_FILE, "w") as f:
        json.dump(breach_details_cache, f)

def is_recently_checked(email):
    """Check if an email has been checked recently."""
    if email in cache:
        last_checked = datetime.strptime(cache[email]["last_checked"], "%Y-%m-%dT%H:%M:%S")
        if datetime.now() - last_checked < timedelta(days=CACHE_EXPIRATION_DAYS):
            return True
    return False

def get_breach_details(breach_name):
    """Fetch detailed information about a breach, using cache if available."""
    if breach_name in breach_details_cache:
        return breach_details_cache[breach_name]

    url = f"https://haveibeenpwned.com/api/v3/breach/{breach_name}"
    headers = {"hibp-api-key": API_KEY, "User-Agent": "HIBP-Wazuh-Integration"}
    response = requests.get(url, headers=headers)

    if response.status_code == 200:
        breach_data = response.json()
        description = breach_data.get("Description", "No description available")
        breach_details_cache[breach_name] = description
        save_breach_details_cache()
        return description
    else:
        return "No description available"

def log_breach_info(email, breach, description):
    """Log each breach separately with its description to a file."""
    log_entry = {
        "email": email,
        "breaches": {breach: description},
        "last_checked": datetime.now().strftime("%Y-%m-%dT%H:%M:%S"),
        "Source": "hibpwned"
    }
    with open(OUTPUT_LOG_FILE, "a") as log_file:
        log_file.write(json.dumps(log_entry) + "\n")

def check_email_breaches(email):
    """Check if an email has been breached using the HIBP API."""
    url = f"https://haveibeenpwned.com/api/v3/breachedaccount/{email}"
    headers = {"hibp-api-key": API_KEY, "User-Agent": "HIBP-Wazuh-Integration"}
    response = requests.get(url, headers=headers)

    if response.status_code == 200:
        breaches = response.json()
        breach_names = [breach['Name'] for breach in breaches]  # Extract breach names
        cache[email] = {"breaches": breach_names, "last_checked": datetime.now().strftime("%Y-%m-%dT%H:%M:%S")}

        for breach in breach_names:
            description = get_breach_details(breach)
            log_breach_info(email, breach, description)

        print(f"{email} found in breaches: {breach_names}")
    elif response.status_code == 404:
        print(f"{email} not found in any breaches.")
        cache[email] = {"breaches": [], "last_checked": datetime.now().strftime("%Y-%m-%dT%H:%M:%S")}
    else:
        print(f"Error checking {email}: {response.status_code} - {response.text}")

def main():
    """Main function to read emails and check breaches."""
    with open(EMAIL_LIST_FILE, "r") as f:
        emails = [line.strip() for line in f.readlines()]

    for email in emails:
        if is_recently_checked(email):
            print(f"Skipping recently checked email: {email}")
            continue

        check_email_breaches(email)
        save_cache()
        time.sleep(RATE_LIMIT_DELAY)

if __name__ == "__main__":
    main()

You can customize the following values according to your requirements:

  • API_KEY = "<YOUR_HIBP_API_KEY>": Replace <YOUR_HIBP_API_KEY> with your HIBP API key.
  • EMAIL_LIST_FILE = "/home/wazuh-user/email_list.txt": This file contains the referenced email addresses (one per line).
  • OUTPUT_LOG_FILE = "/var/log/hibp_breach_checks.log": This is the log file path. It will be included in the Wazuh /var/ossec/etc/ossec.conf configuration file for monitoring. 
  • CACHE_FILE = "hibp_cache.json": This file stores recently checked emails to avoid redundant checks.
  • BREACH_DETAILS_CACHE_FILE = "breach_details_cache.json": This cache stores breach descriptions.
  • RATE_LIMIT_DELAY = 60: You can replace this value to customize the number of seconds to wait between API calls. Please note that HIPB has a rate limit of 10 API calls per minute.
  • CACHE_EXPIRATION_DAYS = 7: This value represents how long it takes to consider cached results valid.

2. Set the correct permissions:

# sudo chown root:wazuh-user /home/wazuh-user/hibp.py
# sudo chmod 750 /home/wazuh-user/hibp.py

3. Create an email list for your script in the /home/wazuh-user/ directory and save the file name as email_list.txt. In this file, you will save the email addresses of the users you want to have routine checks on.

Cron job

Create a cron job to execute this script every 7 days (weekly).

1. Run the following command to open the cron tab table for the current user:

# export VISUAL=nano
# export EDITOR=nano
# crontab -e

2. Add the following line to the crontab table:

0 0 */7 * * /usr/bin/python3.8 /home/wazuh-user/hibp.py

The above cron job command runs the script /home/wazuh-user/hibp.py at midnight (00:00) every 7 days.

Save and exit.

Rules

1. Add the following rules to the /var/ossec/etc/rules/local_rules.xml file:

<group name="hibp, compromised_accounts, security">
  <rule id="100100" level="10">
      <decoded_as>json</decoded_as>
      <match>hibpwned</match>
      <description>Potential data breach detected for monitored email</description>
  </rule>
</group>

Log collection

1. Add the following command to the <ossec_config> block of the Wazuh server /var/ossec/etc/ossec.conf configuration file:

  <localfile>
    <log_format>json</log_format>
    <location>/var/log/hibp_breach_checks.log</location>
  </localfile>

Where:

<log_format>: represents the received log format.

<location>: represents the output log path for the hibp.py Python script.

2. Restart the Wazuh manager service:

# systemctl restart wazuh-manager

Visualizing the alerts on your Wazuh dashboard

The cronjob created earlier will automatically execute the hibp.py script weekly using the email address list email_list.txt as a reference. You can always modify the execution frequency and include more email addresses to the existing list to have as much coverage as you wish. 

1. Navigate to the Threat Hunting page on the Wazuh dashboard after executing the script.

2. Select the Events tab and click + Add filter. Filter for rule.id in the Field field, filter for is in the Operator field, and  100100 in the Value field.

3. Click Save to enable the filter.

Compromised Accounts alerts

4. Select Inspect document details to view the details of the logs.

Compromised Accounts alerts Wazuh dashboard

Conclusion

Detecting compromised accounts is important to any organization’s cybersecurity strategy. By integrating external breach intelligence with Wazuh, businesses can actively monitor for leaked credentials and respond swiftly to potential threats. 

Wazuh is a free and open source security platform built with several capabilities to detect and protect your digital assets from attacks. You can also integrate your Wazuh deployment with external platforms for expanded coverage. Please join our community to interact with other professionals and learn more about our product.