diff --git a/.gitignore b/.gitignore index ca3c9ea..957f0bb 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ .idea *.swp +__pycache__ +dummy.py diff --git a/mail4one/__init__.py b/mail4one/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mail4one/pop3.py b/mail4one/pop3.py new file mode 100644 index 0000000..1c416d8 --- /dev/null +++ b/mail4one/pop3.py @@ -0,0 +1,172 @@ +import asyncio +import ssl +from _contextvars import ContextVar +from pathlib import Path +import logging + +from .poputils import * + +reader: ContextVar[asyncio.StreamReader] = ContextVar("reader") +writer: ContextVar[asyncio.StreamWriter] = ContextVar("writer") + + +def write(data): + logging.debug(f"Server: {data}") + writer.get().write(data) + + +async def next_req(): + for _ in range(InvalidCommand.RETRIES): + line = await reader.get().readline() + logging.debug(f"Client: {line}") + if not line: + continue + try: + request = parse_command(line) + except InvalidCommand: + write(err("Bad command")) + else: + if request.cmd == Command.QUIT: + raise ClientQuit + return request + else: + raise ClientError(f"Bad command {InvalidCommand.RETRIES} times") + + +async def expect_cmd(*commands: Command): + cmd = await next_req() + if cmd.cmd not in commands: + logging.error(f"{cmd.cmd} is not in {commands}") + raise ClientError + return cmd + + +def validate_user_and_pass(username, password): + if username != password: + raise AuthError("Invalid user pass") + + +async def handle_user_pass_auth(user_cmd): + username = user_cmd.arg1 + if not username: + raise AuthError("Invalid USER command. username empty") + write(ok("Welcome")) + cmd = await expect_cmd(Command.PASS) + password = cmd.arg1 + validate_user_and_pass(username, password) + write(ok("Good")) + return username, password + + +async def auth_stage(): + write(ok("Server Ready")) + for _ in range(AuthError.RETRIES): + try: + req = await expect_cmd(Command.USER, Command.CAPA) + if req.cmd is Command.CAPA: + write(ok("Following are supported")) + write(msg("USER")) + write(end()) + else: + username, password = await handle_user_pass_auth(req) + logging.info(f"User: {username} has logged in successfully") + return username + except AuthError: + write(err("Wrong auth")) + except ClientQuit: + write(ok("Bye")) + logging.info("Client has QUIT") + raise + else: + raise ClientError("Failed to authenticate") + + +MAILS_PATH = "" +WAIT_FOR_PRIVILEGES_TO_DROP = None + + +async def transaction_stage(user: User): + logging.debug(f"Entering transaction stage for {user}") + deleted_message_ids = [] + mailbox = MailStorage(MAILS_PATH) + mails_list = mailbox.get_mails_list() + mails_map = {str(entry.nid): entry for entry in mails_list} + while True: + try: + req = await next_req() + logging.debug(f"Request: {req}") + if req.cmd is Command.CAPA: + write(ok("No CAPA")) + write(end()) + elif req.cmd is Command.STAT: + num, size = mailbox.get_mailbox_size() + write(ok(f"{num} {size}")) + elif req.cmd is Command.LIST: + if req.arg1: + write(ok(f"{req.arg1} {mails_map[req.arg1].size}")) + else: + write(ok("Mails follow")) + for entry in mails_list: + write(msg(f"{entry.nid} {entry.size}")) + write(end()) + elif req.cmd is Command.UIDL: + if req.arg1: + write(ok(f"{req.arg1} {mails_map[req.arg1].uid}")) + else: + write(ok("Mails follow")) + for entry in mails_list: + write(msg(f"{entry.nid} {entry.uid}")) + write(end()) + await writer.get().drain() + elif req.cmd is Command.RETR: + if req.arg1 not in mails_map: + write(err("Not found")) + else: + write(ok("Contents follow")) + write(mailbox.get_mail(mails_map[req.arg1])) + write(end()) + await writer.get().drain() + else: + write(err("Not implemented")) + except ClientQuit: + write(ok("Bye")) + return deleted_message_ids + + +def delete_messages(delete_ids): + logging.info(f"Client deleted these ids {delete_ids}") + + +async def new_session(stream_reader: asyncio.StreamReader, stream_writer: asyncio.StreamWriter): + if WAIT_FOR_PRIVILEGES_TO_DROP: + await WAIT_FOR_PRIVILEGES_TO_DROP + reader.set(stream_reader) + writer.set(stream_writer) + logging.info(f"New session started with {stream_reader} and {stream_writer}") + try: + username: User = await auth_stage() + delete_ids = await transaction_stage(username) + delete_messages(delete_ids) + except ClientQuit: + pass + except ClientError as c: + write(err("Something went wrong")) + logging.error(f"Unexpected client error", c) + except: + logging.error(f"Serious client error") + raise + finally: + stream_writer.close() + + +async def a_main(dirpath: Path, port: int, host="", context: ssl.SSLContext = None, waiter=None): + global MAILS_PATH, WAIT_FOR_PRIVILEGES_TO_DROP + MAILS_PATH = dirpath / 'new' + WAIT_FOR_PRIVILEGES_TO_DROP = waiter + server = await asyncio.start_server(new_session, host=host, port=port, ssl=context) + await server.serve_forever() + + +if __name__ == "__main__": + # noinspection PyTypeChecker + asyncio.run(a_main(Path("/tmp/mails"), 9995)) diff --git a/mail4one/poputils.py b/mail4one/poputils.py new file mode 100644 index 0000000..49a2778 --- /dev/null +++ b/mail4one/poputils.py @@ -0,0 +1,129 @@ +import os +from dataclasses import dataclass +from enum import Enum, auto +from typing import NewType, List + + +class ClientError(Exception): + pass + + +class ClientQuit(ClientError): + pass + + +class InvalidCommand(ClientError): + RETRIES = 3 + """WIll allow NUM_BAD_COMMANDS times""" + pass + + +class AuthError(ClientError): + RETRIES = 3 + pass + + +User = NewType('User', str) + + +class Command(Enum): + USER = auto() + PASS = auto() + CAPA = auto() + QUIT = auto() + LIST = auto() + UIDL = auto() + RETR = auto() + DELE = auto() + STAT = auto() + + +@dataclass +class Request: + cmd: Command + arg1: str = "" + arg2: str = "" + rest: str = "" + + +def ok(arg): + return f"+OK {arg}\r\n".encode() + + +def msg(arg: str): + return f"{arg}\r\n".encode() + + +def end(): + return b".\r\n" + + +def err(arg): + return f"-ERR {arg}\r\n".encode() + + +def parse_command(line: bytes) -> Request: + line = line.decode() + if not line.endswith("\r\n"): + raise ClientError("Invalid line ending") + + parts = line.rstrip().split(maxsplit=3) + if not parts: + raise InvalidCommand("No command found") + + cmd_str, *parts = parts + try: + cmd = Command[cmd_str] + except KeyError: + raise InvalidCommand(cmd_str) + + request = Request(cmd) + if parts: + request.arg1, *parts = parts + if parts: + request.arg2, *parts = parts + if parts: + request.rest, = parts + return request + + +def files_in_path(path): + for _, _, files in os.walk(path): + return [(f, os.path.join(path, f)) for f in files] + + +@dataclass +class MailEntry: + uid: str + size: int + c_time: float + path: str + nid: int = 0 + + def __init__(self, filename, path): + self.uid = filename + stats = os.stat(path) + self.size = stats.st_size + self.c_time = stats.st_ctime + self.path = path + + +class MailStorage: + def __init__(self, dirpath: str): + self.dirpath = dirpath + self.files = files_in_path(self.dirpath) + self.entries = [MailEntry(filename, path) for filename, path in self.files] + self.entries = sorted(self.entries, reverse=True, key=lambda e: e.c_time) + for i, entry in enumerate(self.entries, start=1): + entry.nid = i + + def get_mailbox_size(self) -> (int, int): + return len(self.entries), sum(entry.size for entry in self.entries) + + def get_mails_list(self) -> List[MailEntry]: + return self.entries + + @staticmethod + def get_mail(entry: MailEntry) -> bytes: + with open(entry.path, mode='rb') as fp: + return fp.read() diff --git a/mail4one/server.py b/mail4one/server.py index 914a66c..9f5047f 100644 --- a/mail4one/server.py +++ b/mail4one/server.py @@ -1,4 +1,8 @@ import asyncio +# Though we don't use requests, without the below import, we crash https://stackoverflow.com/a/13057751 +# When running on privilege port after dropping privileges. +# noinspection PyUnresolvedReferences +import encodings.idna import io import logging import mailbox @@ -9,18 +13,12 @@ from argparse import ArgumentParser from functools import partial from pathlib import Path -# Though we don't use requests, without the below import, we crash https://stackoverflow.com/a/13057751 -# When running on privilege port after dropping privileges. -# noinspection PyUnresolvedReferences -import encodings.idna - from aiosmtpd.controller import Controller from aiosmtpd.handlers import Mailbox from aiosmtpd.main import DATA_SIZE_DEFAULT from aiosmtpd.smtp import SMTP - -# from pop3 import a_main +from .pop3 import a_main as pop3_main def create_tls_context(certfile, keyfile): @@ -41,6 +39,9 @@ class STARTTLSController(Controller): async def create_future(self): self.has_privileges_dropped = asyncio.get_event_loop().create_future() + async def wait_for_privileges_to_drop(self): + await self.has_privileges_dropped + def factory(self): if not self.has_privileges_dropped.done(): # Ideally we should await here. But this is callback and not a coroutine @@ -80,7 +81,8 @@ def parse_args(): # Hardcoded args args.host = '0.0.0.0' - args.port = 25 + args.smtp_port = 25 + args.pop_port = 995 args.size = DATA_SIZE_DEFAULT args.classpath = MailboxCRLF args.smtputf8 = True @@ -95,7 +97,7 @@ def setup_logging(args): logging.basicConfig(level=logging.INFO) -def drop_privileges(future): +def drop_privileges(future_cb): try: import pwd except ImportError: @@ -109,7 +111,7 @@ def drop_privileges(future): logging.error("Cannot setuid nobody; run as root") sys.exit(1) logging.info("Dropped privileges") - future.set_result("Go!") + future_cb().set_result("Go!") logging.debug("Signalled! Clients can come in") @@ -121,15 +123,17 @@ def main(): handler = args.classpath(args.mail_dir_path) loop = asyncio.get_event_loop() loop.set_debug(args.debug) - # loop.create_task(a_main(args.mail_dir_path, tls_context)) controller = STARTTLSController( - handler, tls_context=tls_context, smtp_args=smtp_args, hostname=args.host, port=args.port, loop=loop) + handler, tls_context=tls_context, smtp_args=smtp_args, hostname=args.host, port=args.smtp_port, loop=loop) loop.create_task(controller.create_future()) + loop.create_task(pop3_main(args.mail_dir_path, args.pop_port, + host=args.host, context=tls_context, waiter=controller.wait_for_privileges_to_drop)) controller.start() - loop.call_soon_threadsafe(partial(drop_privileges, controller.has_privileges_dropped)) - input('Press enter to stop:') + loop.call_soon_threadsafe(partial(drop_privileges, lambda: controller.has_privileges_dropped)) + logging.info("Server started. Press [ENTER] to stop") + input() controller.stop() # loop.create_task(a_main(controller)) # loop.run_forever() diff --git a/run.py b/run.py new file mode 100644 index 0000000..cd0035a --- /dev/null +++ b/run.py @@ -0,0 +1,3 @@ +from mail4one.server import main + +main()