Move tests to tests folder
This commit is contained in:
0
tests/__init__.py
Normal file
0
tests/__init__.py
Normal file
77
tests/test_config.py
Normal file
77
tests/test_config.py
Normal file
@ -0,0 +1,77 @@
|
||||
import unittest
|
||||
|
||||
from mail4one import config
|
||||
|
||||
TEST_CONFIG = """
|
||||
{
|
||||
"mails_path": "/var/tmp/mails",
|
||||
"matches": [
|
||||
{
|
||||
"name": "mydomain",
|
||||
"addr_rexs": [
|
||||
".*@mydomain.com",
|
||||
".*@m.mydomain.com"
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "personal",
|
||||
"addrs": [
|
||||
"first.last@mydomain.com",
|
||||
"secret.name@mydomain.com"
|
||||
]
|
||||
}
|
||||
],
|
||||
"boxes": [
|
||||
{
|
||||
"name": "spam",
|
||||
"rules": [
|
||||
{
|
||||
"match_name": "mydomain",
|
||||
"negate": true,
|
||||
"stop_check": true
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "important",
|
||||
"rules": [
|
||||
{
|
||||
"match_name": "personal"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "all",
|
||||
"rules": [
|
||||
{
|
||||
"match_name": "default_match_all"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
"""
|
||||
|
||||
|
||||
class TestConfig(unittest.TestCase):
|
||||
|
||||
def test_config(self) -> None:
|
||||
cfg = config.Config(TEST_CONFIG)
|
||||
self.assertEqual(cfg.mails_path, "/var/tmp/mails")
|
||||
|
||||
def test_parse_rules(self) -> None:
|
||||
cfg = config.Config(TEST_CONFIG)
|
||||
op = config.parse_checkers(cfg)
|
||||
self.assertEqual(len(op), 3)
|
||||
|
||||
def test_get_mboxes(self) -> None:
|
||||
cfg = config.Config(TEST_CONFIG)
|
||||
rules = config.parse_checkers(cfg)
|
||||
self.assertEqual(config.get_mboxes("foo@bar.com", rules), ['spam'])
|
||||
self.assertEqual(config.get_mboxes("foo@mydomain.com", rules), ['all'])
|
||||
self.assertEqual(config.get_mboxes("first.last@mydomain.com", rules),
|
||||
['important', 'all'])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
226
tests/test_pop.py
Normal file
226
tests/test_pop.py
Normal file
@ -0,0 +1,226 @@
|
||||
import unittest
|
||||
import asyncio
|
||||
import logging
|
||||
import tempfile
|
||||
import time
|
||||
import os
|
||||
from mail4one.pop3 import create_pop_server
|
||||
from mail4one.config import User
|
||||
from pathlib import Path
|
||||
|
||||
TEST_HASH = "".join(c for c in """
|
||||
AFTY5EVN7AX47ZL7UMH3BETYWFBTAV3XHR73CEFAJBPN2NIHPWD
|
||||
ZHV2UQSMSPHSQQ2A2BFQBNC77VL7F2UKATQNJZGYLCSU6C43UQD
|
||||
AQXWXSWNGAEPGIMG2F3QDKBXL3MRHY6K2BPID64ZR6LABLPVSF
|
||||
""" if not c.isspace())
|
||||
|
||||
TEST_USER = 'foobar'
|
||||
TEST_MBOX = 'foobar_mails'
|
||||
|
||||
USERS = [User(username=TEST_USER, password_hash=TEST_HASH, mbox=TEST_MBOX)]
|
||||
|
||||
MAILS_PATH: Path
|
||||
|
||||
TESTMAIL = b"""Message-ID: <N01BwLnh8dGBoD9gVz@msn.com>\r
|
||||
From: from@msn.com\r
|
||||
To: MddK0ftkv@outlook.com\r
|
||||
Subject: hello lorem ipsum foo bar\r
|
||||
Date: Mon, 24 Oct 2002 00:42:02 +0000\r
|
||||
MIME-Version: 1.0\r
|
||||
Content-Type: text/plain;\r
|
||||
charset="windows-1251";\r
|
||||
Content-Transfer-Encoding: 7bit\r
|
||||
X-Peer: ('2.2.1.9', 64593)\r
|
||||
X-MailFrom: from@msn.com\r
|
||||
X-RcptTo: MddK0ftkv@outlook.com\r
|
||||
\r
|
||||
Hello bro\r
|
||||
IlzVOJqu9Zp7twFAtzcV\r
|
||||
yQVk36B0mGU2gtWxXLr\r
|
||||
PeF0RtbI0mAuVPLQDHCi\r
|
||||
\r\n"""
|
||||
|
||||
|
||||
def setUpModule() -> None:
|
||||
global MAILS_PATH
|
||||
logging.basicConfig(level=logging.CRITICAL)
|
||||
td = tempfile.TemporaryDirectory(prefix="m41.pop.")
|
||||
unittest.addModuleCleanup(td.cleanup)
|
||||
MAILS_PATH = Path(td.name)
|
||||
os.mkdir(MAILS_PATH / TEST_MBOX)
|
||||
for md in ('new', 'cur', 'tmp'):
|
||||
os.mkdir(MAILS_PATH / TEST_MBOX / md)
|
||||
with open(MAILS_PATH / TEST_MBOX/ 'new/msg1.eml', 'wb') as f:
|
||||
f.write(TESTMAIL)
|
||||
with open(MAILS_PATH / TEST_MBOX/ 'new/msg2.eml', 'wb') as f:
|
||||
f.write(TESTMAIL)
|
||||
logging.debug(MAILS_PATH)
|
||||
|
||||
|
||||
def tearDownModule():
|
||||
pass
|
||||
|
||||
|
||||
class TestPop3(unittest.IsolatedAsyncioTestCase):
|
||||
|
||||
async def asyncSetUp(self) -> None:
|
||||
logging.debug("at asyncSetUp")
|
||||
pop_server = await create_pop_server(host='127.0.0.1',
|
||||
port=7995,
|
||||
mails_path=MAILS_PATH,
|
||||
users=USERS)
|
||||
self.task = asyncio.create_task(pop_server.serve_forever())
|
||||
self.reader, self.writer = await asyncio.open_connection(
|
||||
'127.0.0.1', 7995)
|
||||
|
||||
async def test_QUIT(self) -> None:
|
||||
dialog = """
|
||||
S: +OK Server Ready
|
||||
C: QUIT
|
||||
S: +OK Bye
|
||||
"""
|
||||
await self.dialog_checker(dialog)
|
||||
|
||||
async def test_BAD(self) -> None:
|
||||
dialog = """
|
||||
S: +OK Server Ready
|
||||
C: HELO
|
||||
S: -ERR Bad command
|
||||
C: HEYA
|
||||
S: -ERR Bad command
|
||||
C: LIST
|
||||
S: -ERR Something went wrong
|
||||
C: HELO
|
||||
"""
|
||||
await self.dialog_checker(dialog)
|
||||
# TODO fix
|
||||
# self.assertTrue(reader.at_eof(), "server should close the connection")
|
||||
|
||||
async def do_login(self) -> None:
|
||||
dialog = """
|
||||
S: +OK Server Ready
|
||||
C: USER foobar
|
||||
S: +OK Welcome
|
||||
C: PASS helloworld
|
||||
S: +OK Login successful
|
||||
"""
|
||||
await self.dialog_checker(dialog)
|
||||
|
||||
async def test_AUTH(self) -> None:
|
||||
await self.do_login()
|
||||
dialog = """
|
||||
C: QUIT
|
||||
S: +OK Bye
|
||||
"""
|
||||
await self.dialog_checker(dialog)
|
||||
|
||||
async def test_dupe_AUTH(self) -> None:
|
||||
r1, w1 = await asyncio.open_connection('127.0.0.1', 7995)
|
||||
r2, w2 = await asyncio.open_connection('127.0.0.1', 7995)
|
||||
dialog = """
|
||||
S: +OK Server Ready
|
||||
C: USER foobar
|
||||
S: +OK Welcome
|
||||
C: PASS helloworld
|
||||
"""
|
||||
await self.dialog_checker_impl(r1, w1, dialog)
|
||||
await self.dialog_checker_impl(r2, w2, dialog)
|
||||
d1 = """S: +OK Login successful"""
|
||||
d2 = """S: -ERR Auth Failed: Already logged in"""
|
||||
await self.dialog_checker_impl(r1, w1, d1)
|
||||
await self.dialog_checker_impl(r2, w2, d2)
|
||||
end_dialog = """
|
||||
C: QUIT
|
||||
S: +OK Bye
|
||||
"""
|
||||
await self.dialog_checker_impl(r1, w1, end_dialog)
|
||||
await self.dialog_checker_impl(r2, w2, end_dialog)
|
||||
|
||||
async def test_STAT(self) -> None:
|
||||
await self.do_login()
|
||||
dialog = """
|
||||
C: STAT
|
||||
S: +OK 2 872
|
||||
"""
|
||||
await self.dialog_checker(dialog)
|
||||
|
||||
async def test_NOOP(self) -> None:
|
||||
await self.do_login()
|
||||
dialog = """
|
||||
C: NOOP
|
||||
S: +OK Hmm
|
||||
"""
|
||||
await self.dialog_checker(dialog)
|
||||
|
||||
async def test_LIST(self) -> None:
|
||||
await self.do_login()
|
||||
dialog = """
|
||||
C: LIST
|
||||
S: +OK Mails follow
|
||||
S: 1 436
|
||||
S: 2 436
|
||||
S: .
|
||||
"""
|
||||
await self.dialog_checker(dialog)
|
||||
|
||||
async def test_UIDL(self) -> None:
|
||||
await self.do_login()
|
||||
dialog = """
|
||||
C: UIDL
|
||||
S: +OK Mails follow
|
||||
S: 1 msg2.eml
|
||||
S: 2 msg1.eml
|
||||
S: .
|
||||
"""
|
||||
await self.dialog_checker(dialog)
|
||||
|
||||
async def test_RETR(self) -> None:
|
||||
await self.do_login()
|
||||
dialog = """
|
||||
C: RETR 1
|
||||
S: +OK Contents follow
|
||||
"""
|
||||
for l in TESTMAIL.splitlines():
|
||||
dialog += f"S: {l.decode()}\n"
|
||||
dialog += "S: ."
|
||||
await self.dialog_checker(dialog)
|
||||
|
||||
async def test_CAPA(self) -> None:
|
||||
dialog = """
|
||||
S: +OK Server Ready
|
||||
C: CAPA
|
||||
S: +OK Following are supported
|
||||
S: USER
|
||||
S: .
|
||||
C: QUIT
|
||||
S: +OK Bye
|
||||
"""
|
||||
await self.dialog_checker(dialog)
|
||||
|
||||
async def asyncTearDown(self) -> None:
|
||||
logging.debug("at teardown")
|
||||
self.writer.close()
|
||||
await self.writer.wait_closed()
|
||||
self.task.cancel("test done")
|
||||
|
||||
async def dialog_checker(self, dialog: str) -> None:
|
||||
await self.dialog_checker_impl(self.reader, self.writer, dialog)
|
||||
|
||||
async def dialog_checker_impl(self, reader: asyncio.StreamReader,
|
||||
writer: asyncio.StreamWriter,
|
||||
dialog: str) -> None:
|
||||
for line in dialog.splitlines():
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
side, data_str = line[:3], line[3:]
|
||||
data = f"{data_str}\r\n".encode()
|
||||
if side == "C: ":
|
||||
writer.write(data)
|
||||
else:
|
||||
resp = await reader.readline()
|
||||
self.assertEqual(data, resp)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
36
tests/test_pwhash.py
Normal file
36
tests/test_pwhash.py
Normal file
@ -0,0 +1,36 @@
|
||||
from mail4one.pwhash import gen_pwhash, parse_hash, check_pass, SALT_LEN, KEY_LEN
|
||||
import unittest
|
||||
|
||||
|
||||
class TestPWHash(unittest.TestCase):
|
||||
|
||||
def test_expected_usage(self):
|
||||
password = "Blah Blah ABCD"
|
||||
pwhash = gen_pwhash(password)
|
||||
pwinfo = parse_hash(pwhash)
|
||||
self.assertEqual(len(pwinfo.salt), SALT_LEN)
|
||||
self.assertEqual(len(pwinfo.scrypt_hash), KEY_LEN)
|
||||
self.assertTrue(check_pass(password, pwinfo),
|
||||
"check pass with correct password")
|
||||
self.assertFalse(check_pass("foobar", pwinfo),
|
||||
"check pass with wrong password")
|
||||
|
||||
def test_hardcoded_hash(self):
|
||||
test_hash = "".join(c for c in """
|
||||
AFTY5EVN7AX47ZL7UMH3BETYWFBTAV3XHR73CEFAJBPN2NIHPWD
|
||||
ZHV2UQSMSPHSQQ2A2BFQBNC77VL7F2UKATQNJZGYLCSU6C43UQD
|
||||
AQXWXSWNGAEPGIMG2F3QDKBXL3MRHY6K2BPID64ZR6LABLPVSF
|
||||
""" if not c.isspace())
|
||||
pwinfo = parse_hash(test_hash)
|
||||
self.assertTrue(check_pass("helloworld", pwinfo),
|
||||
"check pass with correct password")
|
||||
self.assertFalse(check_pass("foobar", pwinfo),
|
||||
"check pass with wrong password")
|
||||
|
||||
def test_invalid_hash(self):
|
||||
with self.assertRaises(Exception):
|
||||
parse_hash("sdlfkjdsklfjdsk")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
76
tests/test_smtp.py
Normal file
76
tests/test_smtp.py
Normal file
@ -0,0 +1,76 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import unittest
|
||||
import smtplib
|
||||
import tempfile
|
||||
import os
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from mail4one.smtp import create_smtp_server
|
||||
|
||||
TEST_MBOX = 'foobar_mails'
|
||||
MAILS_PATH: Path
|
||||
|
||||
|
||||
def setUpModule() -> None:
|
||||
global MAILS_PATH
|
||||
logging.basicConfig(level=logging.CRITICAL)
|
||||
td = tempfile.TemporaryDirectory(prefix="m41.smtp.")
|
||||
unittest.addModuleCleanup(td.cleanup)
|
||||
MAILS_PATH = Path(td.name)
|
||||
os.mkdir(MAILS_PATH / TEST_MBOX)
|
||||
for md in ('new', 'cur', 'tmp'):
|
||||
os.mkdir(MAILS_PATH / TEST_MBOX / md)
|
||||
|
||||
|
||||
class TestSMTP(unittest.IsolatedAsyncioTestCase):
|
||||
|
||||
async def asyncSetUp(self) -> None:
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
smtp_server = await create_smtp_server(
|
||||
host="127.0.0.1",
|
||||
port=7996,
|
||||
mails_path=MAILS_PATH,
|
||||
mbox_finder=lambda addr: [TEST_MBOX])
|
||||
self.task = asyncio.create_task(smtp_server.serve_forever())
|
||||
|
||||
async def test_send_mail(self) -> None:
|
||||
msg = b"""From: foo@sender.com
|
||||
To: "foo@bar.com"
|
||||
|
||||
Hello world
|
||||
Byee
|
||||
"""
|
||||
msg = b"".join(l.strip() + b"\r\n" for l in msg.splitlines())
|
||||
local_port: str
|
||||
|
||||
def send_mail():
|
||||
nonlocal local_port
|
||||
server = smtplib.SMTP(host="127.0.0.1", port=7996)
|
||||
server.sendmail("foo@sender.com", "foo@bar.com", msg)
|
||||
_, local_port = server.sock.getsockname()
|
||||
server.close()
|
||||
|
||||
await asyncio.to_thread(send_mail)
|
||||
expected = f"""From: foo@sender.com
|
||||
To: "foo@bar.com"
|
||||
X-Peer: ('127.0.0.1', {local_port})
|
||||
X-MailFrom: foo@sender.com
|
||||
X-RcptTo: foo@bar.com
|
||||
|
||||
Hello world
|
||||
Byee
|
||||
"""
|
||||
expected = "".join(l.strip() + "\r\n" for l in expected.splitlines())
|
||||
mails = list((MAILS_PATH / TEST_MBOX / 'new').glob("*"))
|
||||
self.assertEqual(len(mails), 1)
|
||||
self.assertEqual(mails[0].read_bytes(), expected.encode())
|
||||
|
||||
async def asyncTearDown(self) -> None:
|
||||
logging.debug("at teardown")
|
||||
self.task.cancel("test done")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
Reference in New Issue
Block a user