migrate pop3

This commit is contained in:
Balakrishnan Balasubramanian 2018-12-18 18:36:13 -05:00
parent 588da38bee
commit 02efb009a8
6 changed files with 324 additions and 14 deletions

2
.gitignore vendored
View File

@ -1,2 +1,4 @@
.idea
*.swp
__pycache__
dummy.py

0
mail4one/__init__.py Normal file
View File

172
mail4one/pop3.py Normal file
View File

@ -0,0 +1,172 @@
import asyncio
import ssl
from _contextvars import ContextVar
from pathlib import Path
import logging
from .poputils import *
reader: ContextVar[asyncio.StreamReader] = ContextVar("reader")
writer: ContextVar[asyncio.StreamWriter] = ContextVar("writer")
def write(data):
logging.debug(f"Server: {data}")
writer.get().write(data)
async def next_req():
for _ in range(InvalidCommand.RETRIES):
line = await reader.get().readline()
logging.debug(f"Client: {line}")
if not line:
continue
try:
request = parse_command(line)
except InvalidCommand:
write(err("Bad command"))
else:
if request.cmd == Command.QUIT:
raise ClientQuit
return request
else:
raise ClientError(f"Bad command {InvalidCommand.RETRIES} times")
async def expect_cmd(*commands: Command):
cmd = await next_req()
if cmd.cmd not in commands:
logging.error(f"{cmd.cmd} is not in {commands}")
raise ClientError
return cmd
def validate_user_and_pass(username, password):
if username != password:
raise AuthError("Invalid user pass")
async def handle_user_pass_auth(user_cmd):
username = user_cmd.arg1
if not username:
raise AuthError("Invalid USER command. username empty")
write(ok("Welcome"))
cmd = await expect_cmd(Command.PASS)
password = cmd.arg1
validate_user_and_pass(username, password)
write(ok("Good"))
return username, password
async def auth_stage():
write(ok("Server Ready"))
for _ in range(AuthError.RETRIES):
try:
req = await expect_cmd(Command.USER, Command.CAPA)
if req.cmd is Command.CAPA:
write(ok("Following are supported"))
write(msg("USER"))
write(end())
else:
username, password = await handle_user_pass_auth(req)
logging.info(f"User: {username} has logged in successfully")
return username
except AuthError:
write(err("Wrong auth"))
except ClientQuit:
write(ok("Bye"))
logging.info("Client has QUIT")
raise
else:
raise ClientError("Failed to authenticate")
MAILS_PATH = ""
WAIT_FOR_PRIVILEGES_TO_DROP = None
async def transaction_stage(user: User):
logging.debug(f"Entering transaction stage for {user}")
deleted_message_ids = []
mailbox = MailStorage(MAILS_PATH)
mails_list = mailbox.get_mails_list()
mails_map = {str(entry.nid): entry for entry in mails_list}
while True:
try:
req = await next_req()
logging.debug(f"Request: {req}")
if req.cmd is Command.CAPA:
write(ok("No CAPA"))
write(end())
elif req.cmd is Command.STAT:
num, size = mailbox.get_mailbox_size()
write(ok(f"{num} {size}"))
elif req.cmd is Command.LIST:
if req.arg1:
write(ok(f"{req.arg1} {mails_map[req.arg1].size}"))
else:
write(ok("Mails follow"))
for entry in mails_list:
write(msg(f"{entry.nid} {entry.size}"))
write(end())
elif req.cmd is Command.UIDL:
if req.arg1:
write(ok(f"{req.arg1} {mails_map[req.arg1].uid}"))
else:
write(ok("Mails follow"))
for entry in mails_list:
write(msg(f"{entry.nid} {entry.uid}"))
write(end())
await writer.get().drain()
elif req.cmd is Command.RETR:
if req.arg1 not in mails_map:
write(err("Not found"))
else:
write(ok("Contents follow"))
write(mailbox.get_mail(mails_map[req.arg1]))
write(end())
await writer.get().drain()
else:
write(err("Not implemented"))
except ClientQuit:
write(ok("Bye"))
return deleted_message_ids
def delete_messages(delete_ids):
logging.info(f"Client deleted these ids {delete_ids}")
async def new_session(stream_reader: asyncio.StreamReader, stream_writer: asyncio.StreamWriter):
if WAIT_FOR_PRIVILEGES_TO_DROP:
await WAIT_FOR_PRIVILEGES_TO_DROP
reader.set(stream_reader)
writer.set(stream_writer)
logging.info(f"New session started with {stream_reader} and {stream_writer}")
try:
username: User = await auth_stage()
delete_ids = await transaction_stage(username)
delete_messages(delete_ids)
except ClientQuit:
pass
except ClientError as c:
write(err("Something went wrong"))
logging.error(f"Unexpected client error", c)
except:
logging.error(f"Serious client error")
raise
finally:
stream_writer.close()
async def a_main(dirpath: Path, port: int, host="", context: ssl.SSLContext = None, waiter=None):
global MAILS_PATH, WAIT_FOR_PRIVILEGES_TO_DROP
MAILS_PATH = dirpath / 'new'
WAIT_FOR_PRIVILEGES_TO_DROP = waiter
server = await asyncio.start_server(new_session, host=host, port=port, ssl=context)
await server.serve_forever()
if __name__ == "__main__":
# noinspection PyTypeChecker
asyncio.run(a_main(Path("/tmp/mails"), 9995))

129
mail4one/poputils.py Normal file
View File

@ -0,0 +1,129 @@
import os
from dataclasses import dataclass
from enum import Enum, auto
from typing import NewType, List
class ClientError(Exception):
pass
class ClientQuit(ClientError):
pass
class InvalidCommand(ClientError):
RETRIES = 3
"""WIll allow NUM_BAD_COMMANDS times"""
pass
class AuthError(ClientError):
RETRIES = 3
pass
User = NewType('User', str)
class Command(Enum):
USER = auto()
PASS = auto()
CAPA = auto()
QUIT = auto()
LIST = auto()
UIDL = auto()
RETR = auto()
DELE = auto()
STAT = auto()
@dataclass
class Request:
cmd: Command
arg1: str = ""
arg2: str = ""
rest: str = ""
def ok(arg):
return f"+OK {arg}\r\n".encode()
def msg(arg: str):
return f"{arg}\r\n".encode()
def end():
return b".\r\n"
def err(arg):
return f"-ERR {arg}\r\n".encode()
def parse_command(line: bytes) -> Request:
line = line.decode()
if not line.endswith("\r\n"):
raise ClientError("Invalid line ending")
parts = line.rstrip().split(maxsplit=3)
if not parts:
raise InvalidCommand("No command found")
cmd_str, *parts = parts
try:
cmd = Command[cmd_str]
except KeyError:
raise InvalidCommand(cmd_str)
request = Request(cmd)
if parts:
request.arg1, *parts = parts
if parts:
request.arg2, *parts = parts
if parts:
request.rest, = parts
return request
def files_in_path(path):
for _, _, files in os.walk(path):
return [(f, os.path.join(path, f)) for f in files]
@dataclass
class MailEntry:
uid: str
size: int
c_time: float
path: str
nid: int = 0
def __init__(self, filename, path):
self.uid = filename
stats = os.stat(path)
self.size = stats.st_size
self.c_time = stats.st_ctime
self.path = path
class MailStorage:
def __init__(self, dirpath: str):
self.dirpath = dirpath
self.files = files_in_path(self.dirpath)
self.entries = [MailEntry(filename, path) for filename, path in self.files]
self.entries = sorted(self.entries, reverse=True, key=lambda e: e.c_time)
for i, entry in enumerate(self.entries, start=1):
entry.nid = i
def get_mailbox_size(self) -> (int, int):
return len(self.entries), sum(entry.size for entry in self.entries)
def get_mails_list(self) -> List[MailEntry]:
return self.entries
@staticmethod
def get_mail(entry: MailEntry) -> bytes:
with open(entry.path, mode='rb') as fp:
return fp.read()

View File

@ -1,4 +1,8 @@
import asyncio
# Though we don't use requests, without the below import, we crash https://stackoverflow.com/a/13057751
# When running on privilege port after dropping privileges.
# noinspection PyUnresolvedReferences
import encodings.idna
import io
import logging
import mailbox
@ -9,18 +13,12 @@ from argparse import ArgumentParser
from functools import partial
from pathlib import Path
# Though we don't use requests, without the below import, we crash https://stackoverflow.com/a/13057751
# When running on privilege port after dropping privileges.
# noinspection PyUnresolvedReferences
import encodings.idna
from aiosmtpd.controller import Controller
from aiosmtpd.handlers import Mailbox
from aiosmtpd.main import DATA_SIZE_DEFAULT
from aiosmtpd.smtp import SMTP
# from pop3 import a_main
from .pop3 import a_main as pop3_main
def create_tls_context(certfile, keyfile):
@ -41,6 +39,9 @@ class STARTTLSController(Controller):
async def create_future(self):
self.has_privileges_dropped = asyncio.get_event_loop().create_future()
async def wait_for_privileges_to_drop(self):
await self.has_privileges_dropped
def factory(self):
if not self.has_privileges_dropped.done():
# Ideally we should await here. But this is callback and not a coroutine
@ -80,7 +81,8 @@ def parse_args():
# Hardcoded args
args.host = '0.0.0.0'
args.port = 25
args.smtp_port = 25
args.pop_port = 995
args.size = DATA_SIZE_DEFAULT
args.classpath = MailboxCRLF
args.smtputf8 = True
@ -95,7 +97,7 @@ def setup_logging(args):
logging.basicConfig(level=logging.INFO)
def drop_privileges(future):
def drop_privileges(future_cb):
try:
import pwd
except ImportError:
@ -109,7 +111,7 @@ def drop_privileges(future):
logging.error("Cannot setuid nobody; run as root")
sys.exit(1)
logging.info("Dropped privileges")
future.set_result("Go!")
future_cb().set_result("Go!")
logging.debug("Signalled! Clients can come in")
@ -121,15 +123,17 @@ def main():
handler = args.classpath(args.mail_dir_path)
loop = asyncio.get_event_loop()
loop.set_debug(args.debug)
# loop.create_task(a_main(args.mail_dir_path, tls_context))
controller = STARTTLSController(
handler, tls_context=tls_context, smtp_args=smtp_args, hostname=args.host, port=args.port, loop=loop)
handler, tls_context=tls_context, smtp_args=smtp_args, hostname=args.host, port=args.smtp_port, loop=loop)
loop.create_task(controller.create_future())
loop.create_task(pop3_main(args.mail_dir_path, args.pop_port,
host=args.host, context=tls_context, waiter=controller.wait_for_privileges_to_drop))
controller.start()
loop.call_soon_threadsafe(partial(drop_privileges, controller.has_privileges_dropped))
input('Press enter to stop:')
loop.call_soon_threadsafe(partial(drop_privileges, lambda: controller.has_privileges_dropped))
logging.info("Server started. Press [ENTER] to stop")
input()
controller.stop()
# loop.create_task(a_main(controller))
# loop.run_forever()

3
run.py Normal file
View File

@ -0,0 +1,3 @@
from mail4one.server import main
main()