Coverage for cookbook/helper/permission_helper.py: 79%

216 statements  

« prev     ^ index     » next       coverage.py v7.4.0, created at 2023-12-29 01:02 +0100

1import inspect 

2 

3from django.conf import settings 

4from django.contrib import messages 

5from django.contrib.auth.decorators import user_passes_test 

6from django.core.cache import cache 

7from django.core.exceptions import ObjectDoesNotExist, ValidationError 

8from django.http import HttpResponseRedirect 

9from django.urls import reverse, reverse_lazy 

10from django.utils.translation import gettext as _ 

11from oauth2_provider.contrib.rest_framework import TokenHasReadWriteScope, TokenHasScope 

12from oauth2_provider.models import AccessToken 

13from rest_framework import permissions 

14from rest_framework.permissions import SAFE_METHODS 

15 

16from cookbook.models import Recipe, ShareLink, UserSpace 

17 

18 

19def get_allowed_groups(groups_required): 

20 """ 

21 Builds a list of all groups equal or higher to the provided groups 

22 This means checking for guest will also allow admins to access 

23 :param groups_required: list or tuple of groups 

24 :return: tuple of groups 

25 """ 

26 groups_allowed = tuple(groups_required) 

27 if 'guest' in groups_required: 

28 groups_allowed = groups_allowed + ('user', 'admin') 

29 if 'user' in groups_required: 

30 groups_allowed = groups_allowed + ('admin',) 

31 return groups_allowed 

32 

33 

34def has_group_permission(user, groups, no_cache=False): 

35 """ 

36 Tests if a given user is member of a certain group (or any higher group) 

37 Superusers always bypass permission checks. 

38 Unauthenticated users can't be member of any group thus always return false. 

39 :param no_cache: (optional) do not return cached results, always check agains DB 

40 :param user: django auth user object 

41 :param groups: list or tuple of groups the user should be checked for 

42 :return: True if user is in allowed groups, false otherwise 

43 """ 

44 if not user.is_authenticated: 

45 return False 

46 groups_allowed = get_allowed_groups(groups) 

47 

48 CACHE_KEY = hash((inspect.stack()[0][3], (user.pk, user.username, user.email), groups_allowed)) 

49 if not no_cache: 

50 cached_result = cache.get(CACHE_KEY, default=None) 

51 if cached_result is not None: 

52 return cached_result 

53 

54 result = False 

55 if user.is_authenticated: 

56 if user_space := user.userspace_set.filter(active=True): 

57 if len(user_space) != 1: 

58 result = False # do not allow any group permission if more than one space is active, needs to be changed when simultaneous multi-space-tenancy is added 

59 elif bool(user_space.first().groups.filter(name__in=groups_allowed)): 

60 result = True 

61 

62 cache.set(CACHE_KEY, result, timeout=10) 

63 return result 

64 

65 

66def is_object_owner(user, obj): 

67 """ 

68 Tests if a given user is the owner of a given object 

69 test performed by checking user against the objects user 

70 and create_by field (if exists) 

71 :param user django auth user object 

72 :param obj any object that should be tested 

73 :return: true if user is owner of object, false otherwise 

74 """ 

75 if not user.is_authenticated: 

76 return False 

77 try: 

78 return obj.get_owner() == user 

79 except Exception: 

80 return False 

81 

82 

83def is_space_owner(user, obj): 

84 """ 

85 Tests if a given user is the owner the space of a given object 

86 :param user django auth user object 

87 :param obj any object that should be tested 

88 :return: true if user is owner of the objects space, false otherwise 

89 """ 

90 if not user.is_authenticated: 

91 return False 

92 try: 

93 return obj.get_space().get_owner() == user 

94 except Exception: 

95 return False 

96 

97 

98def is_object_shared(user, obj): 

99 """ 

100 Tests if a given user is shared for a given object 

101 test performed by checking user against the objects shared table 

102 :param user django auth user object 

103 :param obj any object that should be tested 

104 :return: true if user is shared for object, false otherwise 

105 """ 

106 # TODO this could be improved/cleaned up by adding 

107 # share checks for relevant objects 

108 if not user.is_authenticated: 

109 return False 

110 return user in obj.get_shared() 

111 

112 

113def share_link_valid(recipe, share): 

114 """ 

115 Verifies the validity of a share uuid 

116 :param recipe: recipe object 

117 :param share: share uuid 

118 :return: true if a share link with the given recipe and uuid exists 

119 """ 

120 try: 

121 CACHE_KEY = f'recipe_share_{recipe.pk}_{share}' 

122 if c := cache.get(CACHE_KEY, False): 

123 return c 

124 

125 if link := ShareLink.objects.filter(recipe=recipe, uuid=share, abuse_blocked=False).first(): 

126 if 0 < settings.SHARING_LIMIT < link.request_count and not link.space.no_sharing_limit: 

127 return False 

128 link.request_count += 1 

129 link.save() 

130 cache.set(CACHE_KEY, True, timeout=3) 

131 return True 

132 return False 

133 except ValidationError: 

134 return False 

135 

136 

137# Django Views 

138 

139def group_required(*groups_required): 

140 """ 

141 Decorator that tests the requesting user to be member 

142 of at least one of the provided groups or higher level groups 

143 :param groups_required: list of required groups 

144 :return: true if member of group, false otherwise 

145 """ 

146 

147 def in_groups(u): 

148 return has_group_permission(u, groups_required) 

149 

150 return user_passes_test(in_groups, login_url='view_no_perm') 

151 

152 

153class GroupRequiredMixin(object): 

154 """ 

155 groups_required - list of strings, required param 

156 """ 

157 

158 groups_required = None 

159 

160 def dispatch(self, request, *args, **kwargs): 

161 if not has_group_permission(request.user, self.groups_required): 

162 if not request.user.is_authenticated: 

163 messages.add_message(request, messages.ERROR, 

164 _('You are not logged in and therefore cannot view this page!')) 

165 return HttpResponseRedirect(reverse_lazy('account_login') + '?next=' + request.path) 

166 else: 

167 messages.add_message(request, messages.ERROR, 

168 _('You do not have the required permissions to view this page!')) 

169 return HttpResponseRedirect(reverse_lazy('index')) 

170 try: 

171 obj = self.get_object() 

172 if obj.get_space() != request.space: 

173 messages.add_message(request, messages.ERROR, 

174 _('You do not have the required permissions to view this page!')) 

175 return HttpResponseRedirect(reverse_lazy('index')) 

176 except AttributeError: 

177 pass 

178 

179 return super(GroupRequiredMixin, self).dispatch(request, *args, **kwargs) 

180 

181 

182class OwnerRequiredMixin(object): 

183 

184 def dispatch(self, request, *args, **kwargs): 

185 if not request.user.is_authenticated: 

186 messages.add_message(request, messages.ERROR, 

187 _('You are not logged in and therefore cannot view this page!')) 

188 return HttpResponseRedirect(reverse_lazy('account_login') + '?next=' + request.path) 

189 else: 

190 if not is_object_owner(request.user, self.get_object()): 

191 messages.add_message(request, messages.ERROR, 

192 _('You cannot interact with this object as it is not owned by you!')) 

193 return HttpResponseRedirect(reverse('index')) 

194 

195 try: 

196 obj = self.get_object() 

197 if not request.user.userspace.filter(space=obj.get_space()).exists(): 

198 messages.add_message(request, messages.ERROR, 

199 _('You do not have the required permissions to view this page!')) 

200 return HttpResponseRedirect(reverse_lazy('index')) 

201 except AttributeError: 

202 pass 

203 

204 return super(OwnerRequiredMixin, self).dispatch(request, *args, **kwargs) 

205 

206 

207# Django Rest Framework Permission classes 

208 

209class CustomIsOwner(permissions.BasePermission): 

210 """ 

211 Custom permission class for django rest framework views 

212 verifies user has ownership over object 

213 (either user or created_by or user is request user) 

214 """ 

215 message = _('You cannot interact with this object as it is not owned by you!') 

216 

217 def has_permission(self, request, view): 

218 return request.user.is_authenticated 

219 

220 def has_object_permission(self, request, view, obj): 

221 return is_object_owner(request.user, obj) 

222 

223 

224class CustomIsOwnerReadOnly(CustomIsOwner): 

225 def has_permission(self, request, view): 

226 return super().has_permission(request, view) and request.method in SAFE_METHODS 

227 

228 def has_object_permission(self, request, view, obj): 

229 return super().has_object_permission(request, view) and request.method in SAFE_METHODS 

230 

231 

232class CustomIsSpaceOwner(permissions.BasePermission): 

233 """ 

234 Custom permission class for django rest framework views 

235 verifies if the user is the owner of the space the object belongs to 

236 """ 

237 message = _('You cannot interact with this object as it is not owned by you!') 

238 

239 def has_permission(self, request, view): 

240 return request.user.is_authenticated and request.space.created_by == request.user 

241 

242 def has_object_permission(self, request, view, obj): 

243 return is_space_owner(request.user, obj) 

244 

245 

246# TODO function duplicate/too similar name 

247class CustomIsShared(permissions.BasePermission): 

248 """ 

249 Custom permission class for django rest framework views 

250 verifies user is shared for the object he is trying to access 

251 """ 

252 message = _('You cannot interact with this object as it is not owned by you!') # noqa: E501 

253 

254 def has_permission(self, request, view): 

255 return request.user.is_authenticated 

256 

257 def has_object_permission(self, request, view, obj): 

258 return is_object_shared(request.user, obj) 

259 

260 

261class CustomIsGuest(permissions.BasePermission): 

262 """ 

263 Custom permission class for django rest framework views 

264 verifies the user is member of at least the group: guest 

265 """ 

266 message = _('You do not have the required permissions to view this page!') 

267 

268 def has_permission(self, request, view): 

269 return has_group_permission(request.user, ['guest']) 

270 

271 def has_object_permission(self, request, view, obj): 

272 return has_group_permission(request.user, ['guest']) 

273 

274 

275class CustomIsUser(permissions.BasePermission): 

276 """ 

277 Custom permission class for django rest framework views 

278 verifies the user is member of at least the group: user 

279 """ 

280 message = _('You do not have the required permissions to view this page!') 

281 

282 def has_permission(self, request, view): 

283 return has_group_permission(request.user, ['user']) 

284 

285 

286class CustomIsAdmin(permissions.BasePermission): 

287 """ 

288 Custom permission class for django rest framework views 

289 verifies the user is member of at least the group: admin 

290 """ 

291 message = _('You do not have the required permissions to view this page!') 

292 

293 def has_permission(self, request, view): 

294 return has_group_permission(request.user, ['admin']) 

295 

296 

297class CustomIsShare(permissions.BasePermission): 

298 """ 

299 Custom permission class for django rest framework views 

300 verifies the requesting user provided a valid share link 

301 """ 

302 message = _('You do not have the required permissions to view this page!') 

303 

304 def has_permission(self, request, view): 

305 return request.method in SAFE_METHODS and 'pk' in view.kwargs 

306 

307 def has_object_permission(self, request, view, obj): 

308 share = request.query_params.get('share', None) 

309 if share: 

310 return share_link_valid(obj, share) 

311 return False 

312 

313 

314class CustomRecipePermission(permissions.BasePermission): 

315 """ 

316 Custom permission class for recipe api endpoint 

317 """ 

318 message = _('You do not have the required permissions to view this page!') 

319 

320 def has_permission(self, request, view): # user is either at least a guest or a share link is given and the request is safe 

321 share = request.query_params.get('share', None) 

322 return ((has_group_permission(request.user, ['guest']) and request.method in SAFE_METHODS) or has_group_permission( 

323 request.user, ['user'])) or (share and request.method in SAFE_METHODS and 'pk' in view.kwargs) 

324 

325 def has_object_permission(self, request, view, obj): 

326 share = request.query_params.get('share', None) 

327 if share: 

328 return share_link_valid(obj, share) 

329 else: 

330 if obj.private: 

331 return ((obj.created_by == request.user) or (request.user in obj.shared.all())) and obj.space == request.space 

332 else: 

333 return ((has_group_permission(request.user, ['guest']) and request.method in SAFE_METHODS) 

334 or has_group_permission(request.user, ['user'])) and obj.space == request.space 

335 

336 

337class CustomUserPermission(permissions.BasePermission): 

338 """ 

339 Custom permission class for user api endpoint 

340 """ 

341 message = _('You do not have the required permissions to view this page!') 

342 

343 def has_permission(self, request, view): # a space filtered user list is visible for everyone 

344 return has_group_permission(request.user, ['guest']) 

345 

346 def has_object_permission(self, request, view, obj): # object write permissions are only available for user 

347 if request.method in SAFE_METHODS and 'pk' in view.kwargs and has_group_permission(request.user, ['guest']) and request.space in obj.userspace_set.all(): 

348 return True 

349 elif request.user == obj: 

350 return True 

351 else: 

352 return False 

353 

354 

355class CustomTokenHasScope(TokenHasScope): 

356 """ 

357 Custom implementation of Django OAuth Toolkit TokenHasScope class 

358 Only difference: if any other authentication method except OAuth2Authentication is used the scope check is ignored 

359 IMPORTANT: do not use this class without any other permission class as it will not check anything besides token scopes 

360 """ 

361 

362 def has_permission(self, request, view): 

363 if isinstance(request.auth, AccessToken): 

364 return super().has_permission(request, view) 

365 else: 

366 return request.user.is_authenticated 

367 

368 

369class CustomTokenHasReadWriteScope(TokenHasReadWriteScope): 

370 """ 

371 Custom implementation of Django OAuth Toolkit TokenHasReadWriteScope class 

372 Only difference: if any other authentication method except OAuth2Authentication is used the scope check is ignored 

373 IMPORTANT: do not use this class without any other permission class as it will not check anything besides token scopes 

374 """ 

375 

376 def has_permission(self, request, view): 

377 if isinstance(request.auth, AccessToken): 

378 return super().has_permission(request, view) 

379 else: 

380 return True 

381 

382 

383def above_space_limit(space): # TODO add file storage limit 

384 """ 

385 Test if the space has reached any limit (e.g. max recipes, users, ..) 

386 :param space: Space to test for limits 

387 :return: Tuple (True if above or equal any limit else false, message) 

388 """ 

389 r_limit, r_msg = above_space_recipe_limit(space) 

390 u_limit, u_msg = above_space_user_limit(space) 

391 return r_limit or u_limit, (r_msg + ' ' + u_msg).strip() 

392 

393 

394def above_space_recipe_limit(space): 

395 """ 

396 Test if a space has reached its recipe limit 

397 :param space: Space to test for limits 

398 :return: Tuple (True if above or equal limit else false, message) 

399 """ 

400 limit = space.max_recipes != 0 and Recipe.objects.filter(space=space).count() >= space.max_recipes 

401 if limit: 

402 return True, _('You have reached the maximum number of recipes for your space.') 

403 return False, '' 

404 

405 

406def above_space_user_limit(space): 

407 """ 

408 Test if a space has reached its user limit 

409 :param space: Space to test for limits 

410 :return: Tuple (True if above or equal limit else false, message) 

411 """ 

412 limit = space.max_users != 0 and UserSpace.objects.filter(space=space).count() > space.max_users 

413 if limit: 

414 return True, _('You have more users than allowed in your space.') 

415 return False, '' 

416 

417 

418def switch_user_active_space(user, space): 

419 """ 

420 Switch the currently active space of a user by setting all spaces to inactive and activating the one passed 

421 :param user: user to change active space for 

422 :param space: space to activate user for 

423 :return user space object or none if not found/no permission 

424 """ 

425 try: 

426 us = UserSpace.objects.get(space=space, user=user) 

427 if not us.active: 

428 UserSpace.objects.filter(user=user).update(active=False) 

429 us.active = True 

430 us.save() 

431 return us 

432 else: 

433 return us 

434 except ObjectDoesNotExist: 

435 return None 

436 

437 

438class IsReadOnlyDRF(permissions.BasePermission): 

439 message = 'You cannot interact with this object as it is not owned by you!' 

440 

441 def has_permission(self, request, view): 

442 return request.method in SAFE_METHODS