Coverage for cookbook/helper/open_data_importer.py: 14%
110 statements
« prev ^ index » next coverage.py v7.4.0, created at 2023-12-29 00:47 +0100
« prev ^ index » next coverage.py v7.4.0, created at 2023-12-29 00:47 +0100
1from cookbook.models import (Food, FoodProperty, Property, PropertyType, Supermarket,
2 SupermarketCategory, SupermarketCategoryRelation, Unit, UnitConversion)
5class OpenDataImporter:
6 request = None
7 data = {}
8 slug_id_cache = {}
9 update_existing = False
10 use_metric = True
12 def __init__(self, request, data, update_existing=False, use_metric=True):
13 self.request = request
14 self.data = data
15 self.update_existing = update_existing
16 self.use_metric = use_metric
18 def _update_slug_cache(self, object_class, datatype):
19 self.slug_id_cache[datatype] = dict(object_class.objects.filter(space=self.request.space, open_data_slug__isnull=False).values_list('open_data_slug', 'id', ))
21 def import_units(self):
22 datatype = 'unit'
24 insert_list = []
25 for u in list(self.data[datatype].keys()):
26 insert_list.append(Unit(
27 name=self.data[datatype][u]['name'],
28 plural_name=self.data[datatype][u]['plural_name'],
29 base_unit=self.data[datatype][u]['base_unit'] if self.data[datatype][u]['base_unit'] != '' else None,
30 open_data_slug=u,
31 space=self.request.space
32 ))
34 if self.update_existing:
35 return Unit.objects.bulk_create(insert_list, update_conflicts=True, update_fields=(
36 'name', 'plural_name', 'base_unit', 'open_data_slug'), unique_fields=('space', 'name',))
37 else:
38 return Unit.objects.bulk_create(insert_list, update_conflicts=True, update_fields=('open_data_slug',), unique_fields=('space', 'name',))
40 def import_category(self):
41 datatype = 'category'
43 insert_list = []
44 for k in list(self.data[datatype].keys()):
45 insert_list.append(SupermarketCategory(
46 name=self.data[datatype][k]['name'],
47 open_data_slug=k,
48 space=self.request.space
49 ))
51 return SupermarketCategory.objects.bulk_create(insert_list, update_conflicts=True, update_fields=('open_data_slug',), unique_fields=('space', 'name',))
53 def import_property(self):
54 datatype = 'property'
56 insert_list = []
57 for k in list(self.data[datatype].keys()):
58 insert_list.append(PropertyType(
59 name=self.data[datatype][k]['name'],
60 unit=self.data[datatype][k]['unit'],
61 open_data_slug=k,
62 space=self.request.space
63 ))
65 return PropertyType.objects.bulk_create(insert_list, update_conflicts=True, update_fields=('open_data_slug',), unique_fields=('space', 'name',))
67 def import_supermarket(self):
68 datatype = 'store'
70 self._update_slug_cache(SupermarketCategory, 'category')
71 insert_list = []
72 for k in list(self.data[datatype].keys()):
73 insert_list.append(Supermarket(
74 name=self.data[datatype][k]['name'],
75 open_data_slug=k,
76 space=self.request.space
77 ))
79 # always add open data slug if matching supermarket is found, otherwise relation might fail
80 supermarkets = Supermarket.objects.bulk_create(insert_list, unique_fields=('space', 'name',), update_conflicts=True, update_fields=('open_data_slug',))
81 self._update_slug_cache(Supermarket, 'store')
83 insert_list = []
84 for k in list(self.data[datatype].keys()):
85 relations = []
86 order = 0
87 for c in self.data[datatype][k]['categories']:
88 relations.append(
89 SupermarketCategoryRelation(
90 supermarket_id=self.slug_id_cache[datatype][k],
91 category_id=self.slug_id_cache['category'][c],
92 order=order,
93 )
94 )
95 order += 1
97 SupermarketCategoryRelation.objects.bulk_create(relations, ignore_conflicts=True, unique_fields=('supermarket', 'category',))
99 return supermarkets
101 def import_food(self):
102 identifier_list = []
103 datatype = 'food'
104 for k in list(self.data[datatype].keys()):
105 identifier_list.append(self.data[datatype][k]['name'])
106 identifier_list.append(self.data[datatype][k]['plural_name'])
108 existing_objects_flat = []
109 existing_objects = {}
110 for f in Food.objects.filter(space=self.request.space).filter(name__in=identifier_list).values_list('id', 'name', 'plural_name'):
111 existing_objects_flat.append(f[1])
112 existing_objects_flat.append(f[2])
113 existing_objects[f[1]] = f
114 existing_objects[f[2]] = f
116 self._update_slug_cache(Unit, 'unit')
117 self._update_slug_cache(PropertyType, 'property')
119 insert_list = []
120 insert_list_flat = []
121 update_list = []
122 update_field_list = []
123 for k in list(self.data[datatype].keys()):
124 if not (self.data[datatype][k]['name'] in existing_objects_flat or self.data[datatype][k]['plural_name'] in existing_objects_flat):
125 if not (self.data[datatype][k]['name'] in insert_list_flat or self.data[datatype][k]['plural_name'] in insert_list_flat):
126 insert_list.append({'data': {
127 'name': self.data[datatype][k]['name'],
128 'plural_name': self.data[datatype][k]['plural_name'] if self.data[datatype][k]['plural_name'] != '' else None,
129 'supermarket_category_id': self.slug_id_cache['category'][self.data[datatype][k]['store_category']],
130 'fdc_id': self.data[datatype][k]['fdc_id'] if self.data[datatype][k]['fdc_id'] != '' else None,
131 'open_data_slug': k,
132 'space': self.request.space.id,
133 }})
134 # build a fake second flat array to prevent duplicate foods from being inserted.
135 # trying to insert a duplicate would throw a db error :(
136 insert_list_flat.append(self.data[datatype][k]['name'])
137 insert_list_flat.append(self.data[datatype][k]['plural_name'])
138 else:
139 if self.data[datatype][k]['name'] in existing_objects:
140 existing_food_id = existing_objects[self.data[datatype][k]['name']][0]
141 else:
142 existing_food_id = existing_objects[self.data[datatype][k]['plural_name']][0]
144 if self.update_existing:
145 update_field_list = ['name', 'plural_name', 'preferred_unit_id', 'preferred_shopping_unit_id', 'supermarket_category_id', 'fdc_id', 'open_data_slug', ]
146 update_list.append(Food(
147 id=existing_food_id,
148 name=self.data[datatype][k]['name'],
149 plural_name=self.data[datatype][k]['plural_name'] if self.data[datatype][k]['plural_name'] != '' else None,
150 supermarket_category_id=self.slug_id_cache['category'][self.data[datatype][k]['store_category']],
151 fdc_id=self.data[datatype][k]['fdc_id'] if self.data[datatype][k]['fdc_id'] != '' else None,
152 open_data_slug=k,
153 ))
154 else:
155 update_field_list = ['open_data_slug', ]
156 update_list.append(Food(id=existing_food_id, open_data_slug=k, ))
158 Food.load_bulk(insert_list, None)
159 if len(update_list) > 0:
160 Food.objects.bulk_update(update_list, update_field_list)
162 self._update_slug_cache(Food, 'food')
164 food_property_list = []
165 # alias_list = []
167 for k in list(self.data[datatype].keys()):
168 for fp in self.data[datatype][k]['properties']['type_values']:
169 # try catch here because somettimes key "k" is not set for he food cache
170 try:
171 food_property_list.append(Property(
172 property_type_id=self.slug_id_cache['property'][fp['property_type']],
173 property_amount=fp['property_value'],
174 import_food_id=self.slug_id_cache['food'][k],
175 space=self.request.space,
176 ))
177 except KeyError:
178 print(str(k) + ' is not in self.slug_id_cache["food"]')
180 Property.objects.bulk_create(food_property_list, ignore_conflicts=True, unique_fields=('space', 'import_food_id', 'property_type',))
182 property_food_relation_list = []
183 for p in Property.objects.filter(space=self.request.space, import_food_id__isnull=False).values_list('import_food_id', 'id', ):
184 property_food_relation_list.append(Food.properties.through(food_id=p[0], property_id=p[1]))
186 FoodProperty.objects.bulk_create(property_food_relation_list, ignore_conflicts=True, unique_fields=('food_id', 'property_id',))
188 return insert_list + update_list
190 def import_conversion(self):
191 datatype = 'conversion'
193 insert_list = []
194 for k in list(self.data[datatype].keys()):
195 # try catch here because sometimes key "k" is not set for he food cache
196 try:
197 insert_list.append(UnitConversion(
198 base_amount=self.data[datatype][k]['base_amount'],
199 base_unit_id=self.slug_id_cache['unit'][self.data[datatype][k]['base_unit']],
200 converted_amount=self.data[datatype][k]['converted_amount'],
201 converted_unit_id=self.slug_id_cache['unit'][self.data[datatype][k]['converted_unit']],
202 food_id=self.slug_id_cache['food'][self.data[datatype][k]['food']],
203 open_data_slug=k,
204 space=self.request.space,
205 created_by=self.request.user,
206 ))
207 except KeyError:
208 print(str(k) + ' is not in self.slug_id_cache["food"]')
210 return UnitConversion.objects.bulk_create(insert_list, ignore_conflicts=True, unique_fields=('space', 'base_unit', 'converted_unit', 'food', 'open_data_slug'))