Coverage for cookbook/helper/automation_helper.py: 67%

149 statements  

« prev     ^ index     » next       coverage.py v7.4.0, created at 2023-12-29 01:02 +0100

1import re 

2 

3from django.core.cache import caches 

4from django.db.models.functions import Lower 

5 

6from cookbook.models import Automation 

7 

8 

9class AutomationEngine: 

10 request = None 

11 source = None 

12 use_cache = None 

13 food_aliases = None 

14 keyword_aliases = None 

15 unit_aliases = None 

16 never_unit = None 

17 transpose_words = None 

18 regex_replace = { 

19 Automation.DESCRIPTION_REPLACE: None, 

20 Automation.INSTRUCTION_REPLACE: None, 

21 Automation.FOOD_REPLACE: None, 

22 Automation.UNIT_REPLACE: None, 

23 Automation.NAME_REPLACE: None, 

24 } 

25 

26 def __init__(self, request, use_cache=True, source=None): 

27 self.request = request 

28 self.use_cache = use_cache 

29 if not source: 

30 self.source = "default_string_to_avoid_false_regex_match" 

31 else: 

32 self.source = source 

33 

34 def apply_keyword_automation(self, keyword): 

35 keyword = keyword.strip() 

36 if self.use_cache and self.keyword_aliases is None: 

37 self.keyword_aliases = {} 

38 KEYWORD_CACHE_KEY = f'automation_keyword_alias_{self.request.space.pk}' 

39 if c := caches['default'].get(KEYWORD_CACHE_KEY, None): 

40 self.keyword_aliases = c 

41 caches['default'].touch(KEYWORD_CACHE_KEY, 30) 

42 else: 

43 for a in Automation.objects.filter(space=self.request.space, disabled=False, type=Automation.KEYWORD_ALIAS).only('param_1', 'param_2').order_by('order').all(): 

44 self.keyword_aliases[a.param_1.lower()] = a.param_2 

45 caches['default'].set(KEYWORD_CACHE_KEY, self.keyword_aliases, 30) 

46 else: 

47 self.keyword_aliases = {} 

48 if self.keyword_aliases: 

49 try: 

50 keyword = self.keyword_aliases[keyword.lower()] 

51 except KeyError: 

52 pass 

53 else: 

54 if automation := Automation.objects.filter(space=self.request.space, type=Automation.KEYWORD_ALIAS, param_1__iexact=keyword, disabled=False).order_by('order').first(): 

55 return automation.param_2 

56 return keyword 

57 

58 def apply_unit_automation(self, unit): 

59 unit = unit.strip() 

60 if self.use_cache and self.unit_aliases is None: 

61 self.unit_aliases = {} 

62 UNIT_CACHE_KEY = f'automation_unit_alias_{self.request.space.pk}' 

63 if c := caches['default'].get(UNIT_CACHE_KEY, None): 

64 self.unit_aliases = c 

65 caches['default'].touch(UNIT_CACHE_KEY, 30) 

66 else: 

67 for a in Automation.objects.filter(space=self.request.space, disabled=False, type=Automation.UNIT_ALIAS).only('param_1', 'param_2').order_by('order').all(): 

68 self.unit_aliases[a.param_1.lower()] = a.param_2 

69 caches['default'].set(UNIT_CACHE_KEY, self.unit_aliases, 30) 

70 else: 

71 self.unit_aliases = {} 

72 if self.unit_aliases: 

73 try: 

74 unit = self.unit_aliases[unit.lower()] 

75 except KeyError: 

76 pass 

77 else: 

78 if automation := Automation.objects.filter(space=self.request.space, type=Automation.UNIT_ALIAS, param_1__iexact=unit, disabled=False).order_by('order').first(): 

79 return automation.param_2 

80 return self.apply_regex_replace_automation(unit, Automation.UNIT_REPLACE) 

81 

82 def apply_food_automation(self, food): 

83 food = food.strip() 

84 if self.use_cache and self.food_aliases is None: 

85 self.food_aliases = {} 

86 FOOD_CACHE_KEY = f'automation_food_alias_{self.request.space.pk}' 

87 if c := caches['default'].get(FOOD_CACHE_KEY, None): 

88 self.food_aliases = c 

89 caches['default'].touch(FOOD_CACHE_KEY, 30) 

90 else: 

91 for a in Automation.objects.filter(space=self.request.space, disabled=False, type=Automation.FOOD_ALIAS).only('param_1', 'param_2').order_by('order').all(): 

92 self.food_aliases[a.param_1.lower()] = a.param_2 

93 caches['default'].set(FOOD_CACHE_KEY, self.food_aliases, 30) 

94 else: 

95 self.food_aliases = {} 

96 

97 if self.food_aliases: 

98 try: 

99 return self.food_aliases[food.lower()] 

100 except KeyError: 

101 return food 

102 else: 

103 if automation := Automation.objects.filter(space=self.request.space, type=Automation.FOOD_ALIAS, param_1__iexact=food, disabled=False).order_by('order').first(): 

104 return automation.param_2 

105 return self.apply_regex_replace_automation(food, Automation.FOOD_REPLACE) 

106 

107 def apply_never_unit_automation(self, tokens): 

108 """ 

109 Moves a string that should never be treated as a unit to next token and optionally replaced with default unit 

110 e.g. NEVER_UNIT: param1: egg, param2: None would modify ['1', 'egg', 'white'] to ['1', '', 'egg', 'white'] 

111 or NEVER_UNIT: param1: egg, param2: pcs would modify ['1', 'egg', 'yolk'] to ['1', 'pcs', 'egg', 'yolk'] 

112 :param1 string: string that should never be considered a unit, will be moved to token[2] 

113 :param2 (optional) unit as string: will insert unit string into token[1] 

114 :return: unit as string (possibly changed by automation) 

115 """ 

116 

117 if self.use_cache and self.never_unit is None: 

118 self.never_unit = {} 

119 NEVER_UNIT_CACHE_KEY = f'automation_never_unit_{self.request.space.pk}' 

120 if c := caches['default'].get(NEVER_UNIT_CACHE_KEY, None): 

121 self.never_unit = c 

122 caches['default'].touch(NEVER_UNIT_CACHE_KEY, 30) 

123 else: 

124 for a in Automation.objects.filter(space=self.request.space, disabled=False, type=Automation.NEVER_UNIT).only('param_1', 'param_2').order_by('order').all(): 

125 self.never_unit[a.param_1.lower()] = a.param_2 

126 caches['default'].set(NEVER_UNIT_CACHE_KEY, self.never_unit, 30) 

127 else: 

128 self.never_unit = {} 

129 

130 new_unit = None 

131 alt_unit = self.apply_unit_automation(tokens[1]) 

132 never_unit = False 

133 if self.never_unit: 

134 try: 

135 new_unit = self.never_unit[tokens[1].lower()] 

136 never_unit = True 

137 except KeyError: 

138 return tokens 

139 else: 

140 if a := Automation.objects.annotate(param_1_lower=Lower('param_1')).filter(space=self.request.space, type=Automation.NEVER_UNIT, param_1_lower__in=[ 

141 tokens[1].lower(), alt_unit.lower()], disabled=False).order_by('order').first(): 

142 new_unit = a.param_2 

143 never_unit = True 

144 

145 if never_unit: 

146 tokens.insert(1, new_unit) 

147 return tokens 

148 

149 def apply_transpose_automation(self, string): 

150 """ 

151 If two words (param_1 & param_2) are detected in sequence, swap their position in the ingredient string 

152 :param 1: first word to detect 

153 :param 2: second word to detect 

154 return: new ingredient string 

155 """ 

156 if self.use_cache and self.transpose_words is None: 

157 self.transpose_words = {} 

158 TRANSPOSE_WORDS_CACHE_KEY = f'automation_transpose_words_{self.request.space.pk}' 

159 if c := caches['default'].get(TRANSPOSE_WORDS_CACHE_KEY, None): 

160 self.transpose_words = c 

161 caches['default'].touch(TRANSPOSE_WORDS_CACHE_KEY, 30) 

162 else: 

163 i = 0 

164 for a in Automation.objects.filter(space=self.request.space, disabled=False, type=Automation.TRANSPOSE_WORDS).only( 

165 'param_1', 'param_2').order_by('order').all()[:512]: 

166 self.transpose_words[i] = [a.param_1.lower(), a.param_2.lower()] 

167 i += 1 

168 caches['default'].set(TRANSPOSE_WORDS_CACHE_KEY, self.transpose_words, 30) 

169 else: 

170 self.transpose_words = {} 

171 

172 tokens = [x.lower() for x in string.replace(',', ' ').split()] 

173 if self.transpose_words: 

174 for key, value in self.transpose_words.items(): 

175 if value[0] in tokens and value[1] in tokens: 

176 string = re.sub(rf"\b({value[0]})\W*({value[1]})\b", r"\2 \1", string, flags=re.IGNORECASE) 

177 else: 

178 for rule in Automation.objects.filter(space=self.request.space, type=Automation.TRANSPOSE_WORDS, disabled=False) \ 

179 .annotate(param_1_lower=Lower('param_1'), param_2_lower=Lower('param_2')) \ 

180 .filter(param_1_lower__in=tokens, param_2_lower__in=tokens).order_by('order')[:512]: 

181 if rule.param_1 in tokens and rule.param_2 in tokens: 

182 string = re.sub(rf"\b({rule.param_1})\W*({rule.param_2})\b", r"\2 \1", string, flags=re.IGNORECASE) 

183 return string 

184 

185 def apply_regex_replace_automation(self, string, automation_type): 

186 # TODO add warning - maybe on SPACE page? when a max of 512 automations of a specific type is exceeded (ALIAS types excluded?) 

187 """ 

188 Replaces strings in a recipe field that are from a matched source 

189 field_type are Automation.type that apply regex replacements 

190 Automation.DESCRIPTION_REPLACE 

191 Automation.INSTRUCTION_REPLACE 

192 Automation.FOOD_REPLACE 

193 Automation.UNIT_REPLACE 

194 Automation.NAME_REPLACE 

195 

196 regex replacment utilized the following fields from the Automation model 

197 :param 1: source that should apply the automation in regex format ('.*' for all) 

198 :param 2: regex pattern to match () 

199 :param 3: replacement string (leave blank to delete) 

200 return: new string 

201 """ 

202 if self.use_cache and self.regex_replace[automation_type] is None: 

203 self.regex_replace[automation_type] = {} 

204 REGEX_REPLACE_CACHE_KEY = f'automation_regex_replace_{self.request.space.pk}' 

205 if c := caches['default'].get(REGEX_REPLACE_CACHE_KEY, None): 

206 self.regex_replace[automation_type] = c[automation_type] 

207 caches['default'].touch(REGEX_REPLACE_CACHE_KEY, 30) 

208 else: 

209 i = 0 

210 for a in Automation.objects.filter(space=self.request.space, disabled=False, type=automation_type).only( 

211 'param_1', 'param_2', 'param_3').order_by('order').all()[:512]: 

212 self.regex_replace[automation_type][i] = [a.param_1, a.param_2, a.param_3] 

213 i += 1 

214 caches['default'].set(REGEX_REPLACE_CACHE_KEY, self.regex_replace, 30) 

215 else: 

216 self.regex_replace[automation_type] = {} 

217 

218 if self.regex_replace[automation_type]: 

219 for rule in self.regex_replace[automation_type].values(): 

220 if re.match(rule[0], (self.source)[:512]): 

221 string = re.sub(rule[1], rule[2], string, flags=re.IGNORECASE) 

222 else: 

223 for rule in Automation.objects.filter(space=self.request.space, disabled=False, type=automation_type).only( 

224 'param_1', 'param_2', 'param_3').order_by('order').all()[:512]: 

225 if re.match(rule.param_1, (self.source)[:512]): 

226 string = re.sub(rule.param_2, rule.param_3, string, flags=re.IGNORECASE) 

227 return string