Coverage for cookbook/helper/recipe_url_import.py: 64%
331 statements
« prev ^ index » next coverage.py v7.4.0, created at 2023-12-29 01:02 +0100
« prev ^ index » next coverage.py v7.4.0, created at 2023-12-29 01:02 +0100
1import re
2import traceback
3from html import unescape
5from django.utils.dateparse import parse_duration
6from django.utils.translation import gettext as _
7from isodate import parse_duration as iso_parse_duration
8from isodate.isoerror import ISO8601Error
9from pytube import YouTube
10from recipe_scrapers._utils import get_host_name, get_minutes
12from cookbook.helper.automation_helper import AutomationEngine
13from cookbook.helper.ingredient_parser import IngredientParser
14from cookbook.models import Automation, Keyword, PropertyType
17def get_from_scraper(scrape, request):
18 # converting the scrape_me object to the existing json format based on ld+json
20 recipe_json = {
21 'steps': [],
22 'internal': True
23 }
24 keywords = []
26 # assign source URL
27 try:
28 source_url = scrape.canonical_url()
29 except Exception:
30 try:
31 source_url = scrape.url
32 except Exception:
33 pass
34 if source_url:
35 recipe_json['source_url'] = source_url
36 try:
37 keywords.append(source_url.replace('http://', '').replace('https://', '').split('/')[0])
38 except Exception:
39 recipe_json['source_url'] = ''
41 automation_engine = AutomationEngine(request, source=recipe_json.get('source_url'))
42 # assign recipe name
43 try:
44 recipe_json['name'] = parse_name(scrape.title()[:128] or None)
45 except Exception:
46 recipe_json['name'] = None
47 if not recipe_json['name']:
48 try:
49 recipe_json['name'] = scrape.schema.data.get('name') or ''
50 except Exception:
51 recipe_json['name'] = ''
53 if isinstance(recipe_json['name'], list) and len(recipe_json['name']) > 0:
54 recipe_json['name'] = recipe_json['name'][0]
56 recipe_json['name'] = automation_engine.apply_regex_replace_automation(recipe_json['name'], Automation.NAME_REPLACE)
58 # assign recipe description
59 # TODO notify user about limit if reached - >256 description will be truncated
60 try:
61 description = scrape.description() or None
62 except Exception:
63 description = None
64 if not description:
65 try:
66 description = scrape.schema.data.get("description") or ''
67 except Exception:
68 description = ''
70 recipe_json['description'] = parse_description(description)
71 recipe_json['description'] = automation_engine.apply_regex_replace_automation(recipe_json['description'], Automation.DESCRIPTION_REPLACE)
73 # assign servings attributes
74 try:
75 # dont use scrape.yields() as this will always return "x servings" or "x items", should be improved in scrapers directly
76 servings = scrape.schema.data.get('recipeYield') or 1
77 except Exception:
78 servings = 1
80 recipe_json['servings'] = parse_servings(servings)
81 recipe_json['servings_text'] = parse_servings_text(servings)
83 # assign time attributes
84 try:
85 recipe_json['working_time'] = get_minutes(scrape.prep_time()) or 0
86 except Exception:
87 try:
88 recipe_json['working_time'] = get_minutes(scrape.schema.data.get("prepTime")) or 0
89 except Exception:
90 recipe_json['working_time'] = 0
91 try:
92 recipe_json['waiting_time'] = get_minutes(scrape.cook_time()) or 0
93 except Exception:
94 try:
95 recipe_json['waiting_time'] = get_minutes(scrape.schema.data.get("cookTime")) or 0
96 except Exception:
97 recipe_json['waiting_time'] = 0
99 if recipe_json['working_time'] + recipe_json['waiting_time'] == 0:
100 try:
101 recipe_json['working_time'] = get_minutes(scrape.total_time()) or 0
102 except Exception:
103 try:
104 recipe_json['working_time'] = get_minutes(scrape.schema.data.get("totalTime")) or 0
105 except Exception:
106 pass
108 # assign image
109 try:
110 recipe_json['image'] = parse_image(scrape.image()) or None
111 except Exception:
112 recipe_json['image'] = None
113 if not recipe_json['image']:
114 try:
115 recipe_json['image'] = parse_image(scrape.schema.data.get('image')) or ''
116 except Exception:
117 recipe_json['image'] = ''
119 # assign keywords
120 try:
121 if scrape.schema.data.get("keywords"):
122 keywords += listify_keywords(scrape.schema.data.get("keywords"))
123 except Exception:
124 pass
125 try:
126 if scrape.category():
127 keywords += listify_keywords(scrape.category())
128 except Exception:
129 try:
130 if scrape.schema.data.get('recipeCategory'):
131 keywords += listify_keywords(scrape.schema.data.get("recipeCategory"))
132 except Exception:
133 pass
134 try:
135 if scrape.cuisine():
136 keywords += listify_keywords(scrape.cuisine())
137 except Exception:
138 try:
139 if scrape.schema.data.get('recipeCuisine'):
140 keywords += listify_keywords(scrape.schema.data.get("recipeCuisine"))
141 except Exception:
142 pass
144 try:
145 if scrape.author():
146 keywords.append(scrape.author())
147 except Exception:
148 pass
150 try:
151 recipe_json['keywords'] = parse_keywords(list(set(map(str.casefold, keywords))), request)
152 except AttributeError:
153 recipe_json['keywords'] = keywords
155 ingredient_parser = IngredientParser(request, True)
157 # assign steps
158 try:
159 for i in parse_instructions(scrape.instructions()):
160 recipe_json['steps'].append({'instruction': i, 'ingredients': [], 'show_ingredients_table': request.user.userpreference.show_step_ingredients, })
161 except Exception:
162 pass
163 if len(recipe_json['steps']) == 0:
164 recipe_json['steps'].append({'instruction': '', 'ingredients': [], })
166 recipe_json['description'] = recipe_json['description'][:512]
167 if len(recipe_json['description']) > 256: # split at 256 as long descriptions don't look good on recipe cards
168 recipe_json['steps'][0]['instruction'] = f"*{recipe_json['description']}* \n\n" + recipe_json['steps'][0]['instruction']
170 try:
171 for x in scrape.ingredients():
172 if x.strip() != '':
173 try:
174 amount, unit, ingredient, note = ingredient_parser.parse(x)
175 ingredient = {
176 'amount': amount,
177 'food': {
178 'name': ingredient,
179 },
180 'unit': None,
181 'note': note,
182 'original_text': x
183 }
184 if unit:
185 ingredient['unit'] = {'name': unit, }
186 recipe_json['steps'][0]['ingredients'].append(ingredient)
187 except Exception:
188 recipe_json['steps'][0]['ingredients'].append(
189 {
190 'amount': 0,
191 'unit': None,
192 'food': {
193 'name': x,
194 },
195 'note': '',
196 'original_text': x
197 }
198 )
199 except Exception:
200 pass
202 try:
203 recipe_json['properties'] = get_recipe_properties(request.space, scrape.schema.nutrients())
204 print(recipe_json['properties'])
205 except Exception:
206 traceback.print_exc()
207 pass
209 for s in recipe_json['steps']:
210 s['instruction'] = automation_engine.apply_regex_replace_automation(s['instruction'], Automation.INSTRUCTION_REPLACE)
211 # re.sub(a.param_2, a.param_3, s['instruction'])
213 return recipe_json
216def get_recipe_properties(space, property_data):
217 # {'servingSize': '1', 'calories': '302 kcal', 'proteinContent': '7,66g', 'fatContent': '11,56g', 'carbohydrateContent': '41,33g'}
218 properties = {
219 "property-calories": "calories",
220 "property-carbohydrates": "carbohydrateContent",
221 "property-proteins": "proteinContent",
222 "property-fats": "fatContent",
223 }
224 recipe_properties = []
225 for pt in PropertyType.objects.filter(space=space, open_data_slug__in=list(properties.keys())).all():
226 for p in list(properties.keys()):
227 if pt.open_data_slug == p:
228 if properties[p] in property_data:
229 recipe_properties.append({
230 'property_type': {
231 'id': pt.id,
232 'name': pt.name,
233 },
234 'property_amount': parse_servings(property_data[properties[p]]) / float(property_data['servingSize']),
235 })
237 return recipe_properties
240def get_from_youtube_scraper(url, request):
241 """A YouTube Information Scraper."""
242 kw, created = Keyword.objects.get_or_create(name='YouTube', space=request.space)
243 default_recipe_json = {
244 'name': '',
245 'internal': True,
246 'description': '',
247 'servings': 1,
248 'working_time': 0,
249 'waiting_time': 0,
250 'image': "",
251 'keywords': [{'name': kw.name, 'label': kw.name, 'id': kw.pk}],
252 'source_url': url,
253 'steps': [
254 {
255 'ingredients': [],
256 'instruction': ''
257 }
258 ]
259 }
261 try:
262 automation_engine = AutomationEngine(request, source=url)
263 video = YouTube(url)
264 video.streams.first() # this is required to execute some kind of generator/web request that fetches the description
265 default_recipe_json['name'] = automation_engine.apply_regex_replace_automation(video.title, Automation.NAME_REPLACE)
266 default_recipe_json['image'] = video.thumbnail_url
267 if video.description:
268 default_recipe_json['steps'][0]['instruction'] = automation_engine.apply_regex_replace_automation(video.description, Automation.INSTRUCTION_REPLACE)
270 except Exception:
271 pass
273 return default_recipe_json
276def parse_name(name):
277 if isinstance(name, list):
278 try:
279 name = name[0]
280 except Exception:
281 name = 'ERROR'
282 return normalize_string(name)
285def parse_description(description):
286 return normalize_string(description)
289def clean_instruction_string(instruction):
290 # handle HTML tags that can be converted to markup
291 normalized_string = instruction \
292 .replace("<nobr>", "**") \
293 .replace("</nobr>", "**") \
294 .replace("<strong>", "**") \
295 .replace("</strong>", "**")
296 normalized_string = normalize_string(normalized_string)
297 normalized_string = normalized_string.replace('\n', ' \n')
298 normalized_string = normalized_string.replace(' \n \n', '\n\n')
300 # handle unsupported, special UTF8 character in Thermomix-specific instructions,
301 # that happen in nearly every recipe on Cookidoo, Zaubertopf Club, Rezeptwelt
302 # and in Thermomix-specific recipes on many other sites
303 return normalized_string \
304 .replace("", _('reverse rotation')) \
305 .replace("", _('careful rotation')) \
306 .replace("", _('knead')) \
307 .replace("Andicken ", _('thicken')) \
308 .replace("Erwärmen ", _('warm up')) \
309 .replace("Fermentieren ", _('ferment')) \
310 .replace("Sous-vide ", _("sous-vide"))
313def parse_instructions(instructions):
314 """
315 Convert arbitrary instructions object from website import and turn it into a flat list of strings
316 :param instructions: any instructions object from import
317 :return: list of strings (from one to many elements depending on website)
318 """
319 instruction_list = []
321 if isinstance(instructions, list):
322 for i in instructions:
323 if isinstance(i, str):
324 instruction_list.append(clean_instruction_string(i))
325 else:
326 if 'text' in i:
327 instruction_list.append(clean_instruction_string(i['text']))
328 elif 'itemListElement' in i:
329 for ile in i['itemListElement']:
330 if isinstance(ile, str):
331 instruction_list.append(clean_instruction_string(ile))
332 elif 'text' in ile:
333 instruction_list.append(clean_instruction_string(ile['text']))
334 else:
335 instruction_list.append(clean_instruction_string(str(i)))
336 else:
337 instruction_list.append(clean_instruction_string(instructions))
339 return instruction_list
342def parse_image(image):
343 # check if list of images is returned, take first if so
344 if not image:
345 return None
346 if isinstance(image, list):
347 for pic in image:
348 if (isinstance(pic, str)) and (pic[:4] == 'http'):
349 image = pic
350 elif 'url' in pic:
351 image = pic['url']
352 elif isinstance(image, dict):
353 if 'url' in image:
354 image = image['url']
356 # ignore relative image paths
357 if image[:4] != 'http':
358 image = ''
359 return image
362def parse_servings(servings):
363 if isinstance(servings, str):
364 try:
365 servings = int(re.search(r'\d+', servings).group())
366 except AttributeError:
367 servings = 1
368 elif isinstance(servings, list):
369 try:
370 servings = int(re.findall(r'\b\d+\b', servings[0])[0])
371 except KeyError:
372 servings = 1
373 return servings
376def parse_servings_text(servings):
377 if isinstance(servings, str):
378 try:
379 servings = re.sub("\\d+", '', servings).strip()
380 except Exception:
381 servings = ''
382 if isinstance(servings, list):
383 try:
384 servings = parse_servings_text(servings[1])
385 except Exception:
386 pass
387 return str(servings)[:32]
390def parse_time(recipe_time):
391 if type(recipe_time) not in [int, float]:
392 try:
393 recipe_time = float(re.search(r'\d+', recipe_time).group())
394 except (ValueError, AttributeError):
395 try:
396 recipe_time = round(iso_parse_duration(recipe_time).seconds / 60)
397 except ISO8601Error:
398 try:
399 if (isinstance(recipe_time, list) and len(recipe_time) > 0):
400 recipe_time = recipe_time[0]
401 recipe_time = round(parse_duration(recipe_time).seconds / 60)
402 except AttributeError:
403 recipe_time = 0
405 return recipe_time
408def parse_keywords(keyword_json, request):
409 keywords = []
410 automation_engine = AutomationEngine(request)
412 # keywords as list
413 for kw in keyword_json:
414 kw = normalize_string(kw)
415 # if alias exists use that instead
417 if len(kw) != 0:
418 kw = automation_engine.apply_keyword_automation(kw)
419 if k := Keyword.objects.filter(name__iexact=kw, space=request.space).first():
420 keywords.append({'label': str(k), 'name': k.name, 'id': k.id})
421 else:
422 keywords.append({'label': kw, 'name': kw})
424 return keywords
427def listify_keywords(keyword_list):
428 # keywords as string
429 try:
430 if isinstance(keyword_list[0], dict):
431 return keyword_list
432 except (KeyError, IndexError):
433 pass
434 if isinstance(keyword_list, str):
435 keyword_list = keyword_list.split(',')
437 # keywords as string in list
438 if (isinstance(keyword_list, list) and len(keyword_list) == 1 and ',' in keyword_list[0]):
439 keyword_list = keyword_list[0].split(',')
440 return [x.strip() for x in keyword_list]
443def normalize_string(string):
444 # Convert all named and numeric character references (e.g. >, >)
445 unescaped_string = unescape(string)
446 unescaped_string = re.sub('<[^<]+?>', '', unescaped_string)
447 unescaped_string = re.sub(' +', ' ', unescaped_string)
448 unescaped_string = re.sub('</p>', '\n', unescaped_string)
449 unescaped_string = re.sub(r'\n\s*\n', '\n\n', unescaped_string)
450 unescaped_string = unescaped_string.replace("\xa0", " ").replace("\t", " ").strip()
451 return unescaped_string
454def iso_duration_to_minutes(string):
455 match = re.match(
456 r'P((?P<years>\d+)Y)?((?P<months>\d+)M)?((?P<weeks>\d+)W)?((?P<days>\d+)D)?T((?P<hours>\d+)H)?((?P<minutes>\d+)M)?((?P<seconds>\d+)S)?',
457 string
458 ).groupdict()
459 return int(match['days'] or 0) * 24 * 60 + int(match['hours'] or 0) * 60 + int(match['minutes'] or 0)
462def get_images_from_soup(soup, url):
463 sources = ['src', 'srcset', 'data-src']
464 images = []
465 img_tags = soup.find_all('img')
466 if url:
467 site = get_host_name(url)
468 prot = url.split(':')[0]
470 urls = []
471 for img in img_tags:
472 for src in sources:
473 try:
474 urls.append(img[src])
475 except KeyError:
476 pass
478 for u in urls:
479 u = u.split('?')[0]
480 filename = re.search(r'/([\w_-]+[.](jpg|jpeg|gif|png))$', u)
481 if filename:
482 if (('http' not in u) and (url)):
483 # sometimes an image source can be relative
484 # if it is provide the base url
485 u = '{}://{}{}'.format(prot, site, u)
486 if 'http' in u:
487 images.append(u)
488 return images
491def clean_dict(input_dict, key):
492 if isinstance(input_dict, dict):
493 for x in list(input_dict):
494 if x == key:
495 del input_dict[x]
496 elif isinstance(input_dict[x], dict):
497 input_dict[x] = clean_dict(input_dict[x], key)
498 elif isinstance(input_dict[x], list):
499 temp_list = []
500 for e in input_dict[x]:
501 temp_list.append(clean_dict(e, key))
503 return input_dict