Compare commits
No commits in common. "4f99f1b9a5f29107b7bb5e96d1364cdf254f36b1" and "121a02b8aea79832c214ab39f37e0fc80392b2ad" have entirely different histories.
4f99f1b9a5
...
121a02b8ae
2
Makefile
2
Makefile
@ -32,7 +32,7 @@ requirements.txt: Pipfile.lock
|
|||||||
pipenv requirements > requirements.txt
|
pipenv requirements > requirements.txt
|
||||||
|
|
||||||
format:
|
format:
|
||||||
black mail4one/*py tests/*py
|
black mail4one/*py
|
||||||
|
|
||||||
build-dev: requirements.txt build
|
build-dev: requirements.txt build
|
||||||
|
|
||||||
|
@ -29,7 +29,7 @@ from .poputils import (
|
|||||||
end,
|
end,
|
||||||
Request,
|
Request,
|
||||||
MailEntry,
|
MailEntry,
|
||||||
get_mail_fp,
|
get_mail,
|
||||||
get_mails_list,
|
get_mails_list,
|
||||||
MailList,
|
MailList,
|
||||||
)
|
)
|
||||||
@ -217,12 +217,7 @@ def trans_command_retr(mails: MailList, req: Request) -> None:
|
|||||||
entry = mails.get(req.arg1)
|
entry = mails.get(req.arg1)
|
||||||
if entry:
|
if entry:
|
||||||
write(ok("Contents follow"))
|
write(ok("Contents follow"))
|
||||||
with get_mail_fp(entry) as fp:
|
write(get_mail(entry))
|
||||||
for line in fp:
|
|
||||||
if line.startswith(b"."):
|
|
||||||
write(b".") # prepend dot
|
|
||||||
write(line)
|
|
||||||
# write(get_mail(entry)) # no prepend dot
|
|
||||||
write(end())
|
write(end())
|
||||||
mails.delete(req.arg1)
|
mails.delete(req.arg1)
|
||||||
else:
|
else:
|
||||||
@ -391,14 +386,13 @@ def debug_main():
|
|||||||
logging.basicConfig(level=logging.DEBUG)
|
logging.basicConfig(level=logging.DEBUG)
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
from .pwhash import gen_pwhash
|
|
||||||
|
|
||||||
_, mails_path, mbox = sys.argv
|
_, mails_path, port, password = sys.argv
|
||||||
|
|
||||||
mails_path = Path(mails_path)
|
mails_path = Path(mails_path)
|
||||||
users = [User(username="dummy", password_hash=gen_pwhash("dummy"), mbox=mbox)]
|
port = int(port)
|
||||||
|
|
||||||
asyncio.run(a_main("127.0.0.1", 1101, mails_path, users=users))
|
asyncio.run(a_main(mails_path, port, password_hash=password_hash))
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
@ -2,7 +2,6 @@ import os
|
|||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from enum import Enum, auto
|
from enum import Enum, auto
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from contextlib import contextmanager
|
|
||||||
|
|
||||||
|
|
||||||
class ClientError(Exception):
|
class ClientError(Exception):
|
||||||
@ -125,12 +124,6 @@ def set_nid(entries: list[MailEntry]):
|
|||||||
entry.nid = i
|
entry.nid = i
|
||||||
|
|
||||||
|
|
||||||
@contextmanager
|
|
||||||
def get_mail_fp(entry: MailEntry):
|
|
||||||
with open(entry.path, mode="rb") as fp:
|
|
||||||
yield fp
|
|
||||||
|
|
||||||
|
|
||||||
def get_mail(entry: MailEntry) -> bytes:
|
def get_mail(entry: MailEntry) -> bytes:
|
||||||
with open(entry.path, mode="rb") as fp:
|
with open(entry.path, mode="rb") as fp:
|
||||||
return fp.read()
|
return fp.read()
|
||||||
|
@ -67,11 +67,10 @@ class TestConfig(unittest.TestCase):
|
|||||||
def test_get_mboxes(self) -> None:
|
def test_get_mboxes(self) -> None:
|
||||||
cfg = config.Config(TEST_CONFIG)
|
cfg = config.Config(TEST_CONFIG)
|
||||||
rules = config.parse_checkers(cfg)
|
rules = config.parse_checkers(cfg)
|
||||||
self.assertEqual(config.get_mboxes("foo@bar.com", rules), ["spam"])
|
self.assertEqual(config.get_mboxes("foo@bar.com", rules), ['spam'])
|
||||||
self.assertEqual(config.get_mboxes("foo@mydomain.com", rules), ["all"])
|
self.assertEqual(config.get_mboxes("foo@mydomain.com", rules), ['all'])
|
||||||
self.assertEqual(
|
self.assertEqual(config.get_mboxes("first.last@mydomain.com", rules),
|
||||||
config.get_mboxes("first.last@mydomain.com", rules), ["important", "all"]
|
['important', 'all'])
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
@ -4,29 +4,20 @@ import logging
|
|||||||
import tempfile
|
import tempfile
|
||||||
import time
|
import time
|
||||||
import os
|
import os
|
||||||
import poplib
|
|
||||||
from mail4one.pop3 import create_pop_server
|
from mail4one.pop3 import create_pop_server
|
||||||
from mail4one.config import User
|
from mail4one.config import User
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
TEST_HASH = "".join(
|
TEST_HASH = "".join(c for c in """
|
||||||
"""
|
|
||||||
AFTY5EVN7AX47ZL7UMH3BETYWFBTAV3XHR73CEFAJBPN2NIHPWD
|
AFTY5EVN7AX47ZL7UMH3BETYWFBTAV3XHR73CEFAJBPN2NIHPWD
|
||||||
ZHV2UQSMSPHSQQ2A2BFQBNC77VL7F2UKATQNJZGYLCSU6C43UQD
|
ZHV2UQSMSPHSQQ2A2BFQBNC77VL7F2UKATQNJZGYLCSU6C43UQD
|
||||||
AQXWXSWNGAEPGIMG2F3QDKBXL3MRHY6K2BPID64ZR6LABLPVSF
|
AQXWXSWNGAEPGIMG2F3QDKBXL3MRHY6K2BPID64ZR6LABLPVSF
|
||||||
""".split()
|
""" if not c.isspace())
|
||||||
)
|
|
||||||
|
|
||||||
TEST_USER = "foobar"
|
TEST_USER = 'foobar'
|
||||||
TEST_MBOX = "foobar_mails"
|
TEST_MBOX = 'foobar_mails'
|
||||||
|
|
||||||
TEST_USER2 = "foo2"
|
USERS = [User(username=TEST_USER, password_hash=TEST_HASH, mbox=TEST_MBOX)]
|
||||||
TEST_MBOX2 = "foo2mails"
|
|
||||||
|
|
||||||
USERS = [
|
|
||||||
User(username=TEST_USER, password_hash=TEST_HASH, mbox=TEST_MBOX),
|
|
||||||
User(username=TEST_USER2, password_hash=TEST_HASH, mbox=TEST_MBOX2),
|
|
||||||
]
|
|
||||||
|
|
||||||
MAILS_PATH: Path
|
MAILS_PATH: Path
|
||||||
|
|
||||||
@ -47,8 +38,7 @@ Hello bro\r
|
|||||||
IlzVOJqu9Zp7twFAtzcV\r
|
IlzVOJqu9Zp7twFAtzcV\r
|
||||||
yQVk36B0mGU2gtWxXLr\r
|
yQVk36B0mGU2gtWxXLr\r
|
||||||
PeF0RtbI0mAuVPLQDHCi\r
|
PeF0RtbI0mAuVPLQDHCi\r
|
||||||
\r
|
\r\n"""
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
def setUpModule() -> None:
|
def setUpModule() -> None:
|
||||||
@ -57,21 +47,13 @@ def setUpModule() -> None:
|
|||||||
td = tempfile.TemporaryDirectory(prefix="m41.pop.")
|
td = tempfile.TemporaryDirectory(prefix="m41.pop.")
|
||||||
unittest.addModuleCleanup(td.cleanup)
|
unittest.addModuleCleanup(td.cleanup)
|
||||||
MAILS_PATH = Path(td.name)
|
MAILS_PATH = Path(td.name)
|
||||||
for mbox in (TEST_MBOX, TEST_MBOX2):
|
os.mkdir(MAILS_PATH / TEST_MBOX)
|
||||||
os.mkdir(MAILS_PATH / mbox)
|
for md in ('new', 'cur', 'tmp'):
|
||||||
for md in ("new", "cur", "tmp"):
|
os.mkdir(MAILS_PATH / TEST_MBOX / md)
|
||||||
os.mkdir(MAILS_PATH / mbox / md)
|
with open(MAILS_PATH / TEST_MBOX/ 'new/msg1.eml', 'wb') as f:
|
||||||
with open(MAILS_PATH / TEST_MBOX / "new/msg1.eml", "wb") as f:
|
|
||||||
f.write(TESTMAIL)
|
f.write(TESTMAIL)
|
||||||
with open(MAILS_PATH / TEST_MBOX / "new/msg2.eml", "wb") as f:
|
with open(MAILS_PATH / TEST_MBOX/ 'new/msg2.eml', 'wb') as f:
|
||||||
f.write(TESTMAIL)
|
f.write(TESTMAIL)
|
||||||
with open(MAILS_PATH / TEST_MBOX2 / "new/msg1.eml", "wb") as f:
|
|
||||||
f.write(TESTMAIL)
|
|
||||||
f.write(b"More lines to follow\r\n")
|
|
||||||
f.write(b".Line starts with a dot\r\n")
|
|
||||||
f.write(b"some more lines\r\n")
|
|
||||||
f.write(b".\r\n")
|
|
||||||
f.write(b"Previous line just has a dot\r\n")
|
|
||||||
logging.debug(MAILS_PATH)
|
logging.debug(MAILS_PATH)
|
||||||
|
|
||||||
|
|
||||||
@ -83,11 +65,13 @@ class TestPop3(unittest.IsolatedAsyncioTestCase):
|
|||||||
|
|
||||||
async def asyncSetUp(self) -> None:
|
async def asyncSetUp(self) -> None:
|
||||||
logging.debug("at asyncSetUp")
|
logging.debug("at asyncSetUp")
|
||||||
pop_server = await create_pop_server(
|
pop_server = await create_pop_server(host='127.0.0.1',
|
||||||
host="127.0.0.1", port=7995, mails_path=MAILS_PATH, users=USERS
|
port=7995,
|
||||||
)
|
mails_path=MAILS_PATH,
|
||||||
|
users=USERS)
|
||||||
self.task = asyncio.create_task(pop_server.serve_forever())
|
self.task = asyncio.create_task(pop_server.serve_forever())
|
||||||
self.reader, self.writer = await asyncio.open_connection("127.0.0.1", 7995)
|
self.reader, self.writer = await asyncio.open_connection(
|
||||||
|
'127.0.0.1', 7995)
|
||||||
|
|
||||||
async def test_QUIT(self) -> None:
|
async def test_QUIT(self) -> None:
|
||||||
dialog = """
|
dialog = """
|
||||||
@ -131,8 +115,8 @@ class TestPop3(unittest.IsolatedAsyncioTestCase):
|
|||||||
await self.dialog_checker(dialog)
|
await self.dialog_checker(dialog)
|
||||||
|
|
||||||
async def test_dupe_AUTH(self) -> None:
|
async def test_dupe_AUTH(self) -> None:
|
||||||
r1, w1 = await asyncio.open_connection("127.0.0.1", 7995)
|
r1, w1 = await asyncio.open_connection('127.0.0.1', 7995)
|
||||||
r2, w2 = await asyncio.open_connection("127.0.0.1", 7995)
|
r2, w2 = await asyncio.open_connection('127.0.0.1', 7995)
|
||||||
dialog = """
|
dialog = """
|
||||||
S: +OK Server Ready
|
S: +OK Server Ready
|
||||||
C: USER foobar
|
C: USER foobar
|
||||||
@ -213,22 +197,6 @@ class TestPop3(unittest.IsolatedAsyncioTestCase):
|
|||||||
"""
|
"""
|
||||||
await self.dialog_checker(dialog)
|
await self.dialog_checker(dialog)
|
||||||
|
|
||||||
async def test_poplib(self) -> None:
|
|
||||||
def run_poplib():
|
|
||||||
pc = poplib.POP3("127.0.0.1", 7995)
|
|
||||||
try:
|
|
||||||
self.assertEqual(b"+OK Server Ready", pc.getwelcome())
|
|
||||||
self.assertEqual(b"+OK Welcome", pc.user("foo2"))
|
|
||||||
self.assertEqual(b"+OK Login successful", pc.pass_("helloworld"))
|
|
||||||
_, eml, oc = pc.retr(1)
|
|
||||||
self.assertIn(b"Previous line just has a dot", eml)
|
|
||||||
self.assertIn(b".Line starts with a dot", eml)
|
|
||||||
self.assertIn(b".", eml)
|
|
||||||
finally:
|
|
||||||
pc.quit()
|
|
||||||
|
|
||||||
await asyncio.to_thread(run_poplib)
|
|
||||||
|
|
||||||
async def asyncTearDown(self) -> None:
|
async def asyncTearDown(self) -> None:
|
||||||
logging.debug("at teardown")
|
logging.debug("at teardown")
|
||||||
self.writer.close()
|
self.writer.close()
|
||||||
@ -238,9 +206,9 @@ class TestPop3(unittest.IsolatedAsyncioTestCase):
|
|||||||
async def dialog_checker(self, dialog: str) -> None:
|
async def dialog_checker(self, dialog: str) -> None:
|
||||||
await self.dialog_checker_impl(self.reader, self.writer, dialog)
|
await self.dialog_checker_impl(self.reader, self.writer, dialog)
|
||||||
|
|
||||||
async def dialog_checker_impl(
|
async def dialog_checker_impl(self, reader: asyncio.StreamReader,
|
||||||
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, dialog: str
|
writer: asyncio.StreamWriter,
|
||||||
) -> None:
|
dialog: str) -> None:
|
||||||
for line in dialog.splitlines():
|
for line in dialog.splitlines():
|
||||||
line = line.strip()
|
line = line.strip()
|
||||||
if not line:
|
if not line:
|
||||||
@ -254,5 +222,5 @@ class TestPop3(unittest.IsolatedAsyncioTestCase):
|
|||||||
self.assertEqual(data, resp)
|
self.assertEqual(data, resp)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
@ -10,31 +10,27 @@ class TestPWHash(unittest.TestCase):
|
|||||||
pwinfo = parse_hash(pwhash)
|
pwinfo = parse_hash(pwhash)
|
||||||
self.assertEqual(len(pwinfo.salt), SALT_LEN)
|
self.assertEqual(len(pwinfo.salt), SALT_LEN)
|
||||||
self.assertEqual(len(pwinfo.scrypt_hash), KEY_LEN)
|
self.assertEqual(len(pwinfo.scrypt_hash), KEY_LEN)
|
||||||
self.assertTrue(
|
self.assertTrue(check_pass(password, pwinfo),
|
||||||
check_pass(password, pwinfo), "check pass with correct password"
|
"check pass with correct password")
|
||||||
)
|
self.assertFalse(check_pass("foobar", pwinfo),
|
||||||
self.assertFalse(check_pass("foobar", pwinfo), "check pass with wrong password")
|
"check pass with wrong password")
|
||||||
|
|
||||||
def test_hardcoded_hash(self):
|
def test_hardcoded_hash(self):
|
||||||
test_hash = "".join(
|
test_hash = "".join(c for c in """
|
||||||
c
|
|
||||||
for c in """
|
|
||||||
AFTY5EVN7AX47ZL7UMH3BETYWFBTAV3XHR73CEFAJBPN2NIHPWD
|
AFTY5EVN7AX47ZL7UMH3BETYWFBTAV3XHR73CEFAJBPN2NIHPWD
|
||||||
ZHV2UQSMSPHSQQ2A2BFQBNC77VL7F2UKATQNJZGYLCSU6C43UQD
|
ZHV2UQSMSPHSQQ2A2BFQBNC77VL7F2UKATQNJZGYLCSU6C43UQD
|
||||||
AQXWXSWNGAEPGIMG2F3QDKBXL3MRHY6K2BPID64ZR6LABLPVSF
|
AQXWXSWNGAEPGIMG2F3QDKBXL3MRHY6K2BPID64ZR6LABLPVSF
|
||||||
"""
|
""" if not c.isspace())
|
||||||
if not c.isspace()
|
|
||||||
)
|
|
||||||
pwinfo = parse_hash(test_hash)
|
pwinfo = parse_hash(test_hash)
|
||||||
self.assertTrue(
|
self.assertTrue(check_pass("helloworld", pwinfo),
|
||||||
check_pass("helloworld", pwinfo), "check pass with correct password"
|
"check pass with correct password")
|
||||||
)
|
self.assertFalse(check_pass("foobar", pwinfo),
|
||||||
self.assertFalse(check_pass("foobar", pwinfo), "check pass with wrong password")
|
"check pass with wrong password")
|
||||||
|
|
||||||
def test_invalid_hash(self):
|
def test_invalid_hash(self):
|
||||||
with self.assertRaises(Exception):
|
with self.assertRaises(Exception):
|
||||||
parse_hash("sdlfkjdsklfjdsk")
|
parse_hash("sdlfkjdsklfjdsk")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
@ -10,7 +10,7 @@ from pathlib import Path
|
|||||||
|
|
||||||
from mail4one.smtp import create_smtp_server
|
from mail4one.smtp import create_smtp_server
|
||||||
|
|
||||||
TEST_MBOX = "foobar_mails"
|
TEST_MBOX = 'foobar_mails'
|
||||||
MAILS_PATH: Path
|
MAILS_PATH: Path
|
||||||
|
|
||||||
|
|
||||||
@ -21,7 +21,7 @@ def setUpModule() -> None:
|
|||||||
unittest.addModuleCleanup(td.cleanup)
|
unittest.addModuleCleanup(td.cleanup)
|
||||||
MAILS_PATH = Path(td.name)
|
MAILS_PATH = Path(td.name)
|
||||||
os.mkdir(MAILS_PATH / TEST_MBOX)
|
os.mkdir(MAILS_PATH / TEST_MBOX)
|
||||||
for md in ("new", "cur", "tmp"):
|
for md in ('new', 'cur', 'tmp'):
|
||||||
os.mkdir(MAILS_PATH / TEST_MBOX / md)
|
os.mkdir(MAILS_PATH / TEST_MBOX / md)
|
||||||
|
|
||||||
|
|
||||||
@ -32,8 +32,7 @@ class TestSMTP(unittest.IsolatedAsyncioTestCase):
|
|||||||
host="127.0.0.1",
|
host="127.0.0.1",
|
||||||
port=7996,
|
port=7996,
|
||||||
mails_path=MAILS_PATH,
|
mails_path=MAILS_PATH,
|
||||||
mbox_finder=lambda addr: [TEST_MBOX],
|
mbox_finder=lambda addr: [TEST_MBOX])
|
||||||
)
|
|
||||||
self.task = asyncio.create_task(smtp_server.serve_forever())
|
self.task = asyncio.create_task(smtp_server.serve_forever())
|
||||||
|
|
||||||
async def test_send_mail(self) -> None:
|
async def test_send_mail(self) -> None:
|
||||||
@ -46,9 +45,8 @@ class TestSMTP(unittest.IsolatedAsyncioTestCase):
|
|||||||
msg = b"".join(l.strip() + b"\r\n" for l in msg.splitlines())
|
msg = b"".join(l.strip() + b"\r\n" for l in msg.splitlines())
|
||||||
|
|
||||||
def send_mail():
|
def send_mail():
|
||||||
with contextlib.closing(
|
with contextlib.closing(smtplib.SMTP(host="127.0.0.1",
|
||||||
smtplib.SMTP(host="127.0.0.1", port=7996)
|
port=7996)) as client:
|
||||||
) as client:
|
|
||||||
client.sendmail("foo@sender.com", "foo@bar.com", msg)
|
client.sendmail("foo@sender.com", "foo@bar.com", msg)
|
||||||
_, local_port = client.sock.getsockname()
|
_, local_port = client.sock.getsockname()
|
||||||
return local_port
|
return local_port
|
||||||
@ -64,7 +62,7 @@ class TestSMTP(unittest.IsolatedAsyncioTestCase):
|
|||||||
Byee
|
Byee
|
||||||
"""
|
"""
|
||||||
expected = "".join(l.strip() + "\r\n" for l in expected.splitlines())
|
expected = "".join(l.strip() + "\r\n" for l in expected.splitlines())
|
||||||
mails = list((MAILS_PATH / TEST_MBOX / "new").glob("*"))
|
mails = list((MAILS_PATH / TEST_MBOX / 'new').glob("*"))
|
||||||
self.assertEqual(len(mails), 1)
|
self.assertEqual(len(mails), 1)
|
||||||
self.assertEqual(mails[0].read_bytes(), expected.encode())
|
self.assertEqual(mails[0].read_bytes(), expected.encode())
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user