Detecting known bad actors with Wazuh and AbuseIPDB

| by Chris Bassey
Post icon

AbuseIPDB is a project that helps systems administrators, webmasters, and security analysts check and report IP addresses involved in various categories of malicious attacks. It provides an API to check and report an IP address for malicious activity.

Wazuh supports integrating with external software using the integrator tool. Integrations are done by connecting the Wazuh manager with APIs of the software products through scripts. We currently support integrations with VirusTotal, Slack, and PagerDuty out of the box, while providing an option for creating custom integrations.

In this article, we take a look at configuring Wazuh to communicate with the AbuseIPDB API using integrator. The following are examined in this write up:

  • Configuring the integrator tool for a custom integration.
  • Preparing a python script to process alerts and perform AbuseIPDB API checks against the IP address in the source log.
  • Creating rules based on the Confidence of Abuse rating.

For this integration, we use the following assets:

  1. Wazuh 4.2.5
  2. AbuseIPDB API for checking IP addresses: Access to this API requires an API key. Get it by registering on the AbuseIPDB website. Keep in mind that the free tier has a limit of 1,000 checks per day.

Use case

The specific use case we look at is generating alerts with additional AbuseIPDB information when public IP addresses perform SSH authentication against an endpoint in the network.

Based on this scenario, we add rules to our Wazuh server in the /var/ossec/etc/rules/local_rules.xml file to trigger when there is a failed or successful SSH authentication attempt from a public IP.

<group name="local,syslog,sshd,">
  <rule id="100002" level="5">
    <if_sid>5716</if_sid>
    <match type="pcre2">\b(?!(10)|192\.168|172\.(2[0-9]|1[6-9]|3[0-1])|(25[6-9]|2[6-9][0-9]|[3-9][0-9][0-9]|99[1-9]))[0-9]{1,3}\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)</match>
    <description>sshd: Authentication failed from a public IP address $(srcip).</description>
    <group>authentication_failed,authentication_success,pci_dss_10.2.4,pci_dss_10.2.5,</group>
  </rule>

  <rule id="100003" level="5">
    <if_sid>5715</if_sid>
    <match type="pcre2">\b(?!(10)|192\.168|172\.(2[0-9]|1[6-9]|3[0-1])|(25[6-9]|2[6-9][0-9]|[3-9][0-9][0-9]|99[1-9]))[0-9]{1,3}\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)</match>
    <description>sshd: Authentication succeeded from a public IP address $(srcip).</description>
    <group>authentication_failed,authentication_success,pci_dss_10.2.4,pci_dss_10.2.5,</group>
  </rule>
</group>

Note: Make sure that the PCRE2 package is installed on the Wazuh manager so the rule regex matches.

Once the rule IDs 100002 or 100003 triggers an alert, the Wazuh integration script makes a request to the AbuseIPDB Check IP API endpoint and returns information about the IP address in the alert. This is subsequently used in a rule created based on the Confidence of Abuse score.

AbuseIPDB integration configuration

To create a custom integration, the Wazuh manager configuration file ossec.conf has to be modified to add the integration block with the content below:

<integration>
  <name>custom-abuseipdb.py</name>
  <hook_url>https://api.abuseipdb.com/api/v2/check</hook_url>
  <api_key><YOUR_ABUSEIPDB_API_KEY></api_key>
  <rule_id>100002,100003</rule_id>
  <alert_format>json</alert_format>
</integration>

The parameters used in the integration block are as follows:

  • name: The name of the custom script that performs the integration. All custom script names must start with “custom-“.
  • hook_url: This is the API URL provided by AbuseIPDB. This parameter is optional as it can be included in the integrator script.
  • api_key: Key of the API that enables us to use it. This parameter is also optional for the same reason the use of the hook_url is optional.
  • rule_id: Sets the rules that will trigger this integration. In this article, we use the rule ID of our use case.
  • alert_format: Indicates the format the script receives the alerts. The JSON format is recommended. When this parameter is not set, the script will receive the alerts in full_log format.

Writing the integration script

On the Wazuh server, we proceed to create a file called custom-abuseipdb.py in /var/ossec/integrations/. It is important to note that:

  • The first line of the integration script must indicate its interpreter or else Wazuh will not know how to read and execute the script.
#!/var/ossec/framework/python/bin/python3
  •  Arguments for the API key, hook URL, and the alerts file can be hardcoded. If not, they are retrieved from the custom integration block.
# Read args
alert_file_location = args[1]
apikey = args[2]
hook_url = args[3]
  •  The function request_abuseipdb_info() is the function that requests the IP address abuse information.

The full script is below:

#!/var/ossec/framework/python/bin/python3
# Copyright (C) 2015-2022, Wazuh Inc.

import json
import sys
import time
import os
from socket import socket, AF_UNIX, SOCK_DGRAM

try:
    import requests
    from requests.auth import HTTPBasicAuth
except Exception as e:
    print("No module 'requests' found. Install: pip install requests")
    sys.exit(1)

# Global vars

debug_enabled = False
pwd = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
json_alert = {}
now = time.strftime("%a %b %d %H:%M:%S %Z %Y")

# Set paths
log_file = '{0}/logs/integrations.log'.format(pwd)
socket_addr = '{0}/queue/sockets/queue'.format(pwd)

def main(args):
    debug("# Starting")

    # Read args
    alert_file_location = args[1]
    apikey = args[2]

    debug("# API Key")
    debug(apikey)

    debug("# File location")
    debug(alert_file_location)

    # Load alert. Parse JSON object.
    with open(alert_file_location) as alert_file:
        json_alert = json.load(alert_file)
    debug("# Processing alert")
    debug(json_alert)

    # Request AbuseIPDB info
    msg = request_abuseipdb_info(json_alert,apikey)

    # If positive match, send event to Wazuh Manager
    if msg:
        send_event(msg, json_alert["agent"])

def debug(msg):
    if debug_enabled:
        msg = "{0}: {1}\n".format(now, msg)

        print(msg)

        f = open(log_file,"a")
        f.write(msg)
        f.close()

def collect(data):
  abuse_confidence_score = data['abuseConfidenceScore']
  country_code = data['countryCode']
  usage_type = data['usageType']
  isp = data['isp']
  domain = data['domain']
  total_reports = data['totalReports']
  last_reported_at = data['lastReportedAt']
  return abuse_confidence_score, country_code, usage_type, isp, domain, total_reports, last_reported_at

def in_database(data, srcip):
  result = data['totalReports']
  if result == 0:
    return False
  return True

def query_api(srcip, apikey):
  params = {'maxAgeInDays': '90', 'ipAddress': srcip,}
  headers = {
  "Accept-Encoding": "gzip, deflate",
  'Accept': 'application/json',
  "Key": apikey
  }
  response = requests.get('https://api.abuseipdb.com/api/v2/check',params=params, headers=headers)
  if response.status_code == 200:
      json_response = response.json()
      data = json_response["data"]
      return data
  else:
      alert_output = {}
      alert_output["abuseipdb"] = {}
      alert_output["integration"] = "custom-abuseipdb"
      json_response = response.json()
      debug("# Error: The AbuseIPDB encountered an error")
      alert_output["abuseipdb"]["error"] = response.status_code
      alert_output["abuseipdb"]["description"] = json_response["errors"][0]["detail"]
      send_event(alert_output)
      exit(0)

def request_abuseipdb_info(alert, apikey):
    alert_output = {}
    # If there is no source ip address present in the alert. Exit.
    if not "srcip" in alert["data"]:

      return(0)

    # Request info using AbuseIPDB API
    data = query_api(alert["data"]["srcip"], apikey)

    # Create alert
    alert_output["abuseipdb"] = {}
    alert_output["integration"] = "custom-abuseipdb"
    alert_output["abuseipdb"]["found"] = 0
    alert_output["abuseipdb"]["source"] = {}
    alert_output["abuseipdb"]["source"]["alert_id"] = alert["id"]
    alert_output["abuseipdb"]["source"]["rule"] = alert["rule"]["id"]
    alert_output["abuseipdb"]["source"]["description"] = alert["rule"]["description"]
    alert_output["abuseipdb"]["source"]["full_log"] = alert["full_log"]
    alert_output["abuseipdb"]["source"]["srcip"] = alert["data"]["srcip"]
    srcip = alert["data"]["srcip"]
    # Check if AbuseIPDB has any info about the srcip
    if in_database(data, srcip):
      alert_output["abuseipdb"]["found"] = 1

    # Info about the IP found in AbuseIPDB
    if alert_output["abuseipdb"]["found"] == 1:
        abuse_confidence_score, country_code, usage_type, isp, domain, total_reports, last_reported_at = collect(data)

        # Populate JSON Output object with AbuseIPDB request
        alert_output["abuseipdb"]["abuse_confidence_score"] = abuse_confidence_score
        alert_output["abuseipdb"]["country_code"] = country_code
        alert_output["abuseipdb"]["usage_type"] = usage_type
        alert_output["abuseipdb"]["isp"] = isp
        alert_output["abuseipdb"]["domain"] = domain
        alert_output["abuseipdb"]["total_reports"] = total_reports
        alert_output["abuseipdb"]["last_reported_at"] = last_reported_at

    debug(alert_output)

    return(alert_output)

def send_event(msg, agent = None):
    if not agent or agent["id"] == "000":
        string = '1:abuseipdb:{0}'.format(json.dumps(msg))
    else:
        string = '1:[{0}] ({1}) {2}->abuseipdb:{3}'.format(agent["id"], agent["name"], agent["ip"] if "ip" in agent else "any", json.dumps(msg))

    debug(string)
    sock = socket(AF_UNIX, SOCK_DGRAM)
    sock.connect(socket_addr)
    sock.send(string.encode())
    sock.close()

if __name__ == "__main__":
    try:
        # Read arguments
        bad_arguments = False
        if len(sys.argv) >= 4:
            msg = '{0} {1} {2} {3} {4}'.format(now, sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4] if len(sys.argv) > 4 else '')
            debug_enabled = (len(sys.argv) > 4 and sys.argv[4] == 'debug')
        else:
            msg = '{0} Wrong arguments'.format(now)
            bad_arguments = True

        # Logging the call
        f = open(log_file, 'a')
        f.write(msg +'\n')
        f.close()

        if bad_arguments:
            debug("# Exiting: Bad arguments.")
            sys.exit(1)

        # Main function
        main(sys.argv)

    except Exception as e:
        debug(str(e))
        raise

This script reads the alerts JSON file and extracts the source IP. Then, a request is made to the AbuseIPDB API to check the reputation of the IP address that triggered the integration script.

Once the script has been created, the file owner and group are changed to root:ossec and execution permissions are given.

chmod 750 /var/ossec/integrations/custom-abuseipdb.pychown
root:ossec /var/ossec/integrations/custom-abuseipdb.py

Note: In versions of Wazuh above 4.2.5, the owner and group will be root:wazuh

Proceed to restart the Wazuh manager to apply the changes:

  • For systemd-based Linux systems
systemctl restart wazuh-manager
  • For SysV init-based Linux systems
service wazuh-manager restart
  • For other Unix-based OS
/var/ossec/bin/wazuh-control restart

If <logall> is set to yes in the manager configuration file, we can see the results of the AbuseIPDB integration in /var/ossec/logs/archives/archives.log each time the SSH authentication rules are triggered.

2022 Feb 07 11:28:34 (agent-name) x.x.x.x->abuseipdb {"abuseipdb": {"found": 1, "source": {"alert_id": "1644233310.887354", "rule": "100003", "description": "sshd: Authentication succeeded from a public IP address 64.62.197.132.", "full_log": "Dec 10 01:02:02 host sshd[1234]: Accepted none for root from 64.62.197.132 port 1066 ssh2", "srcip": "64.62.197.132"}, "abuse_confidence_score": 100, "country_code": "US", "usage_type": "Data Center/Web Hosting/Transit", "isp": "Synoptek", "domain": "synoptek.com", "total_reports": 97, "last_reported_at": "2022-02-05T12:36:30+00:00"}, "integration": "custom-abuseipdb"}
2022 Feb 07 11:28:36 (agent-name) x.x.x.x->abuseipdb {"abuseipdb": {"found": 1, "source": {"alert_id": "1644233310.887762", "rule": "100002", "description": "sshd: Authentication failed from a public IP address 212.192.241.132.", "full_log": "Dec 10 01:02:02 host sshd[1234]: Failed none for root from 212.192.241.132 port 1066 ssh2", "srcip": "212.192.241.132"}, "abuse_confidence_score": 100, "country_code": "NL", "usage_type": "Data Center/Web Hosting/Transit", "isp": "Des Capital B.V.", "domain": "des.capital", "total_reports": 141, "last_reported_at": "2022-01-14T14:30:13+00:00"}, "integration": "custom-abuseipdb"}

Creating rules with AbuseIPDB information

The response obtained from AbuseIPDB can be used to enrich alerts information. For example, we can alert about a public IP address that performed an SSH authentication and has an abuse confidence score that is not zero. We can create the following custom rules in /var/ossec/etc/rules/local_rules.xml to do this, restarting the manager to make it effective:

<group name="local,syslog,sshd,">
.
.
.
  <rule id="100004" level="10">
    <field name="abuseipdb.source.rule" type="pcre2">^100002$</field>
    <field name="abuseipdb.abuse_confidence_score" type="pcre2" negate="yes">^0$</field>
    <description>AbuseIPDB: SSH Authentication failed from a public IP address $(srcip) with $(abuseipdb.abuse_confidence_score)% confidence of abuse.</description>
    <group>authentication_failed,pci_dss_10.2.4,pci_dss_10.2.5,</group>
  </rule>

  <rule id="100005" level="14">
    <field name="abuseipdb.source.rule" type="pcre2">^100003$</field>
    <field name="abuseipdb.abuse_confidence_score" type="pcre2" negate="yes">^0$</field>
    <description>AbuseIPDB: SSH Authentication succeeded from a public IP address $(srcip) with $(abuseipdb.abuse_confidence_score)% confidence of abuse.</description>
    <group>authentication_failed,pci_dss_10.2.4,pci_dss_10.2.5,</group>
  </rule>

</group>

Testing the integration

The logs used for testing are:

Dec 10 01:02:02 host sshd[1234]: Failed none for root from 212.192.241.132 port 1066 ssh2
Dec 10 01:02:02 host sshd[1234]: Accepted none for root from 64.62.197.132 port 1066 ssh2

These rules can be triggered in a test via log injection on an endpoint enrolled to the Wazuh manager.

The following steps can be taken to inject logs on a Linux endpoint:

  • Create a log file in /var/log
touch /var/log/test.log
  • Add the test log file to the agent configuration file (ossec.conf) to monitor it.
<localfile>
  <log_format>syslog</log_format>
  <location>/var/log/test.log</location>
</localfile>
  • Restart the agent.
/var/ossec/bin/wazuh-control restart
  • Save the test log into an injector file.
echo "Dec 10 01:02:02 host sshd[1234]: Failed none for root from 212.192.241.132 port 1066 ssh2" >> injector
echo "Dec 10 01:02:02 host sshd[1234]: Accepted none for root from 64.62.197.132 port 1066 ssh2" >> injector
  • Inject the log into /var/log/test.log to trigger the alert.
cat injector >> /var/log/test.log

Once the logs have been injected, we see the result of the log test on the Wazuh dashboard.

Wazuh dashboard

Conclusion

In this article, we integrated AbuseIPDB API with Wazuh to check IP addresses associated with malicious activity. This integration allowed us to retrieve information from AbuseIPDB about public IP addresses that attempted SSH authentication. The information retrieved was subsequently used with rules to improve the detection of known bad actors.