Compare commits

..

No commits in common. "f3e80c43ae1a7270817dbe19cd7a72e3380e9d48" and "121a02b8aea79832c214ab39f37e0fc80392b2ad" have entirely different histories.

6 changed files with 45 additions and 54 deletions

View File

@ -32,7 +32,7 @@ requirements.txt: Pipfile.lock
pipenv requirements > requirements.txt pipenv requirements > requirements.txt
format: format:
black mail4one/*py tests/*py black mail4one/*py
build-dev: requirements.txt build build-dev: requirements.txt build

View File

@ -386,14 +386,13 @@ def debug_main():
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
import sys import sys
from .pwhash import gen_pwhash
_, mails_path, mbox = sys.argv _, mails_path, port, password = sys.argv
mails_path = Path(mails_path) mails_path = Path(mails_path)
users = [User(username="dummy", password_hash=gen_pwhash("dummy"), mbox=mbox)] port = int(port)
asyncio.run(a_main("127.0.0.1", 1101, mails_path, users=users)) asyncio.run(a_main(mails_path, port, password_hash=password_hash))
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -67,11 +67,10 @@ class TestConfig(unittest.TestCase):
def test_get_mboxes(self) -> None: def test_get_mboxes(self) -> None:
cfg = config.Config(TEST_CONFIG) cfg = config.Config(TEST_CONFIG)
rules = config.parse_checkers(cfg) rules = config.parse_checkers(cfg)
self.assertEqual(config.get_mboxes("foo@bar.com", rules), ["spam"]) self.assertEqual(config.get_mboxes("foo@bar.com", rules), ['spam'])
self.assertEqual(config.get_mboxes("foo@mydomain.com", rules), ["all"]) self.assertEqual(config.get_mboxes("foo@mydomain.com", rules), ['all'])
self.assertEqual( self.assertEqual(config.get_mboxes("first.last@mydomain.com", rules),
config.get_mboxes("first.last@mydomain.com", rules), ["important", "all"] ['important', 'all'])
)
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -8,16 +8,14 @@ from mail4one.pop3 import create_pop_server
from mail4one.config import User from mail4one.config import User
from pathlib import Path from pathlib import Path
TEST_HASH = "".join( TEST_HASH = "".join(c for c in """
"""
AFTY5EVN7AX47ZL7UMH3BETYWFBTAV3XHR73CEFAJBPN2NIHPWD AFTY5EVN7AX47ZL7UMH3BETYWFBTAV3XHR73CEFAJBPN2NIHPWD
ZHV2UQSMSPHSQQ2A2BFQBNC77VL7F2UKATQNJZGYLCSU6C43UQD ZHV2UQSMSPHSQQ2A2BFQBNC77VL7F2UKATQNJZGYLCSU6C43UQD
AQXWXSWNGAEPGIMG2F3QDKBXL3MRHY6K2BPID64ZR6LABLPVSF AQXWXSWNGAEPGIMG2F3QDKBXL3MRHY6K2BPID64ZR6LABLPVSF
""".split() """ if not c.isspace())
)
TEST_USER = "foobar" TEST_USER = 'foobar'
TEST_MBOX = "foobar_mails" TEST_MBOX = 'foobar_mails'
USERS = [User(username=TEST_USER, password_hash=TEST_HASH, mbox=TEST_MBOX)] USERS = [User(username=TEST_USER, password_hash=TEST_HASH, mbox=TEST_MBOX)]
@ -40,8 +38,7 @@ Hello bro\r
IlzVOJqu9Zp7twFAtzcV\r IlzVOJqu9Zp7twFAtzcV\r
yQVk36B0mGU2gtWxXLr\r yQVk36B0mGU2gtWxXLr\r
PeF0RtbI0mAuVPLQDHCi\r PeF0RtbI0mAuVPLQDHCi\r
\r \r\n"""
"""
def setUpModule() -> None: def setUpModule() -> None:
@ -51,11 +48,11 @@ def setUpModule() -> None:
unittest.addModuleCleanup(td.cleanup) unittest.addModuleCleanup(td.cleanup)
MAILS_PATH = Path(td.name) MAILS_PATH = Path(td.name)
os.mkdir(MAILS_PATH / TEST_MBOX) os.mkdir(MAILS_PATH / TEST_MBOX)
for md in ("new", "cur", "tmp"): for md in ('new', 'cur', 'tmp'):
os.mkdir(MAILS_PATH / TEST_MBOX / md) os.mkdir(MAILS_PATH / TEST_MBOX / md)
with open(MAILS_PATH / TEST_MBOX / "new/msg1.eml", "wb") as f: with open(MAILS_PATH / TEST_MBOX/ 'new/msg1.eml', 'wb') as f:
f.write(TESTMAIL) f.write(TESTMAIL)
with open(MAILS_PATH / TEST_MBOX / "new/msg2.eml", "wb") as f: with open(MAILS_PATH / TEST_MBOX/ 'new/msg2.eml', 'wb') as f:
f.write(TESTMAIL) f.write(TESTMAIL)
logging.debug(MAILS_PATH) logging.debug(MAILS_PATH)
@ -68,11 +65,13 @@ class TestPop3(unittest.IsolatedAsyncioTestCase):
async def asyncSetUp(self) -> None: async def asyncSetUp(self) -> None:
logging.debug("at asyncSetUp") logging.debug("at asyncSetUp")
pop_server = await create_pop_server( pop_server = await create_pop_server(host='127.0.0.1',
host="127.0.0.1", port=7995, mails_path=MAILS_PATH, users=USERS port=7995,
) mails_path=MAILS_PATH,
users=USERS)
self.task = asyncio.create_task(pop_server.serve_forever()) self.task = asyncio.create_task(pop_server.serve_forever())
self.reader, self.writer = await asyncio.open_connection("127.0.0.1", 7995) self.reader, self.writer = await asyncio.open_connection(
'127.0.0.1', 7995)
async def test_QUIT(self) -> None: async def test_QUIT(self) -> None:
dialog = """ dialog = """
@ -116,8 +115,8 @@ class TestPop3(unittest.IsolatedAsyncioTestCase):
await self.dialog_checker(dialog) await self.dialog_checker(dialog)
async def test_dupe_AUTH(self) -> None: async def test_dupe_AUTH(self) -> None:
r1, w1 = await asyncio.open_connection("127.0.0.1", 7995) r1, w1 = await asyncio.open_connection('127.0.0.1', 7995)
r2, w2 = await asyncio.open_connection("127.0.0.1", 7995) r2, w2 = await asyncio.open_connection('127.0.0.1', 7995)
dialog = """ dialog = """
S: +OK Server Ready S: +OK Server Ready
C: USER foobar C: USER foobar
@ -207,9 +206,9 @@ class TestPop3(unittest.IsolatedAsyncioTestCase):
async def dialog_checker(self, dialog: str) -> None: async def dialog_checker(self, dialog: str) -> None:
await self.dialog_checker_impl(self.reader, self.writer, dialog) await self.dialog_checker_impl(self.reader, self.writer, dialog)
async def dialog_checker_impl( async def dialog_checker_impl(self, reader: asyncio.StreamReader,
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, dialog: str writer: asyncio.StreamWriter,
) -> None: dialog: str) -> None:
for line in dialog.splitlines(): for line in dialog.splitlines():
line = line.strip() line = line.strip()
if not line: if not line:
@ -223,5 +222,5 @@ class TestPop3(unittest.IsolatedAsyncioTestCase):
self.assertEqual(data, resp) self.assertEqual(data, resp)
if __name__ == "__main__": if __name__ == '__main__':
unittest.main() unittest.main()

View File

@ -10,31 +10,27 @@ class TestPWHash(unittest.TestCase):
pwinfo = parse_hash(pwhash) pwinfo = parse_hash(pwhash)
self.assertEqual(len(pwinfo.salt), SALT_LEN) self.assertEqual(len(pwinfo.salt), SALT_LEN)
self.assertEqual(len(pwinfo.scrypt_hash), KEY_LEN) self.assertEqual(len(pwinfo.scrypt_hash), KEY_LEN)
self.assertTrue( self.assertTrue(check_pass(password, pwinfo),
check_pass(password, pwinfo), "check pass with correct password" "check pass with correct password")
) self.assertFalse(check_pass("foobar", pwinfo),
self.assertFalse(check_pass("foobar", pwinfo), "check pass with wrong password") "check pass with wrong password")
def test_hardcoded_hash(self): def test_hardcoded_hash(self):
test_hash = "".join( test_hash = "".join(c for c in """
c
for c in """
AFTY5EVN7AX47ZL7UMH3BETYWFBTAV3XHR73CEFAJBPN2NIHPWD AFTY5EVN7AX47ZL7UMH3BETYWFBTAV3XHR73CEFAJBPN2NIHPWD
ZHV2UQSMSPHSQQ2A2BFQBNC77VL7F2UKATQNJZGYLCSU6C43UQD ZHV2UQSMSPHSQQ2A2BFQBNC77VL7F2UKATQNJZGYLCSU6C43UQD
AQXWXSWNGAEPGIMG2F3QDKBXL3MRHY6K2BPID64ZR6LABLPVSF AQXWXSWNGAEPGIMG2F3QDKBXL3MRHY6K2BPID64ZR6LABLPVSF
""" """ if not c.isspace())
if not c.isspace()
)
pwinfo = parse_hash(test_hash) pwinfo = parse_hash(test_hash)
self.assertTrue( self.assertTrue(check_pass("helloworld", pwinfo),
check_pass("helloworld", pwinfo), "check pass with correct password" "check pass with correct password")
) self.assertFalse(check_pass("foobar", pwinfo),
self.assertFalse(check_pass("foobar", pwinfo), "check pass with wrong password") "check pass with wrong password")
def test_invalid_hash(self): def test_invalid_hash(self):
with self.assertRaises(Exception): with self.assertRaises(Exception):
parse_hash("sdlfkjdsklfjdsk") parse_hash("sdlfkjdsklfjdsk")
if __name__ == "__main__": if __name__ == '__main__':
unittest.main() unittest.main()

View File

@ -10,7 +10,7 @@ from pathlib import Path
from mail4one.smtp import create_smtp_server from mail4one.smtp import create_smtp_server
TEST_MBOX = "foobar_mails" TEST_MBOX = 'foobar_mails'
MAILS_PATH: Path MAILS_PATH: Path
@ -21,7 +21,7 @@ def setUpModule() -> None:
unittest.addModuleCleanup(td.cleanup) unittest.addModuleCleanup(td.cleanup)
MAILS_PATH = Path(td.name) MAILS_PATH = Path(td.name)
os.mkdir(MAILS_PATH / TEST_MBOX) os.mkdir(MAILS_PATH / TEST_MBOX)
for md in ("new", "cur", "tmp"): for md in ('new', 'cur', 'tmp'):
os.mkdir(MAILS_PATH / TEST_MBOX / md) os.mkdir(MAILS_PATH / TEST_MBOX / md)
@ -32,8 +32,7 @@ class TestSMTP(unittest.IsolatedAsyncioTestCase):
host="127.0.0.1", host="127.0.0.1",
port=7996, port=7996,
mails_path=MAILS_PATH, mails_path=MAILS_PATH,
mbox_finder=lambda addr: [TEST_MBOX], mbox_finder=lambda addr: [TEST_MBOX])
)
self.task = asyncio.create_task(smtp_server.serve_forever()) self.task = asyncio.create_task(smtp_server.serve_forever())
async def test_send_mail(self) -> None: async def test_send_mail(self) -> None:
@ -46,9 +45,8 @@ class TestSMTP(unittest.IsolatedAsyncioTestCase):
msg = b"".join(l.strip() + b"\r\n" for l in msg.splitlines()) msg = b"".join(l.strip() + b"\r\n" for l in msg.splitlines())
def send_mail(): def send_mail():
with contextlib.closing( with contextlib.closing(smtplib.SMTP(host="127.0.0.1",
smtplib.SMTP(host="127.0.0.1", port=7996) port=7996)) as client:
) as client:
client.sendmail("foo@sender.com", "foo@bar.com", msg) client.sendmail("foo@sender.com", "foo@bar.com", msg)
_, local_port = client.sock.getsockname() _, local_port = client.sock.getsockname()
return local_port return local_port
@ -64,7 +62,7 @@ class TestSMTP(unittest.IsolatedAsyncioTestCase):
Byee Byee
""" """
expected = "".join(l.strip() + "\r\n" for l in expected.splitlines()) expected = "".join(l.strip() + "\r\n" for l in expected.splitlines())
mails = list((MAILS_PATH / TEST_MBOX / "new").glob("*")) mails = list((MAILS_PATH / TEST_MBOX / 'new').glob("*"))
self.assertEqual(len(mails), 1) self.assertEqual(len(mails), 1)
self.assertEqual(mails[0].read_bytes(), expected.encode()) self.assertEqual(mails[0].read_bytes(), expected.encode())