URLhaus is a project operated by abuse.ch. The purpose of the project is to collect, track, and share malware URLs, helping network administrators and security analysts to protect their networks and customers from cyber threats. URLhaus also offers an API to query information about malicious URLs. Integrating this API with Wazuh can help organizations improve their ability to detect threats.
In this article, we will use a Python script to integrate the Wazuh manager with the URLhaus API. Here is a summary of the work done for this example:
- Setting up Suricata as a network IDS to analyze network traffic.
- Configuring Wazuh integrator component.
- Use of Python script to communicate with URLhaus API.
- Creating rules for alerting when a malicious URL is identified.
For this integration, we use the following assets:
- Wazuh 4.2.5
- Suricata 6.0.4
- URLhaus API for checking URLs: https://urlhaus-api.abuse.ch/v1/url/
Use case
For this example, we will generate alerts when an HTTP curl request is made to a malicious URL. We install Suricata on an endpoint with a Wazuh agent, to detect curl requests, and configure the Wazuh agent to collect Suricata alerts using the instructions given in this documentation.
We can test Suricata’s integration with Wazuh by trying the following HTTP curl request:
curl -A "BlackSun" www.google.com
The curl request should give the alert seen in the screenshot below:
URLhaus integration and configuration
Now that we have captured an alert with a URL, we can integrate URLhaus to perform a check on this URL to determine whether it is malicious or not. To do this, we create a custom integration by modifying the integration block of the Wazuh manager configuration file (ossec.conf
) with the content below:
<integration> <name>custom-urlhaus.py</name> <hook_url>https://urlhaus-api.abuse.ch/v1/url/</hook_url> <rule_id>86601</rule_id> <alert_format>json</alert_format> </integration>
The parameters used in the integration block are as follows:
- name: The name of the custom script that performs the integration. All custom script names must start with “custom-“.
- hook_url: This is the API URL provided by URLhaus.
- rule_id: The ID of the Wazuh rule that will trigger this integration.
- alert_format: Indicates the format in which the script receives the alerts.
Writing the integration script
On the Wazuh server, we proceed to create a file called custom-urlhaus.py
in the directory /var/ossec/integrations/
. It is important to note that:
- The first line of the integration script must indicate its interpreter or else Wazuh will not know how to read and execute the script.
#!/var/ossec/framework/python/bin/python3
- The function
request_urlhaus_info()
is the function that requests the URL data from URLhaus.
The full script is below:
#!/var/ossec/framework/python/bin/python3 # Copyright (C) 2015-2022, Wazuh Inc. import json import sys import time import os from socket import socket, AF_UNIX, SOCK_DGRAM try: import requests from requests.auth import HTTPBasicAuth except Exception as e: print("No module 'requests' found. Install: pip install requests") sys.exit(1) # Global vars debug_enabled = True pwd = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) json_alert = {} now = time.strftime("%a %b %d %H:%M:%S %Z %Y") # Set paths log_file = '{0}/logs/integrations.log'.format(pwd) socket_addr = '{0}/queue/sockets/queue'.format(pwd) def main(args): debug("# Starting") # Read args alert_file_location = args[1] debug("# File location") debug(alert_file_location) # Load alert. Parse JSON object. with open(alert_file_location) as alert_file: json_alert = json.load(alert_file) debug("# Processing alert") debug(json_alert) # Request urlhaus info msg = request_urlhaus_info(json_alert) # If positive match, send event to Wazuh Manager if msg: send_event(msg, json_alert["agent"]) def debug(msg): if debug_enabled: msg = "{0}: {1}\n".format(now, msg) print(msg) f = open(log_file,"a") f.write(msg) f.close() def collect(data): urlhaus_reference = data['urlhaus_reference'] url_status = data['url_status'] url_date_added = data['date_added'] url_threat = data['threat'] url_blacklist_spamhaus = data['blacklists']['spamhaus_dbl'] url_blacklist_surbl = data['blacklists']['surbl'] url_tags = data['tags'] return urlhaus_reference, url_status, url_date_added, url_threat, url_blacklist_spamhaus, url_blacklist_surbl, url_tags def in_database(data, url): result = data['query_status'] debug(result) if result == "ok": return True return False def query_api(url): params = {'url': url} response = requests.post('https://urlhaus-api.abuse.ch/v1/url/', params) json_response = response.json() if json_response['query_status'] == 'ok': data = json_response debug(data) return data else: alert_output = {} alert_output["urlhaus"] = {} alert_output["integration"] = "custom-urlhaus" json_response = response.json() debug("# Error: The URLHAUS integration encountered an error") alert_output["urlhaus"]["error"] = response.status_code alert_output["urlhaus"]["description"] = json_response["errors"][0]["detail"] send_event(alert_output) exit(0) def request_urlhaus_info(alert): alert_output = {} # If there is no url address present in the alert. Exit. if alert["data"]["http"]["redirect"] == None: return(0) # Request info using urlhaus API data = query_api(alert["data"]["http"]["redirect"]) # Create alert alert_output["urlhaus"] = {} alert_output["integration"] = "custom-urlhaus" alert_output["urlhaus"]["found"] = 0 alert_output["urlhaus"]["source"] = {} alert_output["urlhaus"]["source"]["alert_id"] = alert["id"] alert_output["urlhaus"]["source"]["rule"] = alert["rule"]["id"] alert_output["urlhaus"]["source"]["description"] = alert["rule"]["description"] alert_output["urlhaus"]["source"]["url"] = alert["data"]["http"]["redirect"] url = alert["data"]["http"]["redirect"] # Check if urlhaus has any info about the url if in_database(data, url): alert_output["urlhaus"]["found"] = 1 # Info about the url found in urlhaus if alert_output["urlhaus"]["found"] == 1: urlhaus_reference, url_status, url_date_added, url_threat, url_blacklist_spamhaus, url_blacklist_surbl, url_tags = collect(data) # Populate JSON Output object with urlhaus request alert_output["urlhaus"]["urlhaus_reference"] = urlhaus_reference alert_output["urlhaus"]["url_status"] = url_status alert_output["urlhaus"]["url_date_added"] = url_date_added alert_output["urlhaus"]["url_threat"] = url_threat alert_output["urlhaus"]["url_blacklist_spamhaus"] = url_blacklist_spamhaus alert_output["urlhaus"]["url_blacklist_surbl"] = url_blacklist_surbl alert_output["urlhaus"]["url_tags"] = url_tags debug(alert_output) return(alert_output) def send_event(msg, agent = None): if not agent or agent["id"] == "000": string = '1:urlhaus:{0}'.format(json.dumps(msg)) else: string = '1:[{0}] ({1}) {2}->urlhaus:{3}'.format(agent["id"], agent["name"], agent["ip"] if "ip" in agent else "any", json.dumps(msg)) debug(string) sock = socket(AF_UNIX, SOCK_DGRAM) sock.connect(socket_addr) sock.send(string.encode()) sock.close() if __name__ == "__main__": try: # Read arguments bad_arguments = False if len(sys.argv) >= 4: msg = '{0} {1} {2} {3} {4}'.format(now, sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4] if len(sys.argv) > 4 else '') debug_enabled = (len(sys.argv) > 4 and sys.argv[4] == 'debug') else: msg = '{0} Wrong arguments'.format(now) bad_arguments = True # Logging the call f = open(log_file, 'a') f.write(msg +'\n') f.close() if bad_arguments: debug("# Exiting: Bad arguments.") sys.exit(1) # Main function main(sys.argv) except Exception as e: debug(str(e)) raise
This script reads the alerts JSON file and extracts the URL. Then, a request is made to the URLhaus API to check if the URL that triggered the integration script has been flagged for malicious behavior.
Once the script has been created, the file owner and group are changed to root:ossec
, and execution permissions are given.
chmod 750 /var/ossec/integrations/custom-urlhaus.py chown root:ossec /var/ossec/integrations/custom-urlhaus.py
Note
In versions of Wazuh 4.3.0 or above, the owner and group will be root:wazuh
Proceed to restart the Wazuh manager to apply the changes:
- For systemd-based Linux systems
systemctl restart wazuh-manager
- For SysV init-based Linux systems
service wazuh-manager restart
- For other Unix-based OS
/var/ossec/bin/wazuh-control restart
If <logall>
is set to yes
in the manager configuration file, we can see the results of the URLhaus integration in /var/ossec/logs/archives/archives.log
each time a URL is found in the URLhaus malware DB.
2022 Feb 22 09:55:53 (wazuhagent006) 192.168.245.44->urlhaus {"urlhaus": {"found": 1, "source": {"alert_id": "1645541747.1543371", "rule": "86601", "description": "Suricata: Alert - ET POLICY curl User-Agent Outbound", "url": "https://pastebin[.]com/raw/ZkwP7zPF"}, "urlhaus_reference": "https://urlhaus.abuse.ch/url/2045738/", "url_status": "online", "url_date_added": "2022-02-16 21:28:04 UTC", "url_threat": "malware_download", "url_blacklist_spamhaus": "not listed", "url_blacklist_surbl": "not listed", "url_tags": ["PowerShellSMTPInfoStealer"]}, "integration": "custom-urlhaus"}
Creating rules with URLhaus information
The response obtained from URLhaus can be used to create alerts for various use cases. For example, we can alert about a URL that downloads malware, or one that is linked to a phishing campaign. We can create several custom rules in /var/ossec/etc/rules/local_rules.xml
to do this, restarting the manager to make it operational:
<group name="local, suricata,"> <rule id="100004" level="10"> <field name="urlhaus.url_threat">malware_download</field> <description>URLhaus: An endpoint connected to a url known for deploying malware.</description> </rule> </group>
Testing the integration
Make a curl request like below (without the parenthesis):
curl http://pastebin[.]com/raw/ZkwP7zPF
The curl request containing the malicious url will trigger the Suricata rules, which in turn trigger the URLhaus integration script and finally the Wazuh rule to give the output seen below:
Conclusion
In this article, we integrated URLhaus API with Wazuh to check URLs and determine if they have been associated with malicious activity. This integration allowed us to retrieve information from URLhaus about browsing activity made on endpoints monitored by Wazuh. The information retrieved was subsequently used with rules to determine malicious activity.