Organizations face challenges connecting Cyber Threat Intelligence (CTI) and Digital Forensics and Incident Response (DFIR) efforts. Effective collaboration between these domains is necessary for addressing threats proactively and efficiently.
Yeti (Your Everyday Threat Intelligence) is an open source Forensics Intelligence platform that helps bridge the gap between CTI and DFIR efforts. It provides DFIR teams with a pipeline to tackle challenges such as tracking artifact origins or finding Indicators of Compromise (IOCs) linked to threats. The Yeti platform ingests various observables/insights, including malware hashes, phishing databases, IP reputation, and blocklists for services like Apache, FTP, SSH, SIP, and IMAP.
Wazuh helps organizations detect and respond to threats using its capabilities and actionable insights from platforms like Yeti. In this blog post, we will explore the following use cases for leveraging Yeti integration with Wazuh.
- Detecting malware using the Malware Bazaar feed.
- Identifying network activities involving abused IP addresses.
Infrastructure
We use the following infrastructure for this integration.
- A pre-built, ready-to-use Wazuh OVA 4.12.0
- . Follow this guide to download the virtual machine. An Ubuntu 24.04 endpoint with:
- The Wazuh agent 4.12.0 installed and enrolled into the Wazuh server.
- Docker installed. This is a prerequisite for Yeti deployment.
- The Yeti platform deployed following these steps.
 
Note
Yeti uses ArangoDB for data storage, and versions from v3.9 onward require CPUs with AVX support. Check AVX compatibility with the command lscpu | grep -i avx. If unsupported, update the docker-compose.yml file to use ArangoDB v3.8.9.
Configuration
To demonstrate this integration, we enable threat intelligence feeds on the Yeti platform to enrich our database of observables. Next, we configure the Wazuh server and Ubuntu endpoint for the different use cases.
Yeti platform
- Click SYSTEM > Users, fill in the usernameandpassword, then click ADD USER. We will retrieve the API key from this user account to enable the integration:

- Click MANAGE API KEYS on the newly created user account:

- Click NEW API KEY, fill in the Key nameand selectallunderScopes, then click CREATE:

- Save the API key for later use, then close this view:

- Click AUTOMATION > Feeds, then use the toggle button to enable the necessary feeds for our use cases. Feeds in Yeti represent sources from which we can ingest observables.
- Enable the AbuseCHMalwareBazaaar feed. This feed contains a list of malware hashes used to detect and identify the presence of malicious files on the system.

- Enable the AlienVaultIPReputation feed. This feed contains a list of abused IP addresses used to identify potentially malicious network activity from suspicious or compromised hosts.

Note
Yeti may take a few moments to retrieve data from the feeds. The process is finished once the status displays Completed.
Wazuh server
Follow these steps on the Wazuh server to configure the integration module and script.
- Create a file called custom-yeti.pyin the/var/ossec/integrations/directory and insert the script below:
#!/var/ossec/framework/python/bin/python3
import json
import os
import re
import sys
import requests
import ipaddress
from requests.exceptions import Timeout
from socket import AF_UNIX, SOCK_DGRAM, socket
# Exit error codes
ERR_NO_REQUEST_MODULE = 1
ERR_BAD_ARGUMENTS = 2
ERR_BAD_MD5_SUM = 3
ERR_NO_RESPONSE_YETI = 4
ERR_SOCKET_OPERATION = 5
ERR_FILE_NOT_FOUND = 6
ERR_INVALID_JSON = 7
# Global vars
debug_enabled = True
timeout = 10
retries = 3
pwd = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
json_alert = {}
# Log and socket path
LOG_FILE = f'{pwd}/logs/integrations.log'
SOCKET_ADDR = f'{pwd}/queue/sockets/queue'
# Constants
ALERT_INDEX = 1
APIKEY_INDEX = 2
TIMEOUT_INDEX = 6
RETRIES_INDEX = 7
YETI_INSTANCE = 'http://<YETI_IP_ADDRESS>'
def debug(msg: str) -> None:
    """Log the message in the log file with the timestamp, if debug flag
    is enabled."""
    if debug_enabled:
        print(msg)
        with open(LOG_FILE, 'a') as f:
            f.write(msg + '\n')
def main(args):
    global debug_enabled
    global timeout
    global retries
    try:
        # Read arguments
        bad_arguments: bool = False
        msg = ''
        if len(args) >= 4:
            debug_enabled = len(args) > 4 and args[4] == 'debug'
            if len(args) > TIMEOUT_INDEX:
                timeout = int(args[TIMEOUT_INDEX])
            if len(args) > RETRIES_INDEX:
                retries = int(args[RETRIES_INDEX])
        else:
            msg = '# Error: Wrong arguments\n'
            bad_arguments = True
        # Logging the call
        with open(LOG_FILE, 'a') as f:
            f.write(msg)
        if bad_arguments:
            debug('# Error: Exiting, bad arguments. Inputted: %s' % args)
            sys.exit(ERR_BAD_ARGUMENTS)
        # Read args
        apikey: str = args[APIKEY_INDEX]
        # Obtain the access token
        access_token = getAccessToken(apikey)
        # Core function
        process_args(args, access_token)
    except Exception as e:
        debug(str(e))
        raise
def getAccessToken(apikey):
    """Exchange API key for a JWT access token."""
    url = f"{YETI_INSTANCE}/api/v2/auth/api-token"
    headers = {"x-yeti-apikey": apikey}
    try:
        response = requests.post(url, headers=headers)
        response.raise_for_status()
        access_token = response.json().get("access_token")
        if not access_token:
            raise ValueError("Access token missing in the response.")
        return access_token
    except requests.exceptions.RequestException as e:
        debug(f"Error obtaining access token from API: {e}")
        sys.exit(1)
def process_args(args, access_token: str) -> None:
    """This is the core function, creates a message with all valid fields
    and overwrite or add with the optional fields."""
    debug('# Running Yeti script')
    # Read args
    alert_file_location: str = args[ALERT_INDEX]
    # Load alert. Parse JSON object.
    json_alert = get_json_alert(alert_file_location)
    debug(f"# Opening alert file at '{alert_file_location}' with '{json_alert}'")
    # Determine the type of alert and process accordingly
    if 'data' in json_alert and ('sshd' in json_alert or 'srcip' in json_alert['data']):
        debug('# Detected an SSH-related alert')
        wazuh_info = json_alert['data']['srcip']
        msg: any = request_ssh_info(json_alert, access_token, wazuh_info)
    elif 'syscheck' in json_alert or 'md5_after' in json_alert['syscheck']:
        debug('# Detected a file integrity alert (md5 check)')
        wazuh_info = json_alert['syscheck']['md5_after']
        msg: any = request_md5_info(json_alert, access_token, wazuh_info)
    else:
        debug('# Alert does not match known types (SSH or MD5). Skipping processing.')
        return None
    # If a valid message is generated, send it
    if msg:
        send_msg(msg, json_alert['agent'])
    else:
        debug('# No valid message generated. Skipping sending.')
def request_md5_info(alert: any, access_token: str, wazuh_info: str):
    """Generate the JSON object with the message to be send."""
    alert_output = {'yeti': {}, 'integration': 'yeti'}
    # Validate md5 hash
    # if not isinstance(alert['syscheck']['md5_after'], str) or len(re.findall(r'\b([a-f\d]{32}|[A-F\d]{32})\b', alert['syscheck']['md5_after'])) != 1:
    #     debug(f"# Invalid md5_after value: '{alert['syscheck']['md5_after']}'")
    #     return None
    # Request info using Yeti API
    yeti_response_data = request_info_from_api(alert_output, access_token, wazuh_info)
    if not yeti_response_data:
        debug("No data returned from the Yeti API.")
        return None
   
    alert_output['yeti']['source'] = {
        'alert_id': alert['id'],
        'file': alert['syscheck']['path'],
        'md5': alert['syscheck']['md5_after'],
        'sha1': alert['syscheck']['sha1_after'],
    }
    # Check Yeti info about the hash
    if any("source" in ctx and ctx["source"].strip() for ctx in yeti_response_data.get("context", [])):
    # The source field is present and not empty
        alert_output['yeti'].update(
            {
            'info': {
                'name' : yeti_response_data.get('tags', [{}])[0].get("name"),
                'last_seen' : yeti_response_data.get('tags', [{}])[0].get("last_seen"),
                'Created' : yeti_response_data.get("created"),
                'type' : yeti_response_data.get("type"),    
                'source' : yeti_response_data.get('context', [{}])[0].get("source"),
            }})
        return alert_output
   
    else:
    # The source field is missing or empty
        alert_output['yeti'].update(
        {
        'info': {
            'name' : yeti_response_data.get('tags', [{}])[0].get("name"),
            'last_seen' : yeti_response_data.get('tags', [{}])[0].get("last_seen"),
            'Created' : yeti_response_data.get("created"),
            'type' : yeti_response_data.get("type"),
            'source' : "AbuseCHMalwareBazaaar",
        } })
        return alert_output
def is_valid_ip(ip_str):
    try:
        ip_obj = ipaddress.ip_address(ip_str)
        # Reject private, loopback, or reserved addresses
        if ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_reserved or ip_obj.is_multicast:
            debug(f"# IP address '{ip_str}' is private, loopback, or reserved.")
            return False
        return True
    except ValueError:
        debug(f"# IP address '{ip_str}' is not valid.")
        return False
   
def request_ssh_info(alert: any, access_token: str, wazuh_info):
    """Generate the JSON object with the message to be send."""
    alert_output = {'yeti': {}, 'integration': 'yeti'}
    # Inline validation of the source IP
    if not is_valid_ip(wazuh_info):
        return None
    # Request info using Yeti API
    yeti_response_data = request_info_from_api(alert_output, access_token, wazuh_info)
    if not yeti_response_data:
        debug("No data returned from the Yeti API.")
        return None
   
    alert_output['yeti']['source'] = {
        'alert_id': alert['id'],
        'src_ip': alert['data']['srcip'],
        'src_port': alert['data']['srcport'],
        'dst_user': alert['data']['dstuser'],
    }
    # Check Yeti  info about the source IP
    if any("source" in ctx and ctx["source"].strip() for ctx in yeti_response_data.get("context", [])):
        # The source field is present and not empty
        alert_output['yeti'].update({
            'info': {
                'country_code': yeti_response_data.get('context', [{}])[0].get("country"),
                'threat':  yeti_response_data.get('context', [{}])[0].get("threat"),
                'reliability': yeti_response_data.get('context', [{}])[0].get("reliability"),
                'risk':  yeti_response_data.get('context', [{}])[0].get("risk"),
                'name': yeti_response_data.get('tags', [{}])[0].get("name"),
                'source': yeti_response_data.get('context', [{}])[0].get("source"),
                'created': yeti_response_data.get("created"),
                'type' : yeti_response_data.get("type"),
            }    
            })
        return alert_output
   
    else:
        # The source field is missing or empty
        alert_output['yeti'].update({
            'info': {
                'country_code': yeti_response_data.get('context', [{}])[0].get("country"),
                'threat':  yeti_response_data.get('context', [{}])[0].get("threat"),
                'reliability': yeti_response_data.get('context', [{}])[0].get("reliability"),
                'risk':  yeti_response_data.get('context', [{}])[0].get("risk"),
                'name': yeti_response_data.get('tags', [{}])[0].get("name"),
                'source': "AlienVaultIPReputation",
                'created': yeti_response_data.get("created"),
                'type' : yeti_response_data.get("type"),
            }    
            })
        return alert_output
def request_info_from_api(alert_output, access_token, wazuh_info):
    """Request information from Yeti API."""
    for attempt in range(retries + 1):
        try:
            yeti_response_data = query_api(access_token, wazuh_info)
            return yeti_response_data
        except Timeout:
            debug('# Error: Request timed out. Remaining retries: %s' % (retries - attempt))
            continue
        except Exception as e:
            debug(str(e))
            sys.exit(ERR_NO_RESPONSE_YETI)
    debug('# Error: Request timed out and maximum number of retries was exceeded')
    alert_output['yeti']['error'] = 408
    alert_output['yeti']['description'] = 'Error: API request timed out'
    send_msg(alert_output)
    sys.exit(ERR_NO_RESPONSE_YETI)
def query_api(access_token: str, wazuh_info: str) -> any:
    """Query the API for observables."""
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json"
    }
    debug('# Querying Yeti API')
    response = requests.get(
        f'{YETI_INSTANCE}/api/v2/observables/?value={wazuh_info}', headers=headers, timeout=timeout
    )
    if response.status_code == 200:
        return response.json()
    else:
        handle_api_error(response.status_code)
def handle_api_error(status_code):
    """Handle errors from the Yeti API."""
    alert_output = {}
    alert_output['yeti'] = {}
    alert_output['integration'] = 'yeti'
    if status_code == 401:
        alert_output['yeti']['error'] = status_code
        alert_output['yeti']['description'] = 'Error: Unauthorized. Check your API key.'
        send_msg(alert_output)
        raise Exception('# Error: Yeti credentials, required privileges error')
    elif status_code == 404:
        alert_output['yeti']['error'] = status_code
        alert_output['yeti']['description'] = 'Error: Resource not found.'
    elif status_code == 500:
        alert_output['yeti']['error'] = status_code
        alert_output['yeti']['description'] = 'Error: Internal server error.'
    else:
        alert_output['yeti']['error'] = status_code
        alert_output['yeti']['description'] = 'Error: API request failed.'
    send_msg(alert_output)
    raise Exception(f'# Error: Yeti API request failed with status code {status_code}')
def send_msg(msg: any, agent: any = None) -> None:
    if not agent or agent['id'] == '000':
        string = '1:yeti:{0}'.format(json.dumps(msg))
    else:
        location = '[{0}] ({1}) {2}'.format(agent['id'], agent['name'], agent['ip'] if 'ip' in agent else 'any')
        location = location.replace('|', '||').replace(':', '|:')
        string = '1:{0}->yeti:{1}'.format(location, json.dumps(msg))
    debug('# Request result from Yeti server: %s' % string)
    try:
        sock = socket(AF_UNIX, SOCK_DGRAM)
        sock.connect(SOCKET_ADDR)
        sock.send(string.encode())
        sock.close()
    except FileNotFoundError:
        debug('# Error: Unable to open socket connection at %s' % SOCKET_ADDR)
        sys.exit(ERR_SOCKET_OPERATION)
def get_json_alert(file_location: str) -> any:
    """Read JSON alert object from file."""
    try:
        with open(file_location) as alert_file:
            return json.load(alert_file)
    except FileNotFoundError:
        debug("# JSON file for alert %s doesn't exist" % file_location)
        sys.exit(ERR_FILE_NOT_FOUND)
    except json.decoder.JSONDecodeError as e:
        debug('Failed getting JSON alert. Error: %s' % e)
        sys.exit(ERR_INVALID_JSON)
if __name__ == '__main__':
    main(sys.argv)
Replace <YETI_IP_ADDRESS> with the IP address of the Ubuntu endpoint.
The script fetches observables from the Yeti API endpoint api/v2/observables/.
- request_md5_info: This function processes file integrity alerts by validating the MD5 hash and querying the Yeti API for matches. If found, it collects details like the name, last seen, and source.
- request_ssh_info: This function handles SSH-related alerts by validating the source IP and querying the Yeti API for matches. It collects details like country code, threat level, reliability, and risk score if a match exists.
Both functions enrich alerts with threat intelligence for incident response.
- Assign permissions and ownership to the script:
# chmod 750 /var/ossec/integrations/custom-yeti.py # chown root:wazuh /var/ossec/integrations/custom-yeti.py
- Add the following configuration within the <ossec_config>block in the/var/ossec/etc/ossec.conffile to create a custom integration.
Replace <YETI_API_KEY> with the API key you obtained from the previous step.
<integration> <name>custom-yeti.py</name> <api_key><YETI_API_KEY></api_key> <group>syscheck,sshd</group> <alert_format>json</alert_format> </integration>
The parameters used in the integration block are as follows:
- name: The name of the custom script that performs the integration. All custom script names must start with “custom-“.
- api_key: This is the Yeti platform API key obtained in the previous step.
- group: Specifies the rule group that will trigger this integration. We use the- sycheckand- sshdgroups for use cases 1 and 2, respectively.
- alert_format: Specifies the format in which alerts are received by the script. The JSON format is recommended. When this parameter is not set, the script will receive the alerts in- full_logformat.
- Create a file called yeti_rules.xmlin the/var/ossec/etc/rules/directory and insert the following custom rules:
<group name="yeti,">
    <rule id="100500" level="0">
        <decoded_as>json</decoded_as>
        <field name="integration">yeti</field>
        <description>yeti integration messages.</description>
        <options>no_full_log</options>
    </rule>
    <rule id="100501" level="12">
        <if_sid>100500</if_sid>
        <field name="yeti.info.source">AbuseCHMalwareBazaaar</field>
        <description>"Yeti Alert - " $(yeti.info.source) detected this file: $(yeti.source.file) </description>
        <group>pci_dss_10.6.1,pci_dss_11.4,gdpr_IV_35.7.d,</group>
        <options>no_full_log</options>
        <mitre>
            <id>T1203</id>
        </mitre>
    </rule>
     <rule id="100502" level="12">
        <if_sid>100500</if_sid>
        <field name="yeti.info.source">AlienVaultIPReputation</field>
        <description>"Yeti Alert - " $(yeti.info.source) detected IP address: $(yeti.source.src_ip) </description>
        <group>pci_dss_10.2.4,pci_dss_10.2.5,</group>
        <options>no_full_log</options>
    </rule>
     <rule id="100503" level="12">
        <if_sid>100500</if_sid>
        <field name="yeti.info.source" type="pcre2">\w</field>
        <description>"Yeti Alert - " $(yeti.info.source) has detected a potential malicious activity </description>
        <options>no_full_log</options>
    </rule>
</group>
Where:
- 100500: Serves as the base rule, triggered when an alert with the integration value set to- yetiis generated.
- 100501: is triggered when a file’s MD5 hash matches an entry from the- AbuseCHMalwareBazaarsource, indicating malware detection.
- 100502: is triggered when a network activity involves an IP address that matches an entry from the- AlienVaultIPReputationsource, indicating the detection of an abused IP address.
- 100503: is triggered when an IP address or MD5 hash matches a Yeti source not covered by more specific rules. It indicates suspicious activity.
- Restart the Wazuh manager to apply the changes:
# systemctl restart wazuh-manager
Ubuntu endpoint
Perform the following steps to prepare your Ubuntu endpoint for the integration with Yeti.
Use case 1: Detecting malware using the Malware Bazaar feed.
- Add the following configuration inside the <syscheck>block of the/var/ossec/etc/ossec.conffile to enable the Wazuh File Integrity Monitoring (FIM) module for yourhomedirectory. Replace<USER_NAME>with your actual username.
<directories realtime="yes">/home/<USER_NAME></directories>
Use case 2: Identifying network activities involving abused IP addresses.
-  Create a test.logfile under/var/logfolder:
# touch /var/log/test.log
- Insert the configuration below in the /var/ossec/etc/ossec.conffile within the<ossec_config>block for log collection:
<localfile> <log_format>syslog</log_format> <location>/var/log/test.log</location> </localfile>
- Restart the Wazuh agent to apply the changes:
# systemctl restart wazuh-agent
Testing the integration
Follow these steps to test the integration.
Use case 1: Detecting malware using the Malware Bazaar feed.
- Download malware samples from MalwareBazaar Database, extract, and save them in the current user’s homefolder on the Ubuntu endpoint.
Use case 2: Identifying network activities involving abused IP addresses.
- Enrich the test.logfile using the commands below. This simulates SSH login attempts from malicious or abused IPs for testing purposes, as we do not have control over real malicious IP activity.
# echo "Jan 25 10:05:30 ubuntu sshd[3434]: Accepted password for aonymous from 180.151.24.60 port 55094 ssh2" >> /var/log/test.log # echo "Jan 24 04:58:38 ubuntu sshd[2788]: Failed password for anonymous from 222.77.181.28 port 54647 ssh2" >> /var/log/test.log # echo "Jan 24 04:58:47 ubuntu sshd[2788]: Connection reset by authenticating user anonymous 49.143.32.6 port 54647 [preauth]" >> /var/log/test.log # echo "Jan 24 09:49:35 ubuntu sshd[5620]: Connection reset by invalid user test 100.27.42.244 port 56739 [preauth]" >> /var/log/test.log
Visualize the alerts on the Wazuh dashboard
The alerts below are generated after performing the integration tests for our use cases. Perform the following steps to visualize the alerts on the Wazuh dashboard:
- Navigate to Threat intelligence > Threat Hunting > Events on the Wazuh dashboard.
- In the search bar, type rule.groups:yeti, and click Update.

Conclusion
Integrating Yeti with Wazuh enables efficient threat detection and response to cyber threats by combining threat intelligence with real-time monitoring.
This article demonstrated how to configure your Wazuh environment to detect malware using the Malware Bazaar feed and monitor network activities involving abused IP addresses. The integration streamlines threat detection and enhances incident response by bridging Cyber Threat Intelligence and DFIR workflows.
References
- Yeti platform documentation
- How to integrate external software using Integrator
- Detecting and removing malware using VirusTotal integration
