Coverage for cookbook/helper/open_data_importer.py: 14%

110 statements  

« prev     ^ index     » next       coverage.py v7.4.0, created at 2023-12-29 00:47 +0100

1from cookbook.models import (Food, FoodProperty, Property, PropertyType, Supermarket, 

2 SupermarketCategory, SupermarketCategoryRelation, Unit, UnitConversion) 

3 

4 

5class OpenDataImporter: 

6 request = None 

7 data = {} 

8 slug_id_cache = {} 

9 update_existing = False 

10 use_metric = True 

11 

12 def __init__(self, request, data, update_existing=False, use_metric=True): 

13 self.request = request 

14 self.data = data 

15 self.update_existing = update_existing 

16 self.use_metric = use_metric 

17 

18 def _update_slug_cache(self, object_class, datatype): 

19 self.slug_id_cache[datatype] = dict(object_class.objects.filter(space=self.request.space, open_data_slug__isnull=False).values_list('open_data_slug', 'id', )) 

20 

21 def import_units(self): 

22 datatype = 'unit' 

23 

24 insert_list = [] 

25 for u in list(self.data[datatype].keys()): 

26 insert_list.append(Unit( 

27 name=self.data[datatype][u]['name'], 

28 plural_name=self.data[datatype][u]['plural_name'], 

29 base_unit=self.data[datatype][u]['base_unit'] if self.data[datatype][u]['base_unit'] != '' else None, 

30 open_data_slug=u, 

31 space=self.request.space 

32 )) 

33 

34 if self.update_existing: 

35 return Unit.objects.bulk_create(insert_list, update_conflicts=True, update_fields=( 

36 'name', 'plural_name', 'base_unit', 'open_data_slug'), unique_fields=('space', 'name',)) 

37 else: 

38 return Unit.objects.bulk_create(insert_list, update_conflicts=True, update_fields=('open_data_slug',), unique_fields=('space', 'name',)) 

39 

40 def import_category(self): 

41 datatype = 'category' 

42 

43 insert_list = [] 

44 for k in list(self.data[datatype].keys()): 

45 insert_list.append(SupermarketCategory( 

46 name=self.data[datatype][k]['name'], 

47 open_data_slug=k, 

48 space=self.request.space 

49 )) 

50 

51 return SupermarketCategory.objects.bulk_create(insert_list, update_conflicts=True, update_fields=('open_data_slug',), unique_fields=('space', 'name',)) 

52 

53 def import_property(self): 

54 datatype = 'property' 

55 

56 insert_list = [] 

57 for k in list(self.data[datatype].keys()): 

58 insert_list.append(PropertyType( 

59 name=self.data[datatype][k]['name'], 

60 unit=self.data[datatype][k]['unit'], 

61 open_data_slug=k, 

62 space=self.request.space 

63 )) 

64 

65 return PropertyType.objects.bulk_create(insert_list, update_conflicts=True, update_fields=('open_data_slug',), unique_fields=('space', 'name',)) 

66 

67 def import_supermarket(self): 

68 datatype = 'store' 

69 

70 self._update_slug_cache(SupermarketCategory, 'category') 

71 insert_list = [] 

72 for k in list(self.data[datatype].keys()): 

73 insert_list.append(Supermarket( 

74 name=self.data[datatype][k]['name'], 

75 open_data_slug=k, 

76 space=self.request.space 

77 )) 

78 

79 # always add open data slug if matching supermarket is found, otherwise relation might fail 

80 supermarkets = Supermarket.objects.bulk_create(insert_list, unique_fields=('space', 'name',), update_conflicts=True, update_fields=('open_data_slug',)) 

81 self._update_slug_cache(Supermarket, 'store') 

82 

83 insert_list = [] 

84 for k in list(self.data[datatype].keys()): 

85 relations = [] 

86 order = 0 

87 for c in self.data[datatype][k]['categories']: 

88 relations.append( 

89 SupermarketCategoryRelation( 

90 supermarket_id=self.slug_id_cache[datatype][k], 

91 category_id=self.slug_id_cache['category'][c], 

92 order=order, 

93 ) 

94 ) 

95 order += 1 

96 

97 SupermarketCategoryRelation.objects.bulk_create(relations, ignore_conflicts=True, unique_fields=('supermarket', 'category',)) 

98 

99 return supermarkets 

100 

101 def import_food(self): 

102 identifier_list = [] 

103 datatype = 'food' 

104 for k in list(self.data[datatype].keys()): 

105 identifier_list.append(self.data[datatype][k]['name']) 

106 identifier_list.append(self.data[datatype][k]['plural_name']) 

107 

108 existing_objects_flat = [] 

109 existing_objects = {} 

110 for f in Food.objects.filter(space=self.request.space).filter(name__in=identifier_list).values_list('id', 'name', 'plural_name'): 

111 existing_objects_flat.append(f[1]) 

112 existing_objects_flat.append(f[2]) 

113 existing_objects[f[1]] = f 

114 existing_objects[f[2]] = f 

115 

116 self._update_slug_cache(Unit, 'unit') 

117 self._update_slug_cache(PropertyType, 'property') 

118 

119 insert_list = [] 

120 insert_list_flat = [] 

121 update_list = [] 

122 update_field_list = [] 

123 for k in list(self.data[datatype].keys()): 

124 if not (self.data[datatype][k]['name'] in existing_objects_flat or self.data[datatype][k]['plural_name'] in existing_objects_flat): 

125 if not (self.data[datatype][k]['name'] in insert_list_flat or self.data[datatype][k]['plural_name'] in insert_list_flat): 

126 insert_list.append({'data': { 

127 'name': self.data[datatype][k]['name'], 

128 'plural_name': self.data[datatype][k]['plural_name'] if self.data[datatype][k]['plural_name'] != '' else None, 

129 'supermarket_category_id': self.slug_id_cache['category'][self.data[datatype][k]['store_category']], 

130 'fdc_id': self.data[datatype][k]['fdc_id'] if self.data[datatype][k]['fdc_id'] != '' else None, 

131 'open_data_slug': k, 

132 'space': self.request.space.id, 

133 }}) 

134 # build a fake second flat array to prevent duplicate foods from being inserted. 

135 # trying to insert a duplicate would throw a db error :( 

136 insert_list_flat.append(self.data[datatype][k]['name']) 

137 insert_list_flat.append(self.data[datatype][k]['plural_name']) 

138 else: 

139 if self.data[datatype][k]['name'] in existing_objects: 

140 existing_food_id = existing_objects[self.data[datatype][k]['name']][0] 

141 else: 

142 existing_food_id = existing_objects[self.data[datatype][k]['plural_name']][0] 

143 

144 if self.update_existing: 

145 update_field_list = ['name', 'plural_name', 'preferred_unit_id', 'preferred_shopping_unit_id', 'supermarket_category_id', 'fdc_id', 'open_data_slug', ] 

146 update_list.append(Food( 

147 id=existing_food_id, 

148 name=self.data[datatype][k]['name'], 

149 plural_name=self.data[datatype][k]['plural_name'] if self.data[datatype][k]['plural_name'] != '' else None, 

150 supermarket_category_id=self.slug_id_cache['category'][self.data[datatype][k]['store_category']], 

151 fdc_id=self.data[datatype][k]['fdc_id'] if self.data[datatype][k]['fdc_id'] != '' else None, 

152 open_data_slug=k, 

153 )) 

154 else: 

155 update_field_list = ['open_data_slug', ] 

156 update_list.append(Food(id=existing_food_id, open_data_slug=k, )) 

157 

158 Food.load_bulk(insert_list, None) 

159 if len(update_list) > 0: 

160 Food.objects.bulk_update(update_list, update_field_list) 

161 

162 self._update_slug_cache(Food, 'food') 

163 

164 food_property_list = [] 

165 # alias_list = [] 

166 

167 for k in list(self.data[datatype].keys()): 

168 for fp in self.data[datatype][k]['properties']['type_values']: 

169 # try catch here because somettimes key "k" is not set for he food cache 

170 try: 

171 food_property_list.append(Property( 

172 property_type_id=self.slug_id_cache['property'][fp['property_type']], 

173 property_amount=fp['property_value'], 

174 import_food_id=self.slug_id_cache['food'][k], 

175 space=self.request.space, 

176 )) 

177 except KeyError: 

178 print(str(k) + ' is not in self.slug_id_cache["food"]') 

179 

180 Property.objects.bulk_create(food_property_list, ignore_conflicts=True, unique_fields=('space', 'import_food_id', 'property_type',)) 

181 

182 property_food_relation_list = [] 

183 for p in Property.objects.filter(space=self.request.space, import_food_id__isnull=False).values_list('import_food_id', 'id', ): 

184 property_food_relation_list.append(Food.properties.through(food_id=p[0], property_id=p[1])) 

185 

186 FoodProperty.objects.bulk_create(property_food_relation_list, ignore_conflicts=True, unique_fields=('food_id', 'property_id',)) 

187 

188 return insert_list + update_list 

189 

190 def import_conversion(self): 

191 datatype = 'conversion' 

192 

193 insert_list = [] 

194 for k in list(self.data[datatype].keys()): 

195 # try catch here because sometimes key "k" is not set for he food cache 

196 try: 

197 insert_list.append(UnitConversion( 

198 base_amount=self.data[datatype][k]['base_amount'], 

199 base_unit_id=self.slug_id_cache['unit'][self.data[datatype][k]['base_unit']], 

200 converted_amount=self.data[datatype][k]['converted_amount'], 

201 converted_unit_id=self.slug_id_cache['unit'][self.data[datatype][k]['converted_unit']], 

202 food_id=self.slug_id_cache['food'][self.data[datatype][k]['food']], 

203 open_data_slug=k, 

204 space=self.request.space, 

205 created_by=self.request.user, 

206 )) 

207 except KeyError: 

208 print(str(k) + ' is not in self.slug_id_cache["food"]') 

209 

210 return UnitConversion.objects.bulk_create(insert_list, ignore_conflicts=True, unique_fields=('space', 'base_unit', 'converted_unit', 'food', 'open_data_slug'))