Last active
December 15, 2016 19:48
-
-
Save seanmhanson/71a36f4605bf35a4c62e to your computer and use it in GitHub Desktop.
Spectra DS CMV.INI Cleanup
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# CMV.INI Cleanup Script | |
# @author = Sean Hanson (github: seanmhanson) | |
# | |
# Script used to clean the "CMV.INI" initailization file used by | |
# MediaOcean Spectra DS to associate MediaOcean sessions with Active Directory Users | |
# | |
# INPUT: Flag and path as needed to the cmv.ini file. Default value if left blank is "/cmv.ini". | |
# See also parseArgs below. | |
# OUTPUT: None, writes files "newcmv_<datestring>.ini" and "removal_report_<datestring>.csv" | |
# where the date string is in the format YYYYMMDDHHMMSS | |
import argparse | |
import csv | |
import configparser | |
import getpass | |
import sys | |
from argparse import RawTextHelpFormatter | |
from collections import OrderedDict | |
from datetime import datetime | |
from sys import stdout | |
from ldap3 import Server, Connection, SEARCH_SCOPE_WHOLE_SUBTREE | |
from win32security import LogonUser, LOGON32_LOGON_NETWORK, LOGON32_PROVIDER_DEFAULT, error | |
# GLOBAL VARIABLES | |
#################### | |
# | |
# CMV.INI Variables (READ) | |
INI_HEADER_KEYS = ["CCOVERRIDE", "GATEWAY", "SERVER", "PORT", "RESOURCENAME"] # Keys used in headers in INI | |
OPEN_PLACEHOLDER = "OPEN" # Placeholder for unused, available LUIDs | |
OTHER_OPEN_PLACEHOLDERS = ["EXLONLY"] | |
CLOSED_PLACEHOLDER = "DONOTUSE" # Placeholder for unused, unavailable LUIDs | |
OTHER_CLOSED_PLACEHOLDERS = [] | |
PLACEHOLDER_KEYS = [OPEN_PLACEHOLDER] + OTHER_OPEN_PLACEHOLDERS + [CLOSED_PLACEHOLDER] + OTHER_CLOSED_PLACEHOLDERS | |
# LDAP Variables | |
DC_NAME = None # Domain Controller DNS Name (String required) | |
LDAP_PORT = 389 # SSL not enabled by default | |
SEARCH_BASE = None # LDAP Query Search Base (String required) | |
# CMV.INI Variables (Write) | |
# | |
# If you require a write function for your CMV to maintain structure and commenting, declare below. | |
# The write function is called with luid_list as a parameter (a list of LUID/Username Pairs, note the order) | |
# If "None", the default function uses the below values | |
CMV_WRITE_FUNCTION = None | |
CMV_CCOveride = None # CCOveride Value (Int wrapped in String required) | |
CMV_Gateway = None # MediaOcean Gateway (String required) | |
CMV_Server = None # MediaOcean Server (String required) | |
CMV_Port = None # MediaOcean Port (Int wrapped in String required) | |
CMV_ResourceName = None #MediaOcean ResourceName field (String required) | |
CMV_A_Header = None # Multiline string with new line characters to write as commented header for A section | |
# Extension of ConfigParser to allow for duplicate key values | |
# | |
# NEW BEHAVIOR: | |
# When a duplicate key is detected, the value is turned into a list of values | |
# and each value is appended to the list of values | |
class ConfigParserMultiples(configparser.RawConfigParser): | |
def __init__(self): | |
configparser.RawConfigParser.__init__(self, empty_lines_in_values=False, strict=False) | |
def _read(self, fp, fpname): | |
'''Identical to RawConfigParser except for additions | |
commented in multiline string quotes. | |
Modifications by @Praetorian on StackOverflow''' | |
elements_added = set() | |
cursect = None # None, or a dictionary | |
sectname = None | |
optname = None | |
lineno = 0 | |
indent_level = 0 | |
e = None # None, or an exception | |
for lineno, line in enumerate(fp, start=1): | |
comment_start = sys.maxsize | |
# strip inline comments | |
inline_prefixes = {p: -1 for p in self._inline_comment_prefixes} | |
while comment_start == sys.maxsize and inline_prefixes: | |
next_prefixes = {} | |
for prefix, index in inline_prefixes.items(): | |
index = line.find(prefix, index+1) | |
if index == -1: | |
continue | |
next_prefixes[prefix] = index | |
if index == 0 or (index > 0 and line[index-1].isspace()): | |
comment_start = min(comment_start, index) | |
inline_prefixes = next_prefixes | |
# strip full line comments | |
for prefix in self._comment_prefixes: | |
if line.strip().startswith(prefix): | |
comment_start = 0 | |
break | |
if comment_start == sys.maxsize: | |
comment_start = None | |
value = line[:comment_start].strip() | |
if not value: | |
if self._empty_lines_in_values: | |
# add empty line to the value, but only if there was no | |
# comment on the line | |
if (comment_start is None and | |
cursect is not None and | |
optname and | |
cursect[optname] is not None): | |
cursect[optname].append('') # newlines added at join | |
else: | |
# empty line marks end of value | |
indent_level = sys.maxsize | |
continue | |
# continuation line? | |
first_nonspace = self.NONSPACECRE.search(line) | |
cur_indent_level = first_nonspace.start() if first_nonspace else 0 | |
if (cursect is not None and optname and | |
cur_indent_level > indent_level): | |
cursect[optname].append(value) | |
# a section header or option header? | |
else: | |
indent_level = cur_indent_level | |
# is it a section header? | |
mo = self.SECTCRE.match(value) | |
if mo: | |
sectname = mo.group('header') | |
if sectname in self._sections: | |
if self._strict and sectname in elements_added: | |
raise DuplicateSectionError(sectname, fpname, | |
lineno) | |
cursect = self._sections[sectname] | |
elements_added.add(sectname) | |
elif sectname == self.default_section: | |
cursect = self._defaults | |
else: | |
cursect = self._dict() | |
self._sections[sectname] = cursect | |
self._proxies[sectname] = configparser.SectionProxy(self, sectname) | |
elements_added.add(sectname) | |
# So sections can't start with a continuation line | |
optname = None | |
# no section header in the file? | |
elif cursect is None: | |
raise MissingSectionHeaderError(fpname, lineno, line) | |
# an option line? | |
else: | |
mo = self._optcre.match(value) | |
if mo: | |
optname, vi, optval = mo.group('option', 'vi', 'value') | |
if not optname: | |
e = self._handle_error(e, fpname, lineno, line) | |
optname = self.optionxform(optname.rstrip()) | |
if (self._strict and | |
(sectname, optname) in elements_added): | |
raise DuplicateOptionError(sectname, optname, | |
fpname, lineno) | |
elements_added.add((sectname, optname)) | |
# This check is fine because the OPTCRE cannot | |
# match if it would set optval to None | |
if optval is not None: | |
optval = optval.strip() | |
'''Modified: if optname exists, make optval a tuple if not | |
already one and append''' | |
if (optname in cursect) and (cursect[optname] is not None): | |
if not isinstance(cursect[optname], tuple): | |
cursect[optname] = tuple(cursect[optname]) | |
cursect[optname] = cursect[optname] + tuple([optval]) | |
else: | |
cursect[optname] = [optval] | |
else: | |
# valueless option handling | |
cursect[optname] = None | |
else: | |
# a non-fatal parsing error occurred. set up the | |
# exception but keep going. the exception will be | |
# raised at the end of the file and will contain a | |
# list of all bogus lines | |
e = self._handle_error(e, fpname, lineno, line) | |
# if any parsing errors occurred, raise an exception | |
if e: | |
raise e | |
self._join_multiline_values() | |
# Parse Command-line Arguments and provide help text | |
# | |
# INPUT: None (pulls from sys.argv) | |
# OUTPUT: List of (1) argument, containing the path of the cmv.ini file | |
def parseArgs(): | |
parser = argparse.ArgumentParser( | |
description="Checks all usernames in cmv.ini file" | |
" and removes any not found in Active Directory\n" | |
"Outputs a CSV file of removed users and LUIDs, and" | |
" a modified INI file (newcmv.ini)", | |
formatter_class=RawTextHelpFormatter) | |
parser.add_argument('-i', '--input', | |
default="cmv.ini", | |
help="relative path of cmv.ini file\n" | |
"(default: cmv.ini)") | |
return parser.parse_args() | |
# MAIN FUNCTION | |
# | |
# Parse arguments, read the CMV.INI file, looks up values against Active Directory, | |
# then writes removal reports and updated CMV.INI | |
def main(): | |
ARGS = parseArgs() | |
luid_list = read_cmv(ARGS.input) | |
credentials = get_credentials() | |
luid_list, removed_users = LDAP_lookups(luid_list, credentials) | |
write_removal_report(removed_users) | |
write_cmv(luid_list) | |
# Reads CMV.INI file and pulls username/LUID pairs | |
# For duplicate keys, add one pair for each distinct value | |
# | |
# NB: Ignores [General] section and parses [A] section | |
# Set to ignore header material in [A] and [GENERAL] | |
# | |
# INPUT: Path to CMV.INI file | |
# OUTPUT: List of (LUID, Username) pairs as duples | |
def read_cmv(path): | |
config = ConfigParserMultiples() | |
config.optionxform=str | |
config.read(path) | |
luid_list = [] | |
for pair in config.items('A'): | |
if pair[0].upper().replace(" ","") not in INI_HEADER_KEYS: | |
if isinstance(pair[1], tuple): | |
for value in pair[1]: | |
luid_list.append([value, pair[0]]) | |
else: | |
luid_list.append([pair[1], pair[0]]) | |
return sorted(luid_list) | |
# Writes a CMV file from the updated list of LUIDs and Usernames | |
# | |
# INPUT: List of (LUID, Username) pairs as duples | |
# OUTPUT: None (writes newcmv_<datestring>.ini file by default) | |
def write_cmv(luid_list): | |
stdout.write('Writing CMV file...') | |
if CMV_WRITE_FUNCTION != None: | |
CMV_WRITE_FUNCTION(luid_list) | |
else: | |
config = configparser.ConfigParser(strict=False, allow_no_value=True, dict_type=OrderedDict) | |
config.optionxform=str | |
config.add_section("General") | |
config.set("General", "CCoverride", CMV_CCOveride) | |
config.add_section("A") | |
for pair in [("Gateway", CMV_Gateway), | |
("Server", CMV_Server), | |
("Port", CMV_Port), | |
("Resourcename", CMV_ResourceName)]: | |
config.set("A", pair[0], pair[1]) | |
config.set('A', CMV_A_Header) | |
for pair in luid_list: | |
config.set("A", pair[1], pair[0]) | |
now = datetime.now() | |
filename = "newcmv_{number}.ini".format(number=now.strftime("%Y%m%d%H%M%S")) | |
with open(filename, 'w') as configfile: | |
config.write(configfile) | |
stdout.write("written.\n") | |
# Prompts user for Active Directory credentials and verifies against | |
# Logon provider for Windows Machines | |
# No support for non-Windows, as the software in question is WIN86x/64x only | |
# | |
# INPUT: None | |
# OUTPUT: Triple of Domain, Username, and Password | |
# RAISES: Exception if user fails logon three times | |
def get_credentials(): | |
tries = 0 | |
while (tries < 3): | |
print("Please enter logon information:") | |
domain = input("Domain: ") | |
username = input("Username: ") | |
password = getpass.getpass ("Password: ") | |
try: | |
hUser = LogonUser( | |
username, domain, password, LOGON32_LOGON_NETWORK, LOGON32_PROVIDER_DEFAULT) | |
except error: | |
print("Invalid credentials\n") | |
tries += 1 | |
else: | |
return (domain, username, password) | |
raise Exception | |
# Look up each username in LDAP environment and set to a unique open placeholder if not found | |
# | |
# INPUT: List of (LUID, Username) pairs as duples, | |
# Credential Triple (domain, username, password) | |
# OUTPUT: Updated list of (LUID, Username) pairs as duples) | |
# List of (LUID, Username) pairs that were removed | |
def LDAP_lookups(luid_list, credentials): | |
removed_users = [] | |
s = Server(DC_NAME, port=LDAP_PORT) | |
c = Connection(s, user=credentials[0]+"\\"+credentials[1], password=credentials[2], auto_bind=True) | |
i = 0 | |
for duple in luid_list: | |
if duple[1].upper().replace(" ","") not in PLACEHOLDER_KEYS: | |
ldap_filter = "(&(objectClass=User)(sAMAccountName={user}))".format(user=duple[1]) | |
c.search(search_base=SEARCH_BASE, search_filter=ldap_filter, attributes="sAMAccountName", | |
search_scope=SEARCH_SCOPE_WHOLE_SUBTREE) | |
if len(c.response) <= 1: | |
removed_users.append((duple[1], duple[0])) | |
luid_list[i][1] = OPEN_PLACEHOLDER + "-" + duple[0] | |
elif duple[1].upper().replace(" ","") in OTHER_OPEN_PLACEHOLDERS + [OPEN_PLACEHOLDER]: | |
luid_list[i][1] = OPEN_PLACEHOLDER + "-" + duple[0] | |
else: | |
luid_list[i][1] = CLOSED_PLACEHOLDER + "-" + duple[0] | |
i += 1 | |
stdout.write("\rProcessed %i users" % i) | |
stdout.flush() | |
stdout.write("\n") | |
return luid_list, removed_users | |
# Writes a CSV file of all removed users and their LUIDs | |
# | |
# INPUT: List of (LUID, Username) pairs | |
# OUTPUT: None (writes removed_report_<datestring>.csv) | |
def write_removal_report(removed_users): | |
print(str(len(removed_users)) + " users removed") | |
stdout.write("Writing removal report...") | |
now = datetime.now() | |
filename = "removal_report_{number}.csv".format(number=now.strftime("%Y%m%d%H%M%S")) | |
with open(filename, 'w', newline='') as f: | |
writer = csv.writer(f) | |
writer.writerow(["Removed User","Opened LUID"]) | |
for duple in removed_users: | |
writer.writerow(duple) | |
stdout.write("written.\n") | |
# Calls main function if evoked from command line | |
if __name__=="__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment