Coverage for cookbook/helper/recipe_url_import.py: 64%

331 statements  

« prev     ^ index     » next       coverage.py v7.4.0, created at 2023-12-29 01:02 +0100

1import re 

2import traceback 

3from html import unescape 

4 

5from django.utils.dateparse import parse_duration 

6from django.utils.translation import gettext as _ 

7from isodate import parse_duration as iso_parse_duration 

8from isodate.isoerror import ISO8601Error 

9from pytube import YouTube 

10from recipe_scrapers._utils import get_host_name, get_minutes 

11 

12from cookbook.helper.automation_helper import AutomationEngine 

13from cookbook.helper.ingredient_parser import IngredientParser 

14from cookbook.models import Automation, Keyword, PropertyType 

15 

16 

17def get_from_scraper(scrape, request): 

18 # converting the scrape_me object to the existing json format based on ld+json 

19 

20 recipe_json = { 

21 'steps': [], 

22 'internal': True 

23 } 

24 keywords = [] 

25 

26 # assign source URL 

27 try: 

28 source_url = scrape.canonical_url() 

29 except Exception: 

30 try: 

31 source_url = scrape.url 

32 except Exception: 

33 pass 

34 if source_url: 

35 recipe_json['source_url'] = source_url 

36 try: 

37 keywords.append(source_url.replace('http://', '').replace('https://', '').split('/')[0]) 

38 except Exception: 

39 recipe_json['source_url'] = '' 

40 

41 automation_engine = AutomationEngine(request, source=recipe_json.get('source_url')) 

42 # assign recipe name 

43 try: 

44 recipe_json['name'] = parse_name(scrape.title()[:128] or None) 

45 except Exception: 

46 recipe_json['name'] = None 

47 if not recipe_json['name']: 

48 try: 

49 recipe_json['name'] = scrape.schema.data.get('name') or '' 

50 except Exception: 

51 recipe_json['name'] = '' 

52 

53 if isinstance(recipe_json['name'], list) and len(recipe_json['name']) > 0: 

54 recipe_json['name'] = recipe_json['name'][0] 

55 

56 recipe_json['name'] = automation_engine.apply_regex_replace_automation(recipe_json['name'], Automation.NAME_REPLACE) 

57 

58 # assign recipe description 

59 # TODO notify user about limit if reached - >256 description will be truncated 

60 try: 

61 description = scrape.description() or None 

62 except Exception: 

63 description = None 

64 if not description: 

65 try: 

66 description = scrape.schema.data.get("description") or '' 

67 except Exception: 

68 description = '' 

69 

70 recipe_json['description'] = parse_description(description) 

71 recipe_json['description'] = automation_engine.apply_regex_replace_automation(recipe_json['description'], Automation.DESCRIPTION_REPLACE) 

72 

73 # assign servings attributes 

74 try: 

75 # dont use scrape.yields() as this will always return "x servings" or "x items", should be improved in scrapers directly 

76 servings = scrape.schema.data.get('recipeYield') or 1 

77 except Exception: 

78 servings = 1 

79 

80 recipe_json['servings'] = parse_servings(servings) 

81 recipe_json['servings_text'] = parse_servings_text(servings) 

82 

83 # assign time attributes 

84 try: 

85 recipe_json['working_time'] = get_minutes(scrape.prep_time()) or 0 

86 except Exception: 

87 try: 

88 recipe_json['working_time'] = get_minutes(scrape.schema.data.get("prepTime")) or 0 

89 except Exception: 

90 recipe_json['working_time'] = 0 

91 try: 

92 recipe_json['waiting_time'] = get_minutes(scrape.cook_time()) or 0 

93 except Exception: 

94 try: 

95 recipe_json['waiting_time'] = get_minutes(scrape.schema.data.get("cookTime")) or 0 

96 except Exception: 

97 recipe_json['waiting_time'] = 0 

98 

99 if recipe_json['working_time'] + recipe_json['waiting_time'] == 0: 

100 try: 

101 recipe_json['working_time'] = get_minutes(scrape.total_time()) or 0 

102 except Exception: 

103 try: 

104 recipe_json['working_time'] = get_minutes(scrape.schema.data.get("totalTime")) or 0 

105 except Exception: 

106 pass 

107 

108 # assign image 

109 try: 

110 recipe_json['image'] = parse_image(scrape.image()) or None 

111 except Exception: 

112 recipe_json['image'] = None 

113 if not recipe_json['image']: 

114 try: 

115 recipe_json['image'] = parse_image(scrape.schema.data.get('image')) or '' 

116 except Exception: 

117 recipe_json['image'] = '' 

118 

119 # assign keywords 

120 try: 

121 if scrape.schema.data.get("keywords"): 

122 keywords += listify_keywords(scrape.schema.data.get("keywords")) 

123 except Exception: 

124 pass 

125 try: 

126 if scrape.category(): 

127 keywords += listify_keywords(scrape.category()) 

128 except Exception: 

129 try: 

130 if scrape.schema.data.get('recipeCategory'): 

131 keywords += listify_keywords(scrape.schema.data.get("recipeCategory")) 

132 except Exception: 

133 pass 

134 try: 

135 if scrape.cuisine(): 

136 keywords += listify_keywords(scrape.cuisine()) 

137 except Exception: 

138 try: 

139 if scrape.schema.data.get('recipeCuisine'): 

140 keywords += listify_keywords(scrape.schema.data.get("recipeCuisine")) 

141 except Exception: 

142 pass 

143 

144 try: 

145 if scrape.author(): 

146 keywords.append(scrape.author()) 

147 except Exception: 

148 pass 

149 

150 try: 

151 recipe_json['keywords'] = parse_keywords(list(set(map(str.casefold, keywords))), request) 

152 except AttributeError: 

153 recipe_json['keywords'] = keywords 

154 

155 ingredient_parser = IngredientParser(request, True) 

156 

157 # assign steps 

158 try: 

159 for i in parse_instructions(scrape.instructions()): 

160 recipe_json['steps'].append({'instruction': i, 'ingredients': [], 'show_ingredients_table': request.user.userpreference.show_step_ingredients, }) 

161 except Exception: 

162 pass 

163 if len(recipe_json['steps']) == 0: 

164 recipe_json['steps'].append({'instruction': '', 'ingredients': [], }) 

165 

166 recipe_json['description'] = recipe_json['description'][:512] 

167 if len(recipe_json['description']) > 256: # split at 256 as long descriptions don't look good on recipe cards 

168 recipe_json['steps'][0]['instruction'] = f"*{recipe_json['description']}* \n\n" + recipe_json['steps'][0]['instruction'] 

169 

170 try: 

171 for x in scrape.ingredients(): 

172 if x.strip() != '': 

173 try: 

174 amount, unit, ingredient, note = ingredient_parser.parse(x) 

175 ingredient = { 

176 'amount': amount, 

177 'food': { 

178 'name': ingredient, 

179 }, 

180 'unit': None, 

181 'note': note, 

182 'original_text': x 

183 } 

184 if unit: 

185 ingredient['unit'] = {'name': unit, } 

186 recipe_json['steps'][0]['ingredients'].append(ingredient) 

187 except Exception: 

188 recipe_json['steps'][0]['ingredients'].append( 

189 { 

190 'amount': 0, 

191 'unit': None, 

192 'food': { 

193 'name': x, 

194 }, 

195 'note': '', 

196 'original_text': x 

197 } 

198 ) 

199 except Exception: 

200 pass 

201 

202 try: 

203 recipe_json['properties'] = get_recipe_properties(request.space, scrape.schema.nutrients()) 

204 print(recipe_json['properties']) 

205 except Exception: 

206 traceback.print_exc() 

207 pass 

208 

209 for s in recipe_json['steps']: 

210 s['instruction'] = automation_engine.apply_regex_replace_automation(s['instruction'], Automation.INSTRUCTION_REPLACE) 

211 # re.sub(a.param_2, a.param_3, s['instruction']) 

212 

213 return recipe_json 

214 

215 

216def get_recipe_properties(space, property_data): 

217 # {'servingSize': '1', 'calories': '302 kcal', 'proteinContent': '7,66g', 'fatContent': '11,56g', 'carbohydrateContent': '41,33g'} 

218 properties = { 

219 "property-calories": "calories", 

220 "property-carbohydrates": "carbohydrateContent", 

221 "property-proteins": "proteinContent", 

222 "property-fats": "fatContent", 

223 } 

224 recipe_properties = [] 

225 for pt in PropertyType.objects.filter(space=space, open_data_slug__in=list(properties.keys())).all(): 

226 for p in list(properties.keys()): 

227 if pt.open_data_slug == p: 

228 if properties[p] in property_data: 

229 recipe_properties.append({ 

230 'property_type': { 

231 'id': pt.id, 

232 'name': pt.name, 

233 }, 

234 'property_amount': parse_servings(property_data[properties[p]]) / float(property_data['servingSize']), 

235 }) 

236 

237 return recipe_properties 

238 

239 

240def get_from_youtube_scraper(url, request): 

241 """A YouTube Information Scraper.""" 

242 kw, created = Keyword.objects.get_or_create(name='YouTube', space=request.space) 

243 default_recipe_json = { 

244 'name': '', 

245 'internal': True, 

246 'description': '', 

247 'servings': 1, 

248 'working_time': 0, 

249 'waiting_time': 0, 

250 'image': "", 

251 'keywords': [{'name': kw.name, 'label': kw.name, 'id': kw.pk}], 

252 'source_url': url, 

253 'steps': [ 

254 { 

255 'ingredients': [], 

256 'instruction': '' 

257 } 

258 ] 

259 } 

260 

261 try: 

262 automation_engine = AutomationEngine(request, source=url) 

263 video = YouTube(url) 

264 video.streams.first() # this is required to execute some kind of generator/web request that fetches the description 

265 default_recipe_json['name'] = automation_engine.apply_regex_replace_automation(video.title, Automation.NAME_REPLACE) 

266 default_recipe_json['image'] = video.thumbnail_url 

267 if video.description: 

268 default_recipe_json['steps'][0]['instruction'] = automation_engine.apply_regex_replace_automation(video.description, Automation.INSTRUCTION_REPLACE) 

269 

270 except Exception: 

271 pass 

272 

273 return default_recipe_json 

274 

275 

276def parse_name(name): 

277 if isinstance(name, list): 

278 try: 

279 name = name[0] 

280 except Exception: 

281 name = 'ERROR' 

282 return normalize_string(name) 

283 

284 

285def parse_description(description): 

286 return normalize_string(description) 

287 

288 

289def clean_instruction_string(instruction): 

290 # handle HTML tags that can be converted to markup 

291 normalized_string = instruction \ 

292 .replace("<nobr>", "**") \ 

293 .replace("</nobr>", "**") \ 

294 .replace("<strong>", "**") \ 

295 .replace("</strong>", "**") 

296 normalized_string = normalize_string(normalized_string) 

297 normalized_string = normalized_string.replace('\n', ' \n') 

298 normalized_string = normalized_string.replace(' \n \n', '\n\n') 

299 

300 # handle unsupported, special UTF8 character in Thermomix-specific instructions, 

301 # that happen in nearly every recipe on Cookidoo, Zaubertopf Club, Rezeptwelt 

302 # and in Thermomix-specific recipes on many other sites 

303 return normalized_string \ 

304 .replace("", _('reverse rotation')) \ 

305 .replace("", _('careful rotation')) \ 

306 .replace("", _('knead')) \ 

307 .replace("Andicken ", _('thicken')) \ 

308 .replace("Erwärmen ", _('warm up')) \ 

309 .replace("Fermentieren ", _('ferment')) \ 

310 .replace("Sous-vide ", _("sous-vide")) 

311 

312 

313def parse_instructions(instructions): 

314 """ 

315 Convert arbitrary instructions object from website import and turn it into a flat list of strings 

316 :param instructions: any instructions object from import 

317 :return: list of strings (from one to many elements depending on website) 

318 """ 

319 instruction_list = [] 

320 

321 if isinstance(instructions, list): 

322 for i in instructions: 

323 if isinstance(i, str): 

324 instruction_list.append(clean_instruction_string(i)) 

325 else: 

326 if 'text' in i: 

327 instruction_list.append(clean_instruction_string(i['text'])) 

328 elif 'itemListElement' in i: 

329 for ile in i['itemListElement']: 

330 if isinstance(ile, str): 

331 instruction_list.append(clean_instruction_string(ile)) 

332 elif 'text' in ile: 

333 instruction_list.append(clean_instruction_string(ile['text'])) 

334 else: 

335 instruction_list.append(clean_instruction_string(str(i))) 

336 else: 

337 instruction_list.append(clean_instruction_string(instructions)) 

338 

339 return instruction_list 

340 

341 

342def parse_image(image): 

343 # check if list of images is returned, take first if so 

344 if not image: 

345 return None 

346 if isinstance(image, list): 

347 for pic in image: 

348 if (isinstance(pic, str)) and (pic[:4] == 'http'): 

349 image = pic 

350 elif 'url' in pic: 

351 image = pic['url'] 

352 elif isinstance(image, dict): 

353 if 'url' in image: 

354 image = image['url'] 

355 

356 # ignore relative image paths 

357 if image[:4] != 'http': 

358 image = '' 

359 return image 

360 

361 

362def parse_servings(servings): 

363 if isinstance(servings, str): 

364 try: 

365 servings = int(re.search(r'\d+', servings).group()) 

366 except AttributeError: 

367 servings = 1 

368 elif isinstance(servings, list): 

369 try: 

370 servings = int(re.findall(r'\b\d+\b', servings[0])[0]) 

371 except KeyError: 

372 servings = 1 

373 return servings 

374 

375 

376def parse_servings_text(servings): 

377 if isinstance(servings, str): 

378 try: 

379 servings = re.sub("\\d+", '', servings).strip() 

380 except Exception: 

381 servings = '' 

382 if isinstance(servings, list): 

383 try: 

384 servings = parse_servings_text(servings[1]) 

385 except Exception: 

386 pass 

387 return str(servings)[:32] 

388 

389 

390def parse_time(recipe_time): 

391 if type(recipe_time) not in [int, float]: 

392 try: 

393 recipe_time = float(re.search(r'\d+', recipe_time).group()) 

394 except (ValueError, AttributeError): 

395 try: 

396 recipe_time = round(iso_parse_duration(recipe_time).seconds / 60) 

397 except ISO8601Error: 

398 try: 

399 if (isinstance(recipe_time, list) and len(recipe_time) > 0): 

400 recipe_time = recipe_time[0] 

401 recipe_time = round(parse_duration(recipe_time).seconds / 60) 

402 except AttributeError: 

403 recipe_time = 0 

404 

405 return recipe_time 

406 

407 

408def parse_keywords(keyword_json, request): 

409 keywords = [] 

410 automation_engine = AutomationEngine(request) 

411 

412 # keywords as list 

413 for kw in keyword_json: 

414 kw = normalize_string(kw) 

415 # if alias exists use that instead 

416 

417 if len(kw) != 0: 

418 kw = automation_engine.apply_keyword_automation(kw) 

419 if k := Keyword.objects.filter(name__iexact=kw, space=request.space).first(): 

420 keywords.append({'label': str(k), 'name': k.name, 'id': k.id}) 

421 else: 

422 keywords.append({'label': kw, 'name': kw}) 

423 

424 return keywords 

425 

426 

427def listify_keywords(keyword_list): 

428 # keywords as string 

429 try: 

430 if isinstance(keyword_list[0], dict): 

431 return keyword_list 

432 except (KeyError, IndexError): 

433 pass 

434 if isinstance(keyword_list, str): 

435 keyword_list = keyword_list.split(',') 

436 

437 # keywords as string in list 

438 if (isinstance(keyword_list, list) and len(keyword_list) == 1 and ',' in keyword_list[0]): 

439 keyword_list = keyword_list[0].split(',') 

440 return [x.strip() for x in keyword_list] 

441 

442 

443def normalize_string(string): 

444 # Convert all named and numeric character references (e.g. &gt;, &#62;) 

445 unescaped_string = unescape(string) 

446 unescaped_string = re.sub('<[^<]+?>', '', unescaped_string) 

447 unescaped_string = re.sub(' +', ' ', unescaped_string) 

448 unescaped_string = re.sub('</p>', '\n', unescaped_string) 

449 unescaped_string = re.sub(r'\n\s*\n', '\n\n', unescaped_string) 

450 unescaped_string = unescaped_string.replace("\xa0", " ").replace("\t", " ").strip() 

451 return unescaped_string 

452 

453 

454def iso_duration_to_minutes(string): 

455 match = re.match( 

456 r'P((?P<years>\d+)Y)?((?P<months>\d+)M)?((?P<weeks>\d+)W)?((?P<days>\d+)D)?T((?P<hours>\d+)H)?((?P<minutes>\d+)M)?((?P<seconds>\d+)S)?', 

457 string 

458 ).groupdict() 

459 return int(match['days'] or 0) * 24 * 60 + int(match['hours'] or 0) * 60 + int(match['minutes'] or 0) 

460 

461 

462def get_images_from_soup(soup, url): 

463 sources = ['src', 'srcset', 'data-src'] 

464 images = [] 

465 img_tags = soup.find_all('img') 

466 if url: 

467 site = get_host_name(url) 

468 prot = url.split(':')[0] 

469 

470 urls = [] 

471 for img in img_tags: 

472 for src in sources: 

473 try: 

474 urls.append(img[src]) 

475 except KeyError: 

476 pass 

477 

478 for u in urls: 

479 u = u.split('?')[0] 

480 filename = re.search(r'/([\w_-]+[.](jpg|jpeg|gif|png))$', u) 

481 if filename: 

482 if (('http' not in u) and (url)): 

483 # sometimes an image source can be relative 

484 # if it is provide the base url 

485 u = '{}://{}{}'.format(prot, site, u) 

486 if 'http' in u: 

487 images.append(u) 

488 return images 

489 

490 

491def clean_dict(input_dict, key): 

492 if isinstance(input_dict, dict): 

493 for x in list(input_dict): 

494 if x == key: 

495 del input_dict[x] 

496 elif isinstance(input_dict[x], dict): 

497 input_dict[x] = clean_dict(input_dict[x], key) 

498 elif isinstance(input_dict[x], list): 

499 temp_list = [] 

500 for e in input_dict[x]: 

501 temp_list.append(clean_dict(e, key)) 

502 

503 return input_dict