pop3 should byte-stuff lines starting with dot #5
@ -29,7 +29,7 @@ from .poputils import (
|
|||||||
end,
|
end,
|
||||||
Request,
|
Request,
|
||||||
MailEntry,
|
MailEntry,
|
||||||
get_mail,
|
get_mail_fp,
|
||||||
get_mails_list,
|
get_mails_list,
|
||||||
MailList,
|
MailList,
|
||||||
)
|
)
|
||||||
@ -217,7 +217,12 @@ def trans_command_retr(mails: MailList, req: Request) -> None:
|
|||||||
entry = mails.get(req.arg1)
|
entry = mails.get(req.arg1)
|
||||||
if entry:
|
if entry:
|
||||||
write(ok("Contents follow"))
|
write(ok("Contents follow"))
|
||||||
write(get_mail(entry))
|
with get_mail_fp(entry) as fp:
|
||||||
|
for line in fp:
|
||||||
|
if line.startswith(b"."):
|
||||||
|
write(b".") # prepend dot
|
||||||
|
write(line)
|
||||||
|
# write(get_mail(entry)) # no prepend dot
|
||||||
write(end())
|
write(end())
|
||||||
mails.delete(req.arg1)
|
mails.delete(req.arg1)
|
||||||
else:
|
else:
|
||||||
|
@ -2,6 +2,7 @@ import os
|
|||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from enum import Enum, auto
|
from enum import Enum, auto
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from contextlib import contextmanager
|
||||||
|
|
||||||
|
|
||||||
class ClientError(Exception):
|
class ClientError(Exception):
|
||||||
@ -124,6 +125,12 @@ def set_nid(entries: list[MailEntry]):
|
|||||||
entry.nid = i
|
entry.nid = i
|
||||||
|
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def get_mail_fp(entry: MailEntry):
|
||||||
|
with open(entry.path, mode="rb") as fp:
|
||||||
|
yield fp
|
||||||
|
|
||||||
|
|
||||||
def get_mail(entry: MailEntry) -> bytes:
|
def get_mail(entry: MailEntry) -> bytes:
|
||||||
with open(entry.path, mode="rb") as fp:
|
with open(entry.path, mode="rb") as fp:
|
||||||
return fp.read()
|
return fp.read()
|
||||||
|
@ -4,6 +4,7 @@ import logging
|
|||||||
import tempfile
|
import tempfile
|
||||||
import time
|
import time
|
||||||
import os
|
import os
|
||||||
|
import poplib
|
||||||
from mail4one.pop3 import create_pop_server
|
from mail4one.pop3 import create_pop_server
|
||||||
from mail4one.config import User
|
from mail4one.config import User
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
@ -19,7 +20,13 @@ AQXWXSWNGAEPGIMG2F3QDKBXL3MRHY6K2BPID64ZR6LABLPVSF
|
|||||||
TEST_USER = "foobar"
|
TEST_USER = "foobar"
|
||||||
TEST_MBOX = "foobar_mails"
|
TEST_MBOX = "foobar_mails"
|
||||||
|
|
||||||
USERS = [User(username=TEST_USER, password_hash=TEST_HASH, mbox=TEST_MBOX)]
|
TEST_USER2 = "foo2"
|
||||||
|
TEST_MBOX2 = "foo2mails"
|
||||||
|
|
||||||
|
USERS = [
|
||||||
|
User(username=TEST_USER, password_hash=TEST_HASH, mbox=TEST_MBOX),
|
||||||
|
User(username=TEST_USER2, password_hash=TEST_HASH, mbox=TEST_MBOX2),
|
||||||
|
]
|
||||||
|
|
||||||
MAILS_PATH: Path
|
MAILS_PATH: Path
|
||||||
|
|
||||||
@ -50,13 +57,21 @@ def setUpModule() -> None:
|
|||||||
td = tempfile.TemporaryDirectory(prefix="m41.pop.")
|
td = tempfile.TemporaryDirectory(prefix="m41.pop.")
|
||||||
unittest.addModuleCleanup(td.cleanup)
|
unittest.addModuleCleanup(td.cleanup)
|
||||||
MAILS_PATH = Path(td.name)
|
MAILS_PATH = Path(td.name)
|
||||||
os.mkdir(MAILS_PATH / TEST_MBOX)
|
for mbox in (TEST_MBOX, TEST_MBOX2):
|
||||||
|
os.mkdir(MAILS_PATH / mbox)
|
||||||
for md in ("new", "cur", "tmp"):
|
for md in ("new", "cur", "tmp"):
|
||||||
os.mkdir(MAILS_PATH / TEST_MBOX / md)
|
os.mkdir(MAILS_PATH / mbox / md)
|
||||||
with open(MAILS_PATH / TEST_MBOX / "new/msg1.eml", "wb") as f:
|
with open(MAILS_PATH / TEST_MBOX / "new/msg1.eml", "wb") as f:
|
||||||
f.write(TESTMAIL)
|
f.write(TESTMAIL)
|
||||||
with open(MAILS_PATH / TEST_MBOX / "new/msg2.eml", "wb") as f:
|
with open(MAILS_PATH / TEST_MBOX / "new/msg2.eml", "wb") as f:
|
||||||
f.write(TESTMAIL)
|
f.write(TESTMAIL)
|
||||||
|
with open(MAILS_PATH / TEST_MBOX2 / "new/msg1.eml", "wb") as f:
|
||||||
|
f.write(TESTMAIL)
|
||||||
|
f.write(b"More lines to follow\r\n")
|
||||||
|
f.write(b".Line starts with a dot\r\n")
|
||||||
|
f.write(b"some more lines\r\n")
|
||||||
|
f.write(b".\r\n")
|
||||||
|
f.write(b"Previous line just has a dot\r\n")
|
||||||
logging.debug(MAILS_PATH)
|
logging.debug(MAILS_PATH)
|
||||||
|
|
||||||
|
|
||||||
@ -198,6 +213,22 @@ class TestPop3(unittest.IsolatedAsyncioTestCase):
|
|||||||
"""
|
"""
|
||||||
await self.dialog_checker(dialog)
|
await self.dialog_checker(dialog)
|
||||||
|
|
||||||
|
async def test_poplib(self) -> None:
|
||||||
|
def run_poplib():
|
||||||
|
pc = poplib.POP3("127.0.0.1", 7995)
|
||||||
|
try:
|
||||||
|
self.assertEqual(b"+OK Server Ready", pc.getwelcome())
|
||||||
|
self.assertEqual(b"+OK Welcome", pc.user("foo2"))
|
||||||
|
self.assertEqual(b"+OK Login successful", pc.pass_("helloworld"))
|
||||||
|
_, eml, oc = pc.retr(1)
|
||||||
|
self.assertIn(b"Previous line just has a dot", eml)
|
||||||
|
self.assertIn(b".Line starts with a dot", eml)
|
||||||
|
self.assertIn(b".", eml)
|
||||||
|
finally:
|
||||||
|
pc.quit()
|
||||||
|
|
||||||
|
await asyncio.to_thread(run_poplib)
|
||||||
|
|
||||||
async def asyncTearDown(self) -> None:
|
async def asyncTearDown(self) -> None:
|
||||||
logging.debug("at teardown")
|
logging.debug("at teardown")
|
||||||
self.writer.close()
|
self.writer.close()
|
||||||
|
Loading…
Reference in New Issue
Block a user