-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwrite_encrypted.py
More file actions
156 lines (124 loc) · 4.82 KB
/
write_encrypted.py
File metadata and controls
156 lines (124 loc) · 4.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import os
import argparse
from getpass import getpass
from urllib.parse import urlparse
from Cryptodome.Random import get_random_bytes
from base64 import (
b64decode,
b64encode,
)
from rdflib import (
Graph,
URIRef,
)
from solidpod_helper import (
gen_master_key,
gen_verify_key,
encrypt,
parse_ttl,
apps_terms,
path_pred,
iv_pred,
verify_key_pred,
indi_key_pred,
enc_data_pred,
server_path,
)
def upload_file(file_url, file_content, master_key):
# Encrypt file content
print('Encrypt content ...')
indi_key = get_random_bytes(32)
data_iv = get_random_bytes(16)
enc_data_b64 = encrypt(file_content, indi_key, data_iv)
# Write encrypted content to file in POD
print('Write encrypted content ...')
r = urlparse(file_url)
items = r.path.split('/')[1:] # r.path: '/pod_name/app_name/data/data_file_path'
assert len(items) >= 4
assert items[2] == 'data'
relative_file_path = '/'.join(items[1:])
g = Graph()
data_iv_b64 = b64encode(data_iv).decode('ascii')
query = f'INSERT DATA {{<{file_url}> <{apps_terms}{path_pred}> "{relative_file_path}"; ' + \
f'<{apps_terms}{iv_pred}> "{data_iv_b64}"; ' + \
f'<{apps_terms}{enc_data_pred}> "{enc_data_b64}".}};'
g.update(query)
ttl_str = g.serialize(format='turtle')
abs_path = f'{server_path}{"/".join(items)}'
with open(abs_path, 'w') as f:
f.write(ttl_str)
# Add session key to ind-keys.ttl
print('Add encrypted session key to ind-keys.ttl ...')
add_indi_key(file_url, indi_key, master_key)
def add_indi_key(file_url, indi_key, master_key):
# Add encrypted individual key to ind-keys.ttl
# Encrypt the individual key
indi_key_iv = get_random_bytes(16)
indi_key_b64 = b64encode(indi_key).decode('ascii')
enc_indi_key_b64 = encrypt(indi_key_b64, master_key, indi_key_iv)
# Parse the ind-keys.ttl file
r = urlparse(file_url)
items = r.path.split('/')[1:] # r.path: '/pod_name/app_name/data/data_file_path'
assert len(items) >= 4
pod_name = items[0]
app_name = items[1]
assert items[2] == 'data'
server_url = f'{r.scheme}://{r.netloc}'
relative_file_path = '/'.join(items[1:])
ind_key_path = f'{server_path}{pod_name}/{app_name}/encryption/ind-keys.ttl'
g = Graph()
g.parse(ind_key_path)
# Replace file:///POD_DIR prefix with server URL
for s, p, o in g:
prefix = f'file://{server_path}'
if str(s).startswith(prefix):
new_s = str(s).replace(prefix, server_url)
g.add((URIRef(new_s), p, o))
g.remove((s, p, o))
# Remove triple if the subject already exists
if str(s) == file_url:
g.remove((s, p, o))
# Add encrypted individual key to ind-keys.ttl
indi_key_iv_b64 = b64encode(indi_key_iv).decode('ascii')
query = f'INSERT DATA {{<{file_url}> <{apps_terms}{path_pred}> "{relative_file_path}"; ' + \
f'<{apps_terms}{iv_pred}> "{indi_key_iv_b64}"; ' + \
f'<{apps_terms}{indi_key_pred}> "{enc_indi_key_b64}".}};'
g.update(query)
# Write back ind-keys.ttl
ttl_str = g.serialize(format='turtle', base=server_url)
with open(ind_key_path, 'w') as f:
f.write(ttl_str)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Process secret and path arguments')
parser.add_argument('srcpath', help='Path of the file to be uploaded')
parser.add_argument('destpath', help='Path of the destination file within data folder')
args = parser.parse_args()
# Destination file path format: server_path/pod_name/app_name/data/data_file
dest_path = os.path.abspath(args.destpath)
assert dest_path.startswith(server_path)
items = dest_path.replace(server_path, '').split('/')
assert len(items) >= 4
pod_name = items[0]
app_name = items[1]
assert items[2] == 'data'
app_path = f'{server_path}{pod_name}/{app_name}'
# Generate master encryption key from the user-provided security key
security_key_str = getpass(prompt='Security Key: ')
master_key = gen_master_key(security_key_str)
verify_key = gen_verify_key(security_key_str)
# Verify security key
_map = parse_ttl(f'{app_path}/encryption/enc-keys.ttl')
enc_key_url, enc_key_map = list(_map.items())[0]
verify_key_stored = enc_key_map[verify_key_pred]
if verify_key.decode('utf-8') != verify_key_stored:
print('ERROR: Incorrect security key (verification failed).')
sys.exit(0)
# Read content of source file
src_path = args.srcpath
assert os.path.exists(src_path)
with open(src_path, 'r') as f:
file_content = f.read()
# Write encrypted content to POD
r = urlparse(enc_key_url)
file_url = '/'.join([f'{r.scheme}://{r.netloc}'] + items)
upload_file(file_url, file_content, master_key)