From fc7ff2a2b60a0d28bc1cdbb3e3df353d43e31a07 Mon Sep 17 00:00:00 2001 From: Balakrishnan Balasubramanian Date: Wed, 7 Jun 2023 22:07:38 -0400 Subject: [PATCH] add return type hinting for pop3 --- mail4one/pop3.py | 48 ++++++++++++++++++++++++------------------------ 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/mail4one/pop3.py b/mail4one/pop3.py index 543f52f..b8d3389 100644 --- a/mail4one/pop3.py +++ b/mail4one/pop3.py @@ -14,10 +14,10 @@ from .poputils import InvalidCommand, parse_command, err, Command, ClientQuit, C Request, MailEntry, get_mail, get_mails_list, MailList -async def next_req(): +async def next_req() -> Request: for _ in range(InvalidCommand.RETRIES): line = await state().reader.readline() - logging.debug(f"Client: {line}") + # logging.debug(f"Client: {line}") if not line: continue try: @@ -32,7 +32,7 @@ async def next_req(): raise ClientError(f"Bad command {InvalidCommand.RETRIES} times") -async def expect_cmd(*commands: Command): +async def expect_cmd(*commands: Command) -> Request: req = await next_req() if req.cmd not in commands: logging.error(f"Unexpected command: {req.cmd} is not in {commands}") @@ -40,12 +40,12 @@ async def expect_cmd(*commands: Command): return req -def write(data): +def write(data) -> None: logging.debug(f"Server: {data}") state().writer.write(data) -def validate_password(username, password): +def validate_password(username, password) -> None: try: pwinfo, mbox = config().users[username] except: @@ -57,7 +57,7 @@ def validate_password(username, password): state().mbox = mbox -async def handle_user_pass_auth(user_cmd): +async def handle_user_pass_auth(user_cmd) -> None: username = user_cmd.arg1 if not username: raise AuthError("Invalid USER command. username empty") @@ -68,7 +68,7 @@ async def handle_user_pass_auth(user_cmd): logging.info(f"{username=} has logged in successfully") -async def auth_stage(): +async def auth_stage() -> None: write(ok("Server Ready")) for _ in range(AuthError.RETRIES): try: @@ -78,8 +78,8 @@ async def auth_stage(): write(msg("USER")) write(end()) else: - username = await handle_user_pass_auth(req) - if username in config().loggedin_users: + await handle_user_pass_auth(req) + if (username:=state().username) in config().loggedin_users: logging.warning( f"User: {username} already has an active session") raise AuthError("Already logged in") @@ -96,18 +96,18 @@ async def auth_stage(): raise ClientError("Failed to authenticate") -def trans_command_capa(_, __): +def trans_command_capa(_, __) -> None: write(ok("CAPA follows")) write(msg("UIDL")) write(end()) -def trans_command_stat(mails: MailList, _): +def trans_command_stat(mails: MailList, _) -> None: num, size = mails.compute_stat() write(ok(f"{num} {size}")) -def trans_command_list(mails: MailList, req: Request): +def trans_command_list(mails: MailList, req: Request) -> None: if req.arg1: entry = mails.get(req.arg1) if entry: @@ -121,7 +121,7 @@ def trans_command_list(mails: MailList, req: Request): write(end()) -def trans_command_uidl(mails: MailList, req: Request): +def trans_command_uidl(mails: MailList, req: Request) -> None: if req.arg1: entry = mails.get(req.arg1) if entry: @@ -135,7 +135,7 @@ def trans_command_uidl(mails: MailList, req: Request): write(end()) -def trans_command_retr(mails: MailList, req: Request): +def trans_command_retr(mails: MailList, req: Request) -> None: entry = mails.get(req.arg1) if entry: write(ok("Contents follow")) @@ -146,7 +146,7 @@ def trans_command_retr(mails: MailList, req: Request): write(err("Not found")) -def trans_command_dele(mails: MailList, req: Request): +def trans_command_dele(mails: MailList, req: Request) -> None: entry = mails.get(req.arg1) if entry: mails.delete(req.arg1) @@ -155,11 +155,11 @@ def trans_command_dele(mails: MailList, req: Request): write(err("Not found")) -def trans_command_noop(_, __): +def trans_command_noop(_, __) -> None: write(ok("Hmm")) -async def process_transactions(mails_list: list[MailEntry]): +async def process_transactions(mails_list: list[MailEntry]) -> set[str]: mails = MailList(mails_list) def reset(_, __): @@ -194,19 +194,19 @@ async def process_transactions(mails_list: list[MailEntry]): await state().writer.drain() -def get_deleted_items(deleted_items_path: Path): +def get_deleted_items(deleted_items_path: Path) -> set[str]: if deleted_items_path.exists(): with deleted_items_path.open() as f: return set(f.read().splitlines()) return set() -def save_deleted_items(deleted_items_path: Path, deleted_items: set[str]): +def save_deleted_items(deleted_items_path: Path, deleted_items: set[str]) -> None: with deleted_items_path.open(mode="w") as f: f.writelines(f"{did}\n" for did in deleted_items) -async def transaction_stage(): +async def transaction_stage() -> None: deleted_items_path = config().mails_path / state().mbox / state().username existing_deleted_items: set[str] = get_deleted_items(deleted_items_path) mails_list = [ @@ -215,7 +215,7 @@ async def transaction_stage(): if entry.uid not in existing_deleted_items ] - new_deleted_items: Set = await process_transactions(mails_list) + new_deleted_items: set[str] = await process_transactions(mails_list) logging.info(f"completed transactions. Deleted:{len(new_deleted_items)}") if new_deleted_items: save_deleted_items(deleted_items_path, @@ -224,7 +224,7 @@ async def transaction_stage(): logging.info(f"Saved deleted items") -async def start_session(): +async def start_session() -> None: logging.info("New session started") try: await auth_stage() @@ -243,7 +243,7 @@ async def start_session(): config().loggedin_users.remove(state().username) -def parse_users(users: list[User]): +def parse_users(users: list[User]) -> dict[str, tuple[PWInfo, str]]: def inner(): for user in users: @@ -315,7 +315,7 @@ async def create_pop_server(host: str, ssl=ssl_context) -async def a_main(*args, **kwargs): +async def a_main(*args, **kwargs) -> None: server = await create_pop_server(*args, **kwargs) await server.serve_forever()