mail4one/tests/test_pop.py

228 lines
6.0 KiB
Python
Raw Normal View History

2023-06-06 23:20:27 -04:00
import unittest
import asyncio
import logging
2023-06-10 23:28:45 -04:00
import tempfile
import time
import os
2023-06-13 21:12:46 -04:00
from mail4one.pop3 import create_pop_server
from mail4one.config import User
2023-06-10 22:20:36 -04:00
from pathlib import Path
2023-06-06 23:20:27 -04:00
2024-04-01 11:14:20 -04:00
TEST_HASH = "".join(
2024-04-01 17:45:37 -04:00
"""
2023-06-10 23:28:45 -04:00
AFTY5EVN7AX47ZL7UMH3BETYWFBTAV3XHR73CEFAJBPN2NIHPWD
ZHV2UQSMSPHSQQ2A2BFQBNC77VL7F2UKATQNJZGYLCSU6C43UQD
AQXWXSWNGAEPGIMG2F3QDKBXL3MRHY6K2BPID64ZR6LABLPVSF
2024-04-01 17:45:37 -04:00
""".split()
2024-04-01 11:14:20 -04:00
)
2023-06-06 23:20:27 -04:00
2024-04-01 11:14:20 -04:00
TEST_USER = "foobar"
TEST_MBOX = "foobar_mails"
2023-06-10 23:28:45 -04:00
USERS = [User(username=TEST_USER, password_hash=TEST_HASH, mbox=TEST_MBOX)]
MAILS_PATH: Path
TESTMAIL = b"""Message-ID: <N01BwLnh8dGBoD9gVz@msn.com>\r
From: from@msn.com\r
To: MddK0ftkv@outlook.com\r
Subject: hello lorem ipsum foo bar\r
Date: Mon, 24 Oct 2002 00:42:02 +0000\r
MIME-Version: 1.0\r
Content-Type: text/plain;\r
charset="windows-1251";\r
Content-Transfer-Encoding: 7bit\r
X-Peer: ('2.2.1.9', 64593)\r
X-MailFrom: from@msn.com\r
X-RcptTo: MddK0ftkv@outlook.com\r
\r
Hello bro\r
IlzVOJqu9Zp7twFAtzcV\r
yQVk36B0mGU2gtWxXLr\r
PeF0RtbI0mAuVPLQDHCi\r
2024-04-01 17:45:37 -04:00
\r
"""
2023-06-10 23:28:45 -04:00
def setUpModule() -> None:
global MAILS_PATH
logging.basicConfig(level=logging.CRITICAL)
td = tempfile.TemporaryDirectory(prefix="m41.pop.")
unittest.addModuleCleanup(td.cleanup)
MAILS_PATH = Path(td.name)
os.mkdir(MAILS_PATH / TEST_MBOX)
2024-04-01 11:14:20 -04:00
for md in ("new", "cur", "tmp"):
2023-06-10 23:28:45 -04:00
os.mkdir(MAILS_PATH / TEST_MBOX / md)
2024-04-01 11:14:20 -04:00
with open(MAILS_PATH / TEST_MBOX / "new/msg1.eml", "wb") as f:
2023-06-10 23:28:45 -04:00
f.write(TESTMAIL)
2024-04-01 11:14:20 -04:00
with open(MAILS_PATH / TEST_MBOX / "new/msg2.eml", "wb") as f:
2023-06-10 23:28:45 -04:00
f.write(TESTMAIL)
2023-06-11 08:53:36 -04:00
logging.debug(MAILS_PATH)
2023-06-06 23:20:27 -04:00
2023-06-10 23:28:45 -04:00
def tearDownModule():
pass
class TestPop3(unittest.IsolatedAsyncioTestCase):
2023-06-06 23:20:27 -04:00
2023-06-10 22:20:36 -04:00
async def asyncSetUp(self) -> None:
logging.debug("at asyncSetUp")
2024-04-01 11:14:20 -04:00
pop_server = await create_pop_server(
host="127.0.0.1", port=7995, mails_path=MAILS_PATH, users=USERS
)
2023-06-06 23:20:27 -04:00
self.task = asyncio.create_task(pop_server.serve_forever())
2024-04-01 11:14:20 -04:00
self.reader, self.writer = await asyncio.open_connection("127.0.0.1", 7995)
2023-06-06 23:20:27 -04:00
2023-06-10 22:20:36 -04:00
async def test_QUIT(self) -> None:
2023-06-06 23:20:27 -04:00
dialog = """
S: +OK Server Ready
C: QUIT
S: +OK Bye
"""
2023-06-07 22:47:56 -04:00
await self.dialog_checker(dialog)
2023-06-06 23:20:27 -04:00
2023-06-10 22:20:36 -04:00
async def test_BAD(self) -> None:
2023-06-06 23:20:27 -04:00
dialog = """
S: +OK Server Ready
C: HELO
S: -ERR Bad command
C: HEYA
S: -ERR Bad command
C: LIST
S: -ERR Something went wrong
C: HELO
"""
2023-06-07 22:47:56 -04:00
await self.dialog_checker(dialog)
2023-06-06 23:20:27 -04:00
# TODO fix
# self.assertTrue(reader.at_eof(), "server should close the connection")
2023-06-10 22:20:36 -04:00
async def do_login(self) -> None:
2023-06-07 22:47:56 -04:00
dialog = """
S: +OK Server Ready
C: USER foobar
S: +OK Welcome
C: PASS helloworld
S: +OK Login successful
2023-06-10 22:20:36 -04:00
"""
await self.dialog_checker(dialog)
async def test_AUTH(self) -> None:
await self.do_login()
dialog = """
2023-06-07 22:47:56 -04:00
C: QUIT
S: +OK Bye
"""
await self.dialog_checker(dialog)
2023-06-10 22:20:36 -04:00
async def test_dupe_AUTH(self) -> None:
2024-04-01 11:14:20 -04:00
r1, w1 = await asyncio.open_connection("127.0.0.1", 7995)
r2, w2 = await asyncio.open_connection("127.0.0.1", 7995)
2023-06-07 22:47:56 -04:00
dialog = """
S: +OK Server Ready
C: USER foobar
S: +OK Welcome
C: PASS helloworld
"""
await self.dialog_checker_impl(r1, w1, dialog)
await self.dialog_checker_impl(r2, w2, dialog)
d1 = """S: +OK Login successful"""
d2 = """S: -ERR Auth Failed: Already logged in"""
await self.dialog_checker_impl(r1, w1, d1)
await self.dialog_checker_impl(r2, w2, d2)
end_dialog = """
C: QUIT
S: +OK Bye
"""
await self.dialog_checker_impl(r1, w1, end_dialog)
await self.dialog_checker_impl(r2, w2, end_dialog)
2023-06-10 22:20:36 -04:00
async def test_STAT(self) -> None:
await self.do_login()
dialog = """
C: STAT
2023-06-10 23:28:45 -04:00
S: +OK 2 872
2023-06-10 22:20:36 -04:00
"""
await self.dialog_checker(dialog)
2023-06-11 08:53:36 -04:00
async def test_NOOP(self) -> None:
await self.do_login()
dialog = """
C: NOOP
S: +OK Hmm
"""
await self.dialog_checker(dialog)
async def test_LIST(self) -> None:
await self.do_login()
dialog = """
C: LIST
S: +OK Mails follow
S: 1 436
S: 2 436
S: .
"""
await self.dialog_checker(dialog)
async def test_UIDL(self) -> None:
await self.do_login()
dialog = """
C: UIDL
S: +OK Mails follow
S: 1 msg2.eml
S: 2 msg1.eml
S: .
"""
await self.dialog_checker(dialog)
async def test_RETR(self) -> None:
await self.do_login()
dialog = """
C: RETR 1
S: +OK Contents follow
"""
for l in TESTMAIL.splitlines():
dialog += f"S: {l.decode()}\n"
dialog += "S: ."
await self.dialog_checker(dialog)
2023-06-10 22:20:36 -04:00
async def test_CAPA(self) -> None:
2023-06-06 23:20:27 -04:00
dialog = """
S: +OK Server Ready
C: CAPA
S: +OK Following are supported
S: USER
S: .
C: QUIT
S: +OK Bye
"""
2023-06-07 22:47:56 -04:00
await self.dialog_checker(dialog)
2023-06-06 23:20:27 -04:00
2023-06-10 22:20:36 -04:00
async def asyncTearDown(self) -> None:
logging.debug("at teardown")
2023-06-07 22:47:56 -04:00
self.writer.close()
await self.writer.wait_closed()
2023-06-06 23:20:27 -04:00
self.task.cancel("test done")
2023-06-10 22:20:36 -04:00
async def dialog_checker(self, dialog: str) -> None:
2023-06-07 22:47:56 -04:00
await self.dialog_checker_impl(self.reader, self.writer, dialog)
2024-04-01 11:14:20 -04:00
async def dialog_checker_impl(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, dialog: str
) -> None:
2023-06-06 23:20:27 -04:00
for line in dialog.splitlines():
line = line.strip()
if not line:
continue
2023-06-11 08:53:36 -04:00
side, data_str = line[:3], line[3:]
2023-06-06 23:20:27 -04:00
data = f"{data_str}\r\n".encode()
2023-06-11 08:53:36 -04:00
if side == "C: ":
2023-06-06 23:20:27 -04:00
writer.write(data)
else:
resp = await reader.readline()
self.assertEqual(data, resp)
2024-04-01 11:14:20 -04:00
if __name__ == "__main__":
2023-06-06 23:20:27 -04:00
unittest.main()