Login
quine
Back to main page
-> Plain text
#!/usr/bin/env python3
# Anonymine leaderboard web server
# Usage:
# sudo leaderboard [nproc-ulimit] >logfile
# Standard stuff: Python 3 on unix-like OS
import grp
import os
import re
import pwd
import resource
import signal
import socket
import sys
import time
import traceback
# Non-standard stuff:
# https://gitlab.com/oskog97/anonymine.git
# May require `python3 symlinks install` after installation
import anonymine_engine
try:
from protodetect import format_request
except ModuleNotFoundError:
def format_request(bytestring):
try:
log_line = repr(bytestring)
decoded = bytestring.decode('ascii')
lines = decoded.replace('\r\n', '\n').split('\n')[:-1]
log_line = repr(lines)[1:-1]
except Exception:
pass
finally:
return log_line
# Server configuation
# Starts as root to bind to port 80
# Note: Starting as appropriate unprivileged user is NOT implemented
port = 80
# This depends on the Anonymine installation, check with
# `make print-destinations`
anonymine_cfg_file = "/etc/anonymine/enginecfg"
hiscore_file = "/var/games/anonymine"
# Static files for trolling bots
troll_dir = "/var/troll"
# This is displayed on the homepage
ssh_command = "ssh play@anonymine-demo.oskog97.com"
ssh_password = "play"
client_timeout = 60
def drop_priv(user="www-data", group="www-data") -> None:
'''
Drop priviliges. Only meant to be called if running as root.
Raises OSError on failure to change UIDs and GIDs to supplied
user and group.
'''
uid = pwd.getpwnam(user).pw_uid
gid = grp.getgrnam(group).gr_gid
os.initgroups(user, gid)
os.setresgid(gid, gid, gid)
os.setresuid(uid, uid, uid)
if os.getresuid() != (uid, uid, uid):
raise OSError("Failed to set UID")
if os.getresgid() != (gid, gid, gid):
raise OSError("Failed to set UID")
if os.getgroups() != [gid]:
raise OSError("Failed to get rid of groups")
def log(str):
timestamp = time.strftime("[%Y-%m-%d %H:%M:%S %Z]", time.gmtime())
print(timestamp + ' ' + str, flush=True)
# global, set by http_headers and read by client_handler
log_response = "[No response]"
def http_headers(client: socket.socket, status: str, **kwargs) -> None:
'''
Write all the HTTP headers and the blank line.
`status` is a string such as "200 OK" or "404 Not Found"
`kwargs` contains all the extra headers to set.
The keyword argument 'mime' sets the Content-Type with charset UTF-8.
Example:
http_headers(socket, "200 OK", mime="text/html")
'''
client.send(f"HTTP/1.0 {status}\r\n".encode('ascii'))
client.send(b"Server: Anonymine leaderboard\r\n")
if 'mime' in kwargs:
client.send(
f"Content-Type: {kwargs['mime']}; charset=UTF-8\r\n".encode('ascii')
)
del kwargs['mime']
for kwarg in kwargs:
header = kwarg.replace('_', '-')
client.send(f"{header}: {kwargs[kwarg]}\r\n".encode('ascii'))
client.send(b"\r\n")
# Save status after successfully sending for loggin
# Can't print here because 400 responses will appear before the request line
global log_response
log_response = status
def escape(s: str) -> str:
'''
Return `str` escaped for inclusion in HTML
Example:
escape('<script>alert(1)</script>')
-> '<script>alert(1)</script>'
'''
replace = [
('&', '&'), # MUST be first
('<', '<'),
('>', '>'),
('"', '"'),
("'", '''),
]
for from_str, to_str in replace:
s = s.replace(from_str, to_str)
return s
def begin_page(client: socket.socket, title: str) -> None:
'''
Begin writing HTML document
Example:
http_headers(socket, "200 OK", mime="text/html")
begin_page(socket, "Hello world")
# Content inside <body>
end_page(socket)
'''
style="""
/* Color scheme */
body {
color: #44ee11;
background-color: #080400;
}
tr:nth-child(odd) {
background-color: #201008;
}
a:link {
color: #dddd88;
}
a:visited {
color: #999955;
}
a {
/* text-decoration: none; */
font-style: italic;
}
/* code */
code {
font-family: monospace;
color: #cccccc;
background-color: #202030;
}
/* Table */
table {
border-collapse: collapse;
}
td, th {
border: 1px solid #505;
padding-left: .5em;
padding-right: .5em;
}
/* Big number in main table */
#leaderboard-index td {
font-size: 150%;
text-align: center;
}
/* Login button on top right */
.login {
float: right;
text-align: right;
}
"""
# The login page is a lie, there is nothing to log in to.
client.send(f"""<!DOCTYPE html>
<html><head>
<meta charset="utf-8"/>
<title>{title}</title>
<meta name="viewport" content="width=device-width"/>
<style>{style}</style>
</head><body>
<p class="login"><a href="/login">Login</a></p>
<h1>{title}</h1>
""".encode('utf-8'))
def end_page(client: socket.socket) -> None:
'''
Finish writing HTML document
Example:
http_headers(socket, "200 OK", mime="text/html")
begin_page(socket, "Hello world")
# Content inside <body>
end_page(socket)
'''
client.send(b"</body></html>\n")
def main_page(client: socket.socket) -> None:
'''
This generates the HTML page for '/'
Called by client_handler after setting HTTP headers
'''
begin_page(client, "Anonymine leaderboards")
client.send(f"""
<p>This is the leaderboards for the public Anonymine demo server</p>
<ul>
<li>
To play on public server: <code>{ssh_command}</code>,
password is <code>{ssh_password}</code>
</li>
<li>
<a href="https://oskog97.com/projects/anonymine/"
>-> Info page, and download</a>
</li>
</ul>
<h2>Leaderboards</h2>
<table id="leaderboard-index">
<tr>
<th rowspan="2">Difficulty</th>
<th colspan="2">Moore/normal</th>
<th colspan="2">Hex</th>
<th colspan="2">Neumann</th>
</tr>
<tr>
<th>Winners</th><th>Losers</th>
<th>Winners</th><th>Losers</th>
<th>Winners</th><th>Losers</th>
</tr>\n"""
.encode('utf-8')
)
# What leaderboards exist
# Using dictionaries instead of sets to keep count of number of hits
categories = {}
rows = {} # (mines + '@' + width + 'x' + height, '+losable' or '')
lines = filter(None, open(hiscore_file).read().split('\n'))
for line in lines:
# Add category (table cell) to set
category = line.split(':')[0]
if category not in categories:
categories[category] = 1
else:
categories[category] += 1
# Which row is this?
# Winners and losers on the same row
if category.startswith('lost/'):
category = category.split('/')[1]
prefix = category.split('-')[0]
# Separate rows for +losable
if category.endswith('+losable'):
row = (prefix, '+losable')
else:
row = (prefix, '')
# Add row to set
if row not in rows:
rows[row] = 1
else:
rows[row] += 1
presets = [
('Easy', '31@18x17-moore', '31@18x17-hex', '31@18x17-neumann'),
('Medium', '50@21x16-moore', '50@21x16-hex', '50@21x16-neumann'),
('Default', '80@20x20-moore', '80@20x20-hex', '80@20x20-neumann'),
('Hard', '128@24x18-moore', '128@25x19-hex', '128@27x21-neumann'),
('Ultra', '205@27x19-moore', '205@25x24-hex', '205@38x21-neumann'),
]
for line in presets:
for category in line[1:]:
if not category in categories:
categories[category] = 0
loser = 'lost/' + category
if not loser in categories:
categories[loser] = 0
# Print preset difficulties table body
for line in presets:
difficulty, a, b, c = line
client.send(f"<tr>\n <th>{difficulty}</th>\n".encode('ascii'))
for category in (a, b, c):
win_url = '/winners/' + category.replace('@', '_')
lose_url = '/losers/' + category.replace('@', '_')
win_count = categories[category]
lose_count = categories['lost/' + category]
client.send(
f' <td><a href="{win_url}">{win_count}</a></td>\n'
f' <td><a href="{lose_url}">{lose_count}</a></td>\n'
.encode('ascii')
)
client.send(b'</tr>\n')
client.send(
b'<tr><th>Custom<br/>Mines & area</th><th colspan="6"></th></tr>\n'
)
# Print table body
fieldtypes = ['moore', 'hex', 'neumann']
for prefix, suffix in sorted(rows, key=lambda x: rows[x], reverse=True):
client.send(f"<tr>\n <th>{prefix}{suffix}</th>\n".encode('ascii'))
for column in range(6):
# Turn row (prefix, suffix) into full category using column index
fieldtype = fieldtypes[column//2]
if column % 2:
category = f'lost/{prefix}-{fieldtype}{suffix}'
urlish = f'/losers/{prefix}-{fieldtype}{suffix}'
else:
category = f'{prefix}-{fieldtype}{suffix}'
urlish = f'/winners/{prefix}-{fieldtype}{suffix}'
# Convert "+losable" to "-losable" and "@" to "_" in URL
url = urlish.replace('+', '-').replace("@", "_")
# Does it exist?
if category in categories:
client.send(
f' <td><a href="{url}">{categories[category]}</a></td>\n'
.encode('ascii')
)
else:
client.send(b' <td></td>\n')
client.send(b"</tr>\n")
client.send(b"</table>\n")
client.send(b'<ul>\n')
client.send(b'<li><a href="/quine">Leaderboard source code</a></li>\n')
client.send(b'<li><a href="/raw">Raw highscores file</a></li>\n')
client.send(b'</ul>\n')
end_page(client)
def leaderboard_page(client: socket.socket, uri: str) -> None:
'''
This generates the HTML page for individual leaderboards
Called by client_handler after setting HTTP headers
'''
# Transform uri into category
replace = [
('/winners/', ''),
('/losers/', 'lost/'),
('-losable', '+losable'),
('_', '@'),
]
category = uri
for from_str, to_str in replace:
category = category.replace(from_str, to_str)
begin_page(client, f"Highscores for {category}")
client.send(b'<p><a href="/">Back to main page</a></p>\n')
client.send(
f"<p>{time.strftime('Timezone is %Z, current time: %H:%M')}</p>"
.encode('ascii')
)
# Get data
# Use the hiscores class in anonymine_engine to get the rounding,
# formatting and sorting exactly the same way as in the game.
# load_cfg(path_to_file, filename_for_errmsh, list_of_module)
# We want the 'hiscores' part from enginecfg
cfg = anonymine_engine.load_cfg(anonymine_cfg_file, '', [])['hiscores']
# hiscores(cfg, category, time), using time=None to just view
hs = anonymine_engine.hiscores(cfg, category, None)
# (str, [str, ...], [[str, ...], ...])
_ignore, headers, body = hs.display()
# Format data
client.send(b"<table>\n<tr>")
for header in headers:
client.send(f"<th>{escape(header)}</th>".encode('utf-8'))
client.send(b"</tr>\n")
for row in body:
client.send(b"<tr>")
for col in row:
client.send(f"<td>{escape(col)}</td>".encode('utf-8'))
client.send(b"</tr>\n")
client.send(b"</table>\n")
end_page(client)
def client_handler_inner(client: socket.socket, addr) -> None:
'''
This does most of the job of client_handler, but it doesn't
catch internal errors and generate 500 error pages, nor does it
log the response status.
'''
# Get the request and log it, send 400 message if needed
log_line = "(No input)"
buf = b''
max_recv = 1500
try:
buf = client.recv(max_recv)
log_line = format_request(buf)
#decoded = buf.decode('ascii', errors='surrogateescape')
decoded = buf.decode('ascii')
lines = decoded.replace('\r\n', '\n').split('\n')
request_line = lines[0]
method, uri, proto_version = request_line.split(' ')
except (ConnectionResetError, OSError):
return
except Exception as err:
# Generate 400 error
#log_line += ' -- ' + repr(err)
try:
http_headers(client, "400 Bad Request")
except BrokenPipeError:
pass
except Exception:
log(traceback.format_exc())
return
finally:
# Log the request
if len(buf) == max_recv:
log_line += ' (TRUNCATED)'
log(f'{addr} {log_line}')
# Method checks
# Only GET and HEAD is required
if method == "OPTIONS":
http_headers(client, "204 No Content", Allow="GET, HEAD, OPTIONS")
return
if method not in ("GET", "HEAD"):
http_headers(client, "501 Not Implemented")
return
# URI checks:
# Rewrites
# 301/pass Redirects
# 200/pass Bot trolling and static plain text
# 200/pass [/r]/raw or [/r]/quine
# 200/pass main feature
# 404
# Regex to substitution URI
rewrites = [
# These first so any exploit will "work"
(".*etc/passwd.*", "/s/passwd"),
(".*etc/group.*", "/s/group"),
# Order is important for login regexes:
("^/login\\?.*", "/rickroll"),
(".*(login|admin).*", "/s/login"),
# /robots.txt
("^/robots\\.txt$", "/s/robots"),
# Google verify
("^/google([0-9a-f]{16})\\.html$", "/s/google\\1"),
]
# HACK to reduce the number of hits to etc/passwd or etc/group {
tmp = uri.replace('passwd', '').replace('group', '')
while '../'*5 in tmp:
tmp = tmp.replace('../'*5, '../'*4)
if hash(tmp)%100 > 10:
uri = tmp
# }
for regex, target in rewrites:
if re.match(regex, uri):
uri = re.sub(regex, target, uri)
break
# Absolute URI to Location
redirects = {
"/rickroll": "https://www.youtube.com/watch?v=dQw4w9WgXcQ",
# Favicon from main site
"/favicon.ico": "https://oskog97.com/favicon.png",
# Directories that don't actually exist:
"/winners": "/",
"/winners/": "/",
"/losers": "/",
"/losers/": "/",
"/r": "/",
"/r/": "/",
}
if uri in redirects:
http_headers(client, "301 Moved Permanently", Location=redirects[uri])
return
# Bot trolling features / static files
if re.match('^/s/[a-z0-9]+$', uri):
troll_file = uri.split('/')[2]
if '\0' in troll_file or '/' in troll_file or '.' in troll_file:
raise AssertionError("Unsafe character found in troll_file")
types = [
('', 'plain'),
('.inc', 'html-content'),
('.html', 'html-whole'),
]
content = None
for suffix, filetype in types:
try:
path = os.path.join(troll_dir, troll_file+suffix)
content = open(path).read()
break
except OSError:
pass
if content is not None:
if filetype == 'plain':
http_headers(client, "200 OK", mime="text/plain",
X_Robots_Tag="none")
if 'html' in filetype:
http_headers(client, "200 OK", mime="text/html",
X_Robots_Tag="none")
if filetype == 'html-content':
begin_page(client, troll_file)
client.send(content.encode('utf-8'))
if filetype == 'html-content':
end_page(client)
return
if uri == '/exception-test':
raise Exception('Test unhandled exception')
if uri == '/hang-test':
signal.pause()
# Raw and quine features
if uri in ('/r/raw', '/r/quine', '/raw', '/quine'):
# Plain/HTML
if uri.startswith('/r/'):
mime = "text/plain"
else:
mime = "text/html"
# Select content and set robots variable
if uri.endswith('/quine'):
content = open(sys.argv[0]).read()
robots = "all"
if uri.endswith('/raw'):
content = open(hiscore_file).read()
robots = "noindex"
# Headers
http_headers(client, "200 OK", mime=mime, X_Robots_Tag=robots)
if method == 'HEAD':
return
# Output body
if mime == 'text/html':
begin_page(client, uri.split('/')[-1])
client.send(b"<p><a href='/'>Back to main page</a></p>")
client.send(f"<p><a href='/r{uri}'>-> Plain text</a></p>"
.encode('utf-8'))
client.send(f"<pre>{escape(content)}</pre>".encode('utf-8'))
end_page(client)
else:
client.send(content.encode('utf-8'))
return
# Leaderboard or main page:
regex="^/(winn|los)ers/[0-9]+_[0-9]+x[0-9]+-(moore|neumann|hex)(-losable)?$"
if re.match(regex, uri) or uri == '/':
http_headers(client, "200 OK", mime="text/html")
if 'shellshock' in log_line:
# 'User-Agent': '() { _; } >_[$($())] { echo Content-Type: text/plain ; echo ; echo "bash_cve_2014_6278 Output : $((34+68))',
try:
echo_cmds = log_line.split('echo ')[1:]
test_message = ''
for echo_cmd in echo_cmds:
echo_cmd = echo_cmd.split(';')[0].split("',")[0]
echo_str = echo_cmd.strip().strip('"\'')
if '$((' in echo_str:
prefix, tmp = echo_str.split('$((', 1)
numbers, suffix = tmp.split('))', 1)
# Do I dare to use regex validation and eval?
assert numbers.count('+') == 1, "Unimplemented math"
a, b = numbers.split('+')
echo_str = prefix + str(int(a)+int(b)) + suffix
test_message += echo_str + '\n'
client.send(test_message.encode('utf-8'))
except Exception:
pass
if method == 'GET':
if uri == '/':
main_page(client)
else:
leaderboard_page(client, uri)
return
# Method OK, URI not OK
http_headers(client, "404 Not Found", mime="text/html")
begin_page(client, "404 - Not found")
end_page(client)
def client_handler(client: socket.socket, addr) -> None:
'''
Example:
client, addr = serversocket.accept()
client_handler(client, addr)
This handles *response* logging and 500 page generation. All actual
work as well as *request* logging happens in `client_handler_inner`.
Response logging requires the global variable `log_response` which
gets set by `http_headers`.
'''
try:
client_handler_inner(client, addr)
except (BrokenPipeError, ConnectionResetError):
pass
except Exception:
log(traceback.format_exc())
try:
http_headers(client, "500 Internal Server Error", mime="text/html")
begin_page(client, "500 - Server error")
client.send(b"""<p>
Something went wrong. The error has been logged and will be
fixed, sometime.
</p>""")
end_page(client)
except Exception:
pass
finally:
# Log the response
log(f'{addr} -> {log_response}')
try:
client.shutdown(socket.SHUT_RDWR)
except Exception:
pass
client.close()
def alarm_handler(*args) -> None:
'''
Signal handler for SIGALRM
Raise TimeoutError
'''
raise TimeoutError
def main() -> None:
'''
Webserver for Anonymine leaderboard
This function does not return
Note: must be started as root
'''
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# SO_REUADDR needed for fast server restarts
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('', port))
# IMPORTANT
drop_priv()
if len(sys.argv) == 2:
value = int(sys.argv[1])
resource.setrlimit(resource.RLIMIT_NPROC, (value, value))
# Explicitly ignore SIGCHLD to avoid creating zombies
signal.signal(signal.SIGCHLD, signal.SIG_IGN)
server.listen(1)
my_pid = os.getpid()
while True:
if os.getpid() != my_pid:
sys.stdout.write(f'THERMONUCLEAR: {traceback.format_exc()}\n')
sys.stdout.flush()
os._exit(1)
# Accept connection
try:
client, addr = server.accept()
except ConnectionAbortedError:
continue
except Exception:
log(traceback.format_exc())
continue
# Hand off connection to child process
try:
child_pid = os.fork()
except Exception as err:
log(f'{addr}: Fork failure')
client.close()
continue
if child_pid:
client.close()
else:
server.close()
try:
signal.signal(signal.SIGALRM, alarm_handler)
signal.alarm(client_timeout)
client_handler(client, addr)
except Exception as e:
log(f'{addr}: Unhandled exception: {traceback.format_exc()}')
sys.exit(0)
if __name__ == '__main__':
main()