# -*- coding: utf-8 -*-
import typing as T
import dataclasses
import sayt.api as sayt
import aws_arns.api as arns
import aws_console_url.api as acu
from .. import res_lib as rl
if T.TYPE_CHECKING:
from ..ars_def import ARS
[docs]@dataclasses.dataclass
class SqsQueue(rl.ResourceDocument):
# fmt: off
queue_arn: str = dataclasses.field(metadata={"field": sayt.StoredField(name="queue_arn")})
# fmt: on
@property
def queue_url(self) -> str:
return self.raw_data
@property
def queue_name(self) -> str:
q = arns.res.SqsQueue.from_queue_url(url=self.queue_url)
return q.queue_name
@property
def is_fifo(self) -> bool:
return self.queue_name.endswith(".fifo")
[docs] @classmethod
def from_resource(cls, resource, bsm, boto_kwargs):
q = arns.res.SqsQueue.from_queue_url(url=resource)
return cls(
raw_data=resource,
id=q.queue_name,
name=q.queue_name,
queue_arn=q.to_arn(),
)
@property
def title(self) -> str:
return rl.format_key_value("name", self.name)
@property
def autocomplete(self) -> str:
return self.name
@property
def arn(self) -> str:
return self.queue_arn
[docs] def get_console_url(self, console: acu.AWSConsole) -> str:
return console.sqs.get_queue(name_or_arn_or_url=self.arn)
[docs] @classmethod
def get_list_resources_console_url(cls, console: acu.AWSConsole) -> str:
return console.sqs.queues
[docs] def get_details(self, ars: "ARS") -> T.List[rl.DetailItem]:
from_detail = rl.DetailItem.from_detail
url = self.get_console_url(console=ars.aws_console)
detail_items = rl.DetailItem.get_initial_detail_items(doc=self, ars=ars)
detail_items.extend(
[
from_detail("queue_url", self.queue_url, url=url),
]
)
# fmt: off
with rl.DetailItem.error_handling(detail_items):
res = ars.bsm.sqs_client.get_queue_attributes(
QueueUrl=self.queue_url,
AttributeNames=["All"],
)
attrs = res.get("Attributes", {})
is_fifo = self.is_fifo
n_msg = attrs.get("ApproximateNumberOfMessages", "NA")
n_msg_delayed = attrs.get("ApproximateNumberOfMessagesDelayed", "NA")
n_msg_invisible = attrs.get("ApproximateNumberOfMessagesNotVisible", "NA")
policy = attrs.get("Policy")
redrive_policy = attrs.get("RedrivePolicy")
redrive_allow_policy = attrs.get("RedriveAllowPolicy")
detail_items.extend([
from_detail("is_fifo", is_fifo, url=url),
from_detail("n_msg", n_msg, url=url),
from_detail("n_msg_delayed", n_msg_delayed, url=url),
from_detail("n_msg_invisible", n_msg_invisible, url=url),
from_detail("policy", policy, value_text=self.one_line(policy), url=url),
from_detail("redrive_policy", redrive_policy, value_text=self.one_line(redrive_policy), url=url),
from_detail("redrive_allow_policy", redrive_allow_policy, value_text=self.one_line(redrive_allow_policy), url=url),
])
# fmt: on
with rl.DetailItem.error_handling(detail_items):
res = ars.bsm.sqs_client.list_queue_tags(QueueUrl=self.queue_url)
tags = rl.extract_tags(res)
detail_items.extend(rl.DetailItem.from_tags(tags, url))
return detail_items
[docs]class SqsQueueSearcher(rl.BaseSearcher[SqsQueue]):
pass
sqs_queue_searcher = SqsQueueSearcher(
# list resources
service="sqs",
method="list_queues",
is_paginator=True,
default_boto_kwargs={
"PaginationConfig": {
"MaxItems": 9999,
"PageSize": 1000,
},
},
result_path=rl.ResultPath("QueueUrls"),
# extract document
doc_class=SqsQueue,
# search
resource_type=rl.SearcherEnum.sqs_queue.value,
fields=SqsQueue.get_dataset_fields(),
cache_expire=rl.config.get_cache_expire(rl.SearcherEnum.sqs_queue.value),
more_cache_key=None,
)