pop3 should byte-stuff lines starting with dot #5
@ -29,7 +29,7 @@ from .poputils import (
|
||||
end,
|
||||
Request,
|
||||
MailEntry,
|
||||
get_mail,
|
||||
get_mail_fp,
|
||||
get_mails_list,
|
||||
MailList,
|
||||
)
|
||||
@ -217,7 +217,12 @@ def trans_command_retr(mails: MailList, req: Request) -> None:
|
||||
entry = mails.get(req.arg1)
|
||||
if entry:
|
||||
write(ok("Contents follow"))
|
||||
write(get_mail(entry))
|
||||
with get_mail_fp(entry) as fp:
|
||||
for line in fp:
|
||||
if line.startswith(b"."):
|
||||
write(b".") # prepend dot
|
||||
write(line)
|
||||
# write(get_mail(entry)) # no prepend dot
|
||||
write(end())
|
||||
mails.delete(req.arg1)
|
||||
else:
|
||||
|
@ -2,6 +2,7 @@ import os
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum, auto
|
||||
from pathlib import Path
|
||||
from contextlib import contextmanager
|
||||
|
||||
|
||||
class ClientError(Exception):
|
||||
@ -124,6 +125,12 @@ def set_nid(entries: list[MailEntry]):
|
||||
entry.nid = i
|
||||
|
||||
|
||||
@contextmanager
|
||||
def get_mail_fp(entry: MailEntry):
|
||||
with open(entry.path, mode="rb") as fp:
|
||||
yield fp
|
||||
|
||||
|
||||
def get_mail(entry: MailEntry) -> bytes:
|
||||
with open(entry.path, mode="rb") as fp:
|
||||
return fp.read()
|
||||
|
@ -4,6 +4,7 @@ import logging
|
||||
import tempfile
|
||||
import time
|
||||
import os
|
||||
import poplib
|
||||
from mail4one.pop3 import create_pop_server
|
||||
from mail4one.config import User
|
||||
from pathlib import Path
|
||||
@ -19,7 +20,13 @@ AQXWXSWNGAEPGIMG2F3QDKBXL3MRHY6K2BPID64ZR6LABLPVSF
|
||||
TEST_USER = "foobar"
|
||||
TEST_MBOX = "foobar_mails"
|
||||
|
||||
USERS = [User(username=TEST_USER, password_hash=TEST_HASH, mbox=TEST_MBOX)]
|
||||
TEST_USER2 = "foo2"
|
||||
TEST_MBOX2 = "foo2mails"
|
||||
|
||||
USERS = [
|
||||
User(username=TEST_USER, password_hash=TEST_HASH, mbox=TEST_MBOX),
|
||||
User(username=TEST_USER2, password_hash=TEST_HASH, mbox=TEST_MBOX2),
|
||||
]
|
||||
|
||||
MAILS_PATH: Path
|
||||
|
||||
@ -50,13 +57,21 @@ def setUpModule() -> None:
|
||||
td = tempfile.TemporaryDirectory(prefix="m41.pop.")
|
||||
unittest.addModuleCleanup(td.cleanup)
|
||||
MAILS_PATH = Path(td.name)
|
||||
os.mkdir(MAILS_PATH / TEST_MBOX)
|
||||
for mbox in (TEST_MBOX, TEST_MBOX2):
|
||||
os.mkdir(MAILS_PATH / mbox)
|
||||
for md in ("new", "cur", "tmp"):
|
||||
os.mkdir(MAILS_PATH / TEST_MBOX / md)
|
||||
os.mkdir(MAILS_PATH / mbox / md)
|
||||
with open(MAILS_PATH / TEST_MBOX / "new/msg1.eml", "wb") as f:
|
||||
f.write(TESTMAIL)
|
||||
with open(MAILS_PATH / TEST_MBOX / "new/msg2.eml", "wb") as f:
|
||||
f.write(TESTMAIL)
|
||||
with open(MAILS_PATH / TEST_MBOX2 / "new/msg1.eml", "wb") as f:
|
||||
f.write(TESTMAIL)
|
||||
f.write(b"More lines to follow\r\n")
|
||||
f.write(b".Line starts with a dot\r\n")
|
||||
f.write(b"some more lines\r\n")
|
||||
f.write(b".\r\n")
|
||||
f.write(b"Previous line just has a dot\r\n")
|
||||
logging.debug(MAILS_PATH)
|
||||
|
||||
|
||||
@ -198,6 +213,22 @@ class TestPop3(unittest.IsolatedAsyncioTestCase):
|
||||
"""
|
||||
await self.dialog_checker(dialog)
|
||||
|
||||
async def test_poplib(self) -> None:
|
||||
def run_poplib():
|
||||
pc = poplib.POP3("127.0.0.1", 7995)
|
||||
try:
|
||||
self.assertEqual(b"+OK Server Ready", pc.getwelcome())
|
||||
self.assertEqual(b"+OK Welcome", pc.user("foo2"))
|
||||
self.assertEqual(b"+OK Login successful", pc.pass_("helloworld"))
|
||||
_, eml, oc = pc.retr(1)
|
||||
self.assertIn(b"Previous line just has a dot", eml)
|
||||
self.assertIn(b".Line starts with a dot", eml)
|
||||
self.assertIn(b".", eml)
|
||||
finally:
|
||||
pc.quit()
|
||||
|
||||
await asyncio.to_thread(run_poplib)
|
||||
|
||||
async def asyncTearDown(self) -> None:
|
||||
logging.debug("at teardown")
|
||||
self.writer.close()
|
||||
|
Loading…
Reference in New Issue
Block a user