<!DOCTYPE html>
<html lang="en">

<head>
    <meta charset="UTF-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0-alpha1/dist/css/bootstrap.min.css" rel="stylesheet"
        integrity="sha384-GLhlTQ8iRABdZLl6O3oVMWSktQOp6b7In1Zl3/Jr59b6EGGoI1aFkw7cmDA6j6gD" crossorigin="anonymous">
    <link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.3.0/css/all.min.css"
        integrity="sha512-SzlrxWUlpfuzQ+pcUCosxcglQRNAq/DZjVsC0lE40xsADsfeQoEypE+enwcOiGjk/bSuGGKHEyjSoQ1zVisanQ=="
        crossorigin="anonymous" referrerpolicy="no-referrer" />
</head>
</html>
# Copyright 2021 The NATS Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import annotations

import asyncio
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from nats.aio.subscription import Subscription


class Error(Exception):
    pass


class TimeoutError(Error, asyncio.TimeoutError):
    def __str__(self) -> str:
        return "nats: timeout"


class NoRespondersError(Error):
    def __str__(self) -> str:
        return "nats: no responders available for request"


class StaleConnectionError(Error):
    def __str__(self) -> str:
        return "nats: stale connection"


class OutboundBufferLimitError(Error):
    def __str__(self) -> str:
        return "nats: outbound buffer limit exceeded"


class UnexpectedEOF(StaleConnectionError):
    def __str__(self) -> str:
        return "nats: unexpected EOF"


class FlushTimeoutError(TimeoutError):
    def __str__(self) -> str:
        return "nats: flush timeout"


class ConnectionClosedError(Error):
    def __str__(self) -> str:
        return "nats: connection closed"


class SecureConnRequiredError(Error):
    def __str__(self) -> str:
        return "nats: secure connection required"


class SecureConnWantedError(Error):
    def __str__(self) -> str:
        return "nats: secure connection not available"


class SecureConnFailedError(Error):
    def __str__(self) -> str:
        return "nats: secure connection failed"


class BadSubscriptionError(Error):
    def __str__(self) -> str:
        return "nats: invalid subscription"


class BadSubjectError(Error):
    def __str__(self) -> str:
        return "nats: invalid subject"


class SlowConsumerError(Error):
    def __init__(self, subject: str, reply: str, sid: int, sub: Subscription) -> None:
        self.subject = subject
        self.reply = reply
        self.sid = sid
        self.sub = sub

    def __str__(self) -> str:
        return f"nats: slow consumer, messages dropped subject: {self.subject}, sid: {self.sid}, sub: {self.sub}"


class BadTimeoutError(Error):
    def __str__(self) -> str:
        return "nats: timeout invalid"


class AuthorizationError(Error):
    def __str__(self) -> str:
        return "nats: authorization failed"


class NoServersError(Error):
    def __str__(self) -> str:
        return "nats: no servers available for connection"


class JsonParseError(Error):
    def __str__(self) -> str:
        return "nats: connect message, json parse err"


class MaxPayloadError(Error):
    def __str__(self) -> str:
        return "nats: maximum payload exceeded"


class DrainTimeoutError(TimeoutError):
    def __str__(self) -> str:
        return "nats: draining connection timed out"


class ConnectionDrainingError(Error):
    def __str__(self) -> str:
        return "nats: connection draining"


class ConnectionReconnectingError(Error):
    def __str__(self) -> str:
        return "nats: connection reconnecting"


class InvalidUserCredentialsError(Error):
    def __str__(self) -> str:
        return "nats: invalid user credentials"


class InvalidCallbackTypeError(Error):
    def __str__(self) -> str:
        return "nats: callbacks must be coroutine functions"


class ProtocolError(Error):
    def __str__(self) -> str:
        return "nats: protocol error"


class ServerNotInPoolError(Error):
    def __str__(self) -> str:
        return "nats: selected server is not in the pool"


class NotJSMessageError(Error):
    """
    When it is attempted to use an API meant for JetStream on a message
    that does not belong to a stream.
    """

    def __str__(self) -> str:
        return "nats: not a JetStream message"


class MsgAlreadyAckdError(Error):
    def __init__(self, msg=None) -> None:
        self._msg = msg

    def __str__(self) -> str:
        return f"nats: message was already acknowledged: {self._msg}"
