Compare commits

..

6 Commits

7 changed files with 102 additions and 50 deletions

View File

@ -32,7 +32,7 @@ requirements.txt: Pipfile.lock
pipenv requirements > requirements.txt pipenv requirements > requirements.txt
format: format:
black mail4one/*py black mail4one/*py tests/*py
build-dev: requirements.txt build build-dev: requirements.txt build

View File

@ -29,7 +29,7 @@ from .poputils import (
end, end,
Request, Request,
MailEntry, MailEntry,
get_mail, get_mail_fp,
get_mails_list, get_mails_list,
MailList, MailList,
) )
@ -217,7 +217,12 @@ def trans_command_retr(mails: MailList, req: Request) -> None:
entry = mails.get(req.arg1) entry = mails.get(req.arg1)
if entry: if entry:
write(ok("Contents follow")) write(ok("Contents follow"))
write(get_mail(entry)) with get_mail_fp(entry) as fp:
for line in fp:
if line.startswith(b"."):
write(b".") # prepend dot
write(line)
# write(get_mail(entry)) # no prepend dot
write(end()) write(end())
mails.delete(req.arg1) mails.delete(req.arg1)
else: else:
@ -386,13 +391,14 @@ def debug_main():
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
import sys import sys
from .pwhash import gen_pwhash
_, mails_path, port, password = sys.argv _, mails_path, mbox = sys.argv
mails_path = Path(mails_path) mails_path = Path(mails_path)
port = int(port) users = [User(username="dummy", password_hash=gen_pwhash("dummy"), mbox=mbox)]
asyncio.run(a_main(mails_path, port, password_hash=password_hash)) asyncio.run(a_main("127.0.0.1", 1101, mails_path, users=users))
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -2,6 +2,7 @@ import os
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum, auto from enum import Enum, auto
from pathlib import Path from pathlib import Path
from contextlib import contextmanager
class ClientError(Exception): class ClientError(Exception):
@ -124,6 +125,12 @@ def set_nid(entries: list[MailEntry]):
entry.nid = i entry.nid = i
@contextmanager
def get_mail_fp(entry: MailEntry):
with open(entry.path, mode="rb") as fp:
yield fp
def get_mail(entry: MailEntry) -> bytes: def get_mail(entry: MailEntry) -> bytes:
with open(entry.path, mode="rb") as fp: with open(entry.path, mode="rb") as fp:
return fp.read() return fp.read()

View File

@ -67,10 +67,11 @@ class TestConfig(unittest.TestCase):
def test_get_mboxes(self) -> None: def test_get_mboxes(self) -> None:
cfg = config.Config(TEST_CONFIG) cfg = config.Config(TEST_CONFIG)
rules = config.parse_checkers(cfg) rules = config.parse_checkers(cfg)
self.assertEqual(config.get_mboxes("foo@bar.com", rules), ['spam']) self.assertEqual(config.get_mboxes("foo@bar.com", rules), ["spam"])
self.assertEqual(config.get_mboxes("foo@mydomain.com", rules), ['all']) self.assertEqual(config.get_mboxes("foo@mydomain.com", rules), ["all"])
self.assertEqual(config.get_mboxes("first.last@mydomain.com", rules), self.assertEqual(
['important', 'all']) config.get_mboxes("first.last@mydomain.com", rules), ["important", "all"]
)
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -4,20 +4,29 @@ import logging
import tempfile import tempfile
import time import time
import os import os
import poplib
from mail4one.pop3 import create_pop_server from mail4one.pop3 import create_pop_server
from mail4one.config import User from mail4one.config import User
from pathlib import Path from pathlib import Path
TEST_HASH = "".join(c for c in """ TEST_HASH = "".join(
"""
AFTY5EVN7AX47ZL7UMH3BETYWFBTAV3XHR73CEFAJBPN2NIHPWD AFTY5EVN7AX47ZL7UMH3BETYWFBTAV3XHR73CEFAJBPN2NIHPWD
ZHV2UQSMSPHSQQ2A2BFQBNC77VL7F2UKATQNJZGYLCSU6C43UQD ZHV2UQSMSPHSQQ2A2BFQBNC77VL7F2UKATQNJZGYLCSU6C43UQD
AQXWXSWNGAEPGIMG2F3QDKBXL3MRHY6K2BPID64ZR6LABLPVSF AQXWXSWNGAEPGIMG2F3QDKBXL3MRHY6K2BPID64ZR6LABLPVSF
""" if not c.isspace()) """.split()
)
TEST_USER = 'foobar' TEST_USER = "foobar"
TEST_MBOX = 'foobar_mails' TEST_MBOX = "foobar_mails"
USERS = [User(username=TEST_USER, password_hash=TEST_HASH, mbox=TEST_MBOX)] TEST_USER2 = "foo2"
TEST_MBOX2 = "foo2mails"
USERS = [
User(username=TEST_USER, password_hash=TEST_HASH, mbox=TEST_MBOX),
User(username=TEST_USER2, password_hash=TEST_HASH, mbox=TEST_MBOX2),
]
MAILS_PATH: Path MAILS_PATH: Path
@ -38,7 +47,8 @@ Hello bro\r
IlzVOJqu9Zp7twFAtzcV\r IlzVOJqu9Zp7twFAtzcV\r
yQVk36B0mGU2gtWxXLr\r yQVk36B0mGU2gtWxXLr\r
PeF0RtbI0mAuVPLQDHCi\r PeF0RtbI0mAuVPLQDHCi\r
\r\n""" \r
"""
def setUpModule() -> None: def setUpModule() -> None:
@ -47,13 +57,21 @@ def setUpModule() -> None:
td = tempfile.TemporaryDirectory(prefix="m41.pop.") td = tempfile.TemporaryDirectory(prefix="m41.pop.")
unittest.addModuleCleanup(td.cleanup) unittest.addModuleCleanup(td.cleanup)
MAILS_PATH = Path(td.name) MAILS_PATH = Path(td.name)
os.mkdir(MAILS_PATH / TEST_MBOX) for mbox in (TEST_MBOX, TEST_MBOX2):
for md in ('new', 'cur', 'tmp'): os.mkdir(MAILS_PATH / mbox)
os.mkdir(MAILS_PATH / TEST_MBOX / md) for md in ("new", "cur", "tmp"):
with open(MAILS_PATH / TEST_MBOX/ 'new/msg1.eml', 'wb') as f: os.mkdir(MAILS_PATH / mbox / md)
with open(MAILS_PATH / TEST_MBOX / "new/msg1.eml", "wb") as f:
f.write(TESTMAIL) f.write(TESTMAIL)
with open(MAILS_PATH / TEST_MBOX/ 'new/msg2.eml', 'wb') as f: with open(MAILS_PATH / TEST_MBOX / "new/msg2.eml", "wb") as f:
f.write(TESTMAIL) f.write(TESTMAIL)
with open(MAILS_PATH / TEST_MBOX2 / "new/msg1.eml", "wb") as f:
f.write(TESTMAIL)
f.write(b"More lines to follow\r\n")
f.write(b".Line starts with a dot\r\n")
f.write(b"some more lines\r\n")
f.write(b".\r\n")
f.write(b"Previous line just has a dot\r\n")
logging.debug(MAILS_PATH) logging.debug(MAILS_PATH)
@ -65,13 +83,11 @@ class TestPop3(unittest.IsolatedAsyncioTestCase):
async def asyncSetUp(self) -> None: async def asyncSetUp(self) -> None:
logging.debug("at asyncSetUp") logging.debug("at asyncSetUp")
pop_server = await create_pop_server(host='127.0.0.1', pop_server = await create_pop_server(
port=7995, host="127.0.0.1", port=7995, mails_path=MAILS_PATH, users=USERS
mails_path=MAILS_PATH, )
users=USERS)
self.task = asyncio.create_task(pop_server.serve_forever()) self.task = asyncio.create_task(pop_server.serve_forever())
self.reader, self.writer = await asyncio.open_connection( self.reader, self.writer = await asyncio.open_connection("127.0.0.1", 7995)
'127.0.0.1', 7995)
async def test_QUIT(self) -> None: async def test_QUIT(self) -> None:
dialog = """ dialog = """
@ -115,8 +131,8 @@ class TestPop3(unittest.IsolatedAsyncioTestCase):
await self.dialog_checker(dialog) await self.dialog_checker(dialog)
async def test_dupe_AUTH(self) -> None: async def test_dupe_AUTH(self) -> None:
r1, w1 = await asyncio.open_connection('127.0.0.1', 7995) r1, w1 = await asyncio.open_connection("127.0.0.1", 7995)
r2, w2 = await asyncio.open_connection('127.0.0.1', 7995) r2, w2 = await asyncio.open_connection("127.0.0.1", 7995)
dialog = """ dialog = """
S: +OK Server Ready S: +OK Server Ready
C: USER foobar C: USER foobar
@ -197,6 +213,22 @@ class TestPop3(unittest.IsolatedAsyncioTestCase):
""" """
await self.dialog_checker(dialog) await self.dialog_checker(dialog)
async def test_poplib(self) -> None:
def run_poplib():
pc = poplib.POP3("127.0.0.1", 7995)
try:
self.assertEqual(b"+OK Server Ready", pc.getwelcome())
self.assertEqual(b"+OK Welcome", pc.user("foo2"))
self.assertEqual(b"+OK Login successful", pc.pass_("helloworld"))
_, eml, oc = pc.retr(1)
self.assertIn(b"Previous line just has a dot", eml)
self.assertIn(b".Line starts with a dot", eml)
self.assertIn(b".", eml)
finally:
pc.quit()
await asyncio.to_thread(run_poplib)
async def asyncTearDown(self) -> None: async def asyncTearDown(self) -> None:
logging.debug("at teardown") logging.debug("at teardown")
self.writer.close() self.writer.close()
@ -206,9 +238,9 @@ class TestPop3(unittest.IsolatedAsyncioTestCase):
async def dialog_checker(self, dialog: str) -> None: async def dialog_checker(self, dialog: str) -> None:
await self.dialog_checker_impl(self.reader, self.writer, dialog) await self.dialog_checker_impl(self.reader, self.writer, dialog)
async def dialog_checker_impl(self, reader: asyncio.StreamReader, async def dialog_checker_impl(
writer: asyncio.StreamWriter, self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, dialog: str
dialog: str) -> None: ) -> None:
for line in dialog.splitlines(): for line in dialog.splitlines():
line = line.strip() line = line.strip()
if not line: if not line:
@ -222,5 +254,5 @@ class TestPop3(unittest.IsolatedAsyncioTestCase):
self.assertEqual(data, resp) self.assertEqual(data, resp)
if __name__ == '__main__': if __name__ == "__main__":
unittest.main() unittest.main()

View File

@ -10,27 +10,31 @@ class TestPWHash(unittest.TestCase):
pwinfo = parse_hash(pwhash) pwinfo = parse_hash(pwhash)
self.assertEqual(len(pwinfo.salt), SALT_LEN) self.assertEqual(len(pwinfo.salt), SALT_LEN)
self.assertEqual(len(pwinfo.scrypt_hash), KEY_LEN) self.assertEqual(len(pwinfo.scrypt_hash), KEY_LEN)
self.assertTrue(check_pass(password, pwinfo), self.assertTrue(
"check pass with correct password") check_pass(password, pwinfo), "check pass with correct password"
self.assertFalse(check_pass("foobar", pwinfo), )
"check pass with wrong password") self.assertFalse(check_pass("foobar", pwinfo), "check pass with wrong password")
def test_hardcoded_hash(self): def test_hardcoded_hash(self):
test_hash = "".join(c for c in """ test_hash = "".join(
c
for c in """
AFTY5EVN7AX47ZL7UMH3BETYWFBTAV3XHR73CEFAJBPN2NIHPWD AFTY5EVN7AX47ZL7UMH3BETYWFBTAV3XHR73CEFAJBPN2NIHPWD
ZHV2UQSMSPHSQQ2A2BFQBNC77VL7F2UKATQNJZGYLCSU6C43UQD ZHV2UQSMSPHSQQ2A2BFQBNC77VL7F2UKATQNJZGYLCSU6C43UQD
AQXWXSWNGAEPGIMG2F3QDKBXL3MRHY6K2BPID64ZR6LABLPVSF AQXWXSWNGAEPGIMG2F3QDKBXL3MRHY6K2BPID64ZR6LABLPVSF
""" if not c.isspace()) """
if not c.isspace()
)
pwinfo = parse_hash(test_hash) pwinfo = parse_hash(test_hash)
self.assertTrue(check_pass("helloworld", pwinfo), self.assertTrue(
"check pass with correct password") check_pass("helloworld", pwinfo), "check pass with correct password"
self.assertFalse(check_pass("foobar", pwinfo), )
"check pass with wrong password") self.assertFalse(check_pass("foobar", pwinfo), "check pass with wrong password")
def test_invalid_hash(self): def test_invalid_hash(self):
with self.assertRaises(Exception): with self.assertRaises(Exception):
parse_hash("sdlfkjdsklfjdsk") parse_hash("sdlfkjdsklfjdsk")
if __name__ == '__main__': if __name__ == "__main__":
unittest.main() unittest.main()

View File

@ -10,7 +10,7 @@ from pathlib import Path
from mail4one.smtp import create_smtp_server from mail4one.smtp import create_smtp_server
TEST_MBOX = 'foobar_mails' TEST_MBOX = "foobar_mails"
MAILS_PATH: Path MAILS_PATH: Path
@ -21,7 +21,7 @@ def setUpModule() -> None:
unittest.addModuleCleanup(td.cleanup) unittest.addModuleCleanup(td.cleanup)
MAILS_PATH = Path(td.name) MAILS_PATH = Path(td.name)
os.mkdir(MAILS_PATH / TEST_MBOX) os.mkdir(MAILS_PATH / TEST_MBOX)
for md in ('new', 'cur', 'tmp'): for md in ("new", "cur", "tmp"):
os.mkdir(MAILS_PATH / TEST_MBOX / md) os.mkdir(MAILS_PATH / TEST_MBOX / md)
@ -32,7 +32,8 @@ class TestSMTP(unittest.IsolatedAsyncioTestCase):
host="127.0.0.1", host="127.0.0.1",
port=7996, port=7996,
mails_path=MAILS_PATH, mails_path=MAILS_PATH,
mbox_finder=lambda addr: [TEST_MBOX]) mbox_finder=lambda addr: [TEST_MBOX],
)
self.task = asyncio.create_task(smtp_server.serve_forever()) self.task = asyncio.create_task(smtp_server.serve_forever())
async def test_send_mail(self) -> None: async def test_send_mail(self) -> None:
@ -45,8 +46,9 @@ class TestSMTP(unittest.IsolatedAsyncioTestCase):
msg = b"".join(l.strip() + b"\r\n" for l in msg.splitlines()) msg = b"".join(l.strip() + b"\r\n" for l in msg.splitlines())
def send_mail(): def send_mail():
with contextlib.closing(smtplib.SMTP(host="127.0.0.1", with contextlib.closing(
port=7996)) as client: smtplib.SMTP(host="127.0.0.1", port=7996)
) as client:
client.sendmail("foo@sender.com", "foo@bar.com", msg) client.sendmail("foo@sender.com", "foo@bar.com", msg)
_, local_port = client.sock.getsockname() _, local_port = client.sock.getsockname()
return local_port return local_port
@ -62,7 +64,7 @@ class TestSMTP(unittest.IsolatedAsyncioTestCase):
Byee Byee
""" """
expected = "".join(l.strip() + "\r\n" for l in expected.splitlines()) expected = "".join(l.strip() + "\r\n" for l in expected.splitlines())
mails = list((MAILS_PATH / TEST_MBOX / 'new').glob("*")) mails = list((MAILS_PATH / TEST_MBOX / "new").glob("*"))
self.assertEqual(len(mails), 1) self.assertEqual(len(mails), 1)
self.assertEqual(mails[0].read_bytes(), expected.encode()) self.assertEqual(mails[0].read_bytes(), expected.encode())