#!/usr/bin/env python3 # Anonymine leaderboard web server # Usage: # sudo leaderboard [nproc-ulimit] >logfile # Standard stuff: Python 3 on unix-like OS import grp import os import re import pwd import resource import signal import socket import sys import time import traceback # Non-standard stuff: # https://gitlab.com/oskog97/anonymine.git # May require `python3 symlinks install` after installation import anonymine_engine try: from protodetect import format_request except ModuleNotFoundError: def format_request(bytestring): try: log_line = repr(bytestring) decoded = bytestring.decode('ascii') lines = decoded.replace('\r\n', '\n').split('\n')[:-1] log_line = repr(lines)[1:-1] except Exception: pass finally: return log_line # Server configuation # Starts as root to bind to port 80 # Note: Starting as appropriate unprivileged user is NOT implemented port = 80 # This depends on the Anonymine installation, check with # `make print-destinations` anonymine_cfg_file = "/etc/anonymine/enginecfg" hiscore_file = "/var/games/anonymine" # Static files for trolling bots troll_dir = "/var/troll" # This is displayed on the homepage ssh_command = "ssh play@anonymine-demo.oskog97.com" ssh_password = "play" client_timeout = 60 def drop_priv(user="www-data", group="www-data") -> None: ''' Drop priviliges. Only meant to be called if running as root. Raises OSError on failure to change UIDs and GIDs to supplied user and group. ''' uid = pwd.getpwnam(user).pw_uid gid = grp.getgrnam(group).gr_gid os.initgroups(user, gid) os.setresgid(gid, gid, gid) os.setresuid(uid, uid, uid) if os.getresuid() != (uid, uid, uid): raise OSError("Failed to set UID") if os.getresgid() != (gid, gid, gid): raise OSError("Failed to set UID") if os.getgroups() != [gid]: raise OSError("Failed to get rid of groups") def log(str): timestamp = time.strftime("[%Y-%m-%d %H:%M:%S %Z]", time.gmtime()) print(timestamp + ' ' + str, flush=True) # global, set by http_headers and read by client_handler log_response = "[No response]" def http_headers(client: socket.socket, status: str, **kwargs) -> None: ''' Write all the HTTP headers and the blank line. `status` is a string such as "200 OK" or "404 Not Found" `kwargs` contains all the extra headers to set. The keyword argument 'mime' sets the Content-Type with charset UTF-8. Example: http_headers(socket, "200 OK", mime="text/html") ''' client.send(f"HTTP/1.0 {status}\r\n".encode('ascii')) client.send(b"Server: Anonymine leaderboard\r\n") if 'mime' in kwargs: client.send( f"Content-Type: {kwargs['mime']}; charset=UTF-8\r\n".encode('ascii') ) del kwargs['mime'] for kwarg in kwargs: header = kwarg.replace('_', '-') client.send(f"{header}: {kwargs[kwarg]}\r\n".encode('ascii')) client.send(b"\r\n") # Save status after successfully sending for loggin # Can't print here because 400 responses will appear before the request line global log_response log_response = status def escape(s: str) -> str: ''' Return `str` escaped for inclusion in HTML Example: escape('') -> '<script>alert(1)</script>' ''' replace = [ ('&', '&'), # MUST be first ('<', '<'), ('>', '>'), ('"', '"'), ("'", '''), ] for from_str, to_str in replace: s = s.replace(from_str, to_str) return s def begin_page(client: socket.socket, title: str) -> None: ''' Begin writing HTML document Example: http_headers(socket, "200 OK", mime="text/html") begin_page(socket, "Hello world") # Content inside
end_page(socket) ''' style=""" /* Color scheme */ body { color: #44ee11; background-color: #080400; } tr:nth-child(odd) { background-color: #201008; } a:link { color: #dddd88; } a:visited { color: #999955; } a { /* text-decoration: none; */ font-style: italic; } /* code */ code { font-family: monospace; color: #cccccc; background-color: #202030; } /* Table */ table { border-collapse: collapse; } td, th { border: 1px solid #505; padding-left: .5em; padding-right: .5em; } /* Big number in main table */ #leaderboard-index td { font-size: 150%; text-align: center; } /* Login button on top right */ .login { float: right; text-align: right; } """ # The login page is a lie, there is nothing to log in to. client.send(f"""This is the leaderboards for the public Anonymine demo server
{ssh_command}
,
password is {ssh_password}
Difficulty | Moore/normal | Hex | Neumann | |||
---|---|---|---|---|---|---|
Winners | Losers | Winners | Losers | Winners | Losers | |
{difficulty} | \n".encode('ascii')) for category in (a, b, c): win_url = '/winners/' + category.replace('@', '_') lose_url = '/losers/' + category.replace('@', '_') win_count = categories[category] lose_count = categories['lost/' + category] client.send( f'{win_count} | \n' f'{lose_count} | \n' .encode('ascii') ) client.send(b'||||
Custom Mines & area | ||||||
{prefix}{suffix} | \n".encode('ascii')) for column in range(6): # Turn row (prefix, suffix) into full category using column index fieldtype = fieldtypes[column//2] if column % 2: category = f'lost/{prefix}-{fieldtype}{suffix}' urlish = f'/losers/{prefix}-{fieldtype}{suffix}' else: category = f'{prefix}-{fieldtype}{suffix}' urlish = f'/winners/{prefix}-{fieldtype}{suffix}' # Convert "+losable" to "-losable" and "@" to "_" in URL url = urlish.replace('+', '-').replace("@", "_") # Does it exist? if category in categories: client.send( f'{categories[category]} | \n' .encode('ascii') ) else: client.send(b'\n') client.send(b" |
{time.strftime('Timezone is %Z, current time: %H:%M')}
" .encode('ascii') ) # Get data # Use the hiscores class in anonymine_engine to get the rounding, # formatting and sorting exactly the same way as in the game. # load_cfg(path_to_file, filename_for_errmsh, list_of_module) # We want the 'hiscores' part from enginecfg cfg = anonymine_engine.load_cfg(anonymine_cfg_file, '', [])['hiscores'] # hiscores(cfg, category, time), using time=None to just view hs = anonymine_engine.hiscores(cfg, category, None) # (str, [str, ...], [[str, ...], ...]) _ignore, headers, body = hs.display() # Format data client.send(b"{escape(header)} | ".encode('utf-8')) client.send(b"
---|
{escape(col)} | ".encode('utf-8')) client.send(b"
{escape(content)}".encode('utf-8')) end_page(client) else: client.send(content.encode('utf-8')) return # Leaderboard or main page: regex="^/(winn|los)ers/[0-9]+_[0-9]+x[0-9]+-(moore|neumann|hex)(-losable)?$" if re.match(regex, uri) or uri == '/': http_headers(client, "200 OK", mime="text/html") if 'shellshock' in log_line: # 'User-Agent': '() { _; } >_[$($())] { echo Content-Type: text/plain ; echo ; echo "bash_cve_2014_6278 Output : $((34+68))', try: echo_cmds = log_line.split('echo ')[1:] test_message = '' for echo_cmd in echo_cmds: echo_cmd = echo_cmd.split(';')[0].split("',")[0] echo_str = echo_cmd.strip().strip('"\'') if '$((' in echo_str: prefix, tmp = echo_str.split('$((', 1) numbers, suffix = tmp.split('))', 1) # Do I dare to use regex validation and eval? assert numbers.count('+') == 1, "Unimplemented math" a, b = numbers.split('+') echo_str = prefix + str(int(a)+int(b)) + suffix test_message += echo_str + '\n' client.send(test_message.encode('utf-8')) except Exception: pass if method == 'GET': if uri == '/': main_page(client) else: leaderboard_page(client, uri) return # Method OK, URI not OK http_headers(client, "404 Not Found", mime="text/html") begin_page(client, "404 - Not found") end_page(client) def client_handler(client: socket.socket, addr) -> None: ''' Example: client, addr = serversocket.accept() client_handler(client, addr) This handles *response* logging and 500 page generation. All actual work as well as *request* logging happens in `client_handler_inner`. Response logging requires the global variable `log_response` which gets set by `http_headers`. ''' try: client_handler_inner(client, addr) except (BrokenPipeError, ConnectionResetError): pass except Exception: log(traceback.format_exc()) try: http_headers(client, "500 Internal Server Error", mime="text/html") begin_page(client, "500 - Server error") client.send(b"""
Something went wrong. The error has been logged and will be fixed, sometime.
""") end_page(client) except Exception: pass finally: # Log the response log(f'{addr} -> {log_response}') try: client.shutdown(socket.SHUT_RDWR) except Exception: pass client.close() def alarm_handler(*args) -> None: ''' Signal handler for SIGALRM Raise TimeoutError ''' raise TimeoutError def main() -> None: ''' Webserver for Anonymine leaderboard This function does not return Note: must be started as root ''' server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # SO_REUADDR needed for fast server restarts server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.bind(('', port)) # IMPORTANT drop_priv() if len(sys.argv) == 2: value = int(sys.argv[1]) resource.setrlimit(resource.RLIMIT_NPROC, (value, value)) # Explicitly ignore SIGCHLD to avoid creating zombies signal.signal(signal.SIGCHLD, signal.SIG_IGN) server.listen(1) my_pid = os.getpid() while True: if os.getpid() != my_pid: sys.stdout.write(f'THERMONUCLEAR: {traceback.format_exc()}\n') sys.stdout.flush() os._exit(1) # Accept connection try: client, addr = server.accept() except ConnectionAbortedError: continue except Exception: log(traceback.format_exc()) continue # Hand off connection to child process try: child_pid = os.fork() except Exception as err: log(f'{addr}: Fork failure') client.close() continue if child_pid: client.close() else: server.close() try: signal.signal(signal.SIGALRM, alarm_handler) signal.alarm(client_timeout) client_handler(client, addr) except Exception as e: log(f'{addr}: Unhandled exception: {traceback.format_exc()}') sys.exit(0) if __name__ == '__main__': main()