#!/usr/bin/env python3 # Anonymine leaderboard web server # Usage: # sudo leaderboard [nproc-ulimit] >logfile # Standard stuff: Python 3 on unix-like OS import grp import os import re import pwd import resource import signal import socket import sys import time import traceback # Non-standard stuff: # https://gitlab.com/oskog97/anonymine.git # May require `python3 symlinks install` after installation import anonymine_engine try: from protodetect import format_request except ModuleNotFoundError: def format_request(bytestring): try: log_line = repr(bytestring) decoded = bytestring.decode('ascii') lines = decoded.replace('\r\n', '\n').split('\n')[:-1] log_line = repr(lines)[1:-1] except Exception: pass finally: return log_line # Server configuation # Starts as root to bind to port 80 # Note: Starting as appropriate unprivileged user is NOT implemented port = 80 # This depends on the Anonymine installation, check with # `make print-destinations` anonymine_cfg_file = "/etc/anonymine/enginecfg" hiscore_file = "/var/games/anonymine" # Static files for trolling bots troll_dir = "/var/troll" # This is displayed on the homepage ssh_command = "ssh play@anonymine-demo.oskog97.com" ssh_password = "play" def drop_priv(user="www", group="www") -> None: ''' Drop priviliges. Only meant to be called if running as root. Raises OSError on failure to change UIDs and GIDs to supplied user and group. ''' uid = pwd.getpwnam(user).pw_uid gid = grp.getgrnam(group).gr_gid os.initgroups(user, gid) os.setresgid(gid, gid, gid) os.setresuid(uid, uid, uid) if os.getresuid() != (uid, uid, uid): raise OSError("Failed to set UID") if os.getresgid() != (gid, gid, gid): raise OSError("Failed to set UID") if os.getgroups() != [gid]: raise OSError("Failed to get rid of groups") def log(str): timestamp = time.strftime("[%Y-%m-%d %H:%M:%S %Z]", time.gmtime()) print(timestamp + ' ' + str, flush=True) # global, set by http_headers and read by client_handler log_response = "[No response]" def http_headers(client: socket.socket, status: str, **kwargs) -> None: ''' Write all the HTTP headers and the blank line. `status` is a string such as "200 OK" or "404 Not Found" `kwargs` contains all the extra headers to set. The keyword argument 'mime' sets the Content-Type with charset UTF-8. Example: http_headers(socket, "200 OK", mime="text/html") ''' client.send(f"HTTP/1.0 {status}\r\n".encode('ascii')) client.send(b"Server: Anonymine leaderboard\r\n") if 'mime' in kwargs: client.send( f"Content-Type: {kwargs['mime']}; charset=UTF-8\r\n".encode('ascii') ) del kwargs['mime'] for kwarg in kwargs: header = kwarg.replace('_', '-') client.send(f"{header}: {kwargs[kwarg]}\r\n".encode('ascii')) client.send(b"\r\n") # Save status after successfully sending for loggin # Can't print here because 400 responses will appear before the request line global log_response log_response = status def escape(s: str) -> str: ''' Return `str` escaped for inclusion in HTML Example: escape('') -> '<script>alert(1)</script>' ''' replace = [ ('&', '&'), # MUST be first ('<', '<'), ('>', '>'), ('"', '"'), ("'", '''), ] for from_str, to_str in replace: s = s.replace(from_str, to_str) return s def begin_page(client: socket.socket, title: str) -> None: ''' Begin writing HTML document Example: http_headers(socket, "200 OK", mime="text/html") begin_page(socket, "Hello world") # Content inside end_page(socket) ''' style=""" /* Color scheme */ body { color: #44ee11; background-color: #080400; } tr:nth-child(odd) { background-color: #201008; } a:link { color: #dddd88; } a:visited { color: #999955; } a { /* text-decoration: none; */ font-style: italic; } /* code */ code { font-family: monospace; color: #cccccc; background-color: #202030; } /* Table */ table { border-collapse: collapse; } td, th { border: 1px solid #505; padding-left: .5em; padding-right: .5em; } /* Big number in main table */ #leaderboard-index td { font-size: 150%; text-align: center; } /* Login button on top right */ .login { float: right; text-align: right; } """ # The login page is a lie, there is nothing to log in to. client.send(f""" {title}

Login

{title}

""".encode('utf-8')) def end_page(client: socket.socket) -> None: ''' Finish writing HTML document Example: http_headers(socket, "200 OK", mime="text/html") begin_page(socket, "Hello world") # Content inside end_page(socket) ''' client.send(b"\n") def main_page(client: socket.socket) -> None: ''' This generates the HTML page for '/' Called by client_handler after setting HTTP headers ''' begin_page(client, "Anonymine leaderboards") client.send(f"""

This is the leaderboards for the public Anonymine demo server

Leaderboards

\n""" .encode('utf-8') ) # What leaderboards exist # Using dictionaries instead of sets to keep count of number of hits categories = {} rows = {} # (mines + '@' + width + 'x' + height, '+losable' or '') lines = filter(None, open(hiscore_file).read().split('\n')) for line in lines: # Add category (table cell) to set category = line.split(':')[0] if category not in categories: categories[category] = 1 else: categories[category] += 1 # Which row is this? # Winners and losers on the same row if category.startswith('lost/'): category = category.split('/')[1] prefix = category.split('-')[0] # Separate rows for +losable if category.endswith('+losable'): row = (prefix, '+losable') else: row = (prefix, '') # Add row to set if row not in rows: rows[row] = 1 else: rows[row] += 1 # Print table body fieldtypes = ['moore', 'hex', 'neumann'] for prefix, suffix in sorted(rows, key=lambda x: rows[x], reverse=True): client.send(f"\n \n".encode('ascii')) for column in range(6): # Turn row (prefix, suffix) into full category using column index fieldtype = fieldtypes[column//2] if column % 2: category = f'lost/{prefix}-{fieldtype}{suffix}' urlish = f'/losers/{prefix}-{fieldtype}{suffix}' else: category = f'{prefix}-{fieldtype}{suffix}' urlish = f'/winners/{prefix}-{fieldtype}{suffix}' # Convert "+losable" to "-losable" and "@" to "_" in URL url = urlish.replace('+', '-').replace("@", "_") # Does it exist? if category in categories: client.send( f' \n' .encode('ascii') ) else: client.send(b' \n') client.send(b"\n") client.send(b"
Mines & area Moore/normal Hex Neumann
WinnersLosers WinnersLosers WinnersLosers
{prefix}{suffix}{categories[category]}
\n") client.send(b'\n') end_page(client) def leaderboard_page(client: socket.socket, uri: str) -> None: ''' This generates the HTML page for individual leaderboards Called by client_handler after setting HTTP headers ''' # Transform uri into category replace = [ ('/winners/', ''), ('/losers/', 'lost/'), ('-losable', '+losable'), ('_', '@'), ] category = uri for from_str, to_str in replace: category = category.replace(from_str, to_str) begin_page(client, f"Highscores for {category}") client.send(b'

Back to main page

\n') client.send( f"

{time.strftime('Timezone is %Z, current time: %H:%M')}

" .encode('ascii') ) # Get data # Use the hiscores class in anonymine_engine to get the rounding, # formatting and sorting exactly the same way as in the game. # load_cfg(path_to_file, filename_for_errmsh, list_of_module) # We want the 'hiscores' part from enginecfg cfg = anonymine_engine.load_cfg(anonymine_cfg_file, '', [])['hiscores'] # hiscores(cfg, category, time), using time=None to just view hs = anonymine_engine.hiscores(cfg, category, None) # (str, [str, ...], [[str, ...], ...]) _ignore, headers, body = hs.display() # Format data client.send(b"\n") for header in headers: client.send(f"".encode('utf-8')) client.send(b"\n") for row in body: client.send(b"") for col in row: client.send(f"".encode('utf-8')) client.send(b"\n") client.send(b"
{escape(header)}
{escape(col)}
\n") end_page(client) def client_handler_inner(client: socket.socket, addr) -> None: ''' This does most of the job of client_handler, but it doesn't catch internal errors and generate 500 error pages, nor does it log the response status. ''' # Get the request and log it, send 400 message if needed log_line = "(No input)" buf = b'' max_recv = 1500 try: buf = client.recv(max_recv) log_line = format_request(buf) #decoded = buf.decode('ascii', errors='surrogateescape') decoded = buf.decode('ascii') lines = decoded.replace('\r\n', '\n').split('\n') request_line = lines[0] method, uri, proto_version = request_line.split(' ') except (ConnectionResetError, OSError): return except Exception as err: # Generate 400 error #log_line += ' -- ' + repr(err) try: http_headers(client, "400 Bad Request") except BrokenPipeError: pass except Exception: log(traceback.format_exc()) return finally: # Log the request if len(buf) == max_recv: log_line += ' (TRUNCATED)' log(f'{addr} {log_line}') # Method checks # Only GET and HEAD is required if method == "OPTIONS": http_headers(client, "204 No Content", Allow="GET, HEAD, OPTIONS") return if method not in ("GET", "HEAD"): http_headers(client, "501 Not Implemented") return # URI checks: # Rewrites # 301/pass Redirects # 200/pass Bot trolling and static plain text # 200/pass [/r]/raw or [/r]/quine # 200/pass main feature # 404 # Regex to substitution URI rewrites = [ # These first so any exploit will "work" (".*etc/passwd.*", "/s/passwd"), (".*etc/group.*", "/s/group"), # Order is important for login regexes: ("^/login\\?.*", "/rickroll"), (".*(login|admin).*", "/s/login"), # /robots.txt ("^/robots\\.txt$", "/s/robots"), # Google verify ("^/google([0-9a-f]{16})\\.html$", "/s/google\\1"), ] for regex, target in rewrites: if re.match(regex, uri): uri = re.sub(regex, target, uri) break # Absolute URI to Location redirects = { "/rickroll": "https://www.youtube.com/watch?v=dQw4w9WgXcQ", # Favicon from main site "/favicon.ico": "https://oskog97.com/favicon.png", # Directories that don't actually exist: "/winners": "/", "/winners/": "/", "/losers": "/", "/losers/": "/", "/r": "/", "/r/": "/", } if uri in redirects: http_headers(client, "301 Moved Permanently", Location=redirects[uri]) return # Bot trolling features / static files if re.match('^/s/[a-z0-9]+$', uri): troll_file = uri.split('/')[2] if '\0' in troll_file or '/' in troll_file or '.' in troll_file: raise AssertionError("Unsafe character found in troll_file") types = [ ('', 'plain'), ('.inc', 'html-content'), ('.html', 'html-whole'), ] content = None for suffix, filetype in types: try: path = os.path.join(troll_dir, troll_file+suffix) content = open(path).read() break except OSError: pass if content is not None: if filetype == 'plain': http_headers(client, "200 OK", mime="text/plain", X_Robots_Tag="none") if 'html' in filetype: http_headers(client, "200 OK", mime="text/html", X_Robots_Tag="none") if filetype == 'html-content': begin_page(client, troll_file) client.send(content.encode('utf-8')) if filetype == 'html-content': end_page(client) return # Raw and quine features if uri in ('/r/raw', '/r/quine', '/raw', '/quine'): # Plain/HTML if uri.startswith('/r/'): mime = "text/plain" else: mime = "text/html" # Select content and set robots variable if uri.endswith('/quine'): content = open(sys.argv[0]).read() robots = "all" if uri.endswith('/raw'): content = open(hiscore_file).read() robots = "noindex" # Headers http_headers(client, "200 OK", mime=mime, X_Robots_Tag=robots) if method == 'HEAD': return # Output body if mime == 'text/html': begin_page(client, uri.split('/')[-1]) client.send(b"

Back to main page

") client.send(f"

-> Plain text

" .encode('utf-8')) client.send(f"
{escape(content)}
".encode('utf-8')) end_page(client) else: client.send(content.encode('utf-8')) return # Leaderboard or main page: regex="^/(winn|los)ers/[0-9]+_[0-9]+x[0-9]+-(moore|neumann|hex)(-losable)?$" if re.match(regex, uri) or uri == '/': http_headers(client, "200 OK", mime="text/html") if method == 'GET': if uri == '/': main_page(client) else: leaderboard_page(client, uri) return # Method OK, URI not OK http_headers(client, "404 Not Found", mime="text/html") begin_page(client, "404 - Not found") end_page(client) def client_handler(client: socket.socket, addr) -> None: ''' Example: client, addr = serversocket.accept() client_handler(client, addr) This handles *response* logging and 500 page generation. All actual work as well as *request* logging happens in `client_handler_inner`. Response logging requires the global variable `log_response` which gets set by `http_headers`. ''' try: client_handler_inner(client, addr) except (BrokenPipeError, ConnectionResetError): pass except Exception: log(traceback.format_exc()) try: http_headers(client, "500 Internal Server Error", mime="text/html") begin_page(client, "500 - Server error") client.send(b"""

Something went wrong. The error has been logged and will be fixed, sometime.

""") end_page(client) except Exception: pass finally: # Log the response log(f'{addr} -> {log_response}') try: client.shutdown(socket.SHUT_RDWR) except Exception: pass client.close() def main() -> None: ''' Webserver for Anonymine leaderboard This function does not return Note: must be started as root ''' server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # SO_REUADDR needed for fast server restarts server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.bind(('', port)) # IMPORTANT drop_priv() if len(sys.argv) == 2: value = int(sys.argv[1]) resource.setrlimit(resource.RLIMIT_NPROC, (value, value)) # Explicitly ignore SIGCHLD to avoid creating zombies signal.signal(signal.SIGCHLD, signal.SIG_IGN) server.listen(1) my_pid = os.getpid() while True: if os.getpid() != my_pid: sys.stdout.write(f'THERMONUCLEAR: {traceback.format_exc()}\n') sys.stdout.flush() os._exit(1) # Accept connection try: client, addr = server.accept() except ConnectionAbortedError: continue except Exception: log(traceback.format_exc()) continue # Hand off connection to child process try: child_pid = os.fork() except Exception as err: log(f'{addr}: Fork failure') client.close() continue if child_pid: client.close() else: server.close() try: client_handler(client, addr) except Exception as e: log(traceback.format_exc()) sys.exit(0) if __name__ == '__main__': main()