Source code for aws_resource_search.res.glue

# -*- coding: utf-8 -*-

import typing as T
import dataclasses
from datetime import datetime

import sayt.api as sayt
import aws_arns.api as arns
import aws_console_url.api as acu

from .. import res_lib as rl

if T.TYPE_CHECKING:
    from ..ars_def import ARS


class GlueMixin:
    @property
    def description(self: rl.ResourceDocument) -> str:
        return rl.get_description(self.raw_data, "Description")


[docs]@dataclasses.dataclass class GlueDatabase(rl.ResourceDocument, GlueMixin): # fmt: off database_arn: str = dataclasses.field(metadata={"field": sayt.StoredField(name="database_arn")}) # fmt: on @property def catalog_id(self) -> str: return self.raw_data["CatalogId"] @property def database(self) -> str: return self.raw_data["Name"]
[docs] @classmethod def from_resource(cls, resource, bsm, boto_kwargs): return cls( raw_data=resource, id=resource["Name"], name=resource["Name"], database_arn="arn:aws:glue:{aws_region}:{aws_account_id}:database/{database}".format( aws_region=bsm.aws_region, aws_account_id=bsm.aws_account_id, database=resource["Name"], ), )
@property def title(self) -> str: return rl.format_key_value("name", self.database) @property def subtitle(self) -> str: return "{}, {}".format( self.description, self.short_subtitle, ) @property def autocomplete(self) -> str: return self.name @property def arn(self) -> str: return self.database_arn
[docs] def get_console_url(self, console: acu.AWSConsole) -> str: return console.glue.get_database(database_or_arn=self.arn)
[docs] @classmethod def get_list_resources_console_url(cls, console: acu.AWSConsole) -> str: return console.glue.databases
[docs]class GlueDatabaseSearcher(rl.BaseSearcher[GlueDatabase]): pass
glue_database_searcher = GlueDatabaseSearcher( # list resources service="glue", method="get_databases", is_paginator=True, default_boto_kwargs={"PaginationConfig": {"MaxItems": 9999, "PageSize": 1000}}, result_path=rl.ResultPath("DatabaseList"), # extract document doc_class=GlueDatabase, # search resource_type=rl.SearcherEnum.glue_database.value, fields=GlueDatabase.get_dataset_fields(), cache_expire=rl.config.get_cache_expire(rl.SearcherEnum.glue_database.value), more_cache_key=None, )
[docs]@dataclasses.dataclass class GlueTable(rl.ResourceDocument, GlueMixin): # fmt: off table_arn: str = dataclasses.field(metadata={"field": sayt.StoredField(name="table_arn")}) # fmt: on @property def catalog_id(self) -> str: return self.raw_data["CatalogId"] @property def database(self) -> str: return self.raw_data["DatabaseName"] @property def table(self) -> str: return self.raw_data["Name"]
[docs] @classmethod def from_resource(cls, resource, bsm, boto_kwargs): return cls( raw_data=resource, id="{}.{}".format(resource["DatabaseName"], resource["Name"]), name="{}.{}".format(resource["DatabaseName"], resource["Name"]), table_arn="arn:aws:glue:{aws_region}:{aws_account_id}:table/{database}/{table}".format( aws_region=bsm.aws_region, aws_account_id=bsm.aws_account_id, database=resource["DatabaseName"], table=resource["Name"], ), )
@property def title(self) -> str: return rl.format_key_value("fullname", f"{self.database}.{self.table}") @property def subtitle(self) -> str: return "{}, {}".format( self.description, self.short_subtitle, ) @property def autocomplete(self) -> str: return f"{self.database}@{self.table}" @property def arn(self) -> str: return self.table_arn
[docs] def get_console_url(self, console: acu.AWSConsole) -> str: return console.glue.get_table(table_or_arn=self.arn)
[docs] @classmethod def get_list_resources_console_url(cls, console: acu.AWSConsole) -> str: return console.glue.tables
[docs]class GlueTableSearcher(rl.BaseSearcher[GlueTable]): pass
glue_table_searcher = GlueTableSearcher( # list resources service="glue", method="get_tables", is_paginator=True, default_boto_kwargs={"PaginationConfig": {"MaxItems": 9999, "PageSize": 1000}}, result_path=rl.ResultPath("TableList"), # extract document doc_class=GlueTable, # search resource_type=rl.SearcherEnum.glue_database_table.value, fields=GlueTable.get_dataset_fields(), cache_expire=rl.config.get_cache_expire(rl.SearcherEnum.glue_database_table.value), more_cache_key=lambda boto_kwargs: [boto_kwargs["DatabaseName"]], )
[docs]@dataclasses.dataclass class GlueJob(rl.ResourceDocument, GlueMixin): # fmt: off job_arn: str = dataclasses.field(metadata={"field": sayt.StoredField(name="job_arn")}) # fmt: on
[docs] @classmethod def from_resource(cls, resource, bsm, boto_kwargs): return cls( raw_data=resource, id=resource["Name"], name=resource["Name"], job_arn="arn:aws:glue:{aws_region}:{aws_account_id}:job/{job}".format( aws_region=bsm.aws_region, aws_account_id=bsm.aws_account_id, job=resource["Name"], ), )
@property def title(self) -> str: return rl.format_key_value("name", self.name) @property def subtitle(self) -> str: return "{}, {}".format( self.description, self.short_subtitle, ) @property def autocomplete(self) -> str: return self.name @property def arn(self) -> str: return self.job_arn
[docs] def get_console_url(self, console: acu.AWSConsole) -> str: return console.glue.get_job(name_or_arn=self.arn)
[docs] @classmethod def get_list_resources_console_url(cls, console: acu.AWSConsole) -> str: return console.glue.jobs
# fmt: off
[docs] def get_details(self, ars: "ARS") -> T.List[rl.DetailItem]: from_detail = rl.DetailItem.from_detail url = self.get_console_url(console=ars.aws_console) detail_items = rl.DetailItem.get_initial_detail_items(doc=self, ars=ars) with rl.DetailItem.error_handling(detail_items): res = ars.bsm.glue_client.get_job(JobName=self.name) job_dct = res["Job"] description = job_dct.get("Description", "NA") role_arn = job_dct["Role"] glue_version = job_dct.get("GlueVersion", "NA") worker_type = job_dct.get("WorkerType", "NA") number_of_workers = job_dct.get("NumberOfWorkers", "NA") max_concurrent_runs = job_dct.get("ExecutionProperty", {}).get("MaxConcurrentRuns", "unknown") max_retries = job_dct.get("MaxRetries", "NA") execution_class = job_dct.get("ExecutionClass", "NA") script_location = job_dct.get("Command", {}).get("ScriptLocation", "NA") detail_items.extend([ from_detail("description", description, url=url), from_detail("role_arn", role_arn, url=ars.aws_console.iam.get_role(role_arn)), from_detail("glue_version", glue_version, url=url), from_detail("worker_type", worker_type, url=url), from_detail("number_of_workers", number_of_workers, url=url), from_detail("max_concurrent_runs", max_concurrent_runs, url=url), from_detail("max_retries", max_retries, url=url), from_detail("execution_class", execution_class, url=url), from_detail("script_location", script_location, url=ars.aws_console.s3.get_console_url(uri_liked=script_location)), ]) with rl.DetailItem.error_handling(detail_items): res = ars.bsm.glue_client.get_tags(ResourceArn=self.arn) tags = rl.extract_tags(res) detail_items.extend(rl.DetailItem.from_tags(tags, url)) return detail_items
# fmt: on
[docs]class GlueJobSearcher(rl.BaseSearcher[GlueJob]): pass
glue_job_searcher = GlueJobSearcher( # list resources service="glue", method="get_jobs", is_paginator=True, default_boto_kwargs={"PaginationConfig": {"MaxItems": 9999, "PageSize": 1000}}, result_path=rl.ResultPath("Jobs"), # extract document doc_class=GlueJob, # search resource_type=rl.SearcherEnum.glue_job.value, fields=GlueJob.get_dataset_fields(), cache_expire=rl.config.get_cache_expire(rl.SearcherEnum.glue_job.value), more_cache_key=None, ) glue_job_run_state_icon_mapper = { "STARTING": "🟡", "RUNNING": "🔵", "STOPPING": "🟡", "STOPPED": "⚫", "SUCCEEDED": "🟢", "FAILED": "🔴", "TIMEOUT": "🔴", "ERROR": "🔴", "WAITING'": "🟡", }
[docs]@dataclasses.dataclass class GlueJobRun(rl.ResourceDocument): # fmt: off started_on: datetime = dataclasses.field(metadata={"field": sayt.DatetimeField(name="started_on", sortable=True, ascending=False, stored=True)}) # fmt: on @property def job_name(self) -> str: return self.raw_data["JobName"] @property def state(self) -> str: return self.raw_data["JobRunState"] @property def completed_on(self) -> datetime: return self.raw_data.get("CompletedOn") @property def execution_time(self) -> int: return self.raw_data.get("ExecutionTime")
[docs] @classmethod def from_resource(cls, resource, bsm, boto_kwargs): return cls( raw_data=resource, started_on=resource.get("StartedOn"), id=resource["Id"], name=resource["Id"], )
@property def title(self) -> str: return rl.format_key_value("job_run_id", self.id) @property def subtitle(self) -> str: state_icon = glue_job_run_state_icon_mapper[self.state] return "{} | {} | {} | {}, {}".format( f"{state_icon} {self.state}", rl.format_key_value("start", str(self.started_on)[:19]), rl.format_key_value("end", str(self.completed_on)[:19]), rl.format_key_value("elapsed", f"{self.execution_time} secs"), self.short_subtitle, ) @property def autocomplete(self) -> str: return f"{self.job_name}@{self.id}" @property def arn(self) -> str: raise NotImplementedError("Glue Job Run does not have an ARN")
[docs] def get_console_url(self, console: acu.AWSConsole) -> str: return console.glue.get_glue_job_run( job_name_or_arn=self.job_name, job_run_id=self.id )
# fmt: off
[docs] def get_details(self, ars: "ARS") -> T.List[rl.DetailItem]: from_detail = rl.DetailItem.from_detail url = self.get_console_url(console=ars.aws_console) detail_items = [ from_detail("job_run_id", self.id, url=url), ] with rl.DetailItem.error_handling(detail_items): res = ars.bsm.glue_client.get_job_run( JobName=self.job_name, RunId=self.id, ) dct = res["JobRun"] error_message = dct.get("ErrorMessage", "NA") log_group_name = dct.get("LogGroupName", "NA") detail_items.extend([ from_detail("error_message", error_message, url=url), from_detail("output_logs", log_group_name, url=ars.aws_console.cloudwatch.get_log_stream(stream_name_or_arn=self.id, group_name=f"{log_group_name}/output")), from_detail("error_logs", log_group_name, url=ars.aws_console.cloudwatch.get_log_stream(stream_name_or_arn=self.id, group_name=f"{log_group_name}/error")), ]) args = dct.get("Arguments", {}) detail_items.extend( [ rl.DetailItem.new( title=f"📝 arg: {rl.format_key_value(k, v)}", subtitle=f"🌐 {rl.ShortcutEnum.ENTER} to open url, 📋 {rl.ShortcutEnum.CTRL_A} to copy arg value.", uid=f"arg {k}", copy=v, url=url, ) for k, v in args.items() ] ) if len(args) == 0: detail_items.append( rl.DetailItem.new( title=f"📝 arg: 🔴 No arg found", subtitle=f"no arg found", uid=f"no arg found", url=url, ) ) return detail_items
# fmt: on
[docs]class GlueJobRunSearcher(rl.BaseSearcher[GlueJobRun]): pass
glue_job_run_searcher = GlueJobRunSearcher( # list resources service="glue", method="get_job_runs", is_paginator=True, default_boto_kwargs={"PaginationConfig": {"MaxItems": 9999, "PageSize": 200}}, result_path=rl.ResultPath("JobRuns"), # extract document doc_class=GlueJobRun, # search resource_type=rl.SearcherEnum.glue_job_run.value, fields=GlueJobRun.get_dataset_fields(), cache_expire=rl.config.get_cache_expire(rl.SearcherEnum.glue_job_run.value), more_cache_key=lambda boto_kwargs: [boto_kwargs["JobName"]], ) glue_crawler_state_icon_mapper = { "READY": "🔵", "RUNNING": "🟢", "STOPPING": "🔴", }
[docs]@dataclasses.dataclass class GlueCrawler(rl.ResourceDocument, GlueMixin): # fmt: off crawler_arn: str = dataclasses.field(metadata={"field": sayt.StoredField(name="crawler_arn")}) # fmt: on
[docs] @classmethod def from_resource(cls, resource, bsm, boto_kwargs): return cls( raw_data=resource, id=resource["Name"], name=resource["Name"], crawler_arn=arns.res.GlueCrawler.new( aws_account_id=bsm.aws_account_id, aws_region=bsm.aws_region, name=resource["Name"], ).to_arn(), )
@property def title(self) -> str: return rl.format_key_value("name", self.name) @property def autocomplete(self) -> str: return self.name @property def arn(self) -> str: return self.crawler_arn
[docs] def get_console_url(self, console: acu.AWSConsole) -> str: return console.glue.get_crawler(name_or_arn=self.arn)
[docs] @classmethod def get_list_resources_console_url(cls, console: acu.AWSConsole) -> str: return console.glue.crawlers
[docs] def get_details(self, ars: "ARS") -> T.List[rl.DetailItem]: from_detail = rl.DetailItem.from_detail url = self.get_console_url(console=ars.aws_console) detail_items = rl.DetailItem.get_initial_detail_items(doc=self, ars=ars) with rl.DetailItem.error_handling(detail_items): res = ars.bsm.glue_client.get_crawler(Name=self.name) dct = res["Crawler"] description = dct.get("Description", "NA") state = dct.get("State", "NA") role_arn = dct["Role"] if not role_arn.startswith("arn:"): role_arn = arns.res.IamRole.new( aws_account_id=ars.bsm.aws_account_id, name=role_arn, ).to_arn() state_icon = glue_crawler_state_icon_mapper[state] # fmt: off detail_items.extend([ from_detail("description", description, url=url), from_detail("state", state, f"{state_icon} {state}", url=url), from_detail("role_arn", role_arn, url=ars.aws_console.iam.get_role(role_arn)), ]) # fmt: on with rl.DetailItem.error_handling(detail_items): res = ars.bsm.glue_client.get_tags(ResourceArn=self.arn) tags = rl.extract_tags(res) detail_items.extend(rl.DetailItem.from_tags(tags, url)) return detail_items
[docs]class GlueCrawlerSearcher(rl.BaseSearcher[GlueCrawler]): pass
glue_crawler_searcher = GlueCrawlerSearcher( # list resources service="glue", method="get_crawlers", is_paginator=True, default_boto_kwargs={"PaginationConfig": {"MaxItems": 9999, "PageSize": 1000}}, result_path=rl.ResultPath("Crawlers"), # extract document doc_class=GlueCrawler, # search resource_type=rl.SearcherEnum.glue_crawler.value, fields=GlueCrawler.get_dataset_fields(), cache_expire=rl.config.get_cache_expire(rl.SearcherEnum.glue_crawler.value), more_cache_key=None, )