-
-
Save sivel/7b012ae2ebf4ab4cb6225379a72cc502 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
# (c) 2023 Matt Martz <[email protected]> | |
# GNU General Public License v3.0+ | |
# (see https://www.gnu.org/licenses/gpl-3.0.txt) | |
import argparse | |
import os | |
from functools import partial | |
from ansible.template import Templar | |
import yaml | |
try: | |
Loader = yaml.CSafeLoader | |
except AttributeError: | |
Loader = yaml.SafeLoader | |
_TYPES = {} | |
def make_type(obj): | |
if (typ := _TYPES.get(obj.__class__)): | |
return typ | |
typ = _TYPES[obj.__class__] = type( | |
f'Pos{obj.__class__.__name__.title()}', | |
(obj.__class__,), | |
{} | |
) | |
return typ | |
class LocationLoader(Loader): | |
def _add_pos(self, node, value): | |
line = node.start_mark.line + 1 | |
col = node.start_mark.column + 1 | |
value._pos = (line, col) | |
return value | |
def construct_object(self, node, deep=True): | |
value = super().construct_object(node, deep=deep) | |
try: | |
typ = make_type(value) | |
value = typ(value) | |
except TypeError: | |
return value | |
return self._add_pos(node, value) | |
safe_load = partial(yaml.load, Loader=LocationLoader) | |
templar = Templar(None) | |
class Unsafe(yaml.YAMLObject): | |
yaml_tag = '!unsafe' | |
yaml_loader = [LocationLoader] | |
def __init__(self, value): | |
self.value = value | |
@classmethod | |
def from_yaml(cls, loader, node): | |
return Unsafe(node.value) | |
@classmethod | |
def to_yaml(cls, dumper, data): | |
return dumper.represent_scalar(cls.yaml_tag, data.value) | |
class Vault(yaml.YAMLObject): | |
yaml_tag = '!vault' | |
yaml_loader = [LocationLoader] | |
def __init__(self, value): | |
self.value = value | |
@classmethod | |
def from_yaml(cls, loader, node): | |
return Vault(node.value) | |
@classmethod | |
def to_yaml(cls, dumper, data): | |
return dumper.represent_scalar(cls.yaml_tag, data.value) | |
def get_tasks(obj): | |
if isinstance(obj, list): | |
for o in obj: | |
yield from get_tasks(o) | |
else: | |
found = False | |
for key in ('tasks', 'pre_tasks', 'post_tasks', 'handlers', | |
'block', 'rescue', 'always'): | |
try: | |
if obj.get(key): | |
found = True | |
yield from get_tasks(obj.get(key)) | |
except AttributeError: | |
pass | |
if not found: | |
yield obj | |
def walk(data, filename): | |
for ds in get_tasks(data): | |
if not isinstance(ds, dict): | |
continue | |
args = ( | |
ds.get('assert') or | |
ds.get('ansible.builtin.assert') or | |
ds.get('ansible.legacy.assert') | |
) | |
if not args: | |
continue | |
that = args.get('that') | |
if not isinstance(that, list): | |
that = [that] | |
name_or_action = ds.get('name', 'assert') | |
for c in that: | |
if templar.is_template(c): | |
line, col = c._pos | |
print(f'{filename}:{line}:{col}: [{name_or_action}] - {c}') | |
if __name__ == '__main__': | |
extensions = {'.yml', '.yaml', '.json'} | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
'path', | |
help='Path to directory containing task files and playbooks, or an ' | |
'individual file' | |
) | |
args = parser.parse_args() | |
if os.path.isfile(args.path): | |
with open(args.path) as f: | |
try: | |
data = safe_load(f) | |
except yaml.YAMLError: | |
data = {} | |
walk(data, args.path) | |
else: | |
for root, dirs, files in os.walk(args.path): | |
for filename in files: | |
if os.path.splitext(filename)[1] not in extensions: | |
continue | |
path = os.path.join(root, filename) | |
with open(path) as f: | |
try: | |
data = safe_load(f) | |
except yaml.YAMLError: | |
continue | |
walk(data, path) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment