Created
October 15, 2024 21:52
-
-
Save atheiman/1694c272b9d2633610a837ec54aec9a1 to your computer and use it in GitHub Desktop.
Query an AWS Config aggregator for rule compliance and write the results to a CSV file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import boto3 | |
import botocore | |
import os | |
import datetime | |
import re | |
import csv | |
from functools import lru_cache | |
sts = boto3.client("sts") | |
cfg = boto3.client("config") | |
orgs = boto3.client("organizations") | |
print(sts.get_caller_identity()["Arn"]) | |
# Describe the account if we have access via the orgs api | |
@lru_cache(maxsize=256) | |
def describe_account(acct_id): | |
try: | |
return orgs.describe_account(AccountId=acct_id)["Account"] | |
except botocore.exceptions.ClientError as err: | |
return {} | |
# Load the Config aggregator from env var. Or if only one aggregator is found, use it. | |
aggreg_name_env_var = "AGGREGATOR" | |
desc_aggregs_args = {} | |
if aggreg_name_env_var in os.environ: | |
desc_aggregs_args["ConfigurationAggregatorNames"] = [os.environ[aggreg_name_env_var]] | |
aggregators = cfg.describe_configuration_aggregators(**desc_aggregs_args)["ConfigurationAggregators"] | |
if len(aggregators) == 0: | |
raise Exception("Found no Config aggregators") | |
elif len(aggregators) > 1: | |
err = ( | |
f"Found multiple Config aggregators {[a['ConfigurationAggregatorName'] for a in aggregators]}, " | |
f"specify an aggregator using env var '{aggreg_name_env_var}'" | |
) | |
raise Exception(err) | |
aggreg = aggregators[0] | |
print(json.dumps(aggreg, default=str, indent=2)) | |
aggreg_name = aggreg["ConfigurationAggregatorName"] | |
# Query the Config aggregator, write results to CSV | |
csv_file_path = "config-compliance.csv" | |
with open(csv_file_path, "w") as csv_file: | |
writer = csv.DictWriter( | |
csv_file, | |
lineterminator="\n", | |
fieldnames=[ | |
"AccountName", | |
"AwsRegion", | |
"ComplianceType", | |
"ConfigRuleName", | |
"ResourceType", | |
"ResourceId", | |
"Annotation", | |
"EvaluationMode", | |
"ResultRecordedTime", | |
"ConfigRuleInvokedTime", | |
"AccountId", | |
"AccountEmail", | |
"AccountStatus", | |
], | |
) | |
writer.writeheader() | |
prev_acct_id = "" | |
prev_region = "" | |
for pg in cfg.get_paginator("describe_aggregate_compliance_by_config_rules").paginate( | |
# SKIP ACCOUNTS, REGIONS, OR CONFIG RULES HERE AS NEEDED | |
Filters={ | |
# 'ConfigRuleName': 'string', | |
# 'ComplianceType': 'NON_COMPLIANT' # 'COMPLIANT'|'NON_COMPLIANT', | |
# 'AccountId': 'string', | |
# 'AwsRegion': 'string' | |
}, | |
ConfigurationAggregatorName=aggreg_name, | |
# PaginationConfig={"MaxItems": 100}, | |
): | |
for aggreg_compl_by_cfg_rule in pg["AggregateComplianceByConfigRules"]: | |
# Example: | |
# {'AccountId': '111111111111', | |
# 'AwsRegion': 'us-east-2', | |
# 'Compliance': {'ComplianceContributorCount': {'CapExceeded': False, 'CappedCount': 14}, | |
# 'ComplianceType': 'NON_COMPLIANT'}, | |
# 'ConfigRuleName': 'LambdaInsideVpc-conformance-pack-7rmzgpg6i'} | |
# Skip percentage of pages for quicker testing | |
#import random | |
#if random.random() > 0.01: | |
# continue | |
acct = describe_account(aggreg_compl_by_cfg_rule["AccountId"]) | |
# SKIP ACCOUNTS, REGIONS, OR CONFIG RULES HERE AS NEEDED | |
acct_id = aggreg_compl_by_cfg_rule["AccountId"] | |
region = aggreg_compl_by_cfg_rule["AwsRegion"] | |
if prev_acct_id != acct_id or prev_region != region: | |
print() | |
print(acct.get("Name", acct_id), region, "", end="") | |
prev_acct_id = acct_id | |
prev_region = region | |
for pg in cfg.get_paginator("get_aggregate_compliance_details_by_config_rule").paginate( | |
ConfigurationAggregatorName=aggreg_name, | |
ConfigRuleName=aggreg_compl_by_cfg_rule["ConfigRuleName"], | |
AccountId=acct_id, | |
AwsRegion=region, | |
# ComplianceType='COMPLIANT'|'NON_COMPLIANT', | |
# PaginationConfig={"MaxItems": 5}, | |
): | |
for aggreg_eval_result in pg["AggregateEvaluationResults"]: | |
# Example: | |
# { | |
# "EvaluationResultIdentifier": { | |
# "EvaluationResultQualifier": { | |
# "ConfigRuleName": "S3BucketPublicReadProhibited-conformance-pack-7rmzgpg6i", | |
# "ResourceType": "AWS::S3::Bucket", | |
# "ResourceId": "my-bucket" | |
# "EvaluationMode": "DETECTIVE" | |
# }, | |
# "OrderingTimestamp": "2024-03-31 18:08:21.427000-05:00" | |
# }, | |
# "ComplianceType": "COMPLIANT", | |
# "ResultRecordedTime": "2024-03-31 18:08:42.047000-05:00", | |
# "ConfigRuleInvokedTime": "2024-03-31 18:08:41.583000-05:00", | |
# "Annotation": "Supplementary information about evaluated compliance", | |
# "AccountId": "222222222222", | |
# "AwsRegion": "us-west-2" | |
# } | |
print(".", end="") | |
erq = aggreg_eval_result["EvaluationResultIdentifier"]["EvaluationResultQualifier"] | |
row = { | |
"AccountName": acct.get("Name", aggreg_eval_result["AccountId"]), | |
"AwsRegion": aggreg_eval_result["AwsRegion"], | |
"ComplianceType": aggreg_eval_result["ComplianceType"], | |
"ConfigRuleName": erq["ConfigRuleName"], | |
"ResourceType": erq["ResourceType"], | |
"ResourceId": erq["ResourceId"], | |
"Annotation": aggreg_eval_result.get("Annotation"), | |
"EvaluationMode": erq.get("EvaluationMode"), | |
"ResultRecordedTime": aggreg_eval_result["ResultRecordedTime"], | |
"ConfigRuleInvokedTime": aggreg_eval_result["ConfigRuleInvokedTime"], | |
"AccountId": aggreg_eval_result["AccountId"], | |
"AccountEmail": acct.get("Email"), | |
"AccountStatus": acct.get("Status"), | |
} | |
for k, v in row.items(): | |
# Prefix integer strings with ' for rendering in Excel, otherwise leading zeros are lost | |
if re.search("^\\d+$", str(v)): | |
row[k] = "'" + str(v) | |
writer.writerow(row) | |
print() | |
print(csv_file_path) | |
# Save to s3 with timestamp | |
#timestamp = datetime.datetime.now(datetime.UTC).strftime("%Y-%m-%d-%H-%M-%S") | |
#csv_file_name = f"config-compliance-{timestamp}.csv" | |
#bucket_name = "my-output-bucket" | |
#prefix = os.path.basename(f"path/to/{csv_file_name}") | |
#print(f"Saving file to S3 s3://{bucket_name}/{prefix}") | |
#s3 = boto3.resource("s3", region_name="us-east-1") | |
#s3.Object(bucket_name, prefix).put(Body=open(csv_file_path, "rb")) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment