Coverage for cookbook/integration/integration.py: 20%
189 statements
« prev ^ index » next coverage.py v7.4.0, created at 2023-12-29 00:47 +0100
« prev ^ index » next coverage.py v7.4.0, created at 2023-12-29 00:47 +0100
1import datetime
2import traceback
3import uuid
4from io import BytesIO
5from zipfile import BadZipFile, ZipFile
7from bs4 import Tag
8from django.core.cache import cache
9from django.core.exceptions import ObjectDoesNotExist
10from django.core.files import File
11from django.db import IntegrityError
12from django.http import HttpResponse
13from django.utils.formats import date_format
14from django.utils.translation import gettext as _
15from django_scopes import scope
16from lxml import etree
18from cookbook.helper.image_processing import handle_image
19from cookbook.models import Keyword, Recipe
20from recipes.settings import DEBUG, EXPORT_FILE_CACHE_DURATION
23class Integration:
24 request = None
25 keyword = None
26 files = None
27 export_type = None
28 ignored_recipes = []
30 def __init__(self, request, export_type):
31 """
32 Integration for importing and exporting recipes
33 :param request: request context of import session (used to link user to created objects)
34 """
35 self.request = request
36 self.export_type = export_type
37 self.ignored_recipes = []
39 description = f'Imported by {request.user.get_user_display_name()} at {date_format(datetime.datetime.now(), "DATETIME_FORMAT")}. Type: {export_type}'
41 try:
42 last_kw = Keyword.objects.filter(name__regex=r'^(Import [0-9]+)', space=request.space).latest('created_at')
43 name = f'Import {int(last_kw.name.replace("Import ", "")) + 1}'
44 except (ObjectDoesNotExist, ValueError):
45 name = 'Import 1'
47 parent, created = Keyword.objects.get_or_create(name='Import', space=request.space)
48 try:
49 self.keyword = parent.add_child(
50 name=name,
51 description=description,
52 space=request.space
53 )
54 except (IntegrityError, ValueError): # in case, for whatever reason, the name does exist append UUID to it. Not nice but works for now.
55 self.keyword = parent.add_child(
56 name=f'{name} {str(uuid.uuid4())[0:8]}',
57 description=description,
58 space=request.space
59 )
61 def do_export(self, recipes, el):
63 with scope(space=self.request.space):
64 el.total_recipes = len(recipes)
65 el.cache_duration = EXPORT_FILE_CACHE_DURATION
66 el.save()
68 files = self.get_files_from_recipes(recipes, el, self.request.COOKIES)
70 if len(files) == 1:
71 filename, file = files[0]
72 export_filename = filename
73 export_file = file
75 else:
76 # zip the files if there is more then one file
77 export_filename = self.get_export_file_name()
78 export_stream = BytesIO()
79 export_obj = ZipFile(export_stream, 'w')
81 for filename, file in files:
82 export_obj.writestr(filename, file)
84 export_obj.close()
85 export_file = export_stream.getvalue()
87 cache.set('export_file_' + str(el.pk), {'filename': export_filename, 'file': export_file}, EXPORT_FILE_CACHE_DURATION)
88 el.running = False
89 el.save()
91 response = HttpResponse(export_file, content_type='application/force-download')
92 response['Content-Disposition'] = 'attachment; filename="' + export_filename + '"'
93 return response
95 def import_file_name_filter(self, zip_info_object):
96 """
97 Since zipfile.namelist() returns all files in all subdirectories this function allows filtering of files
98 If false is returned the file will be ignored
99 By default all files are included
100 :param zip_info_object: ZipInfo object
101 :return: Boolean if object should be included
102 """
103 return True
105 def do_import(self, files, il, import_duplicates):
106 """
107 Imports given files
108 :param import_duplicates: if true duplicates are imported as well
109 :param files: List of in memory files
110 :param il: Import Log object to refresh while running
111 :return: HttpResponseRedirect to the recipe search showing all imported recipes
112 """
113 with scope(space=self.request.space):
115 try:
116 self.files = files
117 for f in files:
118 if 'RecipeKeeper' in f['name']:
119 import_zip = ZipFile(f['file'])
120 file_list = []
121 for z in import_zip.filelist:
122 if self.import_file_name_filter(z):
123 file_list.append(z)
124 il.total_recipes += len(file_list)
126 for z in file_list:
127 data_list = self.split_recipe_file(import_zip.read(z.filename).decode('utf-8'))
128 for d in data_list:
129 recipe = self.get_recipe_from_file(d)
130 recipe.keywords.add(self.keyword)
131 il.msg += self.get_recipe_processed_msg(recipe)
132 self.handle_duplicates(recipe, import_duplicates)
133 il.imported_recipes += 1
134 il.save()
135 import_zip.close()
136 elif '.zip' in f['name'] or '.paprikarecipes' in f['name'] or '.mcb' in f['name']:
137 import_zip = ZipFile(f['file'])
138 file_list = []
139 for z in import_zip.filelist:
140 if self.import_file_name_filter(z):
141 file_list.append(z)
142 il.total_recipes += len(file_list)
144 import cookbook
145 if isinstance(self, cookbook.integration.copymethat.CopyMeThat):
146 file_list = self.split_recipe_file(BytesIO(import_zip.read('recipes.html')))
147 il.total_recipes += len(file_list)
149 if isinstance(self, cookbook.integration.cookmate.Cookmate):
150 new_file_list = []
151 for file in file_list:
152 new_file_list += etree.parse(BytesIO(import_zip.read(file.filename))).getroot().getchildren()
153 il.total_recipes = len(new_file_list)
154 file_list = new_file_list
156 for z in file_list:
157 try:
158 if not hasattr(z, 'filename') or isinstance(z, Tag):
159 recipe = self.get_recipe_from_file(z)
160 else:
161 recipe = self.get_recipe_from_file(BytesIO(import_zip.read(z.filename)))
162 recipe.keywords.add(self.keyword)
163 il.msg += self.get_recipe_processed_msg(recipe)
164 self.handle_duplicates(recipe, import_duplicates)
165 il.imported_recipes += 1
166 il.save()
167 except Exception as e:
168 traceback.print_exc()
169 self.handle_exception(e, log=il, message=f'-------------------- \nERROR \n{e}\n--------------------\n')
170 import_zip.close()
171 elif '.json' in f['name'] or '.xml' in f['name'] or '.txt' in f['name'] or '.mmf' in f['name'] or '.rk' in f['name'] or '.melarecipe' in f['name']:
172 data_list = self.split_recipe_file(f['file'])
173 il.total_recipes += len(data_list)
174 for d in data_list:
175 try:
176 recipe = self.get_recipe_from_file(d)
177 recipe.keywords.add(self.keyword)
178 il.msg += self.get_recipe_processed_msg(recipe)
179 self.handle_duplicates(recipe, import_duplicates)
180 il.imported_recipes += 1
181 il.save()
182 except Exception as e:
183 self.handle_exception(e, log=il, message=f'-------------------- \nERROR \n{e}\n--------------------\n')
184 elif '.rtk' in f['name']:
185 import_zip = ZipFile(f['file'])
186 for z in import_zip.filelist:
187 if self.import_file_name_filter(z):
188 data_list = self.split_recipe_file(import_zip.read(z.filename).decode('utf-8'))
189 il.total_recipes += len(data_list)
191 for d in data_list:
192 try:
193 recipe = self.get_recipe_from_file(d)
194 recipe.keywords.add(self.keyword)
195 il.msg += self.get_recipe_processed_msg(recipe)
196 self.handle_duplicates(recipe, import_duplicates)
197 il.imported_recipes += 1
198 il.save()
199 except Exception as e:
200 self.handle_exception(e, log=il, message=f'-------------------- \nERROR \n{e}\n--------------------\n')
201 import_zip.close()
202 else:
203 recipe = self.get_recipe_from_file(f['file'])
204 recipe.keywords.add(self.keyword)
205 il.msg += self.get_recipe_processed_msg(recipe)
206 self.handle_duplicates(recipe, import_duplicates)
207 except BadZipFile:
208 il.msg += 'ERROR ' + _(
209 'Importer expected a .zip file. Did you choose the correct importer type for your data ?') + '\n'
210 except Exception as e:
211 msg = 'ERROR ' + _(
212 'An unexpected error occurred during the import. Please make sure you have uploaded a valid file.') + '\n'
213 self.handle_exception(e, log=il, message=msg)
215 if len(self.ignored_recipes) > 0:
216 il.msg += '\n' + _(
217 'The following recipes were ignored because they already existed:') + ' ' + ', '.join(
218 self.ignored_recipes) + '\n\n'
220 il.keyword = self.keyword
221 il.msg += (_('Imported %s recipes.') % Recipe.objects.filter(keywords=self.keyword).count()) + '\n'
222 il.running = False
223 il.save()
225 def handle_duplicates(self, recipe, import_duplicates):
226 """
227 Checks if a recipe is already present, if so deletes it
228 :param recipe: Recipe object
229 :param import_duplicates: if duplicates should be imported
230 """
231 if Recipe.objects.filter(space=self.request.space, name=recipe.name).count() > 1 and not import_duplicates:
232 self.ignored_recipes.append(recipe.name)
233 recipe.delete()
235 def import_recipe_image(self, recipe, image_file, filetype='.jpeg'):
236 """
237 Adds an image to a recipe naming it correctly
238 :param recipe: Recipe object
239 :param image_file: ByteIO stream containing the image
240 :param filetype: type of file to write bytes to, default to .jpeg if unknown
241 """
242 recipe.image = File(handle_image(self.request, File(image_file, name='image'), filetype=filetype), name=f'{uuid.uuid4()}_{recipe.pk}{filetype}')
243 recipe.save()
245 def get_recipe_from_file(self, file):
246 """
247 Takes any file like object and converts it into a recipe
248 :param file: ByteIO or any file like object, depends on provider
249 :return: Recipe object
250 """
251 raise NotImplementedError('Method not implemented in integration')
253 def split_recipe_file(self, file):
254 """
255 Takes a file that contains multiple recipes and splits it into a list of strings of various formats (e.g. json, text, ..)
256 :param file: ByteIO or any file like object, depends on provider
257 :return: list of strings
258 """
259 raise NotImplementedError('Method not implemented in integration')
261 def get_file_from_recipe(self, recipe):
262 """
263 Takes a recipe object and converts it to a string (depending on the format)
264 returns both the filename of the exported file and the file contents
265 :param recipe: Recipe object that should be converted
266 :returns:
267 - name - file name in export
268 - data - string content for file to get created in export zip
269 """
270 raise NotImplementedError('Method not implemented in integration')
272 def get_files_from_recipes(self, recipes, el, cookie):
273 """
274 Takes a list of recipe object and converts it to a array containing each file.
275 Each file is represented as an array [filename, data] where data is a string of the content of the file.
276 :param recipe: Recipe object that should be converted
277 :returns:
278 [[filename, data], ...]
279 """
280 raise NotImplementedError('Method not implemented in integration')
282 @staticmethod
283 def handle_exception(exception, log=None, message=''):
284 if log:
285 if message:
286 log.msg += message
287 else:
288 log.msg += exception.msg
289 if DEBUG:
290 traceback.print_exc()
292 def get_export_file_name(self, format='zip'):
293 return "export_{}.{}".format(datetime.datetime.now().strftime("%Y-%m-%d"), format)
295 def get_recipe_processed_msg(self, recipe):
296 return f'{recipe.pk} - {recipe.name} \n'