Infos:
- Used Zammad version: zammad:6.5.0-15
- Used Zammad installation type: ( docker-compose)
- Operating system: Ubuntu 22.04.5 LTS
- Browser + version: Version 135.0.3179.98 (Official build) (64-bit) edge
Expected behavior:
I just want ticket to be created with text
Actual behavior:
but it auto create url - href for some of the text from ticket.
Steps to reproduce the behavior:
#!/usr/bin/env python3
import sys
import json
import logging
import requests
from requests.exceptions import RequestException
CONFIG
DEFAULT_GROUP = “Users”
DEFAULT_CUSTOMER = “milan.patel@queensu.ca”
LOG_FILE = “/var/ossec/logs/zammad-integration.log”
LOGGING SETUP
logging.basicConfig(
filename=LOG_FILE,
level=logging.DEBUG,
format=“%(asctime)s %(levelname)s: %(message)s”,
datefmt=“%Y-%m-%d %H:%M:%S”
)
ZAMMAD_TOKEN = sys.argv[2]
ZAMMAD_URL = sys.argv[3]
def flatten_dict(d, parent_key=‘’, sep=‘.’):
items = {}
for k, v in d.items():
new_key = f"{parent_key}{sep}{k}" if parent_key else k
if isinstance(v, dict):
items.update(flatten_dict(v, new_key, sep=sep))
elif isinstance(v, list):
items[new_key] = ', '.join(filter(None, map(str, v)))
else:
items[new_key] = v
return items
def load_alerts(file_path):
alerts =
try:
with open(file_path, ‘r’) as f:
for line_number, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
alerts.append(json.loads(line))
except json.JSONDecodeError as je:
logging.error(f"Line {line_number}: JSON parse error: {je}“)
if not alerts:
logging.error(f"No valid alerts found in {file_path}”)
sys.exit(1)
except Exception as e:
logging.error(f"ALERT LOAD ERROR: {e}")
sys.exit(1)
logging.debug(f"Loaded {len(alerts)} alerts from {file_path}")
return alerts
def build_ticket_data(alert):
rule = alert.get(“rule”, {})
agent = alert.get(“agent”, {})
# --- derive a description for your ticket ---
description = (
rule.get("description")
or alert.get("data", {}).get("title")
or alert.get("syscheck", {}).get("path")
or alert.get("decoder", {}).get("name")
or "Wazuh Security Alert"
)
# --- build the summary lines (Markdown) ---
summary_lines = ["**Wazuh Alert Summary**"]
summary_lines.append(
f"Rule: `{rule.get('id','N/A')}` "
f"({rule.get('description','No Description')}) (Level {rule.get('level','N/A')})"
)
summary_lines.append(
f"Agent: {agent.get('name','N/A')} "
f"(ID: {agent.get('id','N/A')}, IP: {agent.get('ip','N/A')})"
)
summary_lines.append(f"Time: {alert.get('timestamp','N/A')}")
if alert.get("location"):
summary_lines.append(f"Location: `{alert.get('location')}`")
# (optional additional fields...)
# …
# --- flatten and build the details section ---
flat = flatten_dict(alert)
details_lines = []
for key in sorted(flat):
val = str(flat[key])
if "\n" in val:
details_lines.append(f"{key}:\n```\n{val}\n```")
else:
details_lines.append(f"{key}: `{val}`")
# --- assemble the full body ---
body = (
"\n\n".join(summary_lines)
+ "\n\n---\n\n---------------------------------Full Alert Details--------------------------------\n\n"
+ "\n\n".join(details_lines)
)
# --- return the Zammad payload, including note/Markdown hints ---
return {
"title": f"Wazuh Alert: {description[:80]}",
"group": DEFAULT_GROUP,
"customer": DEFAULT_CUSTOMER,
"article": {
"subject": f"[L{rule.get('level','?')}] {description[:60]}",
"body": body,
"type": "note", # ← tell Zammad this is a note
"content_type": "text/markdown", # ← tell it to render Markdown
"internal": False
}
}
def send_to_zammad(ticket):
headers = {
“Authorization”: f"Token token={ZAMMAD_TOKEN}“,
“Content-Type”: “application/json”
}
try:
resp = requests.post(ZAMMAD_URL, headers=headers, json=ticket, timeout=15)
resp.raise_for_status()
ticket_id = resp.json().get(“id”)
logging.info(f"Created Zammad ticket ID={ticket_id}”)
return True
except RequestException as e:
logging.error(f"API FAILURE: {e} — status={getattr(e.response,‘status_code’,None)}“)
if hasattr(e.response, “text”):
logging.debug(f"Response body: {e.response.text}”)
return False
def main():
alert_file = sys.argv[1]
logging.debug(f"Starting integration for file: {alert_file}")
alerts = load_alerts(alert_file)
for i, alert in enumerate(alerts, 1):
logging.debug(f"Processing alert #{i}")
ticket = build_ticket_data(alert)
if not send_to_zammad(ticket):
logging.error(f"Failed to send alert #{i}")
if name == “main”:
main()