Integrating Wazuh with Yeti platform

| by | Wazuh 4.10.0
Post icon

Organizations face challenges connecting Cyber Threat Intelligence (CTI) and Digital Forensics and Incident Response (DFIR) efforts. Effective collaboration between these domains is necessary for addressing threats proactively and efficiently.

Yeti (Your Everyday Threat Intelligence) is an open source Forensics Intelligence platform that helps bridge the gap between CTI and DFIR efforts. It provides DFIR teams with a pipeline to tackle challenges such as tracking artifact origins or finding Indicators of Compromise (IOCs) linked to threats. The Yeti platform ingests various observables/insights, including malware hashes, phishing databases, IP reputation, and blocklists for services like Apache, FTP, SSH, SIP, and IMAP.

Wazuh helps organizations detect and respond to threats using its capabilities and actionable insights from platforms like Yeti. In this blog post, we will explore the following use cases for leveraging Yeti integration with Wazuh.

  • Detecting malware using the Malware Bazaar feed.
  • Identifying network activities involving abused IP addresses.

Infrastructure

We use the following infrastructure for this integration.

  • A pre-built, ready-to-use Wazuh OVA 4.10.1. Follow this guide to download the virtual machine.
  • An Ubuntu 24.04 endpoint with:
    • The Wazuh agent 4.10.1 installed and enrolled into the Wazuh server.
    • Docker installed. This is a prerequisite for Yeti deployment.
    • The Yeti platform deployed following these steps.

Note: Yeti uses ArangoDB for data storage, and versions from v3.9 onward require CPUs with AVX support. Check AVX compatibility with the command lscpu | grep -i avx. If unsupported, update the docker-compose.yml file to use ArangoDB v3.8.9.

Configuration

To demonstrate this integration, we enable threat intelligence feeds on the Yeti platform to enrich our database of observables. Next, we configure the Wazuh server and Ubuntu endpoint for the different use cases.

Yeti platform

1. Login to your Yeti server, click ADMIN > Users, and copy the API key. The API key is required to configure the Wazuh integration module.

2. Click AUTOMATION > Feeds, then use the toggle button to enable the necessary feeds for our use cases. Feeds in Yeti represent sources from which we can ingest observables.

  • Enable the AbuseCHMalwareBazaaar feed. This feed contains a list of malware hashes used to detect and identify the presence of malicious files on the system.
Yeti platform Toggle AbuseCHMalwareBazaaar feed
Figure 1: Toggle AbuseCHMalwareBazaaar feed.
  • Enable the AlienVaultIPReputation feed. This feed contains a list of abused IP addresses used to identify potentially malicious network activity from suspicious or compromised hosts.
Yeti Toggle AlienVaultIPReputation feed
Figure 2: Toggle AlienVaultIPReputation feed.

Wazuh server

Follow these steps on the Wazuh server to configure the integration module and script.

1. Create a file called custom-yeti.py in the /var/ossec/integrations/ directory and insert the script below:

#!/var/ossec/framework/python/bin/python3
import json
import os
import re
import sys
import requests
from requests.exceptions import Timeout
from socket import AF_UNIX, SOCK_DGRAM, socket

# Exit error codes
ERR_NO_REQUEST_MODULE = 1
ERR_BAD_ARGUMENTS = 2
ERR_BAD_MD5_SUM = 3
ERR_NO_RESPONSE_YETI = 4
ERR_SOCKET_OPERATION = 5
ERR_FILE_NOT_FOUND = 6
ERR_INVALID_JSON = 7

# Global vars
debug_enabled = True
timeout = 10
retries = 3
pwd = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
json_alert = {}

# Log and socket path
LOG_FILE = f'{pwd}/logs/integrations.log'
SOCKET_ADDR = f'{pwd}/queue/sockets/queue'

# Constants
ALERT_INDEX = 1
APIKEY_INDEX = 2
TIMEOUT_INDEX = 6
RETRIES_INDEX = 7
YETI_INSTANCE = 'http://<YETI_IP_ADDRESS>'

def debug(msg: str) -> None:
    """Log the message in the log file with the timestamp, if debug flag
    is enabled."""
    if debug_enabled:
        print(msg)
        with open(LOG_FILE, 'a') as f:
            f.write(msg + '\n')

def main(args):
    global debug_enabled
    global timeout
    global retries
    try:
        # Read arguments
        bad_arguments: bool = False
        msg = ''
        if len(args) >= 4:
            debug_enabled = len(args) > 4 and args[4] == 'debug'
            if len(args) > TIMEOUT_INDEX:
                timeout = int(args[TIMEOUT_INDEX])
            if len(args) > RETRIES_INDEX:
                retries = int(args[RETRIES_INDEX])
        else:
            msg = '# Error: Wrong arguments\n'
            bad_arguments = True

        # Logging the call
        with open(LOG_FILE, 'a') as f:
            f.write(msg)

        if bad_arguments:
            debug('# Error: Exiting, bad arguments. Inputted: %s' % args)
            sys.exit(ERR_BAD_ARGUMENTS)

        # Read args
        apikey: str = args[APIKEY_INDEX]

        # Obtain the access token
        access_token = getAccessToken(apikey)

        # Core function
        process_args(args, access_token)


    except Exception as e:
        debug(str(e))
        raise

def getAccessToken(apikey):
    """Exchange API key for a JWT access token."""

    url = f"{YETI_INSTANCE}/api/v2/auth/api-token"
    headers = {"x-yeti-apikey": apikey}
    try:
        response = requests.post(url, headers=headers)
        response.raise_for_status()
        access_token = response.json().get("access_token")
        if not access_token:
            raise ValueError("Access token missing in the response.")
        return access_token
    except requests.exceptions.RequestException as e:
        debug(f"Error obtaining access token from API: {e}")
        sys.exit(1)

def process_args(args, access_token: str) -> None:
    """This is the core function, creates a message with all valid fields
    and overwrite or add with the optional fields."""
    debug('# Running Yeti script')

    # Read args
    alert_file_location: str = args[ALERT_INDEX]

    # Load alert. Parse JSON object.
    json_alert = get_json_alert(alert_file_location)
    debug(f"# Opening alert file at '{alert_file_location}' with '{json_alert}'")

    # Determine the type of alert and process accordingly
    if 'data' in json_alert and ('sshd' in json_alert or 'srcip' in json_alert['data']):
        debug('# Detected an SSH-related alert')
        msg: any = request_ssh_info(json_alert, access_token)

    elif 'syscheck' in json_alert or 'md5_after' in json_alert['syscheck']:
        debug('# Detected a file integrity alert (MD5 check)')
        msg: any = request_md5_info(json_alert, access_token)

    else:
        debug('# Alert does not match known types (SSH or MD5). Skipping processing.')
        return None

    # If a valid message is generated, send it
    if msg:
        send_msg(msg, json_alert['agent'])
    else:
        debug('# No valid message generated. Skipping sending.')

def request_md5_info(alert: any, access_token: str):
    """Generate the JSON object with the message to be send."""
    alert_output = {'yeti': {}, 'integration': 'yeti'}

    # Extract MD5 hash
    md5_hash = alert['syscheck']['md5_after']

    # Validate MD5 hash
    if not isinstance(alert['syscheck']['md5_after'], str) or len(re.findall(r'\b([a-f\d]{32}|[A-F\d]{32})\b', alert['syscheck']['md5_after'])) != 1:
        debug(f"# Invalid md5_after value: '{alert['syscheck']['md5_after']}'")
        return None

    # Request info using Yeti API
    yeti_response_data = request_info_from_api(alert_output, access_token)

    if not yeti_response_data:
        debug("No data returned from the Yeti API.")
        return None
   
    alert_output['yeti']['source'] = {
        'alert_id': alert['id'],
        'file': alert['syscheck']['path'],
        'md5': alert['syscheck']['md5_after'],
        'sha1': alert['syscheck']['sha1_after'],
    }

    # Check if Yeti has any info about the hash
    """Filter YETI results for entries with source 'AbuseCHMalwareBazaaar'."""
    if not yeti_response_data:
        debug("No data returned from the YETI API.")
        return None

    for observable in yeti_response_data:
        if "context" in observable:
            for context_entry in observable["context"]:
                if context_entry.get("source") == "AbuseCHMalwareBazaaar" and context_entry.get("md5") == md5_hash:
                    alert_output['yeti'].update(
                        {
                        'info': {
                            'md5': md5_hash,
                            'filename': context_entry.get("filename"),
                            'first_seen': context_entry.get("first_seen"),
                            'reporter': context_entry.get("reporter"),
                            'date_added': context_entry.get("date_added"),
                            'source': "AbuseCHMalwareBazaaar",
                            }
                        }
                    )

                    return alert_output

    debug(f"No matching MD5 hash '{md5_hash}' found in YETI API for source 'AbuseCHMalwareBazaaar'.")
    return None

def request_ssh_info(alert: any, access_token: str):
    """Generate the JSON object with the message to be send."""
    alert_output = {'yeti': {}, 'integration': 'yeti'}

    # Extract source IP
    src_ip = alert['data']['srcip']

    # Inline validation of the source IP
    if not isinstance(src_ip, str):
        debug(f"# Invalid src_ip: '{src_ip}' is not a string")
        return None

    octets = src_ip.split('.')
    if len(octets) != 4 or not all(octet.isdigit() for octet in octets):
        debug(f"# Invalid src_ip format: '{src_ip}'")
        return None

    octets = list(map(int, octets))
    if (
        any(octet < 0 or octet > 255 for octet in octets) or  # Octet range validation
        octets[0] in [10, 127] or  # Exclude private (10.x.x.x) and loopback (127.x.x.x)
        (octets[0] == 192 and octets[1] == 168) or  # Exclude private (192.168.x.x)
        (octets[0] == 172 and 16 <= octets[1] <= 31) or  # Exclude private (172.16.x.x to 172.31.x.x)
        octets[0] >= 240  # Exclude reserved and invalid ranges (240.x.x.x and above)
    ):
        debug(f"# Invalid src_ip: '{src_ip}' is private, reserved, or out of range")
        return None

    # Request info using Yeti API
    yeti_response_data = request_info_from_api(alert_output, access_token)

    if not yeti_response_data:
        debug("No data returned from the Yeti API.")
        return None
   
    alert_output['yeti']['source'] = {
        'alert_id': alert['id'],
        'src_ip': alert['data']['srcip'],
        'src_port': alert['data']['srcport'],
        'dst_user': alert['data']['dstuser'],
    }

    # Check if Yeti has any info about the source IP
    """Filter YETI results for entries with source 'AlienVaultIPReputation'."""
    if not yeti_response_data:
        debug("No data returned from the YETI API.")
        return None

    for observable in yeti_response_data:
        for context_entry in observable.get("context", []):
            if context_entry.get("source") == "AlienVaultIPReputation":
                observable_value = observable.get("value")
                if observable_value == src_ip:
                    alert_output['yeti'].update(
                        {
                        'info': {
                            'country_code': context_entry.get("country"),
                            'threat': context_entry.get("threat"),
                            'reliability': context_entry.get("reliability"),
                            'risk': context_entry.get("risk"),
                            'source': "AlienVaultIPReputation",
                            }
                        }
                    )

                    return alert_output

    debug(f"No matching IP address '{src_ip}' found in YETI API for source 'AlienVaultIPReputation'.")
    return None

def request_info_from_api(alert_output, access_token):
    """Request information from Yeti API."""
    for attempt in range(retries + 1):
        try:
            yeti_response_data = query_api(access_token)
            return yeti_response_data
        except Timeout:
            debug('# Error: Request timed out. Remaining retries: %s' % (retries - attempt))
            continue
        except Exception as e:
            debug(str(e))
            sys.exit(ERR_NO_RESPONSE_YETI)

    debug('# Error: Request timed out and maximum number of retries was exceeded')
    alert_output['yeti']['error'] = 408
    alert_output['yeti']['description'] = 'Error: API request timed out'
    send_msg(alert_output)
    sys.exit(ERR_NO_RESPONSE_YETI)

def query_api(access_token: str) -> any:
    """Query the API for observables."""
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json"
    }
    debug('# Querying Yeti API')
    response = requests.get(
        f'{YETI_INSTANCE}/api/v2/observables/', headers=headers, timeout=timeout
    )
    if response.status_code == 200:
        return response.json()
    else:
        handle_api_error(response.status_code)

def handle_api_error(status_code):
    """Handle errors from the Yeti API."""
    alert_output = {}
    alert_output['yeti'] = {}
    alert_output['integration'] = 'yeti'

    if status_code == 401:
        alert_output['yeti']['error'] = status_code
        alert_output['yeti']['description'] = 'Error: Unauthorized. Check your API key.'
        send_msg(alert_output)
        raise Exception('# Error: Yeti credentials, required privileges error')
    elif status_code == 404:
        alert_output['yeti']['error'] = status_code
        alert_output['yeti']['description'] = 'Error: Resource not found.'
    elif status_code == 500:
        alert_output['yeti']['error'] = status_code
        alert_output['yeti']['description'] = 'Error: Internal server error.'
    else:
        alert_output['yeti']['error'] = status_code
        alert_output['yeti']['description'] = 'Error: API request failed.'

    send_msg(alert_output)
    raise Exception(f'# Error: Yeti API request failed with status code {status_code}')

def send_msg(msg: any, agent: any = None) -> None:
    if not agent or agent['id'] == '000':
        string = '1:yeti:{0}'.format(json.dumps(msg))
    else:
        location = '[{0}] ({1}) {2}'.format(agent['id'], agent['name'], agent['ip'] if 'ip' in agent else 'any')
        location = location.replace('|', '||').replace(':', '|:')
        string = '1:{0}->yeti:{1}'.format(location, json.dumps(msg))

    debug('# Request result from Yeti server: %s' % string)
    try:
        sock = socket(AF_UNIX, SOCK_DGRAM)
        sock.connect(SOCKET_ADDR)
        sock.send(string.encode())
        sock.close()
    except FileNotFoundError:
        debug('# Error: Unable to open socket connection at %s' % SOCKET_ADDR)
        sys.exit(ERR_SOCKET_OPERATION)

def get_json_alert(file_location: str) -> any:
    """Read JSON alert object from file."""
    try:
        with open(file_location) as alert_file:
            return json.load(alert_file)
    except FileNotFoundError:
        debug("# JSON file for alert %s doesn't exist" % file_location)
        sys.exit(ERR_FILE_NOT_FOUND)
    except json.decoder.JSONDecodeError as e:
        debug('Failed getting JSON alert. Error: %s' % e)
        sys.exit(ERR_INVALID_JSON)

if __name__ == '__main__':
    main(sys.argv)

Replace <YETI_IP_ADDRESS> with the IP address of the Yeti instance.

The script fetches observables from the Yeti API endpoint api/v2/observables/.

  • request_md5_info: This function processes file integrity alerts by validating the MD5 hash and querying the Yeti API for matches from ‘AbuseCHMalwareBazaaar’. If found, it collects details like the filename, first seen date, and reporter.
  • request_ssh_info: This function handles SSH-related alerts by validating the source IP and querying the Yeti API for matches from ‘AlienVaultIPReputation’. It collects details like country code, threat level, and risk score if a match exists.

Both functions enrich alerts with threat intelligence for incident response.

2. Assign permissions and ownership to the script:

# chmod 750 /var/ossec/integrations/custom-yeti.py
# chown root:wazuh /var/ossec/integrations/custom-yeti.py

3. Add the following configuration within the <ossec_config> block in the /var/ossec/etc/ossec.conf file to create a custom integration.

Replace <YETI_API_KEY> with the API key you obtained from the previous step.

<integration>
  <name>custom-yeti.py</name>
  <api_key><YETI_API_KEY></api_key>
  <group>syscheck,sshd</group>
  <alert_format>json</alert_format>
</integration>

The parameters used in the integration block are as follows:

  • name: The name of the custom script that performs the integration. All custom script names must start with “custom-“.
  • api_key: This is the Yeti platform API key obtained in the previous step.
  • group: Specifies the rule group that will trigger this integration. We use the sycheck and sshd groups for use cases 1 and 2 respectively.
  • alert_format: Specifies the format in which alerts are received by the script. The JSON format is recommended. When this parameter is not set, the script will receive the alerts in full_log format.

4. Create a file called yeti_rules.xml in the /var/ossec/etc/rules/ directory and insert the following custom rules:

<group name="yeti,">

    <rule id="100500" level="0">
        <decoded_as>json</decoded_as>
        <field name="integration">yeti</field>
        <description>yeti integration messages.</description>
        <options>no_full_log</options>
    </rule>

    <rule id="100501" level="12">
        <if_sid>100500</if_sid>
        <field name="yeti.info.source">AbuseCHMalwareBazaaar</field>
        <description>"Yeti Alert - " $(yeti.info.source) detected this file: $(yeti.source.file) </description>
        <group>pci_dss_10.6.1,pci_dss_11.4,gdpr_IV_35.7.d,</group>
        <options>no_full_log</options>
        <mitre>
            <id>T1203</id>
        </mitre>
    </rule>

     <rule id="100502" level="12">
        <if_sid>100500</if_sid>
        <field name="yeti.info.source">AlienVaultIPReputation</field>
        <description>"Yeti Alert - " $(yeti.info.source) detected IP address: $(yeti.source.src_ip) </description>
        <group>pci_dss_10.2.4,pci_dss_10.2.5,</group>
        <options>no_full_log</options>
    </rule>
</group>

Where:

  • 100500: Serves as the base rule, triggered when an alert with the integration value set to yeti is generated.
  • 100501: is triggered when a file’s MD5 hash matches an entry from the AbuseCHMalwareBazaar source, indicating malware detection.
  • 100502: is triggered when a network activity involves an IP address that matches an entry from the AlienVaultIPReputation source, indicating the detection of an abused IP address.

5. Restart the Wazuh manager to apply the changes:

# systemctl restart wazuh-manager

Ubuntu endpoint

Perform the following steps to prepare your Ubuntu endpoint for the integration with Yeti.

Use case 1: Detecting malware using the Malware Bazaar feed.

1. Add the following configuration inside the <syscheck> block of the /var/ossec/etc/ossec.conf file to enable the Wazuh File Integrity Monitoring (FIM) module for your home directory. Replace <USER_NAME> with your actual username.

<directories realtime="yes">/home/<USER_NAME></directories>

Use case 2: Identifying network activities involving abused IP addresses. 

1. Create a test.log file under /var/log folder:

# touch /var/log/test.log

2. Insert the configuration below in the /var/ossec/etc/ossec.conf file within the <ossec_config> block for log collection:

<localfile>
  <log_format>syslog</log_format>
  <location>/var/log/test.log</location>
</localfile>

3. Restart Wazuh agent to apply the changes:

# systemctl restart wazuh-agent

Testing the integration

Follow these steps to test the integration.

Use case 1: Detecting malware using the Malware Bazaar feed.

1. Download malware samples from MalwareBazaar Database, extract and save them in the current user home folder on the Ubuntu endpoint.

Use case 2: Identifying network activities involving abused IP addresses.

1. Enrich the test.log file using the commands below. This simulates SSH login attempts from malicious or abused IPs for testing purposes, as we do not have control over real malicious IP activity.

# echo "Jan 25 10:05:30 ubuntu sshd[3434]: Accepted password for aonymous from 61.0.168.149 port 55094 ssh2" >> /var/log/test.log
# echo "Jan 24 04:58:38 ubuntu sshd[2788]: Failed password for anonymous from 49.143.32.6 port 54647 ssh2" >> /var/log/test.log
# echo "Jan 24 04:58:47 ubuntu sshd[2788]: Connection reset by authenticating user anonymous 176.111.173.139 port 54647 [preauth]" >> /var/log/test.log
# echo "Jan 24 09:49:35 ubuntu sshd[5620]: Connection reset by invalid user test 110.89.11.143 port 56739 [preauth]" >> /var/log/test.log

Visualize the alerts on the Wazuh dashboard

The alerts below are generated after performing the integration tests for our use cases. Perform the following steps to visualize the alerts on the Wazuh dashboard:

1. Navigate to Threat intelligence > Threat Hunting.

2. Click + Add filter. Filter for rule.groups in the Field field.

3. Filter for is in the Operator field.

4. Filter for yeti in the Value field.

5. Click Save to enable the filter.

Wazuh alerts generated
Figure 3: Alerts generated.

Conclusion

Integrating Yeti with Wazuh enables efficient threat detection and response to cyber threats by combining threat intelligence with real-time monitoring.

This article demonstrated how to configure your Wazuh environment to detect malware using the Malware Bazaar feed and monitor network activities involving abused IP addresses. The integration streamlines threat detection and enhances incident response by bridging Cyber Threat Intelligence and DFIR workflows.

References