CazC commited on
Commit
491765d
·
verified ·
1 Parent(s): 3aa18cd

Update worker.py

Browse files
Files changed (1) hide show
  1. worker.py +108 -19
worker.py CHANGED
@@ -15,18 +15,93 @@ url: str = os.environ.get("SUPABASE_URL")
15
  key: str = os.environ.get("SUPABASE_KEY")
16
  supabase: Client = create_client(url, key)
17
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
18
  def check_queue():
19
- try:
20
- tasks = supabase.table("Tasks").select("*").eq("status", "pending").execute()
21
- assert len(tasks.data) > 0
22
- if len(tasks.data) > 0:
23
- return tasks.data[0]
24
- else:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
25
  return None
26
- except Exception as e:
27
- print(f"Error checking queue: {e}")
28
- return None
29
-
30
 
31
  def generate_image(text):
32
  try:
@@ -65,17 +140,31 @@ def worker():
65
  while True:
66
  task = check_queue()
67
  if task:
68
- supabase.table("Tasks").update({"status": "processing"}).eq("id", task['id']).execute()
69
- print(f"Processing task {task['id']}")
70
- img = generate_image(task["text"])
 
 
 
 
 
 
 
 
71
  if img:
72
- print(f"Image generated for task {task['id']}")
73
- create_obj_file(img,task["id"])
74
- send_back_to_supabase(task["id"])
75
- print(f"Task {task['id']} completed")
 
 
 
76
  else:
77
- print(f"Error generating image for task {task['id']}")
78
- supabase.table("Tasks").update({"status": "error"}).eq("id", task['id']).execute()
 
 
 
79
 
80
  else:
81
  print("No pending tasks in the queue")
 
15
  key: str = os.environ.get("SUPABASE_KEY")
16
  supabase: Client = create_client(url, key)
17
 
18
+ fire_url = os.environ.get("FIRE_URL")
19
+ fire_bucket = os.environ.get("FIRE_BUCKET")
20
+ cred = credentials.Certificate('firekey.json')
21
+ firebase_admin.initialize_app(cred, {'storageBucket': fire_bucket})
22
+ ref = db.reference('/Tasks',url=fire_url)
23
+ bucket = storage.bucket()
24
+
25
+ IsSuperbase = False
26
+ if time.localtime().tm_mday > 15:
27
+ IsSuperbase = True
28
+
29
+
30
+ class Task:
31
+ def __init__(self,tasks):
32
+ """Initialize the task with the first task in the list of tasks"""
33
+ self.key = tasks[0][0]
34
+ self.text = tasks[0][1]['text']
35
+ self.status = tasks[0][1]['status']
36
+ self.result = tasks[0][1]['result']
37
+
38
+ def __str__(self):
39
+ return f"Task: {self.text}, Status: {self.status}, Result: {self.result}, Key: {self.key}"
40
+
41
+ def processing(self):
42
+ ref.child(self.key).set({
43
+ "text": self.text,
44
+ "status": 'processing',
45
+ 'result': ''
46
+ })
47
+ print('Processing')
48
+
49
+ def complete(self, result):
50
+ ref.child(self.key).set({
51
+ "text": self.text,
52
+ "status": 'complete',
53
+ 'result': result
54
+ })
55
+ print('Complete')
56
+
57
+ def error(self):
58
+ ref.child(self.key).set({
59
+ "text": self.text,
60
+ "status": 'error',
61
+ 'result': ''
62
+ })
63
+ print('Error')
64
+
65
+ def save_to_storage(self, binary_file_path, key):
66
+ try:
67
+ blob = bucket.blob(key+'.obj')
68
+ #upload but get download url as well
69
+ blob.upload_from_filename(binary_file_path)
70
+ blob.make_public()
71
+ self.complete(blob.public_url)
72
+ os.remove('task_'+str(self.key)+'.obj')
73
+ except Exception as e:
74
+ print(f"Error saving file to storage: {e}")
75
+ self.error()
76
+ return None
77
+
78
  def check_queue():
79
+ if IsSuperbase:
80
+ try:
81
+ tasks = supabase.table("Tasks").select("*").eq("status", "pending").execute()
82
+ assert len(tasks.data) > 0
83
+ if len(tasks.data) > 0:
84
+ return tasks.data[0]
85
+ else:
86
+ return None
87
+ except Exception as e:
88
+ print(f"Error checking queue: {e}")
89
+ return None
90
+
91
+ else:
92
+ try:
93
+ response = ref.order_by_child('status').equal_to('pending').limit_to_first(1).get()
94
+ tasks = list(response.items()) # Convert tasks to a list
95
+ print(tasks)
96
+ if len(tasks) > 0:
97
+ task = Task(tasks)
98
+ print(task)
99
+ return task
100
+ else:
101
+ return None
102
+ except Exception as e:
103
+ print(f"Error checking queue: {e}")
104
  return None
 
 
 
 
105
 
106
  def generate_image(text):
107
  try:
 
140
  while True:
141
  task = check_queue()
142
  if task:
143
+ if IsSuperbase:
144
+ task_id = task['id']
145
+ supabase.table("Tasks").update({"status": "processing"}).eq("id", task_id).execute()
146
+ print(f"Processing task {task_id}")
147
+ img = generate_image(task["text"])
148
+
149
+ else:
150
+ task_id = task.key
151
+ task.processing()
152
+ img = generate_image(task.text)
153
+
154
  if img:
155
+ print(f"Image generated for task {task_id}")
156
+ create_obj_file(img,task_id)
157
+ if IsSuperbase:
158
+ send_back_to_supabase(task_id)
159
+ else:
160
+ task.save_to_storage('task_'+str(task_id)+'.obj', task_id)
161
+ print(f"Task {task_id} completed")
162
  else:
163
+ print(f"Error generating image for task {task_id}")
164
+ if IsSuperbase:
165
+ supabase.table("Tasks").update({"status": "error"}).eq("id", task_id).execute()
166
+ else:
167
+ task.error()
168
 
169
  else:
170
  print("No pending tasks in the queue")