From 6fc4782bd5bbccdc52193f25f454321dd1ea6251 Mon Sep 17 00:00:00 2001 From: Yannick Hilber <57816991+yannickhilber@users.noreply.github.com> Date: Wed, 22 Feb 2023 08:23:11 +0100 Subject: [PATCH 01/11] Add non zero exit 2 when diff is found (#9) --- main.py | 37 +++++++++++++++++++++++++++++++------ 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/main.py b/main.py index 98bbfb4..b086439 100755 --- a/main.py +++ b/main.py @@ -876,8 +876,12 @@ def print_diff( header_to_add: str, header_to_remove: str, header_to_change: str, - ) -> None: + ) -> bool: + has_diff = False + if len(self.to_add) > 0: + has_diff = True + print(header_to_add) for entry in self.to_add: print() @@ -886,6 +890,8 @@ def print_diff( print() if len(self.to_remove) > 0: + has_diff = True + print(header_to_remove) for entry in self.to_remove: print() @@ -894,6 +900,8 @@ def print_diff( print() if len(self.to_change) > 0: + has_diff = True + print(header_to_change) for change in self.to_change: print() @@ -904,6 +912,7 @@ def print_diff( print() + return has_diff def print_team_members_diff( *, @@ -911,29 +920,40 @@ def print_team_members_diff( target_fname: str, target_members: Set[TeamMember], actual_members: Set[TeamMember], -) -> None: +) -> bool: + has_diff = False members_diff = Diff.new( target=target_members, actual=actual_members, ) + if len(members_diff.to_remove) > 0: + has_diff = True + print( f"The following members of team '{team_name}' are not specified " f"in {target_fname}, but are present on GitHub:\n" ) for member in sorted(members_diff.to_remove): print(f" {member.user_name}") + print() if len(members_diff.to_add) > 0: + has_diff = True + print( f"The following members of team '{team_name}' are specified " f"in {target_fname}, but are not present on GitHub:\n" ) for member in sorted(members_diff.to_add): print(f" {member.user_name}") + print() + return has_diff + + def main() -> None: if "--help" in sys.argv: @@ -951,6 +971,7 @@ def main() -> None: print("See also --help.") sys.exit(1) + has_changes = False target_fname = sys.argv[1] target = Configuration.from_toml_file(target_fname) org_name = target.organization.name @@ -962,7 +983,7 @@ def main() -> None: target.get_repository_target(r) for r in actual_repos } repos_diff = Diff.new(target=target_repos, actual=actual_repos) - repos_diff.print_diff( + has_changes |= repos_diff.print_diff( f"The following repositories are specified in {target_fname} but not present on GitHub:", # Even though we generate the targets form the actuals using the default # settings, it can happen that we match on repository name but not id @@ -974,6 +995,7 @@ def main() -> None: current_org = client.get_organization(org_name) if current_org != target.organization: + has_changes = True print("The organization-level settings need to be changed as follows:\n") print_simple_diff( actual=current_org.format_toml(), @@ -982,7 +1004,7 @@ def main() -> None: current_members = set(client.get_organization_members(org_name)) members_diff = Diff.new(target=target.members, actual=current_members) - members_diff.print_diff( + has_changes |= members_diff.print_diff( f"The following members are specified in {target_fname} but not a member of the GitHub organization:", f"The following members are not specified in {target_fname} but are a member of the GitHub organization:", f"The following members on GitHub need to be changed to match {target_fname}:", @@ -990,7 +1012,7 @@ def main() -> None: current_teams = set(client.get_organization_teams(org_name)) teams_diff = Diff.new(target=target.teams, actual=current_teams) - teams_diff.print_diff( + has_changes |= teams_diff.print_diff( f"The following teams specified in {target_fname} are not present on GitHub:", f"The following teams are not specified in {target_fname} but are present on GitHub:", f"The following teams on GitHub need to be changed to match {target_fname}:", @@ -1004,7 +1026,7 @@ def main() -> None: team for team in current_teams if team.name in target_team_names ] for team in existing_desired_teams: - print_team_members_diff( + has_changes |= print_team_members_diff( team_name=team.name, target_fname=target_fname, target_members={ @@ -1013,6 +1035,9 @@ def main() -> None: actual_members=set(client.get_team_members(org_name, team)), ) + if has_changes: + sys.exit(2) + if __name__ == "__main__": main() From fbda58b0b750c034e39c337955af8bd0ac305d08 Mon Sep 17 00:00:00 2001 From: Yannick Hilber <57816991+yannickhilber@users.noreply.github.com> Date: Tue, 4 Apr 2023 17:27:22 +0200 Subject: [PATCH 02/11] Bitwarden access manager (#10) Add bitwarden_access_manager.py --- bitwarden_access_manager.py | 715 ++++++++++++++++++++++++++++++++++++ main.py | 2 +- 2 files changed, 716 insertions(+), 1 deletion(-) create mode 100755 bitwarden_access_manager.py diff --git a/bitwarden_access_manager.py b/bitwarden_access_manager.py new file mode 100755 index 0000000..94a91c1 --- /dev/null +++ b/bitwarden_access_manager.py @@ -0,0 +1,715 @@ +#!/usr/bin/env python3 + +# Copyright 2022 Chorus One + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# A copy of the License has been included in the root of the repository. + +""" +Bitwarden Access Manager + +Compare the current state of a Bitwarden organization with a desired state +expressed in a TOML file. Currently this tool only points out the differences, +it does not automatically reconcile them for you. + +USAGE + + ./bitwarden_access_manager.py organization.toml + +ENVIRONMENT + +Requires BITWARDEN_CLIENT_ID and BITWARDEN_CLIENT_SECRET to be set in the environment. +Those must contain OAuth2 client credentials for the organization. Only Bitwarden +members of the organization with OWNER role have access to those credentials. + +You can view the credentials at https://vault.bitwarden.com/#/organizations//settings/account + +CONFIGURATION + +* The access_all key for members and groups is optional, default is false. +* The member_access key for a collection only list members with direct access +to collection. It omits direct access for members with the role +owners or admins because they have implicit access to all collections. + +[[member]] +member_id = "2564c11f-fc1b-4ec7-aa0b-afaf00a9e4a4" +member_name = "yan" +email = "yan.68@hotmail.fr" +type = "member" +groups = ["group1"] + +[[member]] +member_id = "856cba2d-cae1-40e7-96cc-afaf00a8a4cb" +member_name = "yunkel" +email = "yunkel68@hotmail.fr" +type = "owner" +access_all = true +groups = ["group1", "group2"] + +[[group]] +group_id = "c6a13b93-edc1-4c3b-9fc5-afaf00a8d33f" +group_name = "group1" +access_all = true + +[[group]] +group_id = "39b48ab2-81fd-40eb-87e9-afb0000110f3" +group_name = "group2" + +[[collection]] +collection_id = "50351c20-55b4-4ee8-bbe0-afaf00a8f25d" +external_id = "collection1" +member_access = [ + { member_name = "yan", access = "write"}, +] + +group_access = [ + { group_name = "group1", access = "readonly"}, + { group_name = "group2", access = "write" }, +] + +[[collection]] +collection_id = "8e69ce49-85ae-4e09-a52c-afaf00a90a3f" +external_id = "" +member_access = [ + { member_name = "yan", access = "write" }, +] +group_access = [ + { group_name = "group1", access = "readonly" }, +] +""" + +from __future__ import annotations +from collections import defaultdict +from dataclasses import dataclass +from difflib import SequenceMatcher +from enum import Enum +from http.client import HTTPSConnection, HTTPResponse + +import json +import os +import requests +import sys +import tomllib + +from typing import ( + Any, + Dict, + List, + Generic, + Tuple, + NamedTuple, + Set, + Iterable, + Protocol, + TypeVar, +) + + +class MemberType(Enum): + OWNER = 0 + ADMIN = 1 + USER = 2 + MANAGER = 3 + CUSTOM = 4 + + +class GroupAccess(Enum): + READONLY = 0 + WRITE = 1 + + +class Member(NamedTuple): + id: str + name: str + email: str + type: MemberType + access_all: bool + groups: Tuple[str, ...] + + def get_id(self) -> str: + return self.id + + @staticmethod + def from_toml_dict(data: Dict[str, Any]) -> Member: + access_all: bool = False + groups: Tuple[str, ...] = tuple() + + if "access_all" in data: + access_all = data["access_all"] + if "groups" in data: + groups = data["groups"] + groups = tuple(sorted(data["groups"])) + return Member( + id=data["member_id"], + name=data["member_name"], + email=data["email"], + type=MemberType[data["type"].upper()], + access_all=access_all, + groups=groups, + ) + + def format_toml(self) -> str: + result = ( + "[[member]]\n" + f'member_id = "{self.id}"\n' + f'member_name = "{self.name}"\n' + f'email = "{self.email}"\n' + f'type = "{self.type.name.lower()}\n"' + f"access_all = {str(self.access_all).lower()}\n" + ) + + result = result + "groups = [" + ", ".join(json.dumps(g) for g in sorted(self.groups)) + "]" + + return result + + +class GroupMember(NamedTuple): + member_id: str + member_name: str + group_name: str + + def get_id(self) -> str: + # Our generic differ has the ability to turn add/removes into changes + # for pairs with the same id, but this does not apply to memberships, + # which do not themselves have an id, so the identity to group on is + # the value itself. + return f"{self.member_id}@{self.group_name}" + + def format_toml(self) -> str: + # Needed to satisfy Diffable, but not used in this case. + raise Exception( + "Group memberships are not expressed in toml, " + "please print the diffs in some other way." + ) + + +class Group(NamedTuple): + id: str + name: str + access_all: bool + + def get_id(self) -> str: + return self.id + + @staticmethod + def from_toml_dict(data: Dict[str, Any]) -> Group: + access_all: bool = False + if "access_all" in data: + access_all = data["access_all"] + + return Group( + id=data["group_id"], + name=data["group_name"], + access_all=access_all, + ) + + def format_toml(self) -> str: + lines = [ + "[[group]]", + f'group_id = "{self.id}"', + f'group_name = "{self.name}"', + f'access_all = "{str(self.access_all).lower()}"', + ] + return "\n".join(lines) + + +class MemberCollectionAccess(NamedTuple): + member_name: str + access: GroupAccess + + @staticmethod + def from_toml_dict(data: Dict[str, Any]) -> MemberCollectionAccess: + return MemberCollectionAccess( + member_name=data["member_name"], + access=GroupAccess[data["access"].upper()], + ) + + def format_toml(self) -> str: + return ( + '{ member_name = "' + + self.member_name + + '", access = "' + + self.access.name.lower() + + '"}' + ) + + +class GroupCollectionAccess(NamedTuple): + group_name: str + access: GroupAccess + + @staticmethod + def from_toml_dict(data: Dict[str, Any]) -> GroupCollectionAccess: + return GroupCollectionAccess( + group_name=data["group_name"], + access=GroupAccess[data["access"].upper()], + ) + + def format_toml(self) -> str: + return ( + '{ group_name = "' + + self.group_name + + '", access = "' + + self.access.name.lower() + + '" }' + ) + + +class Collection(NamedTuple): + id: str + external_id: str + group_access: Tuple[GroupCollectionAccess, ...] + member_access: Tuple[MemberCollectionAccess, ...] + + def get_id(self) -> str: + return self.id + + @staticmethod + def from_toml_dict(data: Dict[str, Any]) -> Collection: + group_access: Tuple[GroupCollectionAccess, ...] = tuple() + if "group_access" in data: + group_access = tuple( + sorted( + GroupCollectionAccess.from_toml_dict(x) + for x in data["group_access"] + ) + ) + + member_access: Tuple[MemberCollectionAccess, ...] = tuple() + if "member_access" in data: + member_access = tuple( + sorted( + MemberCollectionAccess.from_toml_dict(x) + for x in data["member_access"] + ) + ) + + return Collection( + id=data["collection_id"], + external_id=data["external_id"], + group_access=group_access, + member_access=member_access, + ) + + def format_toml(self) -> str: + result = ( + "[[collection]]\n" + f'collection_id = "{self.id}"\n' + f'external_id = "{self.external_id}"\n' + ) + + member_access_lines = [ + " " + a.format_toml() for a in sorted(self.member_access) + ] + if len(member_access_lines) > 0: + result = ( + result + + "member_access = [\n" + + ",\n".join(member_access_lines) + + ",\n]\n" + ) + else: + result = result + "member_access = []\n" + + group_access_lines = [ + " " + a.format_toml() for a in sorted(self.group_access) + ] + if len(group_access_lines) > 0: + result = ( + result + + "group_access = [\n" + + ",\n".join(group_access_lines) + + ",\n]" + ) + else: + result = result + "group_access = []" + + return result + + +class BitwardenClient(NamedTuple): + connection: HTTPSConnection + bearer_token: str + + @staticmethod + def new(client_id: str, client_secret: str) -> BitwardenClient: + response = requests.post( + "https://identity.bitwarden.com/connect/token", + headers={"Content-Type": "application/x-www-form-urlencoded"}, + data={ + "grant_type": "client_credentials", + "scope": "api.organization", + "Accept": "application/json", + }, + auth=(client_id, client_secret), + ) + bearer_token = response.json()["access_token"] + return BitwardenClient(HTTPSConnection("api.bitwarden.com"), bearer_token) + + def _http_get(self, url: str) -> HTTPResponse: + self.connection.request( + method="GET", + url=url, + headers={ + "Accept": "application/json", + "Authorization": f"Bearer {self.bearer_token}", + }, + ) + + return self.connection.getresponse() + + def get_groups(self) -> Iterable[Group]: + groups = json.load(self._http_get(f"/public/groups")) + for group in groups["data"]: + yield Group( + id=group["id"], + name=group["name"], + access_all=group["accessAll"], + ) + + def get_collections( + self, + org_members: Dict[str, Member], + collections_members: Dict[str, List[MemberCollectionAccess]], + ) -> Iterable[Collection]: + collections = json.load(self._http_get(f"/public/collections")) + + for collection in collections["data"]: + group_accesses: Tuple[GroupCollectionAccess, ...] = tuple() + member_accesses: Tuple[MemberCollectionAccess, ...] = tuple() + collection_id = collection["id"] + + collection_data = json.load( + self._http_get(f"/public/collections/{collection_id}") + ) + + group_collection_accesses: List[GroupCollectionAccess] = [] + + for group in collection_data["groups"]: + access = self.map_access(readonly=group["readOnly"]) + group_id = group["id"] + group_collection_accesses.append( + GroupCollectionAccess( + group_name=json.load( + self._http_get(f"/public/groups/{group_id}") + )["name"], + access=access, + ) + ) + + group_accesses = tuple(sorted(group_collection_accesses)) + + if collection_id in collections_members: + member_accesses = tuple(sorted(collections_members[collection_id])) + + yield Collection( + id=collection["id"], + external_id=collection["externalId"], + member_access=member_accesses, + group_access=group_accesses, + ) + + def get_group_members( + self, group_id: str, group_name: str + ) -> Iterable[GroupMember]: + members = json.load(self._http_get(f"/public/groups/{group_id}/member-ids")) + + for member in members: + member = json.load(self._http_get(f"/public/members/{member}")) + yield GroupMember( + member_id=member["id"], + member_name=member["name"], + group_name=group_name, + ) + + def set_member_type(self, type_id: int) -> MemberType: + int_to_member_type: Dict[int, MemberType] = { + variant.value: variant for variant in MemberType + } + + return MemberType(int_to_member_type[type_id]) + + def get_members( + self, member_groups: Dict[str, List[str]] + ) -> Tuple[List[Member], Dict[str, List[MemberCollectionAccess]]]: + data = self._http_get(f"/public/members") + members = json.load(data) + + members_result: List[Member] = [] + collection_access: Dict[str, List[MemberCollectionAccess]] = defaultdict( + lambda: [] + ) + groups: Tuple[str, ...] = tuple() + + for member in members["data"]: + type = self.set_member_type(member["type"]) + groups = tuple(sorted(member_groups[member["id"]])) + m = Member( + id=member["id"], + name=member["name"], + email=member["email"], + type=type, + access_all=member["accessAll"], + groups=groups, + ) + members_result.append(m) + + collections = json.load(self._http_get(f"/public/members/{member['id']}"))[ + "collections" + ] + if type != MemberType.OWNER and type != MemberType.ADMIN: + for collection in collections: + access = self.map_access(readonly=collection["readOnly"]) + + collection_access[collection["id"]].append( + MemberCollectionAccess( + member_name=member["name"], access=access + ) + ) + + return members_result, collection_access + + def map_access(self, *, readonly: bool) -> GroupAccess: + if readonly == True: + return GroupAccess["READONLY"] + else: + return GroupAccess["WRITE"] + + +class Configuration(NamedTuple): + collection: Set[Collection] + members: Set[Member] + groups: Set[Group] + group_memberships: Set[GroupMember] + + @staticmethod + def from_toml_dict(data: Dict[str, Any]) -> Configuration: + collection = {Collection.from_toml_dict(c) for c in data["collection"]} + members = {Member.from_toml_dict(m) for m in data["member"]} + groups = {Group.from_toml_dict(m) for m in data["group"]} + group_memberships = { + GroupMember( + member_id=member["member_id"], + member_name=member["member_name"], + group_name=group, + ) + for member in data["member"] + for group in member.get("groups", []) + } + return Configuration( + collection=collection, + members=members, + groups=groups, + group_memberships=group_memberships, + ) + + @staticmethod + def from_toml_file(fname: str) -> Configuration: + with open(fname, "rb") as f: + data = tomllib.load(f) + return Configuration.from_toml_dict(data) + + +def print_indented(lines: str) -> None: + """Print the input indented by two spaces.""" + for line in lines.splitlines(): + print(f" {line}") + + +def print_simple_diff(actual: str, target: str) -> None: + """ + Print a line-based diff of the two strings, without abbreviating large + chunks of identical lines like a standard unified diff would do. + """ + lines_actual = actual.splitlines() + lines_target = target.splitlines() + line_diff = SequenceMatcher(None, lines_actual, lines_target) + for tag, i1, i2, j1, j2 in line_diff.get_opcodes(): + if tag == "equal": + for line in lines_actual[i1:i2]: + print(" " + line) + elif tag == "replace": + for line in lines_actual[i1:i2]: + print("- " + line) + for line in lines_target[j1:j2]: + print("+ " + line) + elif tag == "delete": + for line in lines_actual[i1:i2]: + print("- " + line) + elif tag == "insert": + for line in lines_target[j1:j2]: + print("+ " + line) + else: + raise Exception("Invalid diff operation.") + + +T = TypeVar("T", bound="Diffable") + + +class Diffable(Protocol): + def __eq__(self: T, other: Any) -> bool: + ... + + def __lt__(self: T, other: T) -> bool: + ... + + def get_id(self: T) -> int | str: + ... + + def format_toml(self: T) -> str: + ... + + +@dataclass(frozen=True) +class DiffEntry(Generic[T]): + actual: T + target: T + + +@dataclass(frozen=True) +class Diff(Generic[T]): + to_add: List[T] + to_remove: List[T] + to_change: List[DiffEntry[T]] + + @staticmethod + def new(target: Set[T], actual: Set[T]) -> Diff[T]: + # A very basic diff is to just look at everything that needs to be added + # and removed, without deeper inspection. + to_add = sorted(target - actual) + to_remove = sorted(actual - target) + + # However, that produces a very rough diff. If we change e.g. the + # description of a group, that would show up as deleting one group and + # adding back another which is almost the same, except with a different + # description. So to improve on this a bit, if entries have ids, and + # the same id needs to be both added and removed, then instead we record + # that as a "change". + to_add_by_id = {x.get_id(): x for x in to_add} + to_remove_by_id = {x.get_id(): x for x in to_remove} + to_change = [ + DiffEntry( + actual=to_remove_by_id[id_], + target=to_add_by_id[id_], + ) + for id_ in sorted(to_add_by_id.keys() & to_remove_by_id.keys()) + ] + + # Now that we turned some add/remove pairs into a "change", we should no + # longer count those as added/removed. + for change in to_change: + to_add.remove(change.target) + to_remove.remove(change.actual) + + return Diff( + to_add=to_add, + to_remove=to_remove, + to_change=to_change, + ) + + def print_diff( + self, + header_to_add: str, + header_to_remove: str, + header_to_change: str, + ) -> None: + if len(self.to_add) > 0: + print(header_to_add) + for entry in self.to_add: + print() + print_indented(entry.format_toml()) + + print() + + if len(self.to_remove) > 0: + print(header_to_remove) + for entry in self.to_remove: + print() + print_indented(entry.format_toml()) + + print() + + if len(self.to_change) > 0: + print(header_to_change) + for change in self.to_change: + print() + print_simple_diff( + actual=change.actual.format_toml(), + target=change.target.format_toml(), + ) + + print() + + +def main() -> None: + if "--help" in sys.argv: + print(__doc__) + sys.exit(0) + + client_id = os.getenv("BITWARDEN_CLIENT_ID") + if client_id is None: + print("Expected BITWARDEN_CLIENT_ID environment variable to be set.") + print("See also --help.") + sys.exit(1) + + client_secret = os.getenv("BITWARDEN_CLIENT_SECRET") + if client_secret is None: + print("Expected BITWARDEN_CLIENT_SECRET environment variable to be set.") + print("See also --help.") + sys.exit(1) + + if len(sys.argv) < 2: + print("Expected file name of config toml as first argument.") + print("See also --help.") + sys.exit(1) + + target_fname = sys.argv[1] + target = Configuration.from_toml_file(target_fname) + client = BitwardenClient.new(client_id, client_secret) + + current_groups = set(client.get_groups()) + groups_diff = Diff.new(target=target.groups, actual=current_groups) + groups_diff.print_diff( + f"The following groups specified in {target_fname} are not present on Bitwarden:", + f"The following groups are not specified in {target_fname} but are present on Bitwarden:", + f"The following groups on Bitwarden need to be changed to match {target_fname}:", + ) + + # For all the groups which we want to exist, and which do actually exist, + # compare their members. + target_groups_names = {group.name for group in target.groups} + existing_desired_groups = [ + group for group in current_groups if group.name in target_groups_names + ] + + member_groups: Dict[str, List[str]] = defaultdict(lambda: []) + + for group in existing_desired_groups: + group_members = set(client.get_group_members(group.id, group.name)) + + # Create a Dict mapping member ids to the groups they are a member of. + for group_member in group_members: + member_groups[group_member.member_id].append(group.name) + + current_members, members_access = client.get_members(member_groups) + current_members_set = set(current_members) + members_diff = Diff.new(target=target.members, actual=current_members_set) + members_diff.print_diff( + f"The following members are specified in {target_fname} but not a member of the Bitwarden organization:", + f"The following members are not specified in {target_fname} but are a member of the Bitwarden organization:", + f"The following members on Bitwarden need to be changed to match {target_fname}:", + ) + + org_members: Dict[str, Member] = {member.id: member for member in current_members} + current_collections = set(client.get_collections(org_members, members_access)) + collections_diff = Diff.new(target=target.collection, actual=current_collections) + collections_diff.print_diff( + f"The following collections are specified in {target_fname} but not a member of the Bitwarden organization:", + f"The following collections are not specified in {target_fname} but are a member of the Bitwarden organization:", + f"The following collections on Bitwarden need to be changed to match {target_fname}:", + ) + + +if __name__ == "__main__": + main() diff --git a/main.py b/main.py index b086439..6f9f134 100755 --- a/main.py +++ b/main.py @@ -9,7 +9,7 @@ """ Github Access Manager -Comare the current state of a GitHub organization against a declarative +Compare the current state of a GitHub organization against a declarative specification of the target state. Currently this tool only points out the differences, it does not automatically reconcile them for you. From 0ea4a428d8581b7870f9666636e4303f538a943d Mon Sep 17 00:00:00 2001 From: Ruud van Asseldonk Date: Thu, 30 Mar 2023 14:53:44 +0200 Subject: [PATCH 03/11] Remove dependency on "requests" We can do the initial authentication request just fine using the standard library. --- bitwarden_access_manager.py | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/bitwarden_access_manager.py b/bitwarden_access_manager.py index 94a91c1..a317ca7 100755 --- a/bitwarden_access_manager.py +++ b/bitwarden_access_manager.py @@ -85,10 +85,10 @@ from difflib import SequenceMatcher from enum import Enum from http.client import HTTPSConnection, HTTPResponse +from urllib.parse import urlencode import json import os -import requests import sys import tomllib @@ -334,17 +334,22 @@ class BitwardenClient(NamedTuple): @staticmethod def new(client_id: str, client_secret: str) -> BitwardenClient: - response = requests.post( - "https://identity.bitwarden.com/connect/token", - headers={"Content-Type": "application/x-www-form-urlencoded"}, - data={ + connection = HTTPSConnection("identity.bitwarden.com") + connection.request( + method="POST", + url="/connect/token", + headers={ + "Content-Type": "application/x-www-form-urlencoded", + }, + body=urlencode({ "grant_type": "client_credentials", "scope": "api.organization", - "Accept": "application/json", - }, - auth=(client_id, client_secret), + "client_id": client_id, + "client_secret": client_secret, + }), ) - bearer_token = response.json()["access_token"] + auth_response: Dict[str, Any] = json.load(connection.getresponse()) + bearer_token = auth_response["access_token"] return BitwardenClient(HTTPSConnection("api.bitwarden.com"), bearer_token) def _http_get(self, url: str) -> HTTPResponse: From 5acc67b9de663c220a84314a3fa3b4bd16946ea8 Mon Sep 17 00:00:00 2001 From: Ruud van Asseldonk Date: Thu, 30 Mar 2023 15:44:02 +0200 Subject: [PATCH 04/11] Ensure we can load member group access Sorting does not work because GroupAccess does not implement __lt__, we can sidestep the issue by providing a comparison key and sorting only by name. --- bitwarden_access_manager.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/bitwarden_access_manager.py b/bitwarden_access_manager.py index a317ca7..5d189c8 100755 --- a/bitwarden_access_manager.py +++ b/bitwarden_access_manager.py @@ -405,8 +405,10 @@ def get_collections( group_accesses = tuple(sorted(group_collection_accesses)) - if collection_id in collections_members: - member_accesses = tuple(sorted(collections_members[collection_id])) + member_accesses = tuple(sorted( + collections_members.get(collection_id, []), + key=lambda ma: ma.member_name, + )) yield Collection( id=collection["id"], From 2acbe679e4617fb821ecb6a76af8e5b149dbf4e5 Mon Sep 17 00:00:00 2001 From: Ruud van Asseldonk Date: Thu, 30 Mar 2023 15:46:19 +0200 Subject: [PATCH 05/11] Make group access_all a bool in toml, not str --- bitwarden_access_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bitwarden_access_manager.py b/bitwarden_access_manager.py index 5d189c8..364b9c6 100755 --- a/bitwarden_access_manager.py +++ b/bitwarden_access_manager.py @@ -209,7 +209,7 @@ def format_toml(self) -> str: "[[group]]", f'group_id = "{self.id}"', f'group_name = "{self.name}"', - f'access_all = "{str(self.access_all).lower()}"', + f'access_all = {str(self.access_all).lower()}', ] return "\n".join(lines) From 09a82c2c35c0e1d8f21ca9ba84f132a77fcbac6a Mon Sep 17 00:00:00 2001 From: Ruud van Asseldonk Date: Thu, 30 Mar 2023 15:51:17 +0200 Subject: [PATCH 06/11] Print status to stderr while fetching --- bitwarden_access_manager.py | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/bitwarden_access_manager.py b/bitwarden_access_manager.py index 364b9c6..f23e953 100755 --- a/bitwarden_access_manager.py +++ b/bitwarden_access_manager.py @@ -328,6 +328,17 @@ def format_toml(self) -> str: return result +def print_status_stderr(status: str) -> None: + """ + On stderr, clear the current line with an ANSI escape code, jump back to + the start of the line, and print the status, without a newline. This means + that subsequent updates will overwrite each other (if nothing gets printed + to stdout in the meantime). + """ + clear_line = "\x1b[2K\r" + print(f"{clear_line}{status}", end="", file=sys.stderr) + + class BitwardenClient(NamedTuple): connection: HTTPSConnection bearer_token: str @@ -449,7 +460,8 @@ def get_members( ) groups: Tuple[str, ...] = tuple() - for member in members["data"]: + for i, member in enumerate(members["data"]): + print_status_stderr(f"[{i+1}/{len(members['data'])}] Getting member {member['name']} ...") type = self.set_member_type(member["type"]) groups = tuple(sorted(member_groups[member["id"]])) m = Member( @@ -475,6 +487,9 @@ def get_members( ) ) + # Clear the member fetching status updates. + print_status_stderr("") + return members_result, collection_access def map_access(self, *, readonly: bool) -> GroupAccess: @@ -692,13 +707,16 @@ def main() -> None: member_groups: Dict[str, List[str]] = defaultdict(lambda: []) - for group in existing_desired_groups: + for i, group in enumerate(existing_desired_groups): + print_status_stderr(f"[{i+1}/{len(existing_desired_groups)}] Getting group {group.name} ...") group_members = set(client.get_group_members(group.id, group.name)) # Create a Dict mapping member ids to the groups they are a member of. for group_member in group_members: member_groups[group_member.member_id].append(group.name) + print_status_stderr("") + current_members, members_access = client.get_members(member_groups) current_members_set = set(current_members) members_diff = Diff.new(target=target.members, actual=current_members_set) From 4caaa89cff523f154fd95aa61abeb680322814d0 Mon Sep 17 00:00:00 2001 From: Ruud van Asseldonk Date: Thu, 30 Mar 2023 15:58:12 +0200 Subject: [PATCH 07/11] Make member name optional, fix formatting of member Members on Bitwarden don't necessarily have a name. Maybe we should omit it entirely from the config, because it's not how we identify users anyway. --- bitwarden_access_manager.py | 44 +++++++++++++++++++++++-------------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/bitwarden_access_manager.py b/bitwarden_access_manager.py index f23e953..8d81ee7 100755 --- a/bitwarden_access_manager.py +++ b/bitwarden_access_manager.py @@ -95,13 +95,14 @@ from typing import ( Any, Dict, - List, Generic, - Tuple, - NamedTuple, - Set, Iterable, + List, + NamedTuple, + Optional, Protocol, + Set, + Tuple, TypeVar, ) @@ -121,7 +122,7 @@ class GroupAccess(Enum): class Member(NamedTuple): id: str - name: str + name: Optional[str] email: str type: MemberType access_all: bool @@ -137,12 +138,14 @@ def from_toml_dict(data: Dict[str, Any]) -> Member: if "access_all" in data: access_all = data["access_all"] + if "groups" in data: groups = data["groups"] groups = tuple(sorted(data["groups"])) + return Member( id=data["member_id"], - name=data["member_name"], + name=data.get("member_name"), email=data["email"], type=MemberType[data["type"].upper()], access_all=access_all, @@ -150,18 +153,25 @@ def from_toml_dict(data: Dict[str, Any]) -> Member: ) def format_toml(self) -> str: - result = ( - "[[member]]\n" - f'member_id = "{self.id}"\n' - f'member_name = "{self.name}"\n' - f'email = "{self.email}"\n' - f'type = "{self.type.name.lower()}\n"' - f"access_all = {str(self.access_all).lower()}\n" - ) + lines = [ + "[[member]]\n", + f'member_id = "{self.id}"\n', + ] - result = result + "groups = [" + ", ".join(json.dumps(g) for g in sorted(self.groups)) + "]" + if self.name is not None: + lines.append(f'member_name = "{self.name}"\n') - return result + lines += [ + f'email = "{self.email}"\n', + f'type = "{self.type.name.lower()}"\n', + ] + + if self.access_all: + lines.append("access_all = true\n") + + lines.append("groups = [" + ", ".join(json.dumps(g) for g in sorted(self.groups)) + "]\n") + + return "".join(lines) class GroupMember(NamedTuple): @@ -461,7 +471,7 @@ def get_members( groups: Tuple[str, ...] = tuple() for i, member in enumerate(members["data"]): - print_status_stderr(f"[{i+1}/{len(members['data'])}] Getting member {member['name']} ...") + print_status_stderr(f"[{i+1}/{len(members['data'])}] Getting member {member['email']} ...") type = self.set_member_type(member["type"]) groups = tuple(sorted(member_groups[member["id"]])) m = Member( From b87000f7455b8256951610af49a5676972a6388e Mon Sep 17 00:00:00 2001 From: Ruud van Asseldonk Date: Thu, 30 Mar 2023 18:48:54 +0200 Subject: [PATCH 08/11] Identify collection members by email As it turns out, members do not always have a name, sometimes it's null. We can use the email instead. --- bitwarden_access_manager.py | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/bitwarden_access_manager.py b/bitwarden_access_manager.py index 8d81ee7..5d276c9 100755 --- a/bitwarden_access_manager.py +++ b/bitwarden_access_manager.py @@ -60,7 +60,7 @@ collection_id = "50351c20-55b4-4ee8-bbe0-afaf00a8f25d" external_id = "collection1" member_access = [ - { member_name = "yan", access = "write"}, + { email = "yan@chorus.one", access = "write"}, ] group_access = [ @@ -72,7 +72,7 @@ collection_id = "8e69ce49-85ae-4e09-a52c-afaf00a90a3f" external_id = "" member_access = [ - { member_name = "yan", access = "write" }, + { email = "yan@chorus.one", access = "write" }, ] group_access = [ { group_name = "group1", access = "readonly" }, @@ -225,23 +225,21 @@ def format_toml(self) -> str: class MemberCollectionAccess(NamedTuple): - member_name: str + # We identify members by email, name is not guaranteed to be present. + email: str access: GroupAccess @staticmethod def from_toml_dict(data: Dict[str, Any]) -> MemberCollectionAccess: return MemberCollectionAccess( - member_name=data["member_name"], + email=data["email"], access=GroupAccess[data["access"].upper()], ) def format_toml(self) -> str: return ( - '{ member_name = "' - + self.member_name - + '", access = "' - + self.access.name.lower() - + '"}' + f'{{ email = {json.dumps(self.email)}, ' + f'access = "{self.access.name.lower()}" }}' ) @@ -428,7 +426,7 @@ def get_collections( member_accesses = tuple(sorted( collections_members.get(collection_id, []), - key=lambda ma: ma.member_name, + key=lambda ma: ma.email, )) yield Collection( @@ -490,10 +488,10 @@ def get_members( if type != MemberType.OWNER and type != MemberType.ADMIN: for collection in collections: access = self.map_access(readonly=collection["readOnly"]) - collection_access[collection["id"]].append( MemberCollectionAccess( - member_name=member["name"], access=access + email=member["email"], + access=access, ) ) From ec0cd1a930d9c918accd1af72be6390c2aada941 Mon Sep 17 00:00:00 2001 From: Ruud van Asseldonk Date: Thu, 30 Mar 2023 19:17:26 +0200 Subject: [PATCH 09/11] Delete member_name from GroupMember It is not needed, and sometimes not present. --- bitwarden_access_manager.py | 41 ++++++++++++++++--------------------- 1 file changed, 18 insertions(+), 23 deletions(-) diff --git a/bitwarden_access_manager.py b/bitwarden_access_manager.py index 5d276c9..d1a5ff1 100755 --- a/bitwarden_access_manager.py +++ b/bitwarden_access_manager.py @@ -176,7 +176,6 @@ def format_toml(self) -> str: class GroupMember(NamedTuple): member_id: str - member_name: str group_name: str def get_id(self) -> str: @@ -301,39 +300,37 @@ def from_toml_dict(data: Dict[str, Any]) -> Collection: ) def format_toml(self) -> str: - result = ( - "[[collection]]\n" - f'collection_id = "{self.id}"\n' - f'external_id = "{self.external_id}"\n' - ) + lines = [ + "[[collection]]\n", + f'collection_id = "{self.id}"\n', + f'external_id = "{self.external_id}"\n', + ] member_access_lines = [ " " + a.format_toml() for a in sorted(self.member_access) ] if len(member_access_lines) > 0: - result = ( - result - + "member_access = [\n" - + ",\n".join(member_access_lines) - + ",\n]\n" - ) + lines.extend([ + "member_access = [\n", + ",\n".join(member_access_lines), + ",\n]\n", + ]) else: - result = result + "member_access = []\n" + lines.append("member_access = []\n") group_access_lines = [ " " + a.format_toml() for a in sorted(self.group_access) ] if len(group_access_lines) > 0: - result = ( - result - + "group_access = [\n" - + ",\n".join(group_access_lines) - + ",\n]" - ) + lines.extend([ + "group_access = [\n", + ",\n".join(group_access_lines), + ",\n]", + ]) else: - result = result + "group_access = []" + lines.append("group_access = []") - return result + return "".join(lines) def print_status_stderr(status: str) -> None: @@ -445,7 +442,6 @@ def get_group_members( member = json.load(self._http_get(f"/public/members/{member}")) yield GroupMember( member_id=member["id"], - member_name=member["name"], group_name=group_name, ) @@ -521,7 +517,6 @@ def from_toml_dict(data: Dict[str, Any]) -> Configuration: group_memberships = { GroupMember( member_id=member["member_id"], - member_name=member["member_name"], group_name=group, ) for member in data["member"] From ecc427bf724aca23622b8b897259a9380f4c764c Mon Sep 17 00:00:00 2001 From: Ruud van Asseldonk Date: Thu, 30 Mar 2023 19:23:58 +0200 Subject: [PATCH 10/11] Make collection external_id optional It is optional in Bitwarden, so it needs to be optional here. Also print progress when fetching collections. --- bitwarden_access_manager.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/bitwarden_access_manager.py b/bitwarden_access_manager.py index d1a5ff1..24c68d2 100755 --- a/bitwarden_access_manager.py +++ b/bitwarden_access_manager.py @@ -29,8 +29,11 @@ * The access_all key for members and groups is optional, default is false. * The member_access key for a collection only list members with direct access -to collection. It omits direct access for members with the role -owners or admins because they have implicit access to all collections. + to the collection. It omits direct access for members with the owner or admin + role, because they have implicit access to all collections. +* For collections, external_id is optional, but as the name of the group is + encrypted, this is the only way for this script to identify collections by a + meaningful name. [[member]] member_id = "2564c11f-fc1b-4ec7-aa0b-afaf00a9e4a4" @@ -265,7 +268,7 @@ def format_toml(self) -> str: class Collection(NamedTuple): id: str - external_id: str + external_id: Optional[str] group_access: Tuple[GroupCollectionAccess, ...] member_access: Tuple[MemberCollectionAccess, ...] @@ -294,7 +297,7 @@ def from_toml_dict(data: Dict[str, Any]) -> Collection: return Collection( id=data["collection_id"], - external_id=data["external_id"], + external_id=data.get("external_id"), group_access=group_access, member_access=member_access, ) @@ -303,9 +306,11 @@ def format_toml(self) -> str: lines = [ "[[collection]]\n", f'collection_id = "{self.id}"\n', - f'external_id = "{self.external_id}"\n', ] + if self.external_id is not None: + lines.append(f'external_id = {json.dumps(self.external_id)}\n') + member_access_lines = [ " " + a.format_toml() for a in sorted(self.member_access) ] @@ -396,7 +401,8 @@ def get_collections( ) -> Iterable[Collection]: collections = json.load(self._http_get(f"/public/collections")) - for collection in collections["data"]: + for i, collection in enumerate(collections["data"]): + print_status_stderr(f"[{i+1}/{len(collections['data'])}] Getting collection ...") group_accesses: Tuple[GroupCollectionAccess, ...] = tuple() member_accesses: Tuple[MemberCollectionAccess, ...] = tuple() collection_id = collection["id"] @@ -433,6 +439,8 @@ def get_collections( group_access=group_accesses, ) + print_status_stderr("") + def get_group_members( self, group_id: str, group_name: str ) -> Iterable[GroupMember]: From bfab93e493d8fdb6ae2f5c8ad6f951fc8809ec9c Mon Sep 17 00:00:00 2001 From: Ruud van Asseldonk Date: Thu, 30 Mar 2023 19:34:53 +0200 Subject: [PATCH 11/11] Run autoformatter --- bitwarden_access_manager.py | 74 ++++++++++++++++++++++--------------- 1 file changed, 44 insertions(+), 30 deletions(-) diff --git a/bitwarden_access_manager.py b/bitwarden_access_manager.py index 24c68d2..78b9af2 100755 --- a/bitwarden_access_manager.py +++ b/bitwarden_access_manager.py @@ -172,7 +172,9 @@ def format_toml(self) -> str: if self.access_all: lines.append("access_all = true\n") - lines.append("groups = [" + ", ".join(json.dumps(g) for g in sorted(self.groups)) + "]\n") + lines.append( + "groups = [" + ", ".join(json.dumps(g) for g in sorted(self.groups)) + "]\n" + ) return "".join(lines) @@ -221,7 +223,7 @@ def format_toml(self) -> str: "[[group]]", f'group_id = "{self.id}"', f'group_name = "{self.name}"', - f'access_all = {str(self.access_all).lower()}', + f"access_all = {str(self.access_all).lower()}", ] return "\n".join(lines) @@ -240,7 +242,7 @@ def from_toml_dict(data: Dict[str, Any]) -> MemberCollectionAccess: def format_toml(self) -> str: return ( - f'{{ email = {json.dumps(self.email)}, ' + f"{{ email = {json.dumps(self.email)}, " f'access = "{self.access.name.lower()}" }}' ) @@ -309,29 +311,31 @@ def format_toml(self) -> str: ] if self.external_id is not None: - lines.append(f'external_id = {json.dumps(self.external_id)}\n') + lines.append(f"external_id = {json.dumps(self.external_id)}\n") member_access_lines = [ " " + a.format_toml() for a in sorted(self.member_access) ] if len(member_access_lines) > 0: - lines.extend([ - "member_access = [\n", - ",\n".join(member_access_lines), - ",\n]\n", - ]) + lines.extend( + [ + "member_access = [\n", + ",\n".join(member_access_lines), + ",\n]\n", + ] + ) else: lines.append("member_access = []\n") - group_access_lines = [ - " " + a.format_toml() for a in sorted(self.group_access) - ] + group_access_lines = [" " + a.format_toml() for a in sorted(self.group_access)] if len(group_access_lines) > 0: - lines.extend([ - "group_access = [\n", - ",\n".join(group_access_lines), - ",\n]", - ]) + lines.extend( + [ + "group_access = [\n", + ",\n".join(group_access_lines), + ",\n]", + ] + ) else: lines.append("group_access = []") @@ -362,12 +366,14 @@ def new(client_id: str, client_secret: str) -> BitwardenClient: headers={ "Content-Type": "application/x-www-form-urlencoded", }, - body=urlencode({ - "grant_type": "client_credentials", - "scope": "api.organization", - "client_id": client_id, - "client_secret": client_secret, - }), + body=urlencode( + { + "grant_type": "client_credentials", + "scope": "api.organization", + "client_id": client_id, + "client_secret": client_secret, + } + ), ) auth_response: Dict[str, Any] = json.load(connection.getresponse()) bearer_token = auth_response["access_token"] @@ -402,7 +408,9 @@ def get_collections( collections = json.load(self._http_get(f"/public/collections")) for i, collection in enumerate(collections["data"]): - print_status_stderr(f"[{i+1}/{len(collections['data'])}] Getting collection ...") + print_status_stderr( + f"[{i+1}/{len(collections['data'])}] Getting collection ..." + ) group_accesses: Tuple[GroupCollectionAccess, ...] = tuple() member_accesses: Tuple[MemberCollectionAccess, ...] = tuple() collection_id = collection["id"] @@ -427,10 +435,12 @@ def get_collections( group_accesses = tuple(sorted(group_collection_accesses)) - member_accesses = tuple(sorted( - collections_members.get(collection_id, []), - key=lambda ma: ma.email, - )) + member_accesses = tuple( + sorted( + collections_members.get(collection_id, []), + key=lambda ma: ma.email, + ) + ) yield Collection( id=collection["id"], @@ -473,7 +483,9 @@ def get_members( groups: Tuple[str, ...] = tuple() for i, member in enumerate(members["data"]): - print_status_stderr(f"[{i+1}/{len(members['data'])}] Getting member {member['email']} ...") + print_status_stderr( + f"[{i+1}/{len(members['data'])}] Getting member {member['email']} ..." + ) type = self.set_member_type(member["type"]) groups = tuple(sorted(member_groups[member["id"]])) m = Member( @@ -719,7 +731,9 @@ def main() -> None: member_groups: Dict[str, List[str]] = defaultdict(lambda: []) for i, group in enumerate(existing_desired_groups): - print_status_stderr(f"[{i+1}/{len(existing_desired_groups)}] Getting group {group.name} ...") + print_status_stderr( + f"[{i+1}/{len(existing_desired_groups)}] Getting group {group.name} ..." + ) group_members = set(client.get_group_members(group.id, group.name)) # Create a Dict mapping member ids to the groups they are a member of.