Skip to content

Instantly share code, notes, and snippets.

@atheiman
Created October 15, 2024 21:52
Show Gist options
  • Save atheiman/1694c272b9d2633610a837ec54aec9a1 to your computer and use it in GitHub Desktop.
Save atheiman/1694c272b9d2633610a837ec54aec9a1 to your computer and use it in GitHub Desktop.
Query an AWS Config aggregator for rule compliance and write the results to a CSV file.
import json
import boto3
import botocore
import os
import datetime
import re
import csv
from functools import lru_cache
sts = boto3.client("sts")
cfg = boto3.client("config")
orgs = boto3.client("organizations")
print(sts.get_caller_identity()["Arn"])
# Describe the account if we have access via the orgs api
@lru_cache(maxsize=256)
def describe_account(acct_id):
try:
return orgs.describe_account(AccountId=acct_id)["Account"]
except botocore.exceptions.ClientError as err:
return {}
# Load the Config aggregator from env var. Or if only one aggregator is found, use it.
aggreg_name_env_var = "AGGREGATOR"
desc_aggregs_args = {}
if aggreg_name_env_var in os.environ:
desc_aggregs_args["ConfigurationAggregatorNames"] = [os.environ[aggreg_name_env_var]]
aggregators = cfg.describe_configuration_aggregators(**desc_aggregs_args)["ConfigurationAggregators"]
if len(aggregators) == 0:
raise Exception("Found no Config aggregators")
elif len(aggregators) > 1:
err = (
f"Found multiple Config aggregators {[a['ConfigurationAggregatorName'] for a in aggregators]}, "
f"specify an aggregator using env var '{aggreg_name_env_var}'"
)
raise Exception(err)
aggreg = aggregators[0]
print(json.dumps(aggreg, default=str, indent=2))
aggreg_name = aggreg["ConfigurationAggregatorName"]
# Query the Config aggregator, write results to CSV
csv_file_path = "config-compliance.csv"
with open(csv_file_path, "w") as csv_file:
writer = csv.DictWriter(
csv_file,
lineterminator="\n",
fieldnames=[
"AccountName",
"AwsRegion",
"ComplianceType",
"ConfigRuleName",
"ResourceType",
"ResourceId",
"Annotation",
"EvaluationMode",
"ResultRecordedTime",
"ConfigRuleInvokedTime",
"AccountId",
"AccountEmail",
"AccountStatus",
],
)
writer.writeheader()
prev_acct_id = ""
prev_region = ""
for pg in cfg.get_paginator("describe_aggregate_compliance_by_config_rules").paginate(
# SKIP ACCOUNTS, REGIONS, OR CONFIG RULES HERE AS NEEDED
Filters={
# 'ConfigRuleName': 'string',
# 'ComplianceType': 'NON_COMPLIANT' # 'COMPLIANT'|'NON_COMPLIANT',
# 'AccountId': 'string',
# 'AwsRegion': 'string'
},
ConfigurationAggregatorName=aggreg_name,
# PaginationConfig={"MaxItems": 100},
):
for aggreg_compl_by_cfg_rule in pg["AggregateComplianceByConfigRules"]:
# Example:
# {'AccountId': '111111111111',
# 'AwsRegion': 'us-east-2',
# 'Compliance': {'ComplianceContributorCount': {'CapExceeded': False, 'CappedCount': 14},
# 'ComplianceType': 'NON_COMPLIANT'},
# 'ConfigRuleName': 'LambdaInsideVpc-conformance-pack-7rmzgpg6i'}
# Skip percentage of pages for quicker testing
#import random
#if random.random() > 0.01:
# continue
acct = describe_account(aggreg_compl_by_cfg_rule["AccountId"])
# SKIP ACCOUNTS, REGIONS, OR CONFIG RULES HERE AS NEEDED
acct_id = aggreg_compl_by_cfg_rule["AccountId"]
region = aggreg_compl_by_cfg_rule["AwsRegion"]
if prev_acct_id != acct_id or prev_region != region:
print()
print(acct.get("Name", acct_id), region, "", end="")
prev_acct_id = acct_id
prev_region = region
for pg in cfg.get_paginator("get_aggregate_compliance_details_by_config_rule").paginate(
ConfigurationAggregatorName=aggreg_name,
ConfigRuleName=aggreg_compl_by_cfg_rule["ConfigRuleName"],
AccountId=acct_id,
AwsRegion=region,
# ComplianceType='COMPLIANT'|'NON_COMPLIANT',
# PaginationConfig={"MaxItems": 5},
):
for aggreg_eval_result in pg["AggregateEvaluationResults"]:
# Example:
# {
# "EvaluationResultIdentifier": {
# "EvaluationResultQualifier": {
# "ConfigRuleName": "S3BucketPublicReadProhibited-conformance-pack-7rmzgpg6i",
# "ResourceType": "AWS::S3::Bucket",
# "ResourceId": "my-bucket"
# "EvaluationMode": "DETECTIVE"
# },
# "OrderingTimestamp": "2024-03-31 18:08:21.427000-05:00"
# },
# "ComplianceType": "COMPLIANT",
# "ResultRecordedTime": "2024-03-31 18:08:42.047000-05:00",
# "ConfigRuleInvokedTime": "2024-03-31 18:08:41.583000-05:00",
# "Annotation": "Supplementary information about evaluated compliance",
# "AccountId": "222222222222",
# "AwsRegion": "us-west-2"
# }
print(".", end="")
erq = aggreg_eval_result["EvaluationResultIdentifier"]["EvaluationResultQualifier"]
row = {
"AccountName": acct.get("Name", aggreg_eval_result["AccountId"]),
"AwsRegion": aggreg_eval_result["AwsRegion"],
"ComplianceType": aggreg_eval_result["ComplianceType"],
"ConfigRuleName": erq["ConfigRuleName"],
"ResourceType": erq["ResourceType"],
"ResourceId": erq["ResourceId"],
"Annotation": aggreg_eval_result.get("Annotation"),
"EvaluationMode": erq.get("EvaluationMode"),
"ResultRecordedTime": aggreg_eval_result["ResultRecordedTime"],
"ConfigRuleInvokedTime": aggreg_eval_result["ConfigRuleInvokedTime"],
"AccountId": aggreg_eval_result["AccountId"],
"AccountEmail": acct.get("Email"),
"AccountStatus": acct.get("Status"),
}
for k, v in row.items():
# Prefix integer strings with ' for rendering in Excel, otherwise leading zeros are lost
if re.search("^\\d+$", str(v)):
row[k] = "'" + str(v)
writer.writerow(row)
print()
print(csv_file_path)
# Save to s3 with timestamp
#timestamp = datetime.datetime.now(datetime.UTC).strftime("%Y-%m-%d-%H-%M-%S")
#csv_file_name = f"config-compliance-{timestamp}.csv"
#bucket_name = "my-output-bucket"
#prefix = os.path.basename(f"path/to/{csv_file_name}")
#print(f"Saving file to S3 s3://{bucket_name}/{prefix}")
#s3 = boto3.resource("s3", region_name="us-east-1")
#s3.Object(bucket_name, prefix).put(Body=open(csv_file_path, "rb"))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment