# Generated by the protocol buffer compiler.  DO NOT EDIT!
# sources: keyapis/kms/v1/keyapis_kms_events_v1.proto, keyapis/kms/v1/keyapis_kms_jwt_v1.proto, keyapis/kms/v1/keyapis_kms_key_v1.proto, keyapis/kms/v1/keyapis_kms_system_v1.proto
# plugin: python-betterproto
from dataclasses import dataclass
from datetime import datetime
from typing import (
    TYPE_CHECKING,
    AsyncIterator,
    Dict,
    Optional,
)

import betterproto
import grpclib
from betterproto.grpc.grpclib_server import ServiceBase

if TYPE_CHECKING:
    import grpclib.server
    from betterproto.grpc.grpclib_client import MetadataLike
    from grpclib.metadata import Deadline


@dataclass(eq=False, repr=False)
class PostKmsKeyGenerateRequest(betterproto.Message):
    """Запрос на генерацию пары ключей"""

    pass


@dataclass(eq=False, repr=False)
class PostKmsKeyGenerateResponse(betterproto.Message):
    """Ответ на запрос на генерацию пары ключей"""

    data: str = betterproto.string_field(1, group="type")
    """Идентификатор. # Тип: Guid"""

    error: "PostKmsKeyGenerateResponseError" = betterproto.message_field(
        2, group="type"
    )
    """Ошибка"""


@dataclass(eq=False, repr=False)
class PostKmsKeyGenerateResponseError(betterproto.Message):
    """Ошибка"""

    generating: "KeyInfoGeneratingError" = betterproto.message_field(1, group="reason")
    """Ошибка генерации ключей в сервисе KMS"""


@dataclass(eq=False, repr=False)
class GetKmsKeyListRequest(betterproto.Message):
    """Запрос на получение ключей"""

    pass


@dataclass(eq=False, repr=False)
class GetKmsKeyListResponse(betterproto.Message):
    """Ответ на запрос на получение ключей"""

    data: "KeyInfo" = betterproto.message_field(1, group="type")
    """Ключ. В ключе должен отсутсвовать encrypted_private_key"""


@dataclass(eq=False, repr=False)
class KeyInfo(betterproto.Message):
    """Пара ключей. # Описание модели"""

    id: str = betterproto.string_field(1)
    """Идентификатор. # Тип: Guid"""

    public_key: str = betterproto.string_field(2)
    """Публичный ключ. # Диапазон: 0..800"""

    encrypted_private_key: str = betterproto.string_field(3)
    """Приватный ключ в зашифрованном виде. # Диапазон: 0..4000"""

    created_at: datetime = betterproto.message_field(4)
    """Дата и время создания ключа"""

    expired_at: datetime = betterproto.message_field(5)
    """Дата и время истечения жизни ключа"""


@dataclass(eq=False, repr=False)
class KeyInfoGeneratingError(betterproto.Message):
    """Ошибка генерации"""

    transaction: "KeyInfoGeneratingErrorTransactionError" = betterproto.message_field(
        1, group="reason"
    )
    """Ошибка генерации ключа в сервисе KMS"""

    publishing: "KeyInfoGeneratingErrorPublishingKeyError" = betterproto.message_field(
        2, group="reason"
    )
    """Ошибка сохранения публичного ключа в сервисе JWKS"""


@dataclass(eq=False, repr=False)
class KeyInfoGeneratingErrorTransactionError(betterproto.Message):
    """
    Ошибка генерации ключа в сервисе KMS. Причины: - Отсутсвует связанность с
    базой данных
    """

    pass


@dataclass(eq=False, repr=False)
class KeyInfoGeneratingErrorPublishingKeyError(betterproto.Message):
    """
    Ошибка сохранения публичного ключа в сервисе JWKS. Причины: - Отсутсвует
    связанность с сервисом JWKS
    """

    pass


@dataclass(eq=False, repr=False)
class GetSystemStatusRequest(betterproto.Message):
    """Запрос проверки доступности сервиса"""

    pass


@dataclass(eq=False, repr=False)
class GetSystemStatusResponse(betterproto.Message):
    """Ответ на запрос проверки доступности сервиса"""

    pass


@dataclass(eq=False, repr=False)
class KeyInitializationTask(betterproto.Message):
    """Задача на инициализацию ключей. Очередь key.kms.key_initialization"""

    request_id: str = betterproto.string_field(1)
    """Идентификатор запроса"""


@dataclass(eq=False, repr=False)
class PostKmsJwtCreateJweRequest(betterproto.Message):
    """Запрос на создание nested JWT токена"""

    data: "PostKmsJwtCreateJweRequestPayloadData" = betterproto.message_field(1)
    """Claims (данные включаемые в тело токена)"""

    public_key: str = betterproto.string_field(2)
    """Публичный ключ для шифрования"""


@dataclass(eq=False, repr=False)
class PostKmsJwtCreateJweRequestPayloadData(betterproto.Message):
    """Claims (данные включаемые в тело токена)"""

    values: Dict[str, str] = betterproto.map_field(
        1, betterproto.TYPE_STRING, betterproto.TYPE_STRING
    )
    """
    Словарь значений. Ключи и значения являются строками. # Диапазон: 1..128
    """


@dataclass(eq=False, repr=False)
class PostKmsJwtCreateJweResponse(betterproto.Message):
    """Ответ на запрос на формирование токена"""

    data: str = betterproto.string_field(1, group="type")
    """Nested JWT токен"""

    error: "PostKmsJwtCreateJweResponseError" = betterproto.message_field(
        2, group="type"
    )
    """Ошибка"""


@dataclass(eq=False, repr=False)
class PostKmsJwtCreateJweResponseError(betterproto.Message):
    """Ошибка"""

    validation: "ValidationError" = betterproto.message_field(1, group="reason")
    """Ошибка валидации"""

    private_key_not_found: "PostKmsJwtCreateJweResponseErrorPrivateKeyNotFound" = (
        betterproto.message_field(2, group="reason")
    )
    """Отсутствует ключ для подписания"""


@dataclass(eq=False, repr=False)
class PostKmsJwtCreateJweResponseErrorPrivateKeyNotFound(betterproto.Message):
    """Отсутствует ключ для подписания"""

    pass


@dataclass(eq=False, repr=False)
class ValidationError(betterproto.Message):
    """
    Ошибки валидации. Эти проверки выполняются до обращения в базу данных
    """

    path: str = betterproto.string_field(1)
    """Путь к полю в формате наименования прото"""

    message: str = betterproto.string_field(2)
    """Валидационное сообщение"""


class KmsServiceStub(betterproto.ServiceStub):
    async def post_kms_key_generate(
        self,
        post_kms_key_generate_request: "PostKmsKeyGenerateRequest",
        *,
        timeout: Optional[float] = None,
        deadline: Optional["Deadline"] = None,
        metadata: Optional["MetadataLike"] = None
    ) -> "PostKmsKeyGenerateResponse":
        return await self._unary_unary(
            "/keyapis.kms.v1.KmsService/PostKmsKeyGenerate",
            post_kms_key_generate_request,
            PostKmsKeyGenerateResponse,
            timeout=timeout,
            deadline=deadline,
            metadata=metadata,
        )

    async def get_kms_key_list(
        self,
        get_kms_key_list_request: "GetKmsKeyListRequest",
        *,
        timeout: Optional[float] = None,
        deadline: Optional["Deadline"] = None,
        metadata: Optional["MetadataLike"] = None
    ) -> AsyncIterator["GetKmsKeyListResponse"]:
        async for response in self._unary_stream(
            "/keyapis.kms.v1.KmsService/GetKmsKeyList",
            get_kms_key_list_request,
            GetKmsKeyListResponse,
            timeout=timeout,
            deadline=deadline,
            metadata=metadata,
        ):
            yield response


class SystemServiceStub(betterproto.ServiceStub):
    async def get_system_status(
        self,
        get_system_status_request: "GetSystemStatusRequest",
        *,
        timeout: Optional[float] = None,
        deadline: Optional["Deadline"] = None,
        metadata: Optional["MetadataLike"] = None
    ) -> "GetSystemStatusResponse":
        return await self._unary_unary(
            "/keyapis.kms.v1.SystemService/GetSystemStatus",
            get_system_status_request,
            GetSystemStatusResponse,
            timeout=timeout,
            deadline=deadline,
            metadata=metadata,
        )


class JwtServiceStub(betterproto.ServiceStub):
    async def post_kms_jwt_create_jwe(
        self,
        post_kms_jwt_create_jwe_request: "PostKmsJwtCreateJweRequest",
        *,
        timeout: Optional[float] = None,
        deadline: Optional["Deadline"] = None,
        metadata: Optional["MetadataLike"] = None
    ) -> "PostKmsJwtCreateJweResponse":
        return await self._unary_unary(
            "/keyapis.kms.v1.JwtService/PostKmsJwtCreateJwe",
            post_kms_jwt_create_jwe_request,
            PostKmsJwtCreateJweResponse,
            timeout=timeout,
            deadline=deadline,
            metadata=metadata,
        )


class KmsServiceBase(ServiceBase):

    async def post_kms_key_generate(
        self, post_kms_key_generate_request: "PostKmsKeyGenerateRequest"
    ) -> "PostKmsKeyGenerateResponse":
        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)

    async def get_kms_key_list(
        self, get_kms_key_list_request: "GetKmsKeyListRequest"
    ) -> AsyncIterator["GetKmsKeyListResponse"]:
        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)

    async def __rpc_post_kms_key_generate(
        self,
        stream: "grpclib.server.Stream[PostKmsKeyGenerateRequest, PostKmsKeyGenerateResponse]",
    ) -> None:
        request = await stream.recv_message()
        response = await self.post_kms_key_generate(request)
        await stream.send_message(response)

    async def __rpc_get_kms_key_list(
        self,
        stream: "grpclib.server.Stream[GetKmsKeyListRequest, GetKmsKeyListResponse]",
    ) -> None:
        request = await stream.recv_message()
        await self._call_rpc_handler_server_stream(
            self.get_kms_key_list,
            stream,
            request,
        )

    def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
        return {
            "/keyapis.kms.v1.KmsService/PostKmsKeyGenerate": grpclib.const.Handler(
                self.__rpc_post_kms_key_generate,
                grpclib.const.Cardinality.UNARY_UNARY,
                PostKmsKeyGenerateRequest,
                PostKmsKeyGenerateResponse,
            ),
            "/keyapis.kms.v1.KmsService/GetKmsKeyList": grpclib.const.Handler(
                self.__rpc_get_kms_key_list,
                grpclib.const.Cardinality.UNARY_STREAM,
                GetKmsKeyListRequest,
                GetKmsKeyListResponse,
            ),
        }


class SystemServiceBase(ServiceBase):

    async def get_system_status(
        self, get_system_status_request: "GetSystemStatusRequest"
    ) -> "GetSystemStatusResponse":
        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)

    async def __rpc_get_system_status(
        self,
        stream: "grpclib.server.Stream[GetSystemStatusRequest, GetSystemStatusResponse]",
    ) -> None:
        request = await stream.recv_message()
        response = await self.get_system_status(request)
        await stream.send_message(response)

    def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
        return {
            "/keyapis.kms.v1.SystemService/GetSystemStatus": grpclib.const.Handler(
                self.__rpc_get_system_status,
                grpclib.const.Cardinality.UNARY_UNARY,
                GetSystemStatusRequest,
                GetSystemStatusResponse,
            ),
        }


class JwtServiceBase(ServiceBase):

    async def post_kms_jwt_create_jwe(
        self, post_kms_jwt_create_jwe_request: "PostKmsJwtCreateJweRequest"
    ) -> "PostKmsJwtCreateJweResponse":
        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)

    async def __rpc_post_kms_jwt_create_jwe(
        self,
        stream: "grpclib.server.Stream[PostKmsJwtCreateJweRequest, PostKmsJwtCreateJweResponse]",
    ) -> None:
        request = await stream.recv_message()
        response = await self.post_kms_jwt_create_jwe(request)
        await stream.send_message(response)

    def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
        return {
            "/keyapis.kms.v1.JwtService/PostKmsJwtCreateJwe": grpclib.const.Handler(
                self.__rpc_post_kms_jwt_create_jwe,
                grpclib.const.Cardinality.UNARY_UNARY,
                PostKmsJwtCreateJweRequest,
                PostKmsJwtCreateJweResponse,
            ),
        }
