Skip to content

Instantly share code, notes, and snippets.

@sivel
Created November 28, 2023 14:25
Show Gist options
  • Save sivel/7b012ae2ebf4ab4cb6225379a72cc502 to your computer and use it in GitHub Desktop.
Save sivel/7b012ae2ebf4ab4cb6225379a72cc502 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) 2023 Matt Martz <[email protected]>
# GNU General Public License v3.0+
# (see https://www.gnu.org/licenses/gpl-3.0.txt)
import argparse
import os
from functools import partial
from ansible.template import Templar
import yaml
try:
Loader = yaml.CSafeLoader
except AttributeError:
Loader = yaml.SafeLoader
_TYPES = {}
def make_type(obj):
if (typ := _TYPES.get(obj.__class__)):
return typ
typ = _TYPES[obj.__class__] = type(
f'Pos{obj.__class__.__name__.title()}',
(obj.__class__,),
{}
)
return typ
class LocationLoader(Loader):
def _add_pos(self, node, value):
line = node.start_mark.line + 1
col = node.start_mark.column + 1
value._pos = (line, col)
return value
def construct_object(self, node, deep=True):
value = super().construct_object(node, deep=deep)
try:
typ = make_type(value)
value = typ(value)
except TypeError:
return value
return self._add_pos(node, value)
safe_load = partial(yaml.load, Loader=LocationLoader)
templar = Templar(None)
class Unsafe(yaml.YAMLObject):
yaml_tag = '!unsafe'
yaml_loader = [LocationLoader]
def __init__(self, value):
self.value = value
@classmethod
def from_yaml(cls, loader, node):
return Unsafe(node.value)
@classmethod
def to_yaml(cls, dumper, data):
return dumper.represent_scalar(cls.yaml_tag, data.value)
class Vault(yaml.YAMLObject):
yaml_tag = '!vault'
yaml_loader = [LocationLoader]
def __init__(self, value):
self.value = value
@classmethod
def from_yaml(cls, loader, node):
return Vault(node.value)
@classmethod
def to_yaml(cls, dumper, data):
return dumper.represent_scalar(cls.yaml_tag, data.value)
def get_tasks(obj):
if isinstance(obj, list):
for o in obj:
yield from get_tasks(o)
else:
found = False
for key in ('tasks', 'pre_tasks', 'post_tasks', 'handlers',
'block', 'rescue', 'always'):
try:
if obj.get(key):
found = True
yield from get_tasks(obj.get(key))
except AttributeError:
pass
if not found:
yield obj
def walk(data, filename):
for ds in get_tasks(data):
if not isinstance(ds, dict):
continue
args = (
ds.get('assert') or
ds.get('ansible.builtin.assert') or
ds.get('ansible.legacy.assert')
)
if not args:
continue
that = args.get('that')
if not isinstance(that, list):
that = [that]
name_or_action = ds.get('name', 'assert')
for c in that:
if templar.is_template(c):
line, col = c._pos
print(f'{filename}:{line}:{col}: [{name_or_action}] - {c}')
if __name__ == '__main__':
extensions = {'.yml', '.yaml', '.json'}
parser = argparse.ArgumentParser()
parser.add_argument(
'path',
help='Path to directory containing task files and playbooks, or an '
'individual file'
)
args = parser.parse_args()
if os.path.isfile(args.path):
with open(args.path) as f:
try:
data = safe_load(f)
except yaml.YAMLError:
data = {}
walk(data, args.path)
else:
for root, dirs, files in os.walk(args.path):
for filename in files:
if os.path.splitext(filename)[1] not in extensions:
continue
path = os.path.join(root, filename)
with open(path) as f:
try:
data = safe_load(f)
except yaml.YAMLError:
continue
walk(data, path)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment