URLhaus is a project operated by abuse.ch. The purpose of the project is to collect, track, and share malware URLs, to help network administrators and security analysts protect their networks from cyber threats. URLhaus also offers an API to query information about malicious URLs. Integrating this API with Wazuh can help organizations improve their ability to detect threats.

In this blog post, we show how to configure Wazuh to use URLhaus malicious URLs database to detect malicious URLs using the Wazuh Integrator module. We will also configure Suricata as an IDS to monitor URL connections on the monitored endpoint.

Requirements

  • Wazuh 4.9.1 central components (Wazuh server, Wazuh indexer, Wazuh dashboard) installed using the Quickstart guide on an Ubuntu server.
  • An Ubuntu 22.04 server. This endpoint has a Wazuh agent installed and enrolled to the Wazuh server.

Configuration

Perform the following steps to install and configure Suricata, integrate URLhaus, and configure custom rules.

Ubuntu endpoint

1. Install Suricata on the Ubuntu endpoint:

# add-apt-repository ppa:oisf/suricata-stable
# apt-get update
# apt-get install suricata -y

2. Download and extract the Emerging Threats Suricata ruleset:

# cd /tmp/ && curl -LO https://rules.emergingthreats.net/open/suricata-6.0.8/emerging.rules.tar.gz
# tar -xvzf emerging.rules.tar.gz && sudo mkdir /etc/suricata/rules && sudo mv rules/*.rules /etc/suricata/rules/
# chmod 640 /etc/suricata/rules/*.rules

3. Modify Suricata settings in the /etc/suricata/suricata.yaml file and set the following variables:

HOME_NET: "<IP>/<subnet>"
EXTERNAL_NET: "any"

default-rule-path: /etc/suricata/rules
rule-files:
- "*.rules"

# Global stats configuration
stats:
enabled: yes

# Linux high speed capture support
af-packet:
  - interface: enp0s3

Where:

  • <IP>/<subnet> represents the IP address of the Ubuntu endpoint. The subnet mask must also be specified, for example 10.0.2.15/24
  • Default-rule-path specifies the path where our Suricata rules are located
  • Interface represents the network interface you want to monitor. Replace the value with the interface name of the Ubuntu endpoint. For example, enp0s3.

The values for <IP>/<subnet> and Interface can be retrieved by running the ifconfig command as seen below:

# ifconfig
enp0s3: flags=4163<UP,BROADCAST,RUNNING,MULTICAST>  mtu 1500
        inet 10.0.2.15  netmask 255.255.255.0  broadcast 10.0.2.255
        inet6 fe80::9ba2:9de3:57ad:64e5  prefixlen 64  scopeid 0x20<link>
        ether 08:00:27:14:65:bd  txqueuelen 1000  (Ethernet)
        RX packets 6704315  bytes 1268472541 (1.1 GiB)
        RX errors 0  dropped 0  overruns 0  frame 0
        TX packets 4590192  bytes 569730548 (543.3 MiB)
        TX errors 0  dropped 0 overruns 0  carrier 0  collisions 0

4. Restart the Suricata service to apply the changes:

# systemctl restart suricata

5. Add the following configuration to the /var/ossec/etc/ossec.conf file of the Wazuh agent. This allows the Wazuh agent to read the Suricata logs file:

<ossec_config>
  <localfile>
    <log_format>json</log_format>
    <location>/var/log/suricata/eve.json</location>
  </localfile>
</ossec_config>

6. Restart the Wazuh agent to apply the changes:

# systemctl restart wazuh-agent

Wazuh server

1. Create a file called custom-urlhaus.py in the directory /var/ossec/integrations/. This custom Wazuh integrator module script queries URLhause with a URL reported by Suricata to determine if it is a malicious or benign:

#!/var/ossec/framework/python/bin/python3
# Copyright (C) 2015-2022, Wazuh Inc.

import json
import sys
import time
import os
from socket import socket, AF_UNIX, SOCK_DGRAM

try:
    import requests
    from requests.auth import HTTPBasicAuth
except Exception as e:
    print("No module 'requests' found. Install: pip install requests")
    sys.exit(1)

# Global vars

debug_enabled = True
pwd = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
json_alert = {}
now = time.strftime("%a %b %d %H:%M:%S %Z %Y")

# Set paths
log_file = '{0}/logs/integrations.log'.format(pwd)
socket_addr = '{0}/queue/sockets/queue'.format(pwd)

def main(args):
    debug("# Starting")

    # Read args
    alert_file_location = args[1]

    debug("# File location")
    debug(alert_file_location)

    # Load alert. Parse JSON object.
    with open(alert_file_location) as alert_file:
        json_alert = json.load(alert_file)
    debug("# Processing alert")
    debug(json_alert)

    # Request urlhaus info
    msg = request_urlhaus_info(json_alert)

    # If positive match, send event to Wazuh Manager
    if msg:
        send_event(msg, json_alert["agent"])

def debug(msg):
    if debug_enabled:
        msg = "{0}: {1}\n".format(now, msg)

        print(msg)

        f = open(log_file,"a")
        f.write(msg)
        f.close()

def collect(data):
  urlhaus_reference = data['urlhaus_reference']
  url_status = data['url_status']
  url_date_added = data['date_added']
  url_threat = data['threat']
  url_blacklist_spamhaus = data['blacklists']['spamhaus_dbl']
  url_blacklist_surbl = data['blacklists']['surbl']
  url_tags = data['tags']
  return urlhaus_reference, url_status, url_date_added, url_threat, url_blacklist_spamhaus, url_blacklist_surbl, url_tags

def in_database(data, url):
  result = data['query_status']
  debug(result)
  if result == "ok":
    return True
  return False

def query_api(url):
  params = {'url': url}
  response = requests.post('https://urlhaus-api.abuse.ch/v1/url/', params)
  json_response = response.json()
  if json_response['query_status'] == 'ok':
      data = json_response
      debug(data)
      return data
  else:
      alert_output = {}
      alert_output["urlhaus"] = {}
      alert_output["integration"] = "custom-urlhaus"
      json_response = response.json()
      debug("# Error: The URLHAUS integration encountered an error")
      alert_output["urlhaus"]["error"] = response.status_code
      alert_output["urlhaus"]["description"] = json_response["errors"][0]["detail"]
      send_event(alert_output)
      exit(0)

def request_urlhaus_info(alert):
    alert_output = {}
    # If there is no url address present in the alert. Exit.
    if alert["data"]["http"]["redirect"] == None:
      return(0)

    # Request info using urlhaus API
    data = query_api(alert["data"]["http"]["redirect"])

    # Create alert
    alert_output["urlhaus"] = {}
    alert_output["integration"] = "custom-urlhaus"
    alert_output["urlhaus"]["found"] = 0
    alert_output["urlhaus"]["source"] = {}
    alert_output["urlhaus"]["source"]["alert_id"] = alert["id"]
    alert_output["urlhaus"]["source"]["rule"] = alert["rule"]["id"]
    alert_output["urlhaus"]["source"]["description"] = alert["rule"]["description"]
    alert_output["urlhaus"]["source"]["url"] = alert["data"]["http"]["redirect"]
    url = alert["data"]["http"]["redirect"]
    # Check if urlhaus has any info about the url
    if in_database(data, url):
      alert_output["urlhaus"]["found"] = 1

    # Info about the url found in urlhaus
    if alert_output["urlhaus"]["found"] == 1:
        urlhaus_reference, url_status, url_date_added, url_threat, url_blacklist_spamhaus, url_blacklist_surbl, url_tags = collect(data)

        # Populate JSON Output object with urlhaus request
        alert_output["urlhaus"]["urlhaus_reference"] = urlhaus_reference
        alert_output["urlhaus"]["url_status"] = url_status
        alert_output["urlhaus"]["url_date_added"] = url_date_added
        alert_output["urlhaus"]["url_threat"] = url_threat
        alert_output["urlhaus"]["url_blacklist_spamhaus"] = url_blacklist_spamhaus
        alert_output["urlhaus"]["url_blacklist_surbl"] = url_blacklist_surbl
        alert_output["urlhaus"]["url_tags"] = url_tags


    debug(alert_output)

    return(alert_output)

def send_event(msg, agent = None):
    if not agent or agent["id"] == "000":
        string = '1:urlhaus:{0}'.format(json.dumps(msg))
    else:
        string = '1:[{0}] ({1}) {2}->urlhaus:{3}'.format(agent["id"], agent["name"], agent["ip"] if "ip" in agent else "any", json.dumps(msg))

    debug(string)
    sock = socket(AF_UNIX, SOCK_DGRAM)
    sock.connect(socket_addr)
    sock.send(string.encode())
    sock.close()

if __name__ == "__main__":
    try:
        # Read arguments
        bad_arguments = False
        if len(sys.argv) >= 4:
            msg = '{0} {1} {2} {3} {4}'.format(now, sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4] if len(sys.argv) > 4 else '')
            debug_enabled = (len(sys.argv) > 4 and sys.argv[4] == 'debug')
        else:
            msg = '{0} Wrong arguments'.format(now)
            bad_arguments = True

        # Logging the call
        f = open(log_file, 'a')
        f.write(msg +'\n')
        f.close()

        if bad_arguments:
            debug("# Exiting: Bad arguments.")
            sys.exit(1)

        # Main function
        main(sys.argv)

    except Exception as e:
        debug(str(e))
        raise

2. Change the ownership and permission for the /var/ossec/integrations/custom-urlhaus.py file:

# chmod 750 /var/ossec/integrations/custom-urlhaus.py
# chown root:wazuh /var/ossec/integrations/custom-urlhaus.py

3. Add the following configuration to the /var/ossec/etc/ossec.conf file to configure the URLhaus integration. This integration is triggered when rule ID 86601 triggers. Rule ID 86601 is an out-of-the-box Wazuh rule for detecting Suricata alerts:

<integration>
  <name>custom-urlhaus.py</name>
  <hook_url>https://urlhaus-api.abuse.ch/v1/url/</hook_url>
  <rule_id>86601</rule_id>
  <alert_format>json</alert_format>
</integration>

4. Add the following custom rule to the /var/ossec/etc/rules/local_rules.xml rule file. This rule generates an alert when URLhaus detects a malicious URL connection:

<group name="local, suricata,">
  <rule id="100004" level="10">
    <field name="urlhaus.url_threat">malware_download</field>
    <description>URLhaus: An endpoint connected to a url known for deploying malware.</description>
  </rule>
</group>

5. Restart the Wazuh manager to apply the configuration changes:

# systemctl restart wazuh-manager

Testing the integration

We test the Suricata integration by making a curl request using a malicious user agent. Run the command below:

# curl -A "BlackSun" http://pastebin.com/raw/ZkwP7zPF

The curl request containing the malicious URL will trigger the Suricata rule with ID 86601 as seen below.

URLhaus Suricata integration

This alert triggers the custom-urlhaus.py integration script to check URLhaus for the URL. URLhaus detects the URL as malicious and rule ID 100004 is triggered as seen below.

URLhaus detect

Conclusion

In this article, we integrated URLhaus API with Wazuh to check URLs and determine if they have been associated with malicious activity. This integration allowed us to retrieve information from URLhaus about browsing activity made on endpoints monitored by Wazuh. The information retrieved was subsequently used with rules to determine malicious activity.

Wazuh is a free and open source SIEM and XDR solution. Wazuh is deployed and managed on-premises, or on Wazuh cloud. Check out our community for support and updates.

References