Source code for gssapi.creds

import typing as t

from gssapi.raw import creds as rcreds
from gssapi.raw import named_tuples as tuples
from gssapi.raw import names as rnames
from gssapi.raw import oids as roids
from gssapi._utils import import_gssapi_extension, _encode_dict

from gssapi import names

rcred_imp_exp = import_gssapi_extension('cred_imp_exp')
rcred_s4u = import_gssapi_extension('s4u')
rcred_cred_store = import_gssapi_extension('cred_store')
rcred_rfc5588 = import_gssapi_extension('rfc5588')


[docs]class Credentials(rcreds.Creds): """GSSAPI Credentials This class represents a set of GSSAPI credentials which may be used with and/or returned by other GSSAPI methods. It inherits from the low-level GSSAPI :class:`~gssapi.raw.creds.Creds` class, and thus may used with both low-level and high-level API methods. If your implementation of GSSAPI supports the credentials import-export extension, you may pickle and unpickle this object. The constructor either acquires or imports a set of GSSAPI credentials. If the `base` argument is used, an existing :class:`~gssapi.raw.creds.Creds` object from the low-level API is converted into a high-level object. If the `token` argument is used, the credentials are imported using the token, if the credentials import-export extension is supported (:requires-ext:`cred_imp_exp`). Otherwise, the credentials are acquired as per the :meth:`acquire` method. Raises: ~gssapi.exceptions.BadMechanismError ~gssapi.exceptions.BadNameTypeError ~gssapi.exceptions.BadNameError ~gssapi.exceptions.ExpiredCredentialsError ~gssapi.exceptions.MissingCredentialsError """ __slots__ = () def __new__( cls, base: t.Optional[rcreds.Creds] = None, token: t.Optional[bytes] = None, name: t.Optional[rnames.Name] = None, lifetime: t.Optional[int] = None, mechs: t.Optional[t.Iterable[roids.OID]] = None, usage: str = 'both', store: t.Optional[ t.Dict[t.Union[bytes, str], t.Union[bytes, str]] ] = None, ) -> "Credentials": # TODO(directxman12): this is missing support for password # (non-RFC method) if base is not None: base_creds = base elif token is not None: if rcred_imp_exp is None: raise NotImplementedError("Your GSSAPI implementation does " "not have support for importing and " "exporting creditials") base_creds = rcred_imp_exp.import_cred(token) else: res = cls.acquire(name, lifetime, mechs, usage, store=store) base_creds = res.creds return t.cast("Credentials", super(Credentials, cls).__new__(cls, base_creds)) @property def name(self) -> rnames.Name: """Get the name associated with these credentials""" return t.cast(rnames.Name, self.inquire(name=True, lifetime=False, usage=False, mechs=False).name) @property def lifetime(self) -> int: """Get the remaining lifetime of these credentials, in seconds""" return t.cast(int, self.inquire(name=False, lifetime=True, usage=False, mechs=False).lifetime) @property def mechs(self) -> t.Set[roids.OID]: """Get the mechanisms for these credentials""" return t.cast(t.Set[roids.OID], self.inquire(name=False, lifetime=False, usage=False, mechs=True).mechs) @property def usage(self) -> str: """Get the usage (initiate, accept, or both) of these credentials""" return t.cast(str, self.inquire(name=False, lifetime=False, usage=True, mechs=False).usage)
[docs] @classmethod def acquire( cls, name: t.Optional[rnames.Name] = None, lifetime: t.Optional[int] = None, mechs: t.Optional[t.Iterable[roids.OID]] = None, usage: str = 'both', store: t.Optional[ t.Dict[t.Union[bytes, str], t.Union[bytes, str]] ] = None, ) -> tuples.AcquireCredResult: """Acquire GSSAPI credentials This method acquires credentials. If the `store` argument is used, the credentials will be acquired from the given credential store (if supported). Otherwise, the credentials are acquired from the default store. The credential store information is a dictionary containing mechanisms-specific keys and values pointing to a credential store or stores. Using a non-default store requires support for the credentials store extension. Args: name (~gssapi.names.Name): the name associated with the credentials, or None for the default name lifetime (int): the desired lifetime of the credentials in seconds, or None for indefinite mechs (list): the desired :class:`MechType` OIDs to be used with the credentials, or None for the default set usage (str): the usage for the credentials -- either 'both', 'initiate', or 'accept' store (dict): the credential store information pointing to the credential store from which to acquire the credentials, or None for the default store (:requires-ext:`cred_store`) Returns: AcquireCredResult: the acquired credentials and information about them Raises: ~gssapi.exceptions.BadMechanismError ~gssapi.exceptions.BadNameTypeError ~gssapi.exceptions.BadNameError ~gssapi.exceptions.ExpiredCredentialsError ~gssapi.exceptions.MissingCredentialsError """ if store is None: res = rcreds.acquire_cred(name, lifetime, mechs, usage) else: if rcred_cred_store is None: raise NotImplementedError("Your GSSAPI implementation does " "not have support for manipulating " "credential stores") b_store = _encode_dict(store) res = rcred_cred_store.acquire_cred_from(b_store, name, lifetime, mechs, usage) return tuples.AcquireCredResult(cls(base=res.creds), res.mechs, res.lifetime)
[docs] def store( self, store: t.Optional[ t.Dict[t.Union[bytes, str], t.Union[bytes, str]] ] = None, usage: str = 'both', mech: t.Optional[roids.OID] = None, overwrite: bool = False, set_default: bool = False, ) -> tuples.StoreCredResult: """Store these credentials into the given store This method stores the current credentials into the specified credentials store. If the default store is used, support for :rfc:`5588` is required. Otherwise, support for the credentials store extension is required. :requires-ext:`rfc5588` or :requires-ext:`cred_store` Args: store (dict): the store into which to store the credentials, or None for the default store. usage (str): the usage to store the credentials with -- either 'both', 'initiate', or 'accept' mech (~gssapi.OID): the :class:`MechType` to associate with the stored credentials overwrite (bool): whether or not to overwrite existing credentials stored with the same name, etc set_default (bool): whether or not to set these credentials as the default credentials for the given store. Returns: StoreCredResult: the results of the credential storing operation Raises: ~gssapi.exceptions.GSSError ~gssapi.exceptions.ExpiredCredentialsError ~gssapi.exceptions.MissingCredentialsError ~gssapi.exceptions.OperationUnavailableError ~gssapi.exceptions.DuplicateCredentialsElementError """ if store is None: if rcred_rfc5588 is None: raise NotImplementedError("Your GSSAPI implementation does " "not have support for RFC 5588") return rcred_rfc5588.store_cred(self, usage, mech, overwrite, set_default) else: if rcred_cred_store is None: raise NotImplementedError("Your GSSAPI implementation does " "not have support for manipulating " "credential stores directly") b_store = _encode_dict(store) return rcred_cred_store.store_cred_into(b_store, self, usage, mech, overwrite, set_default)
[docs] def impersonate( self, name: t.Optional[rnames.Name] = None, lifetime: t.Optional[int] = None, mechs: t.Optional[t.Iterable[roids.OID]] = None, usage: str = 'initiate', ) -> "Credentials": """Impersonate a name using the current credentials This method acquires credentials by impersonating another name using the current credentials. :requires-ext:`s4u` Args: name (~gssapi.names.Name): the name to impersonate lifetime (int): the desired lifetime of the new credentials in seconds, or None for indefinite mechs (list): the desired :class:`MechType` OIDs for the new credentials usage (str): the desired usage for the new credentials -- either 'both', 'initiate', or 'accept'. Note that some mechanisms may only support 'initiate'. Returns: Credentials: the new credentials impersonating the given name """ if rcred_s4u is None: raise NotImplementedError("Your GSSAPI implementation does not " "have support for S4U") res = rcred_s4u.acquire_cred_impersonate_name(self, name, lifetime, mechs, usage) return type(self)(base=res.creds)
[docs] def inquire( self, name: bool = True, lifetime: bool = True, usage: bool = True, mechs: bool = True, ) -> tuples.InquireCredResult: """Inspect these credentials for information This method inspects these credentials for information about them. Args: name (bool): get the name associated with the credentials lifetime (bool): get the remaining lifetime for the credentials usage (bool): get the usage for the credentials mechs (bool): get the mechanisms associated with the credentials Returns: InquireCredResult: the information about the credentials, with None used when the corresponding argument was False Raises: ~gssapi.exceptions.MissingCredentialsError ~gssapi.exceptions.InvalidCredentialsError ~gssapi.exceptions.ExpiredCredentialsError """ res = rcreds.inquire_cred(self, name, lifetime, usage, mechs) if res.name is not None: res_name = names.Name(res.name) else: res_name = None return tuples.InquireCredResult(res_name, res.lifetime, res.usage, res.mechs)
[docs] def inquire_by_mech( self, mech: roids.OID, name: bool = True, init_lifetime: bool = True, accept_lifetime: bool = True, usage: bool = True, ) -> tuples.InquireCredByMechResult: """Inspect these credentials for per-mechanism information This method inspects these credentials for per-mechanism information about them. Args: mech (~gssapi.OID): the mechanism for which to retrieve the information name (bool): get the name associated with the credentials init_lifetime (bool): get the remaining initiate lifetime for the credentials in seconds accept_lifetime (bool): get the remaining accept lifetime for the credentials in seconds usage (bool): get the usage for the credentials Returns: InquireCredByMechResult: the information about the credentials, with None used when the corresponding argument was False """ res = rcreds.inquire_cred_by_mech(self, mech, name, init_lifetime, accept_lifetime, usage) if res.name is not None: res_name = names.Name(res.name) else: res_name = None return tuples.InquireCredByMechResult(res_name, res.init_lifetime, res.accept_lifetime, res.usage)
[docs] def add( self, name: rnames.Name, mech: roids.OID, usage: str = 'both', init_lifetime: t.Optional[int] = None, accept_lifetime: t.Optional[int] = None, impersonator: t.Optional[rcreds.Creds] = None, store: t.Optional[ t.Dict[t.Union[bytes, str], t.Union[bytes, str]] ] = None, ) -> "Credentials": """Acquire more credentials to add to the current set This method works like :meth:`acquire`, except that it adds the acquired credentials for a single mechanism to a copy of the current set, instead of creating a new set for multiple mechanisms. Unlike :meth:`acquire`, you cannot pass None desired name or mechanism. If the `impersonator` argument is used, the credentials will impersonate the given name using the impersonator credentials (:requires-ext:`s4u`). If the `store` argument is used, the credentials will be acquired from the given credential store (:requires-ext:`cred_store`). Otherwise, the credentials are acquired from the default store. The credential store information is a dictionary containing mechanisms-specific keys and values pointing to a credential store or stores. Note that the `store` argument is not compatible with the `impersonator` argument. Args: name (~gssapi.names.Name): the name associated with the credentials mech (~gssapi.OID): the desired :class:`MechType` to be used with the credentials usage (str): the usage for the credentials -- either 'both', 'initiate', or 'accept' init_lifetime (int): the desired initiate lifetime of the credentials in seconds, or None for indefinite accept_lifetime (int): the desired accept lifetime of the credentials in seconds, or None for indefinite impersonator (Credentials): the credentials to use to impersonate the given name, or None to not acquire normally (:requires-ext:`s4u`) store (dict): the credential store information pointing to the credential store from which to acquire the credentials, or None for the default store (:requires-ext:`cred_store`) Returns: Credentials: the credentials set containing the current credentials and the newly acquired ones. Raises: ~gssapi.exceptions.BadMechanismError ~gssapi.exceptions.BadNameTypeError ~gssapi.exceptions.BadNameError ~gssapi.exceptions.DuplicateCredentialsElementError ~gssapi.exceptions.ExpiredCredentialsError ~gssapi.exceptions.MissingCredentialsError """ if store is not None and impersonator is not None: raise ValueError('You cannot use both the `impersonator` and ' '`store` arguments at the same time') if store is not None: if rcred_cred_store is None: raise NotImplementedError("Your GSSAPI implementation does " "not have support for manipulating " "credential stores") b_store = _encode_dict(store) res = rcred_cred_store.add_cred_from(b_store, self, name, mech, usage, init_lifetime, accept_lifetime) elif impersonator is not None: if rcred_s4u is None: raise NotImplementedError("Your GSSAPI implementation does " "not have support for S4U") res = rcred_s4u.add_cred_impersonate_name(self, impersonator, name, mech, usage, init_lifetime, accept_lifetime) else: res = rcreds.add_cred(self, name, mech, usage, init_lifetime, accept_lifetime) return Credentials(res.creds)
[docs] def export(self) -> bytes: """Export these credentials into a token This method exports the current credentials to a token that can then be imported by passing the `token` argument to the constructor. This is often used to pass credentials between processes. :requires-ext:`cred_imp_exp` Returns: bytes: the exported credentials in token form """ if rcred_imp_exp is None: raise NotImplementedError("Your GSSAPI implementation does not " "have support for importing and " "exporting creditials") return rcred_imp_exp.export_cred(self)
# pickle protocol support def __reduce__( self, ) -> t.Tuple[t.Type["Credentials"], t.Tuple[None, bytes]]: # the unpickle arguments to new are (base=None, token=self.export()) return (type(self), (None, self.export()))