Coverage for cookbook/integration/integration.py: 20%

189 statements  

« prev     ^ index     » next       coverage.py v7.4.0, created at 2023-12-29 00:47 +0100

1import datetime 

2import traceback 

3import uuid 

4from io import BytesIO 

5from zipfile import BadZipFile, ZipFile 

6 

7from bs4 import Tag 

8from django.core.cache import cache 

9from django.core.exceptions import ObjectDoesNotExist 

10from django.core.files import File 

11from django.db import IntegrityError 

12from django.http import HttpResponse 

13from django.utils.formats import date_format 

14from django.utils.translation import gettext as _ 

15from django_scopes import scope 

16from lxml import etree 

17 

18from cookbook.helper.image_processing import handle_image 

19from cookbook.models import Keyword, Recipe 

20from recipes.settings import DEBUG, EXPORT_FILE_CACHE_DURATION 

21 

22 

23class Integration: 

24 request = None 

25 keyword = None 

26 files = None 

27 export_type = None 

28 ignored_recipes = [] 

29 

30 def __init__(self, request, export_type): 

31 """ 

32 Integration for importing and exporting recipes 

33 :param request: request context of import session (used to link user to created objects) 

34 """ 

35 self.request = request 

36 self.export_type = export_type 

37 self.ignored_recipes = [] 

38 

39 description = f'Imported by {request.user.get_user_display_name()} at {date_format(datetime.datetime.now(), "DATETIME_FORMAT")}. Type: {export_type}' 

40 

41 try: 

42 last_kw = Keyword.objects.filter(name__regex=r'^(Import [0-9]+)', space=request.space).latest('created_at') 

43 name = f'Import {int(last_kw.name.replace("Import ", "")) + 1}' 

44 except (ObjectDoesNotExist, ValueError): 

45 name = 'Import 1' 

46 

47 parent, created = Keyword.objects.get_or_create(name='Import', space=request.space) 

48 try: 

49 self.keyword = parent.add_child( 

50 name=name, 

51 description=description, 

52 space=request.space 

53 ) 

54 except (IntegrityError, ValueError): # in case, for whatever reason, the name does exist append UUID to it. Not nice but works for now. 

55 self.keyword = parent.add_child( 

56 name=f'{name} {str(uuid.uuid4())[0:8]}', 

57 description=description, 

58 space=request.space 

59 ) 

60 

61 def do_export(self, recipes, el): 

62 

63 with scope(space=self.request.space): 

64 el.total_recipes = len(recipes) 

65 el.cache_duration = EXPORT_FILE_CACHE_DURATION 

66 el.save() 

67 

68 files = self.get_files_from_recipes(recipes, el, self.request.COOKIES) 

69 

70 if len(files) == 1: 

71 filename, file = files[0] 

72 export_filename = filename 

73 export_file = file 

74 

75 else: 

76 # zip the files if there is more then one file 

77 export_filename = self.get_export_file_name() 

78 export_stream = BytesIO() 

79 export_obj = ZipFile(export_stream, 'w') 

80 

81 for filename, file in files: 

82 export_obj.writestr(filename, file) 

83 

84 export_obj.close() 

85 export_file = export_stream.getvalue() 

86 

87 cache.set('export_file_' + str(el.pk), {'filename': export_filename, 'file': export_file}, EXPORT_FILE_CACHE_DURATION) 

88 el.running = False 

89 el.save() 

90 

91 response = HttpResponse(export_file, content_type='application/force-download') 

92 response['Content-Disposition'] = 'attachment; filename="' + export_filename + '"' 

93 return response 

94 

95 def import_file_name_filter(self, zip_info_object): 

96 """ 

97 Since zipfile.namelist() returns all files in all subdirectories this function allows filtering of files 

98 If false is returned the file will be ignored 

99 By default all files are included 

100 :param zip_info_object: ZipInfo object 

101 :return: Boolean if object should be included 

102 """ 

103 return True 

104 

105 def do_import(self, files, il, import_duplicates): 

106 """ 

107 Imports given files 

108 :param import_duplicates: if true duplicates are imported as well 

109 :param files: List of in memory files 

110 :param il: Import Log object to refresh while running 

111 :return: HttpResponseRedirect to the recipe search showing all imported recipes 

112 """ 

113 with scope(space=self.request.space): 

114 

115 try: 

116 self.files = files 

117 for f in files: 

118 if 'RecipeKeeper' in f['name']: 

119 import_zip = ZipFile(f['file']) 

120 file_list = [] 

121 for z in import_zip.filelist: 

122 if self.import_file_name_filter(z): 

123 file_list.append(z) 

124 il.total_recipes += len(file_list) 

125 

126 for z in file_list: 

127 data_list = self.split_recipe_file(import_zip.read(z.filename).decode('utf-8')) 

128 for d in data_list: 

129 recipe = self.get_recipe_from_file(d) 

130 recipe.keywords.add(self.keyword) 

131 il.msg += self.get_recipe_processed_msg(recipe) 

132 self.handle_duplicates(recipe, import_duplicates) 

133 il.imported_recipes += 1 

134 il.save() 

135 import_zip.close() 

136 elif '.zip' in f['name'] or '.paprikarecipes' in f['name'] or '.mcb' in f['name']: 

137 import_zip = ZipFile(f['file']) 

138 file_list = [] 

139 for z in import_zip.filelist: 

140 if self.import_file_name_filter(z): 

141 file_list.append(z) 

142 il.total_recipes += len(file_list) 

143 

144 import cookbook 

145 if isinstance(self, cookbook.integration.copymethat.CopyMeThat): 

146 file_list = self.split_recipe_file(BytesIO(import_zip.read('recipes.html'))) 

147 il.total_recipes += len(file_list) 

148 

149 if isinstance(self, cookbook.integration.cookmate.Cookmate): 

150 new_file_list = [] 

151 for file in file_list: 

152 new_file_list += etree.parse(BytesIO(import_zip.read(file.filename))).getroot().getchildren() 

153 il.total_recipes = len(new_file_list) 

154 file_list = new_file_list 

155 

156 for z in file_list: 

157 try: 

158 if not hasattr(z, 'filename') or isinstance(z, Tag): 

159 recipe = self.get_recipe_from_file(z) 

160 else: 

161 recipe = self.get_recipe_from_file(BytesIO(import_zip.read(z.filename))) 

162 recipe.keywords.add(self.keyword) 

163 il.msg += self.get_recipe_processed_msg(recipe) 

164 self.handle_duplicates(recipe, import_duplicates) 

165 il.imported_recipes += 1 

166 il.save() 

167 except Exception as e: 

168 traceback.print_exc() 

169 self.handle_exception(e, log=il, message=f'-------------------- \nERROR \n{e}\n--------------------\n') 

170 import_zip.close() 

171 elif '.json' in f['name'] or '.xml' in f['name'] or '.txt' in f['name'] or '.mmf' in f['name'] or '.rk' in f['name'] or '.melarecipe' in f['name']: 

172 data_list = self.split_recipe_file(f['file']) 

173 il.total_recipes += len(data_list) 

174 for d in data_list: 

175 try: 

176 recipe = self.get_recipe_from_file(d) 

177 recipe.keywords.add(self.keyword) 

178 il.msg += self.get_recipe_processed_msg(recipe) 

179 self.handle_duplicates(recipe, import_duplicates) 

180 il.imported_recipes += 1 

181 il.save() 

182 except Exception as e: 

183 self.handle_exception(e, log=il, message=f'-------------------- \nERROR \n{e}\n--------------------\n') 

184 elif '.rtk' in f['name']: 

185 import_zip = ZipFile(f['file']) 

186 for z in import_zip.filelist: 

187 if self.import_file_name_filter(z): 

188 data_list = self.split_recipe_file(import_zip.read(z.filename).decode('utf-8')) 

189 il.total_recipes += len(data_list) 

190 

191 for d in data_list: 

192 try: 

193 recipe = self.get_recipe_from_file(d) 

194 recipe.keywords.add(self.keyword) 

195 il.msg += self.get_recipe_processed_msg(recipe) 

196 self.handle_duplicates(recipe, import_duplicates) 

197 il.imported_recipes += 1 

198 il.save() 

199 except Exception as e: 

200 self.handle_exception(e, log=il, message=f'-------------------- \nERROR \n{e}\n--------------------\n') 

201 import_zip.close() 

202 else: 

203 recipe = self.get_recipe_from_file(f['file']) 

204 recipe.keywords.add(self.keyword) 

205 il.msg += self.get_recipe_processed_msg(recipe) 

206 self.handle_duplicates(recipe, import_duplicates) 

207 except BadZipFile: 

208 il.msg += 'ERROR ' + _( 

209 'Importer expected a .zip file. Did you choose the correct importer type for your data ?') + '\n' 

210 except Exception as e: 

211 msg = 'ERROR ' + _( 

212 'An unexpected error occurred during the import. Please make sure you have uploaded a valid file.') + '\n' 

213 self.handle_exception(e, log=il, message=msg) 

214 

215 if len(self.ignored_recipes) > 0: 

216 il.msg += '\n' + _( 

217 'The following recipes were ignored because they already existed:') + ' ' + ', '.join( 

218 self.ignored_recipes) + '\n\n' 

219 

220 il.keyword = self.keyword 

221 il.msg += (_('Imported %s recipes.') % Recipe.objects.filter(keywords=self.keyword).count()) + '\n' 

222 il.running = False 

223 il.save() 

224 

225 def handle_duplicates(self, recipe, import_duplicates): 

226 """ 

227 Checks if a recipe is already present, if so deletes it 

228 :param recipe: Recipe object 

229 :param import_duplicates: if duplicates should be imported 

230 """ 

231 if Recipe.objects.filter(space=self.request.space, name=recipe.name).count() > 1 and not import_duplicates: 

232 self.ignored_recipes.append(recipe.name) 

233 recipe.delete() 

234 

235 def import_recipe_image(self, recipe, image_file, filetype='.jpeg'): 

236 """ 

237 Adds an image to a recipe naming it correctly 

238 :param recipe: Recipe object 

239 :param image_file: ByteIO stream containing the image 

240 :param filetype: type of file to write bytes to, default to .jpeg if unknown 

241 """ 

242 recipe.image = File(handle_image(self.request, File(image_file, name='image'), filetype=filetype), name=f'{uuid.uuid4()}_{recipe.pk}{filetype}') 

243 recipe.save() 

244 

245 def get_recipe_from_file(self, file): 

246 """ 

247 Takes any file like object and converts it into a recipe 

248 :param file: ByteIO or any file like object, depends on provider 

249 :return: Recipe object 

250 """ 

251 raise NotImplementedError('Method not implemented in integration') 

252 

253 def split_recipe_file(self, file): 

254 """ 

255 Takes a file that contains multiple recipes and splits it into a list of strings of various formats (e.g. json, text, ..) 

256 :param file: ByteIO or any file like object, depends on provider 

257 :return: list of strings 

258 """ 

259 raise NotImplementedError('Method not implemented in integration') 

260 

261 def get_file_from_recipe(self, recipe): 

262 """ 

263 Takes a recipe object and converts it to a string (depending on the format) 

264 returns both the filename of the exported file and the file contents 

265 :param recipe: Recipe object that should be converted 

266 :returns: 

267 - name - file name in export 

268 - data - string content for file to get created in export zip 

269 """ 

270 raise NotImplementedError('Method not implemented in integration') 

271 

272 def get_files_from_recipes(self, recipes, el, cookie): 

273 """ 

274 Takes a list of recipe object and converts it to a array containing each file. 

275 Each file is represented as an array [filename, data] where data is a string of the content of the file. 

276 :param recipe: Recipe object that should be converted 

277 :returns: 

278 [[filename, data], ...] 

279 """ 

280 raise NotImplementedError('Method not implemented in integration') 

281 

282 @staticmethod 

283 def handle_exception(exception, log=None, message=''): 

284 if log: 

285 if message: 

286 log.msg += message 

287 else: 

288 log.msg += exception.msg 

289 if DEBUG: 

290 traceback.print_exc() 

291 

292 def get_export_file_name(self, format='zip'): 

293 return "export_{}.{}".format(datetime.datetime.now().strftime("%Y-%m-%d"), format) 

294 

295 def get_recipe_processed_msg(self, recipe): 

296 return f'{recipe.pk} - {recipe.name} \n'