Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

#! /usr/bin/env python3 

# -*- coding: utf-8 -*- 

 

#       Copyright 2013-2014, 2017, Marten de Vries 

# 

#       This file is part of OpenTeacher. 

# 

#       OpenTeacher is free software: you can redistribute it and/or modify 

#       it under the terms of the GNU General Public License as published by 

#       the Free Software Foundation, either version 3 of the License, or 

#       (at your option) any later version. 

# 

#       OpenTeacher is distributed in the hope that it will be useful, 

#       but WITHOUT ANY WARRANTY; without even the implied warranty of 

#       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 

#       GNU General Public License for more details. 

# 

#       You should have received a copy of the GNU General Public License 

#       along with OpenTeacher.  If not, see <http://www.gnu.org/licenses/>. 

 

import requests 

import json 

import uuid 

import logging 

import os 

 

join = os.path.join 

 

logger = logging.getLogger(__name__) 

 

def _only_access_for(username): 

        return { 

                "admins": { 

                        "names": [], 

                        "roles": [], 

                }, 

                "members": { 

                        "names": [username], 

                        "roles": [], 

                } 

        } 

 

ADMIN_ONLY_VALIDATE_DOC_UPDATE = """ 

(function (newDoc, oldDoc, userCtx) { 

        if (userCtx.roles.indexOf("_admin") === -1) { 

                throw({"unauthorized": "need to be admin to make changes."}); 

        } 

}); 

""" 

 

class WebCouch: 

        def __init__(self, host, username, password, dbSkeletonDir, isSafeHtmlCode, generateWordsHtmlCode): 

                self._host = host 

                self._username = username 

                self._password = password 

 

                self._codeDir = dbSkeletonDir 

                self._validationLibs = { 

                        "is_safe_html": isSafeHtmlCode, 

                        "validation_lib": self._getJs("validationLib.js"), 

                } 

                self._presentationLibs = { 

                        "generate_words_html": generateWordsHtmlCode, 

                        "presentation_lib": self._getJs("presentationLib.js"), 

                } 

 

        def _getJs(self, endpoint): 

                with open(join(self._codeDir, endpoint), encoding='UTF-8') as f: 

                        return f.read() 

 

        def _design_from(self, endpoint, additionalData={}): 

                base = join(self._codeDir, endpoint) 

                parts = os.listdir(base) 

                design_doc = {} 

 

                if "validate_doc_update.js" in parts: 

                        design_doc["validate_doc_update"] = self._getJs(join(endpoint, "validate_doc_update.js")) 

 

                for type in ["updates", "lists", "shows"]: 

                        if type in parts: 

                                design_doc[type] = {} 

 

                                allOfType = os.listdir(join(base, type)) 

                                for oneOfTypeJs in allOfType: 

                                        oneOfType = oneOfTypeJs[:-len(".js")] 

                                        design_doc[type][oneOfType] = self._getJs(join(endpoint, type, oneOfTypeJs)) 

 

                if "views" in parts: 

                        design_doc["views"] = self._gatherViews(join(base, "views"), endpoint) 

 

                design_doc.update(additionalData) 

                return design_doc 

 

        def _gatherViews(self, path, endpoint): 

                viewsObj = {} 

 

                views = os.listdir(path) 

                for view in views: 

                        viewObj = { 

                                "map": self._getJs(join(endpoint, "views", view, "map.js")), 

                        } 

                        reducePath = join(endpoint, "views", view, "reduce.js") 

                        if os.path.exists(join(self._codeDir, reducePath)): 

                                viewObj["reduce"] = self._getJs(reducePath) 

                        viewsObj[view] = viewObj 

                return viewsObj 

 

        def req(self, method, endpoint, data=None, auth=None): 

                if data: 

                        data = json.dumps(data) 

                if not auth: 

                        auth = requests.auth.HTTPBasicAuth(self._username, self._password) 

                headers = {"Content-Type": "application/json"} 

                func = getattr(requests, method) 

                return func(self._host + endpoint, auth=auth, headers=headers, data=data) 

 

        def create_anonymous_user(self): 

                self._create_user("anonymous", "anonymous", anonymous=True) 

 

        def _create_user(self, username, password, anonymous): 

                try: 

                        #user 

                        assert self.req("post", "/_users", { 

                                "_id": "org.couchdb.user:" + username, 

                                "type": "user", 

                                "name": username, 

                                "password": password, 

                                "roles": [], 

                        }).status_code == 201 

 

                        #private 

                        assert self.req("put", "/private_" + username).status_code == 201 

                        assert self.req( 

                                "put", 

                                "/private_" + username + "/_security", 

                                _only_access_for(username) 

                        ).status_code == 200 

                        private_design_doc = self._design_from("private", self._validationLibs) 

                        if anonymous: 

                                private_design_doc["validate_doc_update"] = ADMIN_ONLY_VALIDATE_DOC_UPDATE 

                        assert self.req("put", "/private_" + username + "/_design/private", private_design_doc).status_code == 201 

 

                        lists_design_doc = self._design_from("lists", self._presentationLibs) 

                        assert self.req("put", "/private_" + username + "/_design/lists", lists_design_doc).status_code == 201 

 

                        tests_design_doc = self._design_from("tests") 

                        assert self.req("put", "/private_" + username + "/_design/tests", tests_design_doc).status_code == 201 

 

                        #shared_lists 

                        assert self.req("put", "/shared_lists_" + username).status_code == 201 

                        assert self.req("put", "/shared_lists_" + username + "/_design/shares", self._design_from("shared_lists", self._presentationLibs)).status_code == 201 

 

                        #replicator 

                        assert self.req("put", "/_replicator/private_to_shared_lists_" + username, { 

                                "source": "private_" + username, 

                                "target": "shared_lists_" + username, 

                                "continuous": True, 

                                "user_ctx": { 

                                        "roles": ["_admin"], 

                                }, 

                                #Workaround for https://issues.apache.org/jira/browse/COUCHDB-1415 

                                "random_value": str(uuid.uuid4()), 

                        }).status_code == 201 

 

                        #add one 'default' list 

                        assert self.req("post", "/private_" + username + "/_design/lists/_update/set_last_edited_to_now", { 

                                "type": "list", 

                                "shares": [], 

                                "items": [ 

                                        { 

                                                "id": 0, 

                                                "questions": [["een"]], 

                                                "answers": [["one"]], 

                                        }, 

                                        { 

                                                "id": 1, 

                                                "questions": [["twee"]], 

                                                "answers": [["two"]], 

                                        }, 

                                        { 

                                                "id": 1, 

                                                "questions": [["drie"]], 

                                                "answers": [["three"]], 

                                        } 

                                ], 

                                "title": "Example list", 

                        }).status_code == 201 

 

                except AssertionError as e: 

                        logger.debug(e, exc_info=True) 

                        try: 

                                self.delete_user(username) 

                        except ValueError: 

                                #delete as far as possible, but it'll probably crash since 

                                #create_user didn't succeed. 

                                pass 

                        raise ValueError("username_taken") 

 

        def new_user(self, username, password): 

                if len(password) < 8 or password.isalnum(): 

                        raise ValueError("unsafe_password") 

 

                self._create_user(username, password, anonymous=False) 

 

        def check_auth(self, username, password): 

                session = self.req("get", "/_session", auth=requests.auth.HTTPBasicAuth(username, password)).json() 

                try: 

                        return username == session["userCtx"]["name"] 

                except KeyError: 

                        return False 

 

        def delete_user(self, username): 

                try: 

                        #no assertion, because this could actually fail, because the 

                        #order is different than in new_user. And it has to be this way 

                        #because the DB's that are replicated shouldn't be deleted 

                        #already when the replication is done. 

                        rev = json.loads(self.req("head", "/_replicator/private_to_shared_lists_" + username).headers["Etag"]) 

                        self.req("delete", "/_replicator/private_to_shared_lists_" + username + "?rev=" + rev) 

 

                        rev = json.loads(self.req("head", "/_users/org.couchdb.user:" + username).headers["Etag"]) 

                        assert self.req("delete", "/_users/org.couchdb.user:" + username + "?rev=" + rev).status_code == 200 

 

                        assert self.req("delete", "/private_" + username).status_code == 200 

                        assert self.req("delete", "/shared_lists_" + username).status_code == 200 

                except (AssertionError, KeyError) as e: 

                        logger.debug(e, exc_info=True) 

                        raise ValueError("Error deleting user, aborted.")