Login

quine

Back to main page

-> Plain text

#!/usr/bin/env python3

# Anonymine leaderboard web server
# Usage:
#   sudo leaderboard [nproc-ulimit] >logfile

# Standard stuff: Python 3 on unix-like OS
import grp
import os
import re
import pwd
import resource
import signal
import socket
import sys
import time
import traceback


# Non-standard stuff:
# https://gitlab.com/oskog97/anonymine.git
# May require `python3 symlinks install` after installation
import anonymine_engine

try:
    from protodetect import format_request
except ModuleNotFoundError:
    def format_request(bytestring):
        try:
            log_line = repr(bytestring)
            decoded = bytestring.decode('ascii')
            lines = decoded.replace('\r\n', '\n').split('\n')[:-1]
            log_line = repr(lines)[1:-1]
        except Exception:
            pass
        finally:
            return log_line


# Server configuation
# Starts as root to bind to port 80
# Note: Starting as appropriate unprivileged user is NOT implemented
port = 80

# This depends on the Anonymine installation, check with
# `make print-destinations`
anonymine_cfg_file = "/etc/anonymine/enginecfg"
hiscore_file = "/var/games/anonymine"

# Static files for trolling bots
troll_dir = "/var/troll"

# This is displayed on the homepage
ssh_command = "ssh play@anonymine-demo.oskog97.com"
ssh_password = "play"

client_timeout = 60


def drop_priv(user="www", group="www") -> None:
    '''
    Drop priviliges.  Only meant to be called if running as root.

    Raises OSError on failure to change UIDs and GIDs to supplied
    user and group.
    '''
    uid = pwd.getpwnam(user).pw_uid
    gid = grp.getgrnam(group).gr_gid
    
    os.initgroups(user, gid)
    os.setresgid(gid, gid, gid)
    os.setresuid(uid, uid, uid)
    
    if os.getresuid() != (uid, uid, uid):
        raise OSError("Failed to set UID")
    if os.getresgid() != (gid, gid, gid):
        raise OSError("Failed to set UID")
    if os.getgroups() != [gid]:
        raise OSError("Failed to get rid of groups")


def log(str):
    timestamp = time.strftime("[%Y-%m-%d %H:%M:%S %Z]", time.gmtime())
    print(timestamp + ' ' + str, flush=True)


# global, set by http_headers and read by client_handler
log_response = "[No response]"

def http_headers(client: socket.socket, status: str, **kwargs) -> None:
    '''
    Write all the HTTP headers and the blank line.
    `status` is a string such as "200 OK" or "404 Not Found"

    `kwargs` contains all the extra headers to set.
    The keyword argument 'mime' sets the Content-Type with charset UTF-8.

    Example:
        http_headers(socket, "200 OK", mime="text/html")
    '''
    client.send(f"HTTP/1.0 {status}\r\n".encode('ascii'))
    client.send(b"Server: Anonymine leaderboard\r\n")
    if 'mime' in kwargs:
        client.send(
            f"Content-Type: {kwargs['mime']}; charset=UTF-8\r\n".encode('ascii')
        )
        del kwargs['mime']
    for kwarg in kwargs:
        header = kwarg.replace('_', '-')
        client.send(f"{header}: {kwargs[kwarg]}\r\n".encode('ascii'))
    client.send(b"\r\n")
    # Save status after successfully sending for loggin
    # Can't print here because 400 responses will appear before the request line
    global log_response
    log_response = status


def escape(s: str) -> str:
    '''
    Return `str` escaped for inclusion in HTML

    Example:
        escape('<script>alert(1)</script>')
        -> '&lt;script&gt;alert(1)&lt;/script&gt;'
    '''
    replace = [
        ('&', '&amp;'),     # MUST be first
        ('<', '&lt;'),
        ('>', '&gt;'),
        ('"', '&quot;'),
        ("'", '&apos;'),
    ]
    for from_str, to_str in replace:
        s = s.replace(from_str, to_str)
    return s


def begin_page(client: socket.socket, title: str) -> None:
    '''
    Begin writing HTML document

    Example:
        http_headers(socket, "200 OK", mime="text/html")
        begin_page(socket, "Hello world")
        # Content inside <body>
        end_page(socket)
    '''
    style="""
        /* Color scheme */
        body {
            color: #44ee11;
            background-color: #080400;
        }
        tr:nth-child(odd) {
            background-color: #201008;
        }
        a:link {
            color: #dddd88;
        }
        a:visited {
            color: #999955;
        }
        a {
            /* text-decoration: none; */
            font-style: italic;
        }
        /* code */
        code {
            font-family: monospace;
            color: #cccccc;
            background-color: #202030;
        }
        /* Table */
        table {
            border-collapse: collapse;
        }
        td, th {
            border: 1px solid #505;
            padding-left: .5em;
            padding-right: .5em;
        }
        /* Big number in main table */
        #leaderboard-index td {
            font-size: 150%;
            text-align: center;
        }
        /* Login button on top right */
        .login {
            float: right;
            text-align: right;
        }
    """
    # The login page is a lie, there is nothing to log in to.
    client.send(f"""<!DOCTYPE html>
<html><head>
    <meta charset="utf-8"/>
    <title>{title}</title>
    <meta name="viewport" content="width=device-width"/>
    <style>{style}</style>
</head><body>
    <p class="login"><a href="/login">Login</a></p>
    <h1>{title}</h1>
""".encode('utf-8'))


def end_page(client: socket.socket) -> None:
    '''
    Finish writing HTML document

    Example:
        http_headers(socket, "200 OK", mime="text/html")
        begin_page(socket, "Hello world")
        # Content inside <body>
        end_page(socket)
    '''
    client.send(b"</body></html>\n")


def main_page(client: socket.socket) -> None:
    '''
    This generates the HTML page for '/'
    Called by client_handler after setting HTTP headers
    '''
    begin_page(client, "Anonymine leaderboards")
    client.send(f"""
        <p>This is the leaderboards for the public Anonymine demo server</p>
        <ul>
            <li>
                To play on public server: <code>{ssh_command}</code>,
                password is <code>{ssh_password}</code>
            </li>
            <li>
                <a href="https://oskog97.com/projects/anonymine/"
                >-&gt; Info page, and download</a>
            </li>
        </ul>
        <h2>Leaderboards</h2>
        <table id="leaderboard-index">
            <tr>
                <th rowspan="2">Mines &amp; area</th>
                <th colspan="2">Moore/normal</th>
                <th colspan="2">Hex</th>
                <th colspan="2">Neumann</th>
            </tr>
            <tr>
                <th>Winners</th><th>Losers</th>
                <th>Winners</th><th>Losers</th>
                <th>Winners</th><th>Losers</th>
            </tr>\n"""
        .encode('utf-8')
    )
    
    # What leaderboards exist
    # Using dictionaries instead of sets to keep count of number of hits
    categories = {}
    rows = {}    # (mines + '@' + width + 'x' + height, '+losable' or '')
    lines = filter(None, open(hiscore_file).read().split('\n'))
    for line in lines:
        # Add category (table cell) to set
        category = line.split(':')[0]
        if category not in categories:
            categories[category] = 1
        else:
            categories[category] += 1

        # Which row is this?
        # Winners and losers on the same row
        if category.startswith('lost/'):
            category = category.split('/')[1]
        prefix = category.split('-')[0]
        # Separate rows for +losable
        if category.endswith('+losable'):
            row = (prefix, '+losable')
        else:
            row = (prefix, '')
        # Add row to set
        if row not in rows:
            rows[row] = 1
        else:
            rows[row] += 1
    
    # Print table body
    fieldtypes = ['moore', 'hex', 'neumann']
    for prefix, suffix in sorted(rows, key=lambda x: rows[x], reverse=True):
        client.send(f"<tr>\n  <th>{prefix}{suffix}</th>\n".encode('ascii'))
        for column in range(6):
            # Turn row (prefix, suffix) into full category using column index
            fieldtype = fieldtypes[column//2]
            if column % 2:
                category = f'lost/{prefix}-{fieldtype}{suffix}'
                urlish = f'/losers/{prefix}-{fieldtype}{suffix}'
            else:
                category = f'{prefix}-{fieldtype}{suffix}'
                urlish = f'/winners/{prefix}-{fieldtype}{suffix}'
            # Convert "+losable" to "-losable" and "@" to "_" in URL
            url = urlish.replace('+', '-').replace("@", "_")

            # Does it exist?
            if category in categories:
                client.send(
                    f'  <td><a href="{url}">{categories[category]}</a></td>\n'
                    .encode('ascii')
                )
            else:
                client.send(b'  <td></td>\n')
        client.send(b"</tr>\n")
    client.send(b"</table>\n")

    client.send(b'<ul>\n')
    client.send(b'<li><a href="/quine">Leaderboard source code</a></li>\n')
    client.send(b'<li><a href="/raw">Raw highscores file</a></li>\n')
    client.send(b'</ul>\n')
    end_page(client)


def leaderboard_page(client: socket.socket, uri: str) -> None:
    '''
    This generates the HTML page for individual leaderboards
    Called by client_handler after setting HTTP headers
    '''
    # Transform uri into category
    replace = [
        ('/winners/', ''),
        ('/losers/',  'lost/'),
        ('-losable',  '+losable'),
        ('_',         '@'),
    ]
    category = uri
    for from_str, to_str in replace:
        category = category.replace(from_str, to_str)
    begin_page(client, f"Highscores for {category}")
    client.send(b'<p><a href="/">Back to main page</a></p>\n')
    client.send(
        f"<p>{time.strftime('Timezone is %Z, current time: %H:%M')}</p>"
        .encode('ascii')
    )

    # Get data
    # Use the hiscores class in anonymine_engine to get the rounding,
    # formatting and sorting exactly the same way as in the game.

    # load_cfg(path_to_file, filename_for_errmsh, list_of_module)
    # We want the 'hiscores' part from enginecfg
    cfg = anonymine_engine.load_cfg(anonymine_cfg_file, '', [])['hiscores']
    
    # hiscores(cfg, category, time), using time=None to just view
    hs = anonymine_engine.hiscores(cfg, category, None)
    # (str, [str, ...], [[str, ...], ...])
    _ignore, headers, body = hs.display()

    # Format data
    client.send(b"<table>\n<tr>")
    for header in headers:
        client.send(f"<th>{escape(header)}</th>".encode('utf-8'))
    client.send(b"</tr>\n")
    for row in body:
        client.send(b"<tr>")
        for col in row:
            client.send(f"<td>{escape(col)}</td>".encode('utf-8'))
        client.send(b"</tr>\n")
    client.send(b"</table>\n")
    end_page(client)


def client_handler_inner(client: socket.socket, addr) -> None:
    '''
    This does most of the job of client_handler, but it doesn't
    catch internal errors and generate 500 error pages, nor does it
    log the response status.
    '''
    # Get the request and log it, send 400 message if needed
    log_line = "(No input)"
    buf = b''
    max_recv = 1500
    try:
        buf = client.recv(max_recv)
        log_line = format_request(buf)
        
        #decoded = buf.decode('ascii', errors='surrogateescape')
        decoded = buf.decode('ascii')
        lines = decoded.replace('\r\n', '\n').split('\n')
        request_line = lines[0]

        method, uri, proto_version = request_line.split(' ')
    except (ConnectionResetError, OSError):
        return
    except Exception as err:
        # Generate 400 error
        #log_line += ' -- ' + repr(err)
        try:
            http_headers(client, "400 Bad Request")
        except BrokenPipeError:
            pass
        except Exception:
            log(traceback.format_exc())
        return
    finally:
        # Log the request
        if len(buf) == max_recv:
            log_line += ' (TRUNCATED)'
        log(f'{addr} {log_line}')

    # Method checks
    # Only GET and HEAD is required
    if method == "OPTIONS":
        http_headers(client, "204 No Content", Allow="GET, HEAD, OPTIONS")
        return
    if method not in ("GET", "HEAD"):
        http_headers(client, "501 Not Implemented")
        return

    # URI checks:
    #   Rewrites
    #   301/pass Redirects
    #   200/pass Bot trolling and static plain text
    #   200/pass [/r]/raw or [/r]/quine
    #   200/pass main feature
    #   404

    # Regex to substitution URI
    rewrites = [
        # These first so any exploit will "work"
        (".*etc/passwd.*",                  "/s/passwd"),
        (".*etc/group.*",                   "/s/group"),
        # Order is important for login regexes:
        ("^/login\\?.*",                    "/rickroll"),
        (".*(login|admin).*",               "/s/login"),
        # /robots.txt
        ("^/robots\\.txt$",                 "/s/robots"),
        # Google verify
        ("^/google([0-9a-f]{16})\\.html$",  "/s/google\\1"),
    ]
    for regex, target in rewrites:
        if re.match(regex, uri):
            uri = re.sub(regex, target, uri)
            break

    # Absolute URI to Location
    redirects = {
        "/rickroll":    "https://www.youtube.com/watch?v=dQw4w9WgXcQ",
        # Favicon from main site
        "/favicon.ico": "https://oskog97.com/favicon.png",
        # Directories that don't actually exist:
        "/winners":     "/",
        "/winners/":    "/",
        "/losers":      "/",
        "/losers/":     "/",
        "/r":           "/",
        "/r/":          "/",
    }
    if uri in redirects:
        http_headers(client, "301 Moved Permanently", Location=redirects[uri])
        return

    # Bot trolling features / static files
    if re.match('^/s/[a-z0-9]+$', uri):
        troll_file = uri.split('/')[2]
        if '\0' in troll_file or '/' in troll_file or '.' in troll_file:
            raise AssertionError("Unsafe character found in troll_file")
        types = [
            ('',        'plain'),
            ('.inc',    'html-content'),
            ('.html',   'html-whole'),
        ]
        content = None
        for suffix, filetype in types:
            try:
                path = os.path.join(troll_dir, troll_file+suffix)
                content = open(path).read()
                break
            except OSError:
                pass
        if content is not None:
            if filetype == 'plain':
                http_headers(client, "200 OK", mime="text/plain",
                             X_Robots_Tag="none")
            if 'html' in filetype:
                http_headers(client, "200 OK", mime="text/html",
                             X_Robots_Tag="none")
                if filetype == 'html-content':
                    begin_page(client, troll_file)
            client.send(content.encode('utf-8'))
            if filetype == 'html-content':
                end_page(client)
            return
        
    if uri == '/exception-test':
        raise Exception('Test unhandled exception')
    if uri == '/hang-test':
        signal.pause()
    
    # Raw and quine features
    if uri in ('/r/raw', '/r/quine', '/raw', '/quine'):
        # Plain/HTML
        if uri.startswith('/r/'):
            mime = "text/plain"
        else:
            mime = "text/html"
        # Select content and set robots variable
        if uri.endswith('/quine'):
            content = open(sys.argv[0]).read()
            robots = "all"
        if uri.endswith('/raw'):
            content = open(hiscore_file).read()
            robots = "noindex"
        # Headers
        http_headers(client, "200 OK", mime=mime, X_Robots_Tag=robots)
        if method == 'HEAD':
            return
        # Output body
        if mime == 'text/html':
            begin_page(client, uri.split('/')[-1])
            client.send(b"<p><a href='/'>Back to main page</a></p>")
            client.send(f"<p><a href='/r{uri}'>-&gt; Plain text</a></p>"
                        .encode('utf-8'))
            client.send(f"<pre>{escape(content)}</pre>".encode('utf-8'))
            end_page(client)
        else:
            client.send(content.encode('utf-8'))
        return

    # Leaderboard or main page:
    regex="^/(winn|los)ers/[0-9]+_[0-9]+x[0-9]+-(moore|neumann|hex)(-losable)?$"
    if re.match(regex, uri) or uri == '/':
        http_headers(client, "200 OK", mime="text/html")
        if method == 'GET':    
            if uri == '/':
                main_page(client)
            else:
                leaderboard_page(client, uri)
        return

    # Method OK, URI not OK
    http_headers(client, "404 Not Found", mime="text/html")
    begin_page(client, "404 - Not found")
    end_page(client)


def client_handler(client: socket.socket, addr) -> None:
    '''
    Example:
        client, addr = serversocket.accept()
        client_handler(client, addr)

    This handles *response* logging and 500 page generation.  All actual
    work as well as *request* logging happens in `client_handler_inner`.

    Response logging requires the global variable `log_response` which
    gets set by `http_headers`.
    '''
    try:
        client_handler_inner(client, addr)
    except (BrokenPipeError, ConnectionResetError):
        pass
    except Exception:
        log(traceback.format_exc())
        try:
            http_headers(client, "500 Internal Server Error", mime="text/html")
            begin_page(client, "500 - Server error")
            client.send(b"""<p>
                Something went wrong.  The error has been logged and will be
                fixed, sometime.
            </p>""")
            end_page(client)
        except Exception:
            pass
    finally:
        # Log the response
        log(f'{addr} -> {log_response}')
        try:
            client.shutdown(socket.SHUT_RDWR)
        except Exception:
            pass
        client.close()


def alarm_handler(*args) -> None:
    '''
    Signal handler for SIGALRM
    Raise TimeoutError
    '''
    raise TimeoutError


def main() -> None:
    '''
    Webserver for Anonymine leaderboard
    This function does not return
    
    Note: must be started as root
    '''
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # SO_REUADDR needed for fast server restarts
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind(('', port))

    # IMPORTANT
    drop_priv()

    if len(sys.argv) == 2:
        value = int(sys.argv[1])
        resource.setrlimit(resource.RLIMIT_NPROC, (value, value))

    # Explicitly ignore SIGCHLD to avoid creating zombies
    signal.signal(signal.SIGCHLD, signal.SIG_IGN)
    
    server.listen(1)
    my_pid = os.getpid()
    while True:
        if os.getpid() != my_pid:
            sys.stdout.write(f'THERMONUCLEAR: {traceback.format_exc()}\n')
            sys.stdout.flush()
            os._exit(1)
        # Accept connection
        try:
            client, addr = server.accept()
        except ConnectionAbortedError:
            continue
        except Exception:
            log(traceback.format_exc())
            continue
        # Hand off connection to child process
        try:
            child_pid = os.fork()
        except Exception as err:
            log(f'{addr}: Fork failure')
            client.close()
            continue
        if child_pid:
            client.close()
        else:
            server.close()
            try:
                signal.signal(signal.SIGALRM, alarm_handler)
                signal.alarm(client_timeout)
                client_handler(client, addr)
            except Exception as e:
                log(f'{addr}: Unhandled exception: {traceback.format_exc()}')
            sys.exit(0)


if __name__ == '__main__':
    main()