# Generated by the protocol buffer compiler.  DO NOT EDIT!
# sources: keyapis/kms/v1/keyapis_kms_events_v1.proto, keyapis/kms/v1/keyapis_kms_jwt_v1.proto, keyapis/kms/v1/keyapis_kms_key_v1.proto, keyapis/kms/v1/keyapis_kms_system_v1.proto
# plugin: python-betterproto
# This file has been @generated

from collections.abc import AsyncIterator
from datetime import datetime
from typing import TYPE_CHECKING

import betterproto
import grpclib
from betterproto.grpc.grpclib_server import ServiceBase
from pydantic import model_validator
from pydantic.dataclasses import (
    dataclass,
    rebuild_dataclass,
)

if TYPE_CHECKING:
    import grpclib.server
    from betterproto.grpc.grpclib_client import MetadataLike
    from grpclib.metadata import Deadline


@dataclass(eq=False, repr=False, config={"extra": "forbid"})
class PostKmsKeyGenerateRequest(betterproto.Message):
    """Запрос на генерацию пары ключей"""

    pass


@dataclass(eq=False, repr=False, config={"extra": "forbid"})
class PostKmsKeyGenerateResponse(betterproto.Message):
    """Ответ на запрос на генерацию пары ключей"""

    data: "str | None" = betterproto.string_field(1, optional=True, group="type")
    """
    Идентификатор.
     # Тип: Guid
    """

    error: "PostKmsKeyGenerateResponseError | None" = betterproto.message_field(
        2, optional=True, group="type"
    )
    """Ошибка"""

    @model_validator(mode="after")
    def check_oneof(cls, values):
        return cls._validate_field_groups(values)


@dataclass(eq=False, repr=False, config={"extra": "forbid"})
class PostKmsKeyGenerateResponseError(betterproto.Message):
    """Ошибка"""

    generating: "KeyInfoGeneratingError | None" = betterproto.message_field(
        1, optional=True, group="reason"
    )
    """Ошибка генерации ключей в сервисе KMS"""

    @model_validator(mode="after")
    def check_oneof(cls, values):
        return cls._validate_field_groups(values)


@dataclass(eq=False, repr=False, config={"extra": "forbid"})
class GetKmsKeyListRequest(betterproto.Message):
    """Запрос на получение ключей"""

    pass


@dataclass(eq=False, repr=False, config={"extra": "forbid"})
class GetKmsKeyListResponse(betterproto.Message):
    """Ответ на запрос на получение ключей"""

    data: "KeyInfo | None" = betterproto.message_field(1, optional=True, group="type")
    """
    Ключ.
     В ключе должен отсутсвовать encrypted_private_key
    """

    @model_validator(mode="after")
    def check_oneof(cls, values):
        return cls._validate_field_groups(values)


@dataclass(eq=False, repr=False, config={"extra": "forbid"})
class KeyInfo(betterproto.Message):
    """
    Пара ключей.
     # Описание модели
    """

    id: str = betterproto.string_field(1)
    """
    Идентификатор.
     # Тип: Guid
    """

    public_key: str = betterproto.string_field(2)
    """
    Публичный ключ.
     # Диапазон: 0..800
    """

    encrypted_private_key: str = betterproto.string_field(3)
    """
    Приватный ключ в зашифрованном виде.
     # Диапазон: 0..4000
    """

    created_at: datetime = betterproto.message_field(4)
    """Дата и время создания ключа"""

    expired_at: datetime = betterproto.message_field(5)
    """Дата и время истечения жизни ключа"""


@dataclass(eq=False, repr=False, config={"extra": "forbid"})
class KeyInfoGeneratingError(betterproto.Message):
    """Ошибка генерации"""

    transaction: "KeyInfoGeneratingErrorTransactionError | None" = (
        betterproto.message_field(1, optional=True, group="reason")
    )
    """Ошибка генерации ключа в сервисе KMS"""

    publishing: "KeyInfoGeneratingErrorPublishingKeyError | None" = (
        betterproto.message_field(2, optional=True, group="reason")
    )
    """Ошибка сохранения публичного ключа в сервисе JWKS"""

    @model_validator(mode="after")
    def check_oneof(cls, values):
        return cls._validate_field_groups(values)


@dataclass(eq=False, repr=False, config={"extra": "forbid"})
class KeyInfoGeneratingErrorTransactionError(betterproto.Message):
    """
    Ошибка генерации ключа в сервисе KMS.
     Причины:
     - Отсутсвует связанность с базой данных
    """

    pass


@dataclass(eq=False, repr=False, config={"extra": "forbid"})
class KeyInfoGeneratingErrorPublishingKeyError(betterproto.Message):
    """
    Ошибка сохранения публичного ключа в сервисе JWKS.
     Причины:
     - Отсутсвует связанность с сервисом JWKS
    """

    pass


@dataclass(eq=False, repr=False, config={"extra": "forbid"})
class GetSystemStatusRequest(betterproto.Message):
    """Запрос проверки доступности сервиса"""

    pass


@dataclass(eq=False, repr=False, config={"extra": "forbid"})
class GetSystemStatusResponse(betterproto.Message):
    """Ответ на запрос проверки доступности сервиса"""

    pass


@dataclass(eq=False, repr=False, config={"extra": "forbid"})
class KeyInitializationTask(betterproto.Message):
    """
    Задача на инициализацию ключей.
     Очередь key.kms.key_initialization
    """

    request_id: str = betterproto.string_field(1)
    """Идентификатор запроса"""


@dataclass(eq=False, repr=False, config={"extra": "forbid"})
class PostKmsJwtCreateJweRequest(betterproto.Message):
    """Запрос на создание nested JWT токена"""

    data: "PostKmsJwtCreateJweRequestPayloadData" = betterproto.message_field(1)
    """Claims (данные включаемые в тело токена)"""

    public_key: str = betterproto.string_field(2)
    """Публичный ключ для шифрования"""


@dataclass(eq=False, repr=False, config={"extra": "forbid"})
class PostKmsJwtCreateJweRequestPayloadData(betterproto.Message):
    """Claims (данные включаемые в тело токена)"""

    values: "dict[str, str]" = betterproto.map_field(
        1, betterproto.TYPE_STRING, betterproto.TYPE_STRING
    )
    """
    Словарь значений.
     Ключи и значения являются строками.
     # Диапазон: 1..128
    """


@dataclass(eq=False, repr=False, config={"extra": "forbid"})
class PostKmsJwtCreateJweResponse(betterproto.Message):
    """Ответ на запрос на формирование токена"""

    data: "str | None" = betterproto.string_field(1, optional=True, group="type")
    """Nested JWT токен"""

    error: "PostKmsJwtCreateJweResponseError | None" = betterproto.message_field(
        2, optional=True, group="type"
    )
    """Ошибка"""

    @model_validator(mode="after")
    def check_oneof(cls, values):
        return cls._validate_field_groups(values)


@dataclass(eq=False, repr=False, config={"extra": "forbid"})
class PostKmsJwtCreateJweResponseError(betterproto.Message):
    """Ошибка"""

    validation: "ValidationError | None" = betterproto.message_field(
        1, optional=True, group="reason"
    )
    """Ошибка валидации"""

    private_key_not_found: (
        "PostKmsJwtCreateJweResponseErrorPrivateKeyNotFound | None"
    ) = betterproto.message_field(2, optional=True, group="reason")
    """Отсутствует ключ для подписания"""

    @model_validator(mode="after")
    def check_oneof(cls, values):
        return cls._validate_field_groups(values)


@dataclass(eq=False, repr=False, config={"extra": "forbid"})
class PostKmsJwtCreateJweResponseErrorPrivateKeyNotFound(betterproto.Message):
    """Отсутствует ключ для подписания"""

    pass


@dataclass(eq=False, repr=False, config={"extra": "forbid"})
class ValidationError(betterproto.Message):
    """
    Ошибки валидации.
     Эти проверки выполняются до обращения в базу данных
    """

    path: str = betterproto.string_field(1)
    """Путь к полю в формате наименования прото"""

    message: str = betterproto.string_field(2)
    """Валидационное сообщение"""


class KmsServiceStub(betterproto.ServiceStub):
    async def post_kms_key_generate(
        self,
        post_kms_key_generate_request: "PostKmsKeyGenerateRequest",
        *,
        timeout: "float | None" = None,
        deadline: "Deadline | None" = None,
        metadata: "MetadataLike | None" = None
    ) -> "PostKmsKeyGenerateResponse":
        return await self._unary_unary(
            "/keyapis.kms.v1.KmsService/PostKmsKeyGenerate",
            post_kms_key_generate_request,
            PostKmsKeyGenerateResponse,
            timeout=timeout,
            deadline=deadline,
            metadata=metadata,
        )

    async def get_kms_key_list(
        self,
        get_kms_key_list_request: "GetKmsKeyListRequest",
        *,
        timeout: "float | None" = None,
        deadline: "Deadline | None" = None,
        metadata: "MetadataLike | None" = None
    ) -> "AsyncIterator[GetKmsKeyListResponse]":
        async for response in self._unary_stream(
            "/keyapis.kms.v1.KmsService/GetKmsKeyList",
            get_kms_key_list_request,
            GetKmsKeyListResponse,
            timeout=timeout,
            deadline=deadline,
            metadata=metadata,
        ):
            yield response


class SystemServiceStub(betterproto.ServiceStub):
    async def get_system_status(
        self,
        get_system_status_request: "GetSystemStatusRequest",
        *,
        timeout: "float | None" = None,
        deadline: "Deadline | None" = None,
        metadata: "MetadataLike | None" = None
    ) -> "GetSystemStatusResponse":
        return await self._unary_unary(
            "/keyapis.kms.v1.SystemService/GetSystemStatus",
            get_system_status_request,
            GetSystemStatusResponse,
            timeout=timeout,
            deadline=deadline,
            metadata=metadata,
        )


class JwtServiceStub(betterproto.ServiceStub):
    async def post_kms_jwt_create_jwe(
        self,
        post_kms_jwt_create_jwe_request: "PostKmsJwtCreateJweRequest",
        *,
        timeout: "float | None" = None,
        deadline: "Deadline | None" = None,
        metadata: "MetadataLike | None" = None
    ) -> "PostKmsJwtCreateJweResponse":
        return await self._unary_unary(
            "/keyapis.kms.v1.JwtService/PostKmsJwtCreateJwe",
            post_kms_jwt_create_jwe_request,
            PostKmsJwtCreateJweResponse,
            timeout=timeout,
            deadline=deadline,
            metadata=metadata,
        )


class KmsServiceBase(ServiceBase):

    async def post_kms_key_generate(
        self, post_kms_key_generate_request: "PostKmsKeyGenerateRequest"
    ) -> "PostKmsKeyGenerateResponse":
        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)

    async def get_kms_key_list(
        self, get_kms_key_list_request: "GetKmsKeyListRequest"
    ) -> "AsyncIterator[GetKmsKeyListResponse]":
        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
        yield GetKmsKeyListResponse()

    async def __rpc_post_kms_key_generate(
        self,
        stream: "grpclib.server.Stream[PostKmsKeyGenerateRequest, PostKmsKeyGenerateResponse]",
    ) -> None:
        request = await stream.recv_message()
        response = await self.post_kms_key_generate(request)
        await stream.send_message(response)

    async def __rpc_get_kms_key_list(
        self,
        stream: "grpclib.server.Stream[GetKmsKeyListRequest, GetKmsKeyListResponse]",
    ) -> None:
        request = await stream.recv_message()
        await self._call_rpc_handler_server_stream(
            self.get_kms_key_list,
            stream,
            request,
        )

    def __mapping__(self) -> "dict[str, grpclib.const.Handler]":
        return {
            "/keyapis.kms.v1.KmsService/PostKmsKeyGenerate": grpclib.const.Handler(
                self.__rpc_post_kms_key_generate,
                grpclib.const.Cardinality.UNARY_UNARY,
                PostKmsKeyGenerateRequest,
                PostKmsKeyGenerateResponse,
            ),
            "/keyapis.kms.v1.KmsService/GetKmsKeyList": grpclib.const.Handler(
                self.__rpc_get_kms_key_list,
                grpclib.const.Cardinality.UNARY_STREAM,
                GetKmsKeyListRequest,
                GetKmsKeyListResponse,
            ),
        }


class SystemServiceBase(ServiceBase):

    async def get_system_status(
        self, get_system_status_request: "GetSystemStatusRequest"
    ) -> "GetSystemStatusResponse":
        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)

    async def __rpc_get_system_status(
        self,
        stream: "grpclib.server.Stream[GetSystemStatusRequest, GetSystemStatusResponse]",
    ) -> None:
        request = await stream.recv_message()
        response = await self.get_system_status(request)
        await stream.send_message(response)

    def __mapping__(self) -> "dict[str, grpclib.const.Handler]":
        return {
            "/keyapis.kms.v1.SystemService/GetSystemStatus": grpclib.const.Handler(
                self.__rpc_get_system_status,
                grpclib.const.Cardinality.UNARY_UNARY,
                GetSystemStatusRequest,
                GetSystemStatusResponse,
            ),
        }


class JwtServiceBase(ServiceBase):

    async def post_kms_jwt_create_jwe(
        self, post_kms_jwt_create_jwe_request: "PostKmsJwtCreateJweRequest"
    ) -> "PostKmsJwtCreateJweResponse":
        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)

    async def __rpc_post_kms_jwt_create_jwe(
        self,
        stream: "grpclib.server.Stream[PostKmsJwtCreateJweRequest, PostKmsJwtCreateJweResponse]",
    ) -> None:
        request = await stream.recv_message()
        response = await self.post_kms_jwt_create_jwe(request)
        await stream.send_message(response)

    def __mapping__(self) -> "dict[str, grpclib.const.Handler]":
        return {
            "/keyapis.kms.v1.JwtService/PostKmsJwtCreateJwe": grpclib.const.Handler(
                self.__rpc_post_kms_jwt_create_jwe,
                grpclib.const.Cardinality.UNARY_UNARY,
                PostKmsJwtCreateJweRequest,
                PostKmsJwtCreateJweResponse,
            ),
        }


rebuild_dataclass(PostKmsKeyGenerateResponse)  # type: ignore
rebuild_dataclass(PostKmsKeyGenerateResponseError)  # type: ignore
rebuild_dataclass(GetKmsKeyListResponse)  # type: ignore
rebuild_dataclass(KeyInfo)  # type: ignore
rebuild_dataclass(KeyInfoGeneratingError)  # type: ignore
rebuild_dataclass(PostKmsJwtCreateJweRequest)  # type: ignore
rebuild_dataclass(PostKmsJwtCreateJweRequestPayloadData)  # type: ignore
rebuild_dataclass(PostKmsJwtCreateJweResponse)  # type: ignore
rebuild_dataclass(PostKmsJwtCreateJweResponseError)  # type: ignore
