On branch main

Initial commit

 Changes to be committed:
	new file:   README.md
	new file:   abx/apply_nsx_tags_for_tiers/README.md
	new file:   abx/apply_nsx_tags_for_tiers/action.py
	new file:   abx/list_vcenter_vms/README.md
	new file:   abx/list_vcenter_vms/action.py
	new file:   abx/send_email/README.md
	new file:   abx/send_email/action.py
	new file:   blueprints/forms/vdefend-form.json
	new file:   blueprints/vdefend-form-driven.yaml
This commit is contained in:
fultonbr
2025-09-18 09:40:08 -05:00
commit 24ff9fd2fc
9 changed files with 446 additions and 0 deletions

View File

@@ -0,0 +1,14 @@
# ABX: apply_nsx_tags_for_tiers (Python 3)
Applies NSX tags to selected VMs for tier and environment. Use as a **Deployment Completed** subscription so it runs after the Terraform policy is created.
## Inputs
- nsx_manager_url, nsx_username, nsx_password
- env_value: prod|test|dev (or any string)
- vm_web: array of VM names
- vm_app: array of VM names
- vm_db: array of VM names
## Behavior
- Looks up each VM by display_name in NSX Fabric VMs.
- Adds tags: `{scope: 'tier', tag: 'web|app|db'}` and `{scope: 'env', tag: env_value}`.

View File

@@ -0,0 +1,50 @@
import requests, json
def _session(nm, user, pwd):
s = requests.Session()
s.verify = False
s.auth = (user, pwd)
s.headers.update({"Content-Type": "application/json"})
return s
def _find_by_name(s, nm, name):
# A simple scan; for large estates consider filtering parameters & paging
r = s.get(f"{nm}/api/v1/fabric/virtual-machines")
r.raise_for_status()
for vm in r.json().get("results", []):
if vm.get("display_name") == name or vm.get("vm_name") == name:
return vm.get("external_id")
return None
def _apply_tags(s, nm, external_id, tags):
body = {"external_id": external_id, "tags": tags}
r = s.post(f"{nm}/api/v1/fabric/virtual-machines", params={"action": "add_tags"}, data=json.dumps(body))
if r.status_code not in (200,204):
raise Exception(f"Tagging failed: {r.status_code} {r.text}")
def handler(context, inputs):
nm = inputs["nsx_manager_url"]
user = inputs["nsx_username"]
pwd = inputs["nsx_password"]
env_value = inputs["env_value"]
tiers = {
"web": inputs.get("vm_web") or [],
"app": inputs.get("vm_app") or [],
"db": inputs.get("vm_db") or [],
}
s = _session(nm, user, pwd)
applied = []
for tier, names in tiers.items():
for name in names:
extid = _find_by_name(s, nm, name)
if not extid:
applied.append({"name": name, "tier": tier, "status": "not_found"})
continue
tags = [{"scope":"tier","tag":tier},{"scope":"env","tag":env_value}]
_apply_tags(s, nm, extid, tags)
applied.append({"name": name, "tier": tier, "status": "tagged"})
return {"result": applied}

View File

@@ -0,0 +1,11 @@
# ABX: list_vcenter_vms (Python 3)
Returns an array of VM names from vCenter for use in the Custom Form.
## Constants (recommended)
- VCENTER_SERVER (e.g., https://vcsa.lab.legionitgroup.com)
- VCENTER_USERNAME
- VCENTER_PASSWORD
## Output format
Return a JSON array of strings (VM names) or objects `{ "text": "vm-name", "value": "vm-name" }`.

View File

@@ -0,0 +1,33 @@
import os, requests
def _login(session, base):
# Try modern API session (vSphere 8/9)
r = session.post(f"{base}/api/session")
if r.status_code in (200, 204):
return
# Fallback to legacy vAPI
r = session.post(f"{base}/rest/com/vmware/cis/session")
if r.status_code not in (200, 201):
raise Exception(f"vCenter login failed: {r.status_code} {r.text}")
def handler(context, inputs):
base = os.getenv("VCENTER_SERVER") or inputs.get("vcenter_server")
user = os.getenv("VCENTER_USERNAME") or inputs.get("vcenter_username")
pwd = os.getenv("VCENTER_PASSWORD") or inputs.get("vcenter_password")
if not (base and user and pwd):
raise Exception("Missing vCenter credentials/server. Set VCENTER_* constants or pass in inputs.")
s = requests.Session()
s.verify = False
s.auth = (user, pwd)
_login(s, base)
# Try modern inventory endpoint
r = s.get(f"{base}/api/vcenter/vm")
if r.status_code == 200:
vms = [vm.get("name") for vm in r.json() if vm.get("name")]
else:
# Fallback legacy
r = s.get(f"{base}/rest/vcenter/vm")
r.raise_for_status()
vms = [vm.get("name") for vm in r.json().get("value", []) if vm.get("name")]
# Return array for multi-select
return vms

9
abx/send_email/README.md Normal file
View File

@@ -0,0 +1,9 @@
# ABX: send_email (Python 3)
Sends a summary email to the requester with the created section/groups and the chosen ports.
## Inputs
- smtp_host, smtp_port (25 default), smtp_user/smtp_pass (optional), use_tls (bool)
- from_email, to_email, subject, body (text)
This action is generic; blueprint post-processing can construct a friendly body.

34
abx/send_email/action.py Normal file
View File

@@ -0,0 +1,34 @@
import smtplib, ssl, os
from email.mime.text import MIMEText
def handler(context, inputs):
smtp_host = inputs.get("smtp_host") or os.getenv("SMTP_HOST")
smtp_port = int(inputs.get("smtp_port", 25))
smtp_user = inputs.get("smtp_user") or os.getenv("SMTP_USER")
smtp_pass = inputs.get("smtp_pass") or os.getenv("SMTP_PASS")
use_tls = bool(inputs.get("use_tls", False))
from_email = inputs["from_email"]
to_email = inputs["to_email"]
subject = inputs.get("subject", "vDefend policy created")
body = inputs.get("body", "")
msg = MIMEText(body, "plain", "utf-8")
msg["Subject"] = subject
msg["From"] = from_email
msg["To"] = to_email
if use_tls:
context = ssl.create_default_context()
with smtplib.SMTP(smtp_host, smtp_port) as server:
server.starttls(context=context)
if smtp_user and smtp_pass:
server.login(smtp_user, smtp_pass)
server.send_message(msg)
else:
with smtplib.SMTP(smtp_host, smtp_port) as server:
if smtp_user and smtp_pass:
server.login(smtp_user, smtp_pass)
server.send_message(msg)
return {"status": "sent", "to": to_email}