pop3 should byte-stuff lines starting with dot #5

Merged
balki merged 4 commits from popdotstuff into main 2024-04-01 17:51:23 -04:00
3 changed files with 49 additions and 6 deletions

View File

@ -29,7 +29,7 @@ from .poputils import (
end, end,
Request, Request,
MailEntry, MailEntry,
get_mail, get_mail_fp,
get_mails_list, get_mails_list,
MailList, MailList,
) )
@ -217,7 +217,12 @@ def trans_command_retr(mails: MailList, req: Request) -> None:
entry = mails.get(req.arg1) entry = mails.get(req.arg1)
if entry: if entry:
write(ok("Contents follow")) write(ok("Contents follow"))
write(get_mail(entry)) with get_mail_fp(entry) as fp:
for line in fp:
if line.startswith(b"."):
write(b".") # prepend dot
write(line)
# write(get_mail(entry)) # no prepend dot
write(end()) write(end())
mails.delete(req.arg1) mails.delete(req.arg1)
else: else:

View File

@ -2,6 +2,7 @@ import os
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum, auto from enum import Enum, auto
from pathlib import Path from pathlib import Path
from contextlib import contextmanager
class ClientError(Exception): class ClientError(Exception):
@ -124,6 +125,12 @@ def set_nid(entries: list[MailEntry]):
entry.nid = i entry.nid = i
@contextmanager
def get_mail_fp(entry: MailEntry):
with open(entry.path, mode="rb") as fp:
yield fp
def get_mail(entry: MailEntry) -> bytes: def get_mail(entry: MailEntry) -> bytes:
with open(entry.path, mode="rb") as fp: with open(entry.path, mode="rb") as fp:
return fp.read() return fp.read()

View File

@ -4,6 +4,7 @@ import logging
import tempfile import tempfile
import time import time
import os import os
import poplib
from mail4one.pop3 import create_pop_server from mail4one.pop3 import create_pop_server
from mail4one.config import User from mail4one.config import User
from pathlib import Path from pathlib import Path
@ -19,7 +20,13 @@ AQXWXSWNGAEPGIMG2F3QDKBXL3MRHY6K2BPID64ZR6LABLPVSF
TEST_USER = "foobar" TEST_USER = "foobar"
TEST_MBOX = "foobar_mails" TEST_MBOX = "foobar_mails"
USERS = [User(username=TEST_USER, password_hash=TEST_HASH, mbox=TEST_MBOX)] TEST_USER2 = "foo2"
TEST_MBOX2 = "foo2mails"
USERS = [
User(username=TEST_USER, password_hash=TEST_HASH, mbox=TEST_MBOX),
User(username=TEST_USER2, password_hash=TEST_HASH, mbox=TEST_MBOX2),
]
MAILS_PATH: Path MAILS_PATH: Path
@ -50,13 +57,21 @@ def setUpModule() -> None:
td = tempfile.TemporaryDirectory(prefix="m41.pop.") td = tempfile.TemporaryDirectory(prefix="m41.pop.")
unittest.addModuleCleanup(td.cleanup) unittest.addModuleCleanup(td.cleanup)
MAILS_PATH = Path(td.name) MAILS_PATH = Path(td.name)
os.mkdir(MAILS_PATH / TEST_MBOX) for mbox in (TEST_MBOX, TEST_MBOX2):
for md in ("new", "cur", "tmp"): os.mkdir(MAILS_PATH / mbox)
os.mkdir(MAILS_PATH / TEST_MBOX / md) for md in ("new", "cur", "tmp"):
os.mkdir(MAILS_PATH / mbox / md)
with open(MAILS_PATH / TEST_MBOX / "new/msg1.eml", "wb") as f: with open(MAILS_PATH / TEST_MBOX / "new/msg1.eml", "wb") as f:
f.write(TESTMAIL) f.write(TESTMAIL)
with open(MAILS_PATH / TEST_MBOX / "new/msg2.eml", "wb") as f: with open(MAILS_PATH / TEST_MBOX / "new/msg2.eml", "wb") as f:
f.write(TESTMAIL) f.write(TESTMAIL)
with open(MAILS_PATH / TEST_MBOX2 / "new/msg1.eml", "wb") as f:
f.write(TESTMAIL)
f.write(b"More lines to follow\r\n")
f.write(b".Line starts with a dot\r\n")
f.write(b"some more lines\r\n")
f.write(b".\r\n")
f.write(b"Previous line just has a dot\r\n")
logging.debug(MAILS_PATH) logging.debug(MAILS_PATH)
@ -198,6 +213,22 @@ class TestPop3(unittest.IsolatedAsyncioTestCase):
""" """
await self.dialog_checker(dialog) await self.dialog_checker(dialog)
async def test_poplib(self) -> None:
def run_poplib():
pc = poplib.POP3("127.0.0.1", 7995)
try:
self.assertEqual(b"+OK Server Ready", pc.getwelcome())
self.assertEqual(b"+OK Welcome", pc.user("foo2"))
self.assertEqual(b"+OK Login successful", pc.pass_("helloworld"))
_, eml, oc = pc.retr(1)
self.assertIn(b"Previous line just has a dot", eml)
self.assertIn(b".Line starts with a dot", eml)
self.assertIn(b".", eml)
finally:
pc.quit()
await asyncio.to_thread(run_poplib)
async def asyncTearDown(self) -> None: async def asyncTearDown(self) -> None:
logging.debug("at teardown") logging.debug("at teardown")
self.writer.close() self.writer.close()