AbuseIPDB is a project that helps systems administrators, webmasters, and security analysts check and report IP addresses involved in various categories of malicious attacks. It provides an API to check and report an IP address for malicious activity.
Wazuh supports integrating with external software using the integrator tool. Integrations are done by connecting the Wazuh manager with APIs of the software products through scripts. We currently support integrations with VirusTotal, Slack, and PagerDuty out of the box, while providing an option for creating custom integrations.
In this article, we take a look at configuring Wazuh to communicate with the AbuseIPDB API using integrator. The following are examined in this write up:
- Configuring the integrator tool for a custom integration.
- Preparing a python script to process alerts and perform AbuseIPDB API checks against the IP address in the source log.
- Creating rules based on the Confidence of Abuse rating.
For this integration, we use the following assets:
- Wazuh 4.2.5
- AbuseIPDB API for checking IP addresses: Access to this API requires an API key. Get it by registering on the AbuseIPDB website. Keep in mind that the free tier has a limit of 1,000 checks per day.
Use case
The specific use case we look at is generating alerts with additional AbuseIPDB information when public IP addresses perform SSH authentication against an endpoint in the network.
Based on this scenario, we add rules to our Wazuh server in the /var/ossec/etc/rules/local_rules.xml
file to trigger when there is a failed or successful SSH authentication attempt from a public IP.
<group name="local,syslog,sshd,"> <rule id="100002" level="5"> <if_sid>5716</if_sid> <match type="pcre2">\b(?!(10)|192\.168|172\.(2[0-9]|1[6-9]|3[0-1])|(25[6-9]|2[6-9][0-9]|[3-9][0-9][0-9]|99[1-9]))[0-9]{1,3}\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)</match> <description>sshd: Authentication failed from a public IP address $(srcip).</description> <group>authentication_failed,authentication_success,pci_dss_10.2.4,pci_dss_10.2.5,</group> </rule> <rule id="100003" level="5"> <if_sid>5715</if_sid> <match type="pcre2">\b(?!(10)|192\.168|172\.(2[0-9]|1[6-9]|3[0-1])|(25[6-9]|2[6-9][0-9]|[3-9][0-9][0-9]|99[1-9]))[0-9]{1,3}\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)</match> <description>sshd: Authentication succeeded from a public IP address $(srcip).</description> <group>authentication_failed,authentication_success,pci_dss_10.2.4,pci_dss_10.2.5,</group> </rule> </group>
Note
Make sure that the PCRE2 package is installed on the Wazuh manager so the rule regex matches.
Once the rule IDs 100002 or 100003 triggers an alert, the Wazuh integration script makes a request to the AbuseIPDB Check IP API endpoint and returns information about the IP address in the alert. This is subsequently used in a rule created based on the Confidence of Abuse score.
AbuseIPDB integration configuration
To create a custom integration, the Wazuh manager configuration file ossec.conf
has to be modified to add the integration block with the content below:
<integration> <name>custom-abuseipdb.py</name> <hook_url>https://api.abuseipdb.com/api/v2/check</hook_url> <api_key><YOUR_ABUSEIPDB_API_KEY></api_key> <rule_id>100002,100003</rule_id> <alert_format>json</alert_format> </integration>
The parameters used in the integration block are as follows:
- name: The name of the custom script that performs the integration. All custom script names must start with “custom-“.
- hook_url: This is the API URL provided by AbuseIPDB. This parameter is optional as it can be included in the integrator script.
- api_key: Key of the API that enables us to use it. This parameter is also optional for the same reason the use of the
hook_url
is optional. - rule_id: Sets the rules that will trigger this integration. In this article, we use the rule ID of our use case.
- alert_format: Indicates the format the script receives the alerts. The JSON format is recommended. When this parameter is not set, the script will receive the alerts in
full_log
format.
Writing the integration script
On the Wazuh server, we proceed to create a file called custom-abuseipdb.py
in /var/ossec/integrations/
. It is important to note that:
- The first line of the integration script must indicate its interpreter or else Wazuh will not know how to read and execute the script.
#!/var/ossec/framework/python/bin/python3
- Arguments for the API key, hook URL, and the alerts file can be hardcoded. If not, they are retrieved from the custom integration block.
# Read args alert_file_location = args[1] apikey = args[2] hook_url = args[3]
- The function
request_abuseipdb_info()
is the function that requests the IP address abuse information.
The full script is below:
#!/var/ossec/framework/python/bin/python3 # Copyright (C) 2015-2022, Wazuh Inc. import json import sys import time import os from socket import socket, AF_UNIX, SOCK_DGRAM try: import requests from requests.auth import HTTPBasicAuth except Exception as e: print("No module 'requests' found. Install: pip install requests") sys.exit(1) # Global vars debug_enabled = False pwd = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) json_alert = {} now = time.strftime("%a %b %d %H:%M:%S %Z %Y") # Set paths log_file = '{0}/logs/integrations.log'.format(pwd) socket_addr = '{0}/queue/sockets/queue'.format(pwd) def main(args): debug("# Starting") # Read args alert_file_location = args[1] apikey = args[2] debug("# API Key") debug(apikey) debug("# File location") debug(alert_file_location) # Load alert. Parse JSON object. with open(alert_file_location) as alert_file: json_alert = json.load(alert_file) debug("# Processing alert") debug(json_alert) # Request AbuseIPDB info msg = request_abuseipdb_info(json_alert,apikey) # If positive match, send event to Wazuh Manager if msg: send_event(msg, json_alert["agent"]) def debug(msg): if debug_enabled: msg = "{0}: {1}\n".format(now, msg) print(msg) f = open(log_file,"a") f.write(msg) f.close() def collect(data): abuse_confidence_score = data['abuseConfidenceScore'] country_code = data['countryCode'] usage_type = data['usageType'] isp = data['isp'] domain = data['domain'] total_reports = data['totalReports'] last_reported_at = data['lastReportedAt'] return abuse_confidence_score, country_code, usage_type, isp, domain, total_reports, last_reported_at def in_database(data, srcip): result = data['totalReports'] if result == 0: return False return True def query_api(srcip, apikey): params = {'maxAgeInDays': '90', 'ipAddress': srcip,} headers = { "Accept-Encoding": "gzip, deflate", 'Accept': 'application/json', "Key": apikey } response = requests.get('https://api.abuseipdb.com/api/v2/check',params=params, headers=headers) if response.status_code == 200: json_response = response.json() data = json_response["data"] return data else: alert_output = {} alert_output["abuseipdb"] = {} alert_output["integration"] = "custom-abuseipdb" json_response = response.json() debug("# Error: The AbuseIPDB encountered an error") alert_output["abuseipdb"]["error"] = response.status_code alert_output["abuseipdb"]["description"] = json_response["errors"][0]["detail"] send_event(alert_output) exit(0) def request_abuseipdb_info(alert, apikey): alert_output = {} # If there is no source ip address present in the alert. Exit. if not "srcip" in alert["data"]: return(0) # Request info using AbuseIPDB API data = query_api(alert["data"]["srcip"], apikey) # Create alert alert_output["abuseipdb"] = {} alert_output["integration"] = "custom-abuseipdb" alert_output["abuseipdb"]["found"] = 0 alert_output["abuseipdb"]["source"] = {} alert_output["abuseipdb"]["source"]["alert_id"] = alert["id"] alert_output["abuseipdb"]["source"]["rule"] = alert["rule"]["id"] alert_output["abuseipdb"]["source"]["description"] = alert["rule"]["description"] alert_output["abuseipdb"]["source"]["full_log"] = alert["full_log"] alert_output["abuseipdb"]["source"]["srcip"] = alert["data"]["srcip"] srcip = alert["data"]["srcip"] # Check if AbuseIPDB has any info about the srcip if in_database(data, srcip): alert_output["abuseipdb"]["found"] = 1 # Info about the IP found in AbuseIPDB if alert_output["abuseipdb"]["found"] == 1: abuse_confidence_score, country_code, usage_type, isp, domain, total_reports, last_reported_at = collect(data) # Populate JSON Output object with AbuseIPDB request alert_output["abuseipdb"]["abuse_confidence_score"] = abuse_confidence_score alert_output["abuseipdb"]["country_code"] = country_code alert_output["abuseipdb"]["usage_type"] = usage_type alert_output["abuseipdb"]["isp"] = isp alert_output["abuseipdb"]["domain"] = domain alert_output["abuseipdb"]["total_reports"] = total_reports alert_output["abuseipdb"]["last_reported_at"] = last_reported_at debug(alert_output) return(alert_output) def send_event(msg, agent = None): if not agent or agent["id"] == "000": string = '1:abuseipdb:{0}'.format(json.dumps(msg)) else: string = '1:[{0}] ({1}) {2}->abuseipdb:{3}'.format(agent["id"], agent["name"], agent["ip"] if "ip" in agent else "any", json.dumps(msg)) debug(string) sock = socket(AF_UNIX, SOCK_DGRAM) sock.connect(socket_addr) sock.send(string.encode()) sock.close() if __name__ == "__main__": try: # Read arguments bad_arguments = False if len(sys.argv) >= 4: msg = '{0} {1} {2} {3} {4}'.format(now, sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4] if len(sys.argv) > 4 else '') debug_enabled = (len(sys.argv) > 4 and sys.argv[4] == 'debug') else: msg = '{0} Wrong arguments'.format(now) bad_arguments = True # Logging the call f = open(log_file, 'a') f.write(msg +'\n') f.close() if bad_arguments: debug("# Exiting: Bad arguments.") sys.exit(1) # Main function main(sys.argv) except Exception as e: debug(str(e)) raise
This script reads the alerts JSON file and extracts the source IP. Then, a request is made to the AbuseIPDB API to check the reputation of the IP address that triggered the integration script.
Once the script has been created, the file owner and group are changed to root:ossec
and execution permissions are given.
chmod 750 /var/ossec/integrations/custom-abuseipdb.py chown root:ossec /var/ossec/integrations/custom-abuseipdb.py
Note
In versions of Wazuh above 4.2.5, the owner and group will be root:wazuh
Proceed to restart the Wazuh manager to apply the changes:
- For systemd-based Linux systems
systemctl restart wazuh-manager
- For SysV init-based Linux systems
service wazuh-manager restart
- For other Unix-based OS
/var/ossec/bin/wazuh-control restart
If <logall>
is set to yes
in the manager configuration file, we can see the results of the AbuseIPDB integration in /var/ossec/logs/archives/archives.log
each time the SSH authentication rules are triggered.
2022 Feb 07 11:28:34 (agent-name) x.x.x.x->abuseipdb {"abuseipdb": {"found": 1, "source": {"alert_id": "1644233310.887354", "rule": "100003", "description": "sshd: Authentication succeeded from a public IP address 64.62.197.132.", "full_log": "Dec 10 01:02:02 host sshd[1234]: Accepted none for root from 64.62.197.132 port 1066 ssh2", "srcip": "64.62.197.132"}, "abuse_confidence_score": 100, "country_code": "US", "usage_type": "Data Center/Web Hosting/Transit", "isp": "Synoptek", "domain": "synoptek.com", "total_reports": 97, "last_reported_at": "2022-02-05T12:36:30+00:00"}, "integration": "custom-abuseipdb"} 2022 Feb 07 11:28:36 (agent-name) x.x.x.x->abuseipdb {"abuseipdb": {"found": 1, "source": {"alert_id": "1644233310.887762", "rule": "100002", "description": "sshd: Authentication failed from a public IP address 212.192.241.132.", "full_log": "Dec 10 01:02:02 host sshd[1234]: Failed none for root from 212.192.241.132 port 1066 ssh2", "srcip": "212.192.241.132"}, "abuse_confidence_score": 100, "country_code": "NL", "usage_type": "Data Center/Web Hosting/Transit", "isp": "Des Capital B.V.", "domain": "des.capital", "total_reports": 141, "last_reported_at": "2022-01-14T14:30:13+00:00"}, "integration": "custom-abuseipdb"}
Creating rules with AbuseIPDB information
The response obtained from AbuseIPDB can be used to enrich alerts information. For example, we can alert about a public IP address that performed an SSH authentication and has an abuse confidence score that is not zero. We can create the following custom rules in /var/ossec/etc/rules/local_rules.xml
to do this, restarting the manager to make it effective:
<group name="local,syslog,sshd,"> . . . <rule id="100004" level="10"> <field name="abuseipdb.source.rule" type="pcre2">^100002$</field> <field name="abuseipdb.abuse_confidence_score" type="pcre2" negate="yes">^0$</field> <description>AbuseIPDB: SSH Authentication failed from a public IP address $(abuseipdb.source.srcip) with $(abuseipdb.abuse_confidence_score)% confidence of abuse.</description> <group>authentication_failed,pci_dss_10.2.4,pci_dss_10.2.5,</group> </rule> <rule id="100005" level="14"> <field name="abuseipdb.source.rule" type="pcre2">^100003$</field> <field name="abuseipdb.abuse_confidence_score" type="pcre2" negate="yes">^0$</field> <description>AbuseIPDB: SSH Authentication succeeded from a public IP address $(abuseipdb.source.srcip) with $(abuseipdb.abuse_confidence_score)% confidence of abuse.</description> <group>authentication_failed,pci_dss_10.2.4,pci_dss_10.2.5,</group> </rule> </group>
Testing the integration
The logs used for testing are:
Dec 10 01:02:02 host sshd[1234]: Failed none for root from 212.192.241.132 port 1066 ssh2 Dec 10 01:02:02 host sshd[1234]: Accepted none for root from 64.62.197.132 port 1066 ssh2
These rules can be triggered in a test via log injection on an endpoint enrolled to the Wazuh manager.
The following steps can be taken to inject logs on a Linux endpoint:
1. Create a log file in /var/log
touch /var/log/test.log
2. Add the test log file to the agent configuration file (ossec.conf) to monitor it.
<localfile> <log_format>syslog</log_format> <location>/var/log/test.log</location> </localfile>
3. Restart the agent.
/var/ossec/bin/wazuh-control restart
4. Save the test log into an injector file.
echo "Dec 10 01:02:02 host sshd[1234]: Failed none for root from 45.159.112.120 port 1066 ssh2" >> injector echo "Dec 10 01:02:02 host sshd[1234]: Accepted none for root from 64.62.197.132 port 1066 ssh2" >> injector
5. Inject the log into /var/log/test.log
to trigger the alert.
cat injector >> /var/log/test.log
Once the logs have been injected, we see the result of the log test on the Wazuh dashboard.
Conclusion
In this article, we integrated AbuseIPDB API with Wazuh to check IP addresses associated with malicious activity. This integration allowed us to retrieve information from AbuseIPDB about public IP addresses that attempted SSH authentication. The information retrieved was subsequently used with rules to improve the detection of known bad actors.