# Generated by the protocol buffer compiler.  DO NOT EDIT!
# sources: keyapis/jwks/v1/keyapis_jwks_key_v1.proto, keyapis/jwks/v1/keyapis_jwks_system_v1.proto
# plugin: python-betterproto
# This file has been @generated

from datetime import datetime
from typing import TYPE_CHECKING

import betterproto
import grpclib
from betterproto.grpc.grpclib_server import ServiceBase
from pydantic import model_validator
from pydantic.dataclasses import (
    dataclass,
    rebuild_dataclass,
)

if TYPE_CHECKING:
    import grpclib.server
    from betterproto.grpc.grpclib_client import MetadataLike
    from grpclib.metadata import Deadline


@dataclass(eq=False, repr=False, config={"extra": "forbid"})
class GetSystemStatusRequest(betterproto.Message):
    """Запрос проверки доступности сервиса"""

    pass


@dataclass(eq=False, repr=False, config={"extra": "forbid"})
class GetSystemStatusResponse(betterproto.Message):
    """Ответ на запрос проверки доступности сервиса"""

    pass


@dataclass(eq=False, repr=False, config={"extra": "forbid"})
class KeyInfo(betterproto.Message):
    """
    Ключ.
     # Описание модели
    """

    id: str = betterproto.string_field(1)
    """
    Идентификатор ключа, соответсвует kid.
     # Тип: Guid
    """

    public_key: "KeyInfoPublicKey" = betterproto.message_field(2)
    """Публичный ключ"""

    created_at: datetime = betterproto.message_field(3)
    """
    Дата создания.
     # Тип: DateTime
    """


@dataclass(eq=False, repr=False, config={"extra": "forbid"})
class KeyInfoPublicKey(betterproto.Message):
    """Публичный ключ"""

    use: str = betterproto.string_field(1)
    """
    Вид использования.
     # Диапазон: 2..32
    """

    kty: str = betterproto.string_field(2)
    """
    Тип ключа.
     # Диапазон: 2..32
    """

    kid: str = betterproto.string_field(3)
    """
    Идентификатор.
     # Тип: Guid
    """

    alg: str = betterproto.string_field(4)
    """
    Алгоритм шифрования.
     # Диапазон: 2..32
    """

    n: str = betterproto.string_field(5)
    """Параметр Modulus"""

    e: str = betterproto.string_field(6)
    """Параметр Exponent"""

    x5_c: "list[str]" = betterproto.string_field(7)
    """
    Цепочка сертификатов X.509.
     # Диапазон: 0..100
    """


@dataclass(eq=False, repr=False, config={"extra": "forbid"})
class KeyInfoSavingError(betterproto.Message):
    """
    Ошибка сохранения.
     Эти проверки выполняются при работе с базой данных и сторонними сервисами
    """

    key_already_exists: "KeyInfoSavingErrorKeyAlreadyExists | None" = (
        betterproto.message_field(1, optional=True, group="reason")
    )
    """Ключ с таким идентификатором уже существует"""

    @model_validator(mode="after")
    def check_oneof(cls, values):
        return cls._validate_field_groups(values)


@dataclass(eq=False, repr=False, config={"extra": "forbid"})
class KeyInfoSavingErrorKeyAlreadyExists(betterproto.Message):
    """
    Ключ с таким идентификатором уже существует.
     Причины:
     - В базе хранится запись с переданнм kid
    """

    pass


@dataclass(eq=False, repr=False, config={"extra": "forbid"})
class PostKeyRequest(betterproto.Message):
    """Запрос на сохранение публичного ключа"""

    data: "KeyInfoPublicKey" = betterproto.message_field(1)
    """Ключ"""


@dataclass(eq=False, repr=False, config={"extra": "forbid"})
class PostKeyResponse(betterproto.Message):
    """Ответ на запрос на сохранение публичного ключа"""

    error: "PostKeyResponseError | None" = betterproto.message_field(
        1, optional=True, group="type"
    )
    """Ошибка"""

    @model_validator(mode="after")
    def check_oneof(cls, values):
        return cls._validate_field_groups(values)


@dataclass(eq=False, repr=False, config={"extra": "forbid"})
class PostKeyResponseError(betterproto.Message):
    """Ошибка"""

    validation: "ValidationError | None" = betterproto.message_field(
        1, optional=True, group="reason"
    )
    """Ошибки валидации"""

    saving: "KeyInfoSavingError | None" = betterproto.message_field(
        2, optional=True, group="reason"
    )
    """Ошибка сохранения"""

    @model_validator(mode="after")
    def check_oneof(cls, values):
        return cls._validate_field_groups(values)


@dataclass(eq=False, repr=False, config={"extra": "forbid"})
class DeleteKeyRequest(betterproto.Message):
    """Запрос удаления публичного ключа"""

    id: str = betterproto.string_field(1)
    """
    Идентификатор ключа, соответсвует kid.
     # Тип: Guid
    """


@dataclass(eq=False, repr=False, config={"extra": "forbid"})
class DeleteKeyResponse(betterproto.Message):
    """Ответ на запрос удаления публичного ключа"""

    error: "DeleteKeyResponseError | None" = betterproto.message_field(
        1, optional=True, group="type"
    )
    """Ошибка"""

    @model_validator(mode="after")
    def check_oneof(cls, values):
        return cls._validate_field_groups(values)


@dataclass(eq=False, repr=False, config={"extra": "forbid"})
class DeleteKeyResponseError(betterproto.Message):
    """Ошибка запроса удаления публичного ключа"""

    validation: "ValidationError | None" = betterproto.message_field(
        1, optional=True, group="reason"
    )
    """Ошибка валидации"""

    @model_validator(mode="after")
    def check_oneof(cls, values):
        return cls._validate_field_groups(values)


@dataclass(eq=False, repr=False, config={"extra": "forbid"})
class GetKeyWellKnownJwksJsonRequest(betterproto.Message):
    """Запрос на получение публичных ключей"""

    pass


@dataclass(eq=False, repr=False, config={"extra": "forbid"})
class GetKeyWellKnownJwksJsonResponse(betterproto.Message):
    """Ответ на запрос на получение публичных ключей"""

    data: "list[KeyInfoPublicKey]" = betterproto.message_field(1)
    """Список ключей"""


@dataclass(eq=False, repr=False, config={"extra": "forbid"})
class ValidationError(betterproto.Message):
    """
    Ошибки валидации.
     Эти проверки выполняются до обращения в базу данных
    """

    path: str = betterproto.string_field(1)
    """Путь к полю в формате наименования прото"""

    message: str = betterproto.string_field(2)
    """Валидационное сообщение"""


class SystemServiceStub(betterproto.ServiceStub):
    async def get_system_status(
        self,
        get_system_status_request: "GetSystemStatusRequest",
        *,
        timeout: "float | None" = None,
        deadline: "Deadline | None" = None,
        metadata: "MetadataLike | None" = None
    ) -> "GetSystemStatusResponse":
        return await self._unary_unary(
            "/keyapis.jwks.v1.SystemService/GetSystemStatus",
            get_system_status_request,
            GetSystemStatusResponse,
            timeout=timeout,
            deadline=deadline,
            metadata=metadata,
        )


class KeyServiceStub(betterproto.ServiceStub):
    async def post_key(
        self,
        post_key_request: "PostKeyRequest",
        *,
        timeout: "float | None" = None,
        deadline: "Deadline | None" = None,
        metadata: "MetadataLike | None" = None
    ) -> "PostKeyResponse":
        return await self._unary_unary(
            "/keyapis.jwks.v1.KeyService/PostKey",
            post_key_request,
            PostKeyResponse,
            timeout=timeout,
            deadline=deadline,
            metadata=metadata,
        )

    async def delete_key(
        self,
        delete_key_request: "DeleteKeyRequest",
        *,
        timeout: "float | None" = None,
        deadline: "Deadline | None" = None,
        metadata: "MetadataLike | None" = None
    ) -> "DeleteKeyResponse":
        return await self._unary_unary(
            "/keyapis.jwks.v1.KeyService/DeleteKey",
            delete_key_request,
            DeleteKeyResponse,
            timeout=timeout,
            deadline=deadline,
            metadata=metadata,
        )

    async def get_key_well_known_jwks_json(
        self,
        get_key_well_known_jwks_json_request: "GetKeyWellKnownJwksJsonRequest",
        *,
        timeout: "float | None" = None,
        deadline: "Deadline | None" = None,
        metadata: "MetadataLike | None" = None
    ) -> "GetKeyWellKnownJwksJsonResponse":
        return await self._unary_unary(
            "/keyapis.jwks.v1.KeyService/GetKeyWellKnownJwksJson",
            get_key_well_known_jwks_json_request,
            GetKeyWellKnownJwksJsonResponse,
            timeout=timeout,
            deadline=deadline,
            metadata=metadata,
        )


class SystemServiceBase(ServiceBase):

    async def get_system_status(
        self, get_system_status_request: "GetSystemStatusRequest"
    ) -> "GetSystemStatusResponse":
        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)

    async def __rpc_get_system_status(
        self,
        stream: "grpclib.server.Stream[GetSystemStatusRequest, GetSystemStatusResponse]",
    ) -> None:
        request = await stream.recv_message()
        response = await self.get_system_status(request)
        await stream.send_message(response)

    def __mapping__(self) -> "dict[str, grpclib.const.Handler]":
        return {
            "/keyapis.jwks.v1.SystemService/GetSystemStatus": grpclib.const.Handler(
                self.__rpc_get_system_status,
                grpclib.const.Cardinality.UNARY_UNARY,
                GetSystemStatusRequest,
                GetSystemStatusResponse,
            ),
        }


class KeyServiceBase(ServiceBase):

    async def post_key(self, post_key_request: "PostKeyRequest") -> "PostKeyResponse":
        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)

    async def delete_key(
        self, delete_key_request: "DeleteKeyRequest"
    ) -> "DeleteKeyResponse":
        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)

    async def get_key_well_known_jwks_json(
        self, get_key_well_known_jwks_json_request: "GetKeyWellKnownJwksJsonRequest"
    ) -> "GetKeyWellKnownJwksJsonResponse":
        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)

    async def __rpc_post_key(
        self, stream: "grpclib.server.Stream[PostKeyRequest, PostKeyResponse]"
    ) -> None:
        request = await stream.recv_message()
        response = await self.post_key(request)
        await stream.send_message(response)

    async def __rpc_delete_key(
        self, stream: "grpclib.server.Stream[DeleteKeyRequest, DeleteKeyResponse]"
    ) -> None:
        request = await stream.recv_message()
        response = await self.delete_key(request)
        await stream.send_message(response)

    async def __rpc_get_key_well_known_jwks_json(
        self,
        stream: "grpclib.server.Stream[GetKeyWellKnownJwksJsonRequest, GetKeyWellKnownJwksJsonResponse]",
    ) -> None:
        request = await stream.recv_message()
        response = await self.get_key_well_known_jwks_json(request)
        await stream.send_message(response)

    def __mapping__(self) -> "dict[str, grpclib.const.Handler]":
        return {
            "/keyapis.jwks.v1.KeyService/PostKey": grpclib.const.Handler(
                self.__rpc_post_key,
                grpclib.const.Cardinality.UNARY_UNARY,
                PostKeyRequest,
                PostKeyResponse,
            ),
            "/keyapis.jwks.v1.KeyService/DeleteKey": grpclib.const.Handler(
                self.__rpc_delete_key,
                grpclib.const.Cardinality.UNARY_UNARY,
                DeleteKeyRequest,
                DeleteKeyResponse,
            ),
            "/keyapis.jwks.v1.KeyService/GetKeyWellKnownJwksJson": grpclib.const.Handler(
                self.__rpc_get_key_well_known_jwks_json,
                grpclib.const.Cardinality.UNARY_UNARY,
                GetKeyWellKnownJwksJsonRequest,
                GetKeyWellKnownJwksJsonResponse,
            ),
        }


rebuild_dataclass(KeyInfo)  # type: ignore
rebuild_dataclass(KeyInfoSavingError)  # type: ignore
rebuild_dataclass(PostKeyRequest)  # type: ignore
rebuild_dataclass(PostKeyResponse)  # type: ignore
rebuild_dataclass(PostKeyResponseError)  # type: ignore
rebuild_dataclass(DeleteKeyResponse)  # type: ignore
rebuild_dataclass(DeleteKeyResponseError)  # type: ignore
rebuild_dataclass(GetKeyWellKnownJwksJsonResponse)  # type: ignore
