from time import sleep from typing import Optional import requests from internals.data.task import Task from internals.util.config import get_environment class Slack: def __init__(self): # self.webhook_url = "https://hooks.slack.com/services/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" self.webhook_url = "https://hooks.slack.com/services/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" self.error_webhook = "https://hooks.slack.com/services/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" def send_alert(self, task: Task, args: Optional[dict]): if task.get_slack_url(): self.webhook_url = task.get_slack_url() raw = task.get_raw().copy() raw["environment"] = get_environment() raw.pop("queue_name", None) raw.pop("attempt", None) raw.pop("timestamp", None) raw.pop("task_id", None) raw.pop("maskImageUrl", None) raw.pop("aux_imageUrl", None) raw.pop("slack_url", None) if args is not None: raw.update(args.items()) message = "" for key, value in raw.items(): if value: if type(value) == list: if type(value[0]) == float or type(value[0]) == int: value = str(value) message += f"*{key}*: {', '.join(value)}\n" else: message += f"*{key}*: {value}\n" requests.post( self.webhook_url, headers={"Content-Type": "application/json"}, json={"text": message}, ) def error_alert(self, task: Task, e: Exception): requests.post( self.error_webhook, headers={"Content-Type": "application/json"}, json={ "text": "Task failed:\n{} \n error is: \n {}".format(task.get_raw(), e) }, ) def auto_send_alert(self, func): def inner(*args, **kwargs): rargs = func(*args, **kwargs) task = Task({}) for arg in args: if type(arg) is Task: task = arg break self.send_alert(task, rargs) return rargs return inner