-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgithub_mirror.py
More file actions
executable file
·342 lines (276 loc) · 12.3 KB
/
github_mirror.py
File metadata and controls
executable file
·342 lines (276 loc) · 12.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
#!/usr/bin/env python
import argparse
import json
import logging
import subprocess
import sys
import urllib.error
import urllib.parse
import urllib.request
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Set, Tuple
try:
import tomllib
except ModuleNotFoundError:
print('Python 3.11+ is required (tomllib is missing).', file=sys.stderr)
sys.exit(2)
@dataclass(frozen=True)
class Target:
raw: str
kind: str
owner: str
repo: Optional[str]
token: Optional[str]
def configure_logging(level: str) -> None:
normalized = level.strip().upper()
allowed = {'INFO': logging.INFO, 'WARN': logging.WARNING, 'ERROR': logging.ERROR}
if normalized not in allowed:
raise ValueError(f'Unsupported log level: {level}. Allowed: INFO, WARN, ERROR')
logging.basicConfig(
level=allowed[normalized],
stream=sys.stderr,
format='%(asctime)s %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
)
def load_config(path: Path) -> dict:
with path.open('rb') as f:
return tomllib.load(f)
def parse_target(raw: str, token: Optional[str]) -> Target:
value = raw.strip()
if not value:
raise ValueError('Empty target value is not allowed.')
parts = value.split('/')
if len(parts) == 1:
owner = parts[0]
return Target(raw=value, kind='owner', owner=owner, repo=None, token=token)
if len(parts) == 2 and parts[0] and parts[1]:
owner, repo = parts
return Target(raw=value, kind='repo', owner=owner, repo=repo, token=token)
raise ValueError(
f'Invalid target format: {value}. Expected OWNER, or OWNER/REPO'
)
def parse_targets(config: dict) -> List[Target]:
items = config.get('sources')
if not isinstance(items, list) or not items:
raise ValueError('Config must contain non-empty array: sources')
targets: List[Target] = []
for index, item in enumerate(items):
if not isinstance(item, dict):
raise ValueError(f'sources[{index}] must be table/object')
if 'target' not in item:
raise ValueError(f'sources[{index}] must contain "target"')
if not isinstance(item['target'], str):
raise ValueError(f'sources[{index}].target must be string')
token = item.get('token')
if token is not None and not isinstance(token, str):
raise ValueError(f'sources[{index}].token must be string')
targets.append(parse_target(item['target'], token))
return targets
def github_request(url: str, token: Optional[str]) -> Tuple[List[dict], Optional[str]]:
req = urllib.request.Request(url)
req.add_header('Accept', 'application/vnd.github+json')
req.add_header('X-GitHub-Api-Version', '2022-11-28')
req.add_header('User-Agent', 'github-mirror-script')
if token:
req.add_header('Authorization', f'Bearer {token}')
try:
with urllib.request.urlopen(req) as resp:
body = resp.read()
links = resp.headers.get('Link', '')
except urllib.error.HTTPError as exc:
payload = exc.read().decode('utf-8', errors='replace')
raise RuntimeError(f'GitHub API error {exc.code} for {url}: {payload}') from exc
except urllib.error.URLError as exc:
raise RuntimeError(f'Network error for {url}: {exc}') from exc
data = json.loads(body.decode('utf-8'))
if not isinstance(data, list):
raise RuntimeError(f'Unexpected API payload for {url}: expected list')
return data, parse_next_link(links)
def parse_next_link(link_header: str) -> Optional[str]:
if not link_header:
return None
for part in link_header.split(','):
piece = part.strip()
if 'rel="next"' in piece:
start = piece.find('<')
end = piece.find('>')
if start != -1 and end != -1 and start < end:
return piece[start + 1 : end]
return None
def list_owner_repos(
owner: str, token: Optional[str], ignore_forks: bool
) -> List[Tuple[str, str, str]]:
repos: List[Tuple[str, str, str]] = []
owner_type = detect_owner_type(owner, token)
if owner_type == 'org':
base_url = f'https://api.github.com/orgs/{urllib.parse.quote(owner)}/repos?per_page=100&type=all'
else:
base_url = f'https://api.github.com/users/{urllib.parse.quote(owner)}/repos?per_page=100&type=owner'
next_url: Optional[str] = base_url
while next_url:
page, next_url = github_request(next_url, token)
for repo in page:
full_name = repo.get('full_name')
clone_url = repo.get('clone_url')
repo_name = repo.get('name')
owner_name = repo.get('owner', {}).get('login')
is_fork = repo.get('fork')
if ignore_forks and is_fork is True:
continue
if all(isinstance(v, str) for v in [full_name, clone_url, repo_name, owner_name]):
repos.append((owner_name, repo_name, clone_url))
return repos
def detect_owner_type(owner: str, token: Optional[str]) -> str:
url = f'https://api.github.com/users/{urllib.parse.quote(owner)}'
req = urllib.request.Request(url)
req.add_header('Accept', 'application/vnd.github+json')
req.add_header('X-GitHub-Api-Version', '2022-11-28')
req.add_header('User-Agent', 'github-mirror-script')
if token:
req.add_header('Authorization', f'Bearer {token}')
try:
with urllib.request.urlopen(req) as resp:
data = json.loads(resp.read().decode('utf-8'))
except urllib.error.HTTPError as exc:
payload = exc.read().decode('utf-8', errors='replace')
raise RuntimeError(f'Failed to detect owner type for {owner}: {exc.code} {payload}') from exc
except urllib.error.URLError as exc:
raise RuntimeError(f'Network error while reading owner {owner}: {exc}') from exc
owner_type = data.get('type')
if owner_type == 'Organization':
return 'org'
return 'user'
def get_repo(owner: str, repo: str, token: Optional[str]) -> Tuple[str, str, str, bool]:
url = f'https://api.github.com/repos/{urllib.parse.quote(owner)}/{urllib.parse.quote(repo)}'
req = urllib.request.Request(url)
req.add_header('Accept', 'application/vnd.github+json')
req.add_header('X-GitHub-Api-Version', '2022-11-28')
req.add_header('User-Agent', 'github-mirror-script')
if token:
req.add_header('Authorization', f'Bearer {token}')
try:
with urllib.request.urlopen(req) as resp:
data = json.loads(resp.read().decode('utf-8'))
except urllib.error.HTTPError as exc:
payload = exc.read().decode('utf-8', errors='replace')
raise RuntimeError(f'Failed to read repo {owner}/{repo}: {exc.code} {payload}') from exc
except urllib.error.URLError as exc:
raise RuntimeError(f'Network error while reading repo {owner}/{repo}: {exc}') from exc
clone_url = data.get('clone_url')
owner_name = data.get('owner', {}).get('login')
repo_name = data.get('name')
is_fork = data.get('fork')
if not all(isinstance(v, str) for v in [clone_url, owner_name, repo_name]):
raise RuntimeError(f'Unexpected repo payload for {owner}/{repo}')
if not isinstance(is_fork, bool):
raise RuntimeError(f'Unexpected repo payload for {owner}/{repo}: missing fork flag')
return owner_name, repo_name, clone_url, is_fork
def auth_url(clone_url: str, token: Optional[str]) -> str:
if not token:
return clone_url
parsed = urllib.parse.urlparse(clone_url)
netloc = f'x-access-token:{urllib.parse.quote(token, safe="")}@{parsed.netloc}'
with_auth = parsed._replace(netloc=netloc)
return urllib.parse.urlunparse(with_auth)
def redact_url(url: str) -> str:
parsed = urllib.parse.urlparse(url)
if not parsed.scheme or not parsed.netloc or '@' not in parsed.netloc:
return url
host = parsed.netloc.split('@', 1)[1]
safe = parsed._replace(netloc=f'***:***@{host}')
return urllib.parse.urlunparse(safe)
def sanitize_git_args(args: List[str]) -> List[str]:
sanitized: List[str] = []
for arg in args:
if arg.startswith('http://') or arg.startswith('https://'):
sanitized.append(redact_url(arg))
else:
sanitized.append(arg)
return sanitized
def run_git(args: List[str], cwd: Optional[Path] = None) -> None:
logging.info('Running: %s', ' '.join(sanitize_git_args(args)))
completed = subprocess.run(args, cwd=str(cwd) if cwd else None, check=False)
if completed.returncode != 0:
raise RuntimeError(f'Git command failed ({completed.returncode}): {" ".join(args)}')
def sync_bare_repo(
root_dir: Path, owner: str, repo: str, clone_url: str, token: Optional[str], prune: bool
) -> None:
repo_dir = root_dir / 'github.com' / owner / repo
repo_parent = repo_dir.parent
repo_parent.mkdir(parents=True, exist_ok=True)
remote_url = auth_url(clone_url, token)
if not repo_dir.exists():
logging.info('Cloning bare repo: %s/%s', owner, repo)
run_git(['git', 'clone', '--bare', '--origin', 'origin', remote_url, str(repo_dir)])
return
if not (repo_dir / 'HEAD').exists():
raise RuntimeError(f'Path exists but is not bare git repo: {repo_dir}')
logging.info('Fetching bare repo: %s/%s', owner, repo)
run_git(['git', '-C', str(repo_dir), 'remote', 'set-url', 'origin', remote_url])
fetch_cmd = ['git', '-C', str(repo_dir), 'fetch', '--tags', 'origin', '+refs/*:refs/*']
if prune:
fetch_cmd.insert(4, '--prune')
run_git(fetch_cmd)
def collect_repos(
targets: Iterable[Target], ignore_forks: bool
) -> Dict[Tuple[str, str], Tuple[str, Optional[str]]]:
repos: Dict[Tuple[str, str], Tuple[str, Optional[str]]] = {}
for target in targets:
if target.kind == 'repo' and target.repo:
owner, repo, clone_url, is_fork = get_repo(target.owner, target.repo, target.token)
if ignore_forks and is_fork:
continue
repos[(owner, repo)] = (clone_url, target.token)
continue
owner_repos = list_owner_repos(target.owner, target.token, ignore_forks)
for owner, repo, clone_url in owner_repos:
key = (owner, repo)
if key not in repos or target.token:
repos[key] = (clone_url, target.token)
return repos
def main() -> int:
parser = argparse.ArgumentParser(description='Mirror GitHub repositories as bare git repos.')
parser.add_argument('config', help='Path to TOML config file')
args = parser.parse_args()
config_path = Path(args.config).expanduser().resolve()
config = load_config(config_path)
root_dir_raw = config.get('root_dir')
if not isinstance(root_dir_raw, str) or not root_dir_raw.strip():
raise ValueError('Config field root_dir must be a non-empty string')
log_level = config.get('log_level', 'INFO')
if not isinstance(log_level, str):
raise ValueError('Config field log_level must be string')
configure_logging(log_level)
prune = config.get('prune', True)
if not isinstance(prune, bool):
raise ValueError('Config field prune must be boolean')
ignore_forks = config.get('ignore_forks', True)
if not isinstance(ignore_forks, bool):
raise ValueError('Config field ignore_forks must be boolean')
root_dir = Path(root_dir_raw).expanduser().resolve()
root_dir.mkdir(parents=True, exist_ok=True)
targets = parse_targets(config)
logging.info('Parsed %d source targets', len(targets))
repos = collect_repos(targets, ignore_forks)
logging.info('Resolved %d repositories to sync', len(repos))
failed: Set[str] = set()
for (owner, repo), (clone_url, token) in sorted(repos.items()):
try:
sync_bare_repo(root_dir, owner, repo, clone_url, token, prune)
except Exception as exc: # pylint: disable=broad-except
failed.add(f'{owner}/{repo}')
logging.error('Failed to sync %s/%s: %s', owner, repo, exc)
if failed:
logging.error('Sync completed with failures: %d', len(failed))
for item in sorted(failed):
logging.error('Failed repo: %s', item)
return 1
logging.info('Sync completed successfully.')
return 0
if __name__ == '__main__':
try:
sys.exit(main())
except Exception as exc: # pylint: disable=broad-except
logging.error('%s', exc)
sys.exit(1)