URLhaus is a project operated by abuse.ch. The purpose of the project is to collect, track, and share malware URLs, to help network administrators and security analysts protect their networks from cyber threats. URLhaus also offers an API to query information about malicious URLs. Integrating this API with Wazuh can help organizations improve their ability to detect threats.
In this blog post, we show how to configure Wazuh to use URLhaus malicious URLs database to detect malicious URLs using the Wazuh Integrator module. We will also configure Suricata as an IDS to monitor URL connections on the monitored endpoint.
Requirements
- Wazuh 4.9.1 central components (Wazuh server, Wazuh indexer, Wazuh dashboard) installed using the Quickstart guide on an Ubuntu server.
- An Ubuntu 22.04 server. This endpoint has a Wazuh agent installed and enrolled to the Wazuh server.
Configuration
Perform the following steps to install and configure Suricata, integrate URLhaus, and configure custom rules.
Ubuntu endpoint
1. Install Suricata on the Ubuntu endpoint:
# add-apt-repository ppa:oisf/suricata-stable # apt-get update # apt-get install suricata -y
2. Download and extract the Emerging Threats Suricata ruleset:
# cd /tmp/ && curl -LO https://rules.emergingthreats.net/open/suricata-6.0.8/emerging.rules.tar.gz # tar -xvzf emerging.rules.tar.gz && sudo mkdir /etc/suricata/rules && sudo mv rules/*.rules /etc/suricata/rules/ # chmod 640 /etc/suricata/rules/*.rules
3. Modify Suricata settings in the /etc/suricata/suricata.yaml file and set the following variables:
HOME_NET: "<IP>/<subnet>" EXTERNAL_NET: "any" default-rule-path: /etc/suricata/rules rule-files: - "*.rules" # Global stats configuration stats: enabled: yes # Linux high speed capture support af-packet: - interface: enp0s3
Where:
<IP>/<subnet>represents the IP address of the Ubuntu endpoint. The subnet mask must also be specified, for example10.0.2.15/24Default-rule-pathspecifies the path where our Suricata rules are locatedInterfacerepresents the network interface you want to monitor. Replace the value with the interface name of the Ubuntu endpoint. For example, enp0s3.
The values for <IP>/<subnet> and Interface can be retrieved by running the ifconfig command as seen below:
# ifconfig
enp0s3: flags=4163<UP,BROADCAST,RUNNING,MULTICAST> mtu 1500
inet 10.0.2.15 netmask 255.255.255.0 broadcast 10.0.2.255
inet6 fe80::9ba2:9de3:57ad:64e5 prefixlen 64 scopeid 0x20<link>
ether 08:00:27:14:65:bd txqueuelen 1000 (Ethernet)
RX packets 6704315 bytes 1268472541 (1.1 GiB)
RX errors 0 dropped 0 overruns 0 frame 0
TX packets 4590192 bytes 569730548 (543.3 MiB)
TX errors 0 dropped 0 overruns 0 carrier 0 collisions 0
4. Restart the Suricata service to apply the changes:
# systemctl restart suricata
5. Add the following configuration to the /var/ossec/etc/ossec.conf file of the Wazuh agent. This allows the Wazuh agent to read the Suricata logs file:
<ossec_config>
<localfile>
<log_format>json</log_format>
<location>/var/log/suricata/eve.json</location>
</localfile>
</ossec_config>
6. Restart the Wazuh agent to apply the changes:
# systemctl restart wazuh-agent
Wazuh server
1. Create a file called custom-urlhaus.py in the directory /var/ossec/integrations/. This custom Wazuh integrator module script queries URLhause with a URL reported by Suricata to determine if it is a malicious or benign:
#!/var/ossec/framework/python/bin/python3
# Copyright (C) 2015-2022, Wazuh Inc.
import json
import sys
import time
import os
from socket import socket, AF_UNIX, SOCK_DGRAM
try:
import requests
from requests.auth import HTTPBasicAuth
except Exception as e:
print("No module 'requests' found. Install: pip install requests")
sys.exit(1)
# Global vars
debug_enabled = True
pwd = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
json_alert = {}
now = time.strftime("%a %b %d %H:%M:%S %Z %Y")
# Set paths
log_file = '{0}/logs/integrations.log'.format(pwd)
socket_addr = '{0}/queue/sockets/queue'.format(pwd)
def main(args):
debug("# Starting")
# Read args
alert_file_location = args[1]
debug("# File location")
debug(alert_file_location)
# Load alert. Parse JSON object.
with open(alert_file_location) as alert_file:
json_alert = json.load(alert_file)
debug("# Processing alert")
debug(json_alert)
# Request urlhaus info
msg = request_urlhaus_info(json_alert)
# If positive match, send event to Wazuh Manager
if msg:
send_event(msg, json_alert["agent"])
def debug(msg):
if debug_enabled:
msg = "{0}: {1}\n".format(now, msg)
print(msg)
f = open(log_file,"a")
f.write(msg)
f.close()
def collect(data):
urlhaus_reference = data['urlhaus_reference']
url_status = data['url_status']
url_date_added = data['date_added']
url_threat = data['threat']
url_blacklist_spamhaus = data['blacklists']['spamhaus_dbl']
url_blacklist_surbl = data['blacklists']['surbl']
url_tags = data['tags']
return urlhaus_reference, url_status, url_date_added, url_threat, url_blacklist_spamhaus, url_blacklist_surbl, url_tags
def in_database(data, url):
result = data['query_status']
debug(result)
if result == "ok":
return True
return False
def query_api(url):
params = {'url': url}
response = requests.post('https://urlhaus-api.abuse.ch/v1/url/', params)
json_response = response.json()
if json_response['query_status'] == 'ok':
data = json_response
debug(data)
return data
else:
alert_output = {}
alert_output["urlhaus"] = {}
alert_output["integration"] = "custom-urlhaus"
json_response = response.json()
debug("# Error: The URLHAUS integration encountered an error")
alert_output["urlhaus"]["error"] = response.status_code
alert_output["urlhaus"]["description"] = json_response["errors"][0]["detail"]
send_event(alert_output)
exit(0)
def request_urlhaus_info(alert):
alert_output = {}
# If there is no url address present in the alert. Exit.
if alert["data"]["http"]["redirect"] == None:
return(0)
# Request info using urlhaus API
data = query_api(alert["data"]["http"]["redirect"])
# Create alert
alert_output["urlhaus"] = {}
alert_output["integration"] = "custom-urlhaus"
alert_output["urlhaus"]["found"] = 0
alert_output["urlhaus"]["source"] = {}
alert_output["urlhaus"]["source"]["alert_id"] = alert["id"]
alert_output["urlhaus"]["source"]["rule"] = alert["rule"]["id"]
alert_output["urlhaus"]["source"]["description"] = alert["rule"]["description"]
alert_output["urlhaus"]["source"]["url"] = alert["data"]["http"]["redirect"]
url = alert["data"]["http"]["redirect"]
# Check if urlhaus has any info about the url
if in_database(data, url):
alert_output["urlhaus"]["found"] = 1
# Info about the url found in urlhaus
if alert_output["urlhaus"]["found"] == 1:
urlhaus_reference, url_status, url_date_added, url_threat, url_blacklist_spamhaus, url_blacklist_surbl, url_tags = collect(data)
# Populate JSON Output object with urlhaus request
alert_output["urlhaus"]["urlhaus_reference"] = urlhaus_reference
alert_output["urlhaus"]["url_status"] = url_status
alert_output["urlhaus"]["url_date_added"] = url_date_added
alert_output["urlhaus"]["url_threat"] = url_threat
alert_output["urlhaus"]["url_blacklist_spamhaus"] = url_blacklist_spamhaus
alert_output["urlhaus"]["url_blacklist_surbl"] = url_blacklist_surbl
alert_output["urlhaus"]["url_tags"] = url_tags
debug(alert_output)
return(alert_output)
def send_event(msg, agent = None):
if not agent or agent["id"] == "000":
string = '1:urlhaus:{0}'.format(json.dumps(msg))
else:
string = '1:[{0}] ({1}) {2}->urlhaus:{3}'.format(agent["id"], agent["name"], agent["ip"] if "ip" in agent else "any", json.dumps(msg))
debug(string)
sock = socket(AF_UNIX, SOCK_DGRAM)
sock.connect(socket_addr)
sock.send(string.encode())
sock.close()
if __name__ == "__main__":
try:
# Read arguments
bad_arguments = False
if len(sys.argv) >= 4:
msg = '{0} {1} {2} {3} {4}'.format(now, sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4] if len(sys.argv) > 4 else '')
debug_enabled = (len(sys.argv) > 4 and sys.argv[4] == 'debug')
else:
msg = '{0} Wrong arguments'.format(now)
bad_arguments = True
# Logging the call
f = open(log_file, 'a')
f.write(msg +'\n')
f.close()
if bad_arguments:
debug("# Exiting: Bad arguments.")
sys.exit(1)
# Main function
main(sys.argv)
except Exception as e:
debug(str(e))
raise
2. Change the ownership and permission for the /var/ossec/integrations/custom-urlhaus.py file:
# chmod 750 /var/ossec/integrations/custom-urlhaus.py # chown root:wazuh /var/ossec/integrations/custom-urlhaus.py
3. Add the following configuration to the /var/ossec/etc/ossec.conf file to configure the URLhaus integration. This integration is triggered when rule ID 86601 triggers. Rule ID 86601 is an out-of-the-box Wazuh rule for detecting Suricata alerts:
<integration> <name>custom-urlhaus.py</name> <hook_url>https://urlhaus-api.abuse.ch/v1/url/</hook_url> <rule_id>86601</rule_id> <alert_format>json</alert_format> </integration>
4. Add the following custom rule to the /var/ossec/etc/rules/local_rules.xml rule file. This rule generates an alert when URLhaus detects a malicious URL connection:
<group name="local, suricata,">
<rule id="100004" level="10">
<field name="urlhaus.url_threat">malware_download</field>
<description>URLhaus: An endpoint connected to a url known for deploying malware.</description>
</rule>
</group>
5. Restart the Wazuh manager to apply the configuration changes:
# systemctl restart wazuh-manager
Testing the integration
We test the Suricata integration by making a curl request using a malicious user agent. Run the command below:
# curl -A "BlackSun" http://pastebin.com/raw/ZkwP7zPF
The curl request containing the malicious URL will trigger the Suricata rule with ID 86601 as seen below.

This alert triggers the custom-urlhaus.py integration script to check URLhaus for the URL. URLhaus detects the URL as malicious and rule ID 100004 is triggered as seen below.

Conclusion
In this article, we integrated URLhaus API with Wazuh to check URLs and determine if they have been associated with malicious activity. This integration allowed us to retrieve information from URLhaus about browsing activity made on endpoints monitored by Wazuh. The information retrieved was subsequently used with rules to determine malicious activity.
Wazuh is a free and open source SIEM and XDR solution. Wazuh is deployed and managed on-premises, or on Wazuh cloud. Check out our community for support and updates.
References