diff --git a/ExtensiveRoleCheck.py b/ExtensiveRoleCheck.py index ccd2b37..3b5d065 100644 --- a/ExtensiveRoleCheck.py +++ b/ExtensiveRoleCheck.py @@ -47,6 +47,7 @@ def _generate(self): if not rule.get('resources', None): continue self.get_read_secrets(rule, role_name) + self.get_read_configmaps(rule,role_name) self.clusteradmin_role(rule, role_name) self.any_resources(rule, role_name) self.any_verb(rule, role_name) @@ -64,6 +65,17 @@ def get_read_secrets(self, rule, role_name): if filtered_name: self._role.warning(f'{Fore.GREEN}{filtered_name}' + f'{Fore.RED} Has permission to list secrets!') self.add_result(filtered_name, 'Has permission to list secrets!') + + + # Read configmaps, devs store creds in configmaps all the time + def get_read_configmaps(self, rule, role_name): + verbs = ['*','get','list'] + if ('configmaps' in rule['resources'] and any([sign for sign in verbs if sign in rule['verbs']])): + filtered_name = self.get_non_default_name(role_name) + if filtered_name: + self._role.warning(f'{Fore.GREEN}{filtered_name}' + f'{Fore.RED} Has permission to list configmaps!') + self.add_result(filtered_name, 'Has permission to list configmaps!') + #Any Any roles def clusteradmin_role(self, rule, role_name): @@ -76,6 +88,7 @@ def clusteradmin_role(self, rule, role_name): #get ANY verbs: def any_verb(self, rule, role_name): resources = ['secrets', + 'configmaps', 'pods', 'deployments', 'daemonsets', @@ -112,7 +125,8 @@ def any_resources(self, rule, role_name): def high_risk_roles(self, rule, role_name): verb_actions = ['create','update'] - resources_attributes = ['deployments','daemonsets','statefulsets','replicationcontrollers','replicasets','jobs','cronjobs'] + # adding pods to cover privilege pod scenarios and pods being used for mining example + resources_attributes = ['pods', 'deployments','daemonsets','statefulsets','replicationcontrollers','replicasets','jobs','cronjobs'] found_attribute = [attribute for attribute in resources_attributes if attribute in rule['resources']] if not (found_attribute): return @@ -163,7 +177,7 @@ def get_non_default_name(name): return name -class roleBingingChecker(object): +class roleBindingChecker(object): def __init__(self, json_file, extensive_roles, bind_kind): self._json_file = json_file self._extensive_roles = extensive_roles @@ -172,26 +186,25 @@ def __init__(self, json_file, extensive_roles, bind_kind): self.bindsCheck() def bindsCheck(self): - _rolebiding_found = [] + _rolebinding_found = [] for entity in self._json_file['items']: _role_name = entity['metadata']['name'] _rol_ref = entity['roleRef']['name'] if not entity.get('subjects', None): continue if _rol_ref in self._extensive_roles: - _rolebiding_found.append(_rol_ref) + _rolebinding_found.append(_rol_ref) for sub in entity['subjects']: if not sub.get('name', None): continue self.print_rolebinding_results(sub, _role_name, self._bind_kind) - return _rolebiding_found + return _rolebinding_found def print_rolebinding_results(self, sub, role_name, bind_kind): if sub['kind'] == 'ServiceAccount': - print(f'{Fore.YELLOW}[!][{bind_kind}]{Fore.WHITE}\u2192 ' + f'{Fore.GREEN}{role_name}{Fore.RED} is binded to {sub["name"]} ServiceAccount.') + print(f'{Fore.YELLOW}[!][{bind_kind}]{Fore.WHITE}\u2192 ' + f'{Fore.GREEN}{role_name}{Fore.RED} is bound to {sub["name"]} ServiceAccount.') else: - print(f'{Fore.YELLOW}[!][{bind_kind}]{Fore.WHITE}\u2192 ' + f'{Fore.GREEN}{role_name}{Fore.RED} is binded to the {sub["kind"]}: {sub["name"]}!') - + print(f'{Fore.YELLOW}[!][{bind_kind}]{Fore.WHITE}\u2192 ' + f'{Fore.GREEN}{role_name}{Fore.RED} is bound to the {sub["kind"]}: {sub["name"]}!') if __name__ == '__main__': @@ -215,10 +228,10 @@ def print_rolebinding_results(self, sub, role_name, bind_kind): print(f'{Fore.WHITE}[*] Started enumerating risky ClusterRoleBinding:') bind_kind = 'ClusterRoleBinding' clusterRoleBinding_json_file = open_file(args.cluseterolebindings) - extensive_clusteRoleBindings = roleBingingChecker(clusterRoleBinding_json_file, extensive_roles, bind_kind) + extensive_clusteRoleBindings = roleBindingChecker(clusterRoleBinding_json_file, extensive_roles, bind_kind) if args.rolebindings: print(f'{Fore.WHITE}[*] Started enumerating risky RoleRoleBindings:') bind_kind = 'RoleBinding' RoleBinding_json_file = open_file(args.rolebindings) - extensive_RoleBindings = roleBingingChecker(RoleBinding_json_file, extensive_roles, bind_kind) + extensive_RoleBindings = roleBindingChecker(RoleBinding_json_file, extensive_roles, bind_kind)