Detecting malicious URLs using Wazuh and URLhaus

| by | Wazuh 4.2
Post icon

URLhaus is a project operated by abuse.ch. The purpose of the project is to collect, track, and share malware URLs, helping network administrators and security analysts to protect their networks and customers from cyber threats. URLhaus also offers an API to query information about malicious URLs. Integrating this API with Wazuh can help organizations improve their ability to detect threats.

In this article, we will use a Python script to integrate the Wazuh manager with the URLhaus API. Here is a summary of the work done for this example:

  • Setting up Suricata as a network IDS to analyze network traffic.
  • Configuring Wazuh integrator component.
  • Use of Python script to communicate with URLhaus API.
  • Creating rules for alerting when a malicious URL is identified.

For this integration, we use the following assets:

Use case

For this example, we will generate alerts when an HTTP curl request is made to a malicious URL. We install Suricata on an endpoint with a Wazuh agent, to detect curl requests, and configure the Wazuh agent to collect Suricata alerts using the instructions given in this documentation.

We can test Suricata’s integration with Wazuh by trying the following HTTP curl request:

curl -A "BlackSun" www.google.com

The curl request should give the alert seen in the screenshot below:

URLhaus integration and configuration

Now that we have captured an alert with a URL, we can integrate URLhaus to perform a check on this URL to determine whether it is malicious or not. To do this, we create a custom integration by modifying the integration block of the Wazuh manager configuration file (ossec.conf) with the content below:

<integration>
  <name>custom-urlhaus.py</name>
  <hook_url>https://urlhaus-api.abuse.ch/v1/url/</hook_url>
  <rule_id>86601</rule_id>
  <alert_format>json</alert_format>
</integration>

The parameters used in the integration block are as follows:

  • name: The name of the custom script that performs the integration. All custom script names must start with “custom-“.
  • hook_url: This is the API URL provided by URLhaus.
  • rule_id: The ID of the Wazuh rule that will trigger this integration.
  • alert_format: Indicates the format in which the script receives the alerts.

Writing the integration script

On the Wazuh server, we proceed to create a file called custom-urlhaus.py in the directory /var/ossec/integrations/. It is important to note that:

  • The first line of the integration script must indicate its interpreter or else Wazuh will not know how to read and execute the script.
#!/var/ossec/framework/python/bin/python3
  • The function request_urlhaus_info() is the function that requests the URL data from URLhaus.

The full script is below:

#!/var/ossec/framework/python/bin/python3
# Copyright (C) 2015-2022, Wazuh Inc.

import json
import sys
import time
import os
from socket import socket, AF_UNIX, SOCK_DGRAM

try:
    import requests
    from requests.auth import HTTPBasicAuth
except Exception as e:
    print("No module 'requests' found. Install: pip install requests")
    sys.exit(1)

# Global vars

debug_enabled = True
pwd = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
json_alert = {}
now = time.strftime("%a %b %d %H:%M:%S %Z %Y")

# Set paths
log_file = '{0}/logs/integrations.log'.format(pwd)
socket_addr = '{0}/queue/sockets/queue'.format(pwd)

def main(args):
    debug("# Starting")

    # Read args
    alert_file_location = args[1]

    debug("# File location")
    debug(alert_file_location)

    # Load alert. Parse JSON object.
    with open(alert_file_location) as alert_file:
        json_alert = json.load(alert_file)
    debug("# Processing alert")
    debug(json_alert)

    # Request urlhaus info
    msg = request_urlhaus_info(json_alert)

    # If positive match, send event to Wazuh Manager
    if msg:
        send_event(msg, json_alert["agent"])

def debug(msg):
    if debug_enabled:
        msg = "{0}: {1}\n".format(now, msg)

        print(msg)

        f = open(log_file,"a")
        f.write(msg)
        f.close()

def collect(data):
  urlhaus_reference = data['urlhaus_reference']
  url_status = data['url_status']
  url_date_added = data['date_added']
  url_threat = data['threat']
  url_blacklist_spamhaus = data['blacklists']['spamhaus_dbl']
  url_blacklist_surbl = data['blacklists']['surbl']
  url_tags = data['tags']
  return urlhaus_reference, url_status, url_date_added, url_threat, url_blacklist_spamhaus, url_blacklist_surbl, url_tags

def in_database(data, url):
  result = data['query_status']
  debug(result)
  if result == "ok":
    return True
  return False

def query_api(url):
  params = {'url': url}
  response = requests.post('https://urlhaus-api.abuse.ch/v1/url/', params)
  json_response = response.json()
  if json_response['query_status'] == 'ok':
      data = json_response
      debug(data)
      return data
  else:
      alert_output = {}
      alert_output["urlhaus"] = {}
      alert_output["integration"] = "custom-urlhaus"
      json_response = response.json()
      debug("# Error: The URLHAUS integration encountered an error")
      alert_output["urlhaus"]["error"] = response.status_code
      alert_output["urlhaus"]["description"] = json_response["errors"][0]["detail"]
      send_event(alert_output)
      exit(0)

def request_urlhaus_info(alert):
    alert_output = {}
    # If there is no url address present in the alert. Exit.
    if alert["data"]["http"]["redirect"] == None:
      return(0)

    # Request info using urlhaus API
    data = query_api(alert["data"]["http"]["redirect"])

    # Create alert
    alert_output["urlhaus"] = {}
    alert_output["integration"] = "custom-urlhaus"
    alert_output["urlhaus"]["found"] = 0
    alert_output["urlhaus"]["source"] = {}
    alert_output["urlhaus"]["source"]["alert_id"] = alert["id"]
    alert_output["urlhaus"]["source"]["rule"] = alert["rule"]["id"]
    alert_output["urlhaus"]["source"]["description"] = alert["rule"]["description"]
    alert_output["urlhaus"]["source"]["url"] = alert["data"]["http"]["redirect"]
    url = alert["data"]["http"]["redirect"]
    # Check if urlhaus has any info about the url
    if in_database(data, url):
      alert_output["urlhaus"]["found"] = 1

    # Info about the url found in urlhaus
    if alert_output["urlhaus"]["found"] == 1:
        urlhaus_reference, url_status, url_date_added, url_threat, url_blacklist_spamhaus, url_blacklist_surbl, url_tags = collect(data)

        # Populate JSON Output object with urlhaus request
        alert_output["urlhaus"]["urlhaus_reference"] = urlhaus_reference
        alert_output["urlhaus"]["url_status"] = url_status
        alert_output["urlhaus"]["url_date_added"] = url_date_added
        alert_output["urlhaus"]["url_threat"] = url_threat
        alert_output["urlhaus"]["url_blacklist_spamhaus"] = url_blacklist_spamhaus
        alert_output["urlhaus"]["url_blacklist_surbl"] = url_blacklist_surbl
        alert_output["urlhaus"]["url_tags"] = url_tags


    debug(alert_output)

    return(alert_output)

def send_event(msg, agent = None):
    if not agent or agent["id"] == "000":
        string = '1:urlhaus:{0}'.format(json.dumps(msg))
    else:
        string = '1:[{0}] ({1}) {2}->urlhaus:{3}'.format(agent["id"], agent["name"], agent["ip"] if "ip" in agent else "any", json.dumps(msg))

    debug(string)
    sock = socket(AF_UNIX, SOCK_DGRAM)
    sock.connect(socket_addr)
    sock.send(string.encode())
    sock.close()

if __name__ == "__main__":
    try:
        # Read arguments
        bad_arguments = False
        if len(sys.argv) >= 4:
            msg = '{0} {1} {2} {3} {4}'.format(now, sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4] if len(sys.argv) > 4 else '')
            debug_enabled = (len(sys.argv) > 4 and sys.argv[4] == 'debug')
        else:
            msg = '{0} Wrong arguments'.format(now)
            bad_arguments = True

        # Logging the call
        f = open(log_file, 'a')
        f.write(msg +'\n')
        f.close()

        if bad_arguments:
            debug("# Exiting: Bad arguments.")
            sys.exit(1)

        # Main function
        main(sys.argv)

    except Exception as e:
        debug(str(e))
        raise

This script reads the alerts JSON file and extracts the URL. Then, a request is made to the URLhaus API to check if the URL that triggered the integration script has been flagged for malicious behavior.

Once the script has been created, the file owner and group are changed to root:ossec, and execution permissions are given.

chmod 750 /var/ossec/integrations/custom-urlhaus.py
chown root:ossec /var/ossec/integrations/custom-urlhaus.py

Note: In versions of Wazuh 4.3.0 or above, the owner and group will be root:wazuh

Proceed to restart the Wazuh manager to apply the changes:

  • For systemd-based Linux systems
systemctl restart wazuh-manager
  • For SysV init-based Linux systems
service wazuh-manager restart
  • For other Unix-based OS
/var/ossec/bin/wazuh-control restart

If <logall> is set to yes in the manager configuration file, we can see the results of the URLhaus integration in /var/ossec/logs/archives/archives.log each time a URL is found in the URLhaus malware DB.

2022 Feb 22 09:55:53 (wazuhagent006) 192.168.245.44->urlhaus {"urlhaus": {"found": 1, "source": {"alert_id": "1645541747.1543371", "rule": "86601", "description": "Suricata: Alert - ET POLICY curl User-Agent Outbound", "url": "https://pastebin[.]com/raw/ZkwP7zPF"}, "urlhaus_reference": "https://urlhaus.abuse.ch/url/2045738/", "url_status": "online", "url_date_added": "2022-02-16 21:28:04 UTC", "url_threat": "malware_download", "url_blacklist_spamhaus": "not listed", "url_blacklist_surbl": "not listed", "url_tags": ["PowerShellSMTPInfoStealer"]}, "integration": "custom-urlhaus"}

Creating rules with URLhaus information

The response obtained from URLhaus can be used to create alerts for various use cases. For example, we can alert about a URL that downloads malware, or one that is linked to a phishing campaign. We can create several custom rules in /var/ossec/etc/rules/local_rules.xml to do this, restarting the manager to make it operational:

<group name=”local, suricata,”>

<rule id="100004" level="10">
  <field name="urlhaus.url_threat">malware_download</field>
  <description>URLhaus: An endpoint connected to a url known for deploying malware.</description>
</rule>

</group>

Testing the integration

Make a curl request like below (without the parenthesis):

curl http://pastebin[.]com/raw/ZkwP7zPF

The curl request containing the malicious url will trigger the Suricata rules, which in turn trigger the URLhaus integration script and finally the Wazuh rule to give the output seen below:

The curl request containing the malicious url will trigger the Suricata rules

Conclusion

In this article, we integrated URLhaus API with Wazuh to check URLs and determine if they have been associated with malicious activity. This integration allowed us to retrieve information from URLhaus about browsing activity made on endpoints monitored by Wazuh. The information retrieved was subsequently used with rules to determine malicious activity.