URLhaus is a project operated by abuse.ch. The purpose of the project is to collect, track, and share malware URLs, to help network administrators and security analysts protect their networks from cyber threats. URLhaus also offers an API to query information about malicious URLs. Integrating this API with Wazuh can help organizations improve their ability to detect threats.
In this blog post, we show how to configure Wazuh to use URLhaus malicious URLs database to detect malicious URLs using the Wazuh Integrator module. We will also configure Suricata as an IDS to monitor URL connections on the monitored endpoint.
Requirements
- Wazuh 4.9.1 central components (Wazuh server, Wazuh indexer, Wazuh dashboard) installed using the Quickstart guide on an Ubuntu server.
- An Ubuntu 22.04 server. This endpoint has a Wazuh agent installed and enrolled to the Wazuh server.
Configuration
Perform the following steps to install and configure Suricata, integrate URLhaus, and configure custom rules.
Ubuntu endpoint
1. Install Suricata on the Ubuntu endpoint:
# add-apt-repository ppa:oisf/suricata-stable # apt-get update # apt-get install suricata -y
2. Download and extract the Emerging Threats Suricata ruleset:
# cd /tmp/ && curl -LO https://rules.emergingthreats.net/open/suricata-6.0.8/emerging.rules.tar.gz # tar -xvzf emerging.rules.tar.gz && sudo mkdir /etc/suricata/rules && sudo mv rules/*.rules /etc/suricata/rules/ # chmod 640 /etc/suricata/rules/*.rules
3. Modify Suricata settings in the /etc/suricata/suricata.yaml
file and set the following variables:
HOME_NET: "<IP>/<subnet>" EXTERNAL_NET: "any" default-rule-path: /etc/suricata/rules rule-files: - "*.rules" # Global stats configuration stats: enabled: yes # Linux high speed capture support af-packet: - interface: enp0s3
Where:
<IP>/<subnet>
represents the IP address of the Ubuntu endpoint. The subnet mask must also be specified, for example10.0.2.15/24
Default-rule-path
specifies the path where our Suricata rules are locatedInterface
represents the network interface you want to monitor. Replace the value with the interface name of the Ubuntu endpoint. For example, enp0s3.
The values for <IP>/<subnet>
and Interface
can be retrieved by running the ifconfig
command as seen below:
# ifconfig
enp0s3: flags=4163<UP,BROADCAST,RUNNING,MULTICAST> mtu 1500 inet 10.0.2.15 netmask 255.255.255.0 broadcast 10.0.2.255 inet6 fe80::9ba2:9de3:57ad:64e5 prefixlen 64 scopeid 0x20<link> ether 08:00:27:14:65:bd txqueuelen 1000 (Ethernet) RX packets 6704315 bytes 1268472541 (1.1 GiB) RX errors 0 dropped 0 overruns 0 frame 0 TX packets 4590192 bytes 569730548 (543.3 MiB) TX errors 0 dropped 0 overruns 0 carrier 0 collisions 0
4. Restart the Suricata service to apply the changes:
# systemctl restart suricata
5. Add the following configuration to the /var/ossec/etc/ossec.conf
file of the Wazuh agent. This allows the Wazuh agent to read the Suricata logs file:
<ossec_config> <localfile> <log_format>json</log_format> <location>/var/log/suricata/eve.json</location> </localfile> </ossec_config>
6. Restart the Wazuh agent to apply the changes:
# systemctl restart wazuh-agent
Wazuh server
1. Create a file called custom-urlhaus.py
in the directory /var/ossec/integrations/
. This custom Wazuh integrator module script queries URLhause with a URL reported by Suricata to determine if it is a malicious or benign:
#!/var/ossec/framework/python/bin/python3 # Copyright (C) 2015-2022, Wazuh Inc. import json import sys import time import os from socket import socket, AF_UNIX, SOCK_DGRAM try: import requests from requests.auth import HTTPBasicAuth except Exception as e: print("No module 'requests' found. Install: pip install requests") sys.exit(1) # Global vars debug_enabled = True pwd = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) json_alert = {} now = time.strftime("%a %b %d %H:%M:%S %Z %Y") # Set paths log_file = '{0}/logs/integrations.log'.format(pwd) socket_addr = '{0}/queue/sockets/queue'.format(pwd) def main(args): debug("# Starting") # Read args alert_file_location = args[1] debug("# File location") debug(alert_file_location) # Load alert. Parse JSON object. with open(alert_file_location) as alert_file: json_alert = json.load(alert_file) debug("# Processing alert") debug(json_alert) # Request urlhaus info msg = request_urlhaus_info(json_alert) # If positive match, send event to Wazuh Manager if msg: send_event(msg, json_alert["agent"]) def debug(msg): if debug_enabled: msg = "{0}: {1}\n".format(now, msg) print(msg) f = open(log_file,"a") f.write(msg) f.close() def collect(data): urlhaus_reference = data['urlhaus_reference'] url_status = data['url_status'] url_date_added = data['date_added'] url_threat = data['threat'] url_blacklist_spamhaus = data['blacklists']['spamhaus_dbl'] url_blacklist_surbl = data['blacklists']['surbl'] url_tags = data['tags'] return urlhaus_reference, url_status, url_date_added, url_threat, url_blacklist_spamhaus, url_blacklist_surbl, url_tags def in_database(data, url): result = data['query_status'] debug(result) if result == "ok": return True return False def query_api(url): params = {'url': url} response = requests.post('https://urlhaus-api.abuse.ch/v1/url/', params) json_response = response.json() if json_response['query_status'] == 'ok': data = json_response debug(data) return data else: alert_output = {} alert_output["urlhaus"] = {} alert_output["integration"] = "custom-urlhaus" json_response = response.json() debug("# Error: The URLHAUS integration encountered an error") alert_output["urlhaus"]["error"] = response.status_code alert_output["urlhaus"]["description"] = json_response["errors"][0]["detail"] send_event(alert_output) exit(0) def request_urlhaus_info(alert): alert_output = {} # If there is no url address present in the alert. Exit. if alert["data"]["http"]["redirect"] == None: return(0) # Request info using urlhaus API data = query_api(alert["data"]["http"]["redirect"]) # Create alert alert_output["urlhaus"] = {} alert_output["integration"] = "custom-urlhaus" alert_output["urlhaus"]["found"] = 0 alert_output["urlhaus"]["source"] = {} alert_output["urlhaus"]["source"]["alert_id"] = alert["id"] alert_output["urlhaus"]["source"]["rule"] = alert["rule"]["id"] alert_output["urlhaus"]["source"]["description"] = alert["rule"]["description"] alert_output["urlhaus"]["source"]["url"] = alert["data"]["http"]["redirect"] url = alert["data"]["http"]["redirect"] # Check if urlhaus has any info about the url if in_database(data, url): alert_output["urlhaus"]["found"] = 1 # Info about the url found in urlhaus if alert_output["urlhaus"]["found"] == 1: urlhaus_reference, url_status, url_date_added, url_threat, url_blacklist_spamhaus, url_blacklist_surbl, url_tags = collect(data) # Populate JSON Output object with urlhaus request alert_output["urlhaus"]["urlhaus_reference"] = urlhaus_reference alert_output["urlhaus"]["url_status"] = url_status alert_output["urlhaus"]["url_date_added"] = url_date_added alert_output["urlhaus"]["url_threat"] = url_threat alert_output["urlhaus"]["url_blacklist_spamhaus"] = url_blacklist_spamhaus alert_output["urlhaus"]["url_blacklist_surbl"] = url_blacklist_surbl alert_output["urlhaus"]["url_tags"] = url_tags debug(alert_output) return(alert_output) def send_event(msg, agent = None): if not agent or agent["id"] == "000": string = '1:urlhaus:{0}'.format(json.dumps(msg)) else: string = '1:[{0}] ({1}) {2}->urlhaus:{3}'.format(agent["id"], agent["name"], agent["ip"] if "ip" in agent else "any", json.dumps(msg)) debug(string) sock = socket(AF_UNIX, SOCK_DGRAM) sock.connect(socket_addr) sock.send(string.encode()) sock.close() if __name__ == "__main__": try: # Read arguments bad_arguments = False if len(sys.argv) >= 4: msg = '{0} {1} {2} {3} {4}'.format(now, sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4] if len(sys.argv) > 4 else '') debug_enabled = (len(sys.argv) > 4 and sys.argv[4] == 'debug') else: msg = '{0} Wrong arguments'.format(now) bad_arguments = True # Logging the call f = open(log_file, 'a') f.write(msg +'\n') f.close() if bad_arguments: debug("# Exiting: Bad arguments.") sys.exit(1) # Main function main(sys.argv) except Exception as e: debug(str(e)) raise
2. Change the ownership and permission for the /var/ossec/integrations/custom-urlhaus.py
file:
# chmod 750 /var/ossec/integrations/custom-urlhaus.py # chown root:wazuh /var/ossec/integrations/custom-urlhaus.py
3. Add the following configuration to the /var/ossec/etc/ossec.conf
file to configure the URLhaus integration. This integration is triggered when rule ID 86601
triggers. Rule ID 86601
is an out-of-the-box Wazuh rule for detecting Suricata alerts:
<integration> <name>custom-urlhaus.py</name> <hook_url>https://urlhaus-api.abuse.ch/v1/url/</hook_url> <rule_id>86601</rule_id> <alert_format>json</alert_format> </integration>
4. Add the following custom rule to the /var/ossec/etc/rules/local_rules.xml
rule file. This rule generates an alert when URLhaus detects a malicious URL connection:
<group name="local, suricata,"> <rule id="100004" level="10"> <field name="urlhaus.url_threat">malware_download</field> <description>URLhaus: An endpoint connected to a url known for deploying malware.</description> </rule> </group>
5. Restart the Wazuh manager to apply the configuration changes:
# systemctl restart wazuh-manager
Testing the integration
We test the Suricata integration by making a curl request using a malicious user agent. Run the command below:
# curl -A "BlackSun" http://pastebin.com/raw/ZkwP7zPF
The curl request containing the malicious URL will trigger the Suricata rule with ID 86601
as seen below.
This alert triggers the custom-urlhaus.py
integration script to check URLhaus for the URL. URLhaus detects the URL as malicious and rule ID 100004
is triggered as seen below.
Conclusion
In this article, we integrated URLhaus API with Wazuh to check URLs and determine if they have been associated with malicious activity. This integration allowed us to retrieve information from URLhaus about browsing activity made on endpoints monitored by Wazuh. The information retrieved was subsequently used with rules to determine malicious activity.
Wazuh is a free and open source SIEM and XDR solution. Wazuh is deployed and managed on-premises, or on Wazuh cloud. Check out our community for support and updates.
References