Coverage for cookbook/helper/ 67%
149 statements
« prev ^ index » next v7.4.0, created at 2023-12-29 01:02 +0100
« prev ^ index » next v7.4.0, created at 2023-12-29 01:02 +0100
1import re
3from django.core.cache import caches
4from django.db.models.functions import Lower
6from cookbook.models import Automation
9class AutomationEngine:
10 request = None
11 source = None
12 use_cache = None
13 food_aliases = None
14 keyword_aliases = None
15 unit_aliases = None
16 never_unit = None
17 transpose_words = None
18 regex_replace = {
19 Automation.DESCRIPTION_REPLACE: None,
20 Automation.INSTRUCTION_REPLACE: None,
21 Automation.FOOD_REPLACE: None,
22 Automation.UNIT_REPLACE: None,
23 Automation.NAME_REPLACE: None,
24 }
26 def __init__(self, request, use_cache=True, source=None):
27 self.request = request
28 self.use_cache = use_cache
29 if not source:
30 self.source = "default_string_to_avoid_false_regex_match"
31 else:
32 self.source = source
34 def apply_keyword_automation(self, keyword):
35 keyword = keyword.strip()
36 if self.use_cache and self.keyword_aliases is None:
37 self.keyword_aliases = {}
38 KEYWORD_CACHE_KEY = f'automation_keyword_alias_{}'
39 if c := caches['default'].get(KEYWORD_CACHE_KEY, None):
40 self.keyword_aliases = c
41 caches['default'].touch(KEYWORD_CACHE_KEY, 30)
42 else:
43 for a in Automation.objects.filter(, disabled=False, type=Automation.KEYWORD_ALIAS).only('param_1', 'param_2').order_by('order').all():
44 self.keyword_aliases[a.param_1.lower()] = a.param_2
45 caches['default'].set(KEYWORD_CACHE_KEY, self.keyword_aliases, 30)
46 else:
47 self.keyword_aliases = {}
48 if self.keyword_aliases:
49 try:
50 keyword = self.keyword_aliases[keyword.lower()]
51 except KeyError:
52 pass
53 else:
54 if automation := Automation.objects.filter(, type=Automation.KEYWORD_ALIAS, param_1__iexact=keyword, disabled=False).order_by('order').first():
55 return automation.param_2
56 return keyword
58 def apply_unit_automation(self, unit):
59 unit = unit.strip()
60 if self.use_cache and self.unit_aliases is None:
61 self.unit_aliases = {}
62 UNIT_CACHE_KEY = f'automation_unit_alias_{}'
63 if c := caches['default'].get(UNIT_CACHE_KEY, None):
64 self.unit_aliases = c
65 caches['default'].touch(UNIT_CACHE_KEY, 30)
66 else:
67 for a in Automation.objects.filter(, disabled=False, type=Automation.UNIT_ALIAS).only('param_1', 'param_2').order_by('order').all():
68 self.unit_aliases[a.param_1.lower()] = a.param_2
69 caches['default'].set(UNIT_CACHE_KEY, self.unit_aliases, 30)
70 else:
71 self.unit_aliases = {}
72 if self.unit_aliases:
73 try:
74 unit = self.unit_aliases[unit.lower()]
75 except KeyError:
76 pass
77 else:
78 if automation := Automation.objects.filter(, type=Automation.UNIT_ALIAS, param_1__iexact=unit, disabled=False).order_by('order').first():
79 return automation.param_2
80 return self.apply_regex_replace_automation(unit, Automation.UNIT_REPLACE)
82 def apply_food_automation(self, food):
83 food = food.strip()
84 if self.use_cache and self.food_aliases is None:
85 self.food_aliases = {}
86 FOOD_CACHE_KEY = f'automation_food_alias_{}'
87 if c := caches['default'].get(FOOD_CACHE_KEY, None):
88 self.food_aliases = c
89 caches['default'].touch(FOOD_CACHE_KEY, 30)
90 else:
91 for a in Automation.objects.filter(, disabled=False, type=Automation.FOOD_ALIAS).only('param_1', 'param_2').order_by('order').all():
92 self.food_aliases[a.param_1.lower()] = a.param_2
93 caches['default'].set(FOOD_CACHE_KEY, self.food_aliases, 30)
94 else:
95 self.food_aliases = {}
97 if self.food_aliases:
98 try:
99 return self.food_aliases[food.lower()]
100 except KeyError:
101 return food
102 else:
103 if automation := Automation.objects.filter(, type=Automation.FOOD_ALIAS, param_1__iexact=food, disabled=False).order_by('order').first():
104 return automation.param_2
105 return self.apply_regex_replace_automation(food, Automation.FOOD_REPLACE)
107 def apply_never_unit_automation(self, tokens):
108 """
109 Moves a string that should never be treated as a unit to next token and optionally replaced with default unit
110 e.g. NEVER_UNIT: param1: egg, param2: None would modify ['1', 'egg', 'white'] to ['1', '', 'egg', 'white']
111 or NEVER_UNIT: param1: egg, param2: pcs would modify ['1', 'egg', 'yolk'] to ['1', 'pcs', 'egg', 'yolk']
112 :param1 string: string that should never be considered a unit, will be moved to token[2]
113 :param2 (optional) unit as string: will insert unit string into token[1]
114 :return: unit as string (possibly changed by automation)
115 """
117 if self.use_cache and self.never_unit is None:
118 self.never_unit = {}
119 NEVER_UNIT_CACHE_KEY = f'automation_never_unit_{}'
120 if c := caches['default'].get(NEVER_UNIT_CACHE_KEY, None):
121 self.never_unit = c
122 caches['default'].touch(NEVER_UNIT_CACHE_KEY, 30)
123 else:
124 for a in Automation.objects.filter(, disabled=False, type=Automation.NEVER_UNIT).only('param_1', 'param_2').order_by('order').all():
125 self.never_unit[a.param_1.lower()] = a.param_2
126 caches['default'].set(NEVER_UNIT_CACHE_KEY, self.never_unit, 30)
127 else:
128 self.never_unit = {}
130 new_unit = None
131 alt_unit = self.apply_unit_automation(tokens[1])
132 never_unit = False
133 if self.never_unit:
134 try:
135 new_unit = self.never_unit[tokens[1].lower()]
136 never_unit = True
137 except KeyError:
138 return tokens
139 else:
140 if a := Automation.objects.annotate(param_1_lower=Lower('param_1')).filter(, type=Automation.NEVER_UNIT, param_1_lower__in=[
141 tokens[1].lower(), alt_unit.lower()], disabled=False).order_by('order').first():
142 new_unit = a.param_2
143 never_unit = True
145 if never_unit:
146 tokens.insert(1, new_unit)
147 return tokens
149 def apply_transpose_automation(self, string):
150 """
151 If two words (param_1 & param_2) are detected in sequence, swap their position in the ingredient string
152 :param 1: first word to detect
153 :param 2: second word to detect
154 return: new ingredient string
155 """
156 if self.use_cache and self.transpose_words is None:
157 self.transpose_words = {}
158 TRANSPOSE_WORDS_CACHE_KEY = f'automation_transpose_words_{}'
159 if c := caches['default'].get(TRANSPOSE_WORDS_CACHE_KEY, None):
160 self.transpose_words = c
161 caches['default'].touch(TRANSPOSE_WORDS_CACHE_KEY, 30)
162 else:
163 i = 0
164 for a in Automation.objects.filter(, disabled=False, type=Automation.TRANSPOSE_WORDS).only(
165 'param_1', 'param_2').order_by('order').all()[:512]:
166 self.transpose_words[i] = [a.param_1.lower(), a.param_2.lower()]
167 i += 1
168 caches['default'].set(TRANSPOSE_WORDS_CACHE_KEY, self.transpose_words, 30)
169 else:
170 self.transpose_words = {}
172 tokens = [x.lower() for x in string.replace(',', ' ').split()]
173 if self.transpose_words:
174 for key, value in self.transpose_words.items():
175 if value[0] in tokens and value[1] in tokens:
176 string = re.sub(rf"\b({value[0]})\W*({value[1]})\b", r"\2 \1", string, flags=re.IGNORECASE)
177 else:
178 for rule in Automation.objects.filter(, type=Automation.TRANSPOSE_WORDS, disabled=False) \
179 .annotate(param_1_lower=Lower('param_1'), param_2_lower=Lower('param_2')) \
180 .filter(param_1_lower__in=tokens, param_2_lower__in=tokens).order_by('order')[:512]:
181 if rule.param_1 in tokens and rule.param_2 in tokens:
182 string = re.sub(rf"\b({rule.param_1})\W*({rule.param_2})\b", r"\2 \1", string, flags=re.IGNORECASE)
183 return string
185 def apply_regex_replace_automation(self, string, automation_type):
186 # TODO add warning - maybe on SPACE page? when a max of 512 automations of a specific type is exceeded (ALIAS types excluded?)
187 """
188 Replaces strings in a recipe field that are from a matched source
189 field_type are Automation.type that apply regex replacements
192 Automation.FOOD_REPLACE
193 Automation.UNIT_REPLACE
194 Automation.NAME_REPLACE
196 regex replacment utilized the following fields from the Automation model
197 :param 1: source that should apply the automation in regex format ('.*' for all)
198 :param 2: regex pattern to match ()
199 :param 3: replacement string (leave blank to delete)
200 return: new string
201 """
202 if self.use_cache and self.regex_replace[automation_type] is None:
203 self.regex_replace[automation_type] = {}
204 REGEX_REPLACE_CACHE_KEY = f'automation_regex_replace_{}'
205 if c := caches['default'].get(REGEX_REPLACE_CACHE_KEY, None):
206 self.regex_replace[automation_type] = c[automation_type]
207 caches['default'].touch(REGEX_REPLACE_CACHE_KEY, 30)
208 else:
209 i = 0
210 for a in Automation.objects.filter(, disabled=False, type=automation_type).only(
211 'param_1', 'param_2', 'param_3').order_by('order').all()[:512]:
212 self.regex_replace[automation_type][i] = [a.param_1, a.param_2, a.param_3]
213 i += 1
214 caches['default'].set(REGEX_REPLACE_CACHE_KEY, self.regex_replace, 30)
215 else:
216 self.regex_replace[automation_type] = {}
218 if self.regex_replace[automation_type]:
219 for rule in self.regex_replace[automation_type].values():
220 if re.match(rule[0], (self.source)[:512]):
221 string = re.sub(rule[1], rule[2], string, flags=re.IGNORECASE)
222 else:
223 for rule in Automation.objects.filter(, disabled=False, type=automation_type).only(
224 'param_1', 'param_2', 'param_3').order_by('order').all()[:512]:
225 if re.match(rule.param_1, (self.source)[:512]):
226 string = re.sub(rule.param_2, rule.param_3, string, flags=re.IGNORECASE)
227 return string