9 Commits
py38 ... v1.1

16 changed files with 183 additions and 113 deletions

View File

@ -1,8 +1,10 @@
# Needs python3 >= 3.9, sed, git for build # Needs python3 >= 3.9, sed, git for build, docker for tests
build: clean build: clean
python3 -m pip install -r requirements.txt --no-compile --target build python3 -m pip install -r requirements.txt --no-compile --target build
cp -r mail4one/ build/ cp -r mail4one/ build/
sed -i "s/DEVELOMENT/$(shell scripts/get_version.sh)/" build/mail4one/version.py sed -i "s/DEVELOMENT/$(shell scripts/get_version.sh)/" build/mail4one/version.py
find build -name "*.pyi" -o -name "py.typed" | xargs -I typefile rm typefile
rm -rf build/bin
rm -rf build/mail4one/__pycache__ rm -rf build/mail4one/__pycache__
rm -rf build/*.dist-info rm -rf build/*.dist-info
python3 -m zipapp \ python3 -m zipapp \
@ -18,17 +20,19 @@ clean:
docker-tests: docker-tests:
docker run --pull=always -v `pwd`:/app -w /app --rm python:3.11-alpine sh scripts/runtests.sh docker run --pull=always -v `pwd`:/app -w /app --rm python:3.11-alpine sh scripts/runtests.sh
docker run --pull=always -v `pwd`:/app -w /app --rm python:3.10-alpine sh scripts/runtests.sh docker run --pull=always -v `pwd`:/app -w /app --rm python:3.10-alpine sh scripts/runtests.sh
docker run --pull=always -v `pwd`:/app -w /app --rm python:3.12 sh scripts/runtests.sh
docker run --pull=always -v `pwd`:/app -w /app --rm python:3.11 sh scripts/runtests.sh docker run --pull=always -v `pwd`:/app -w /app --rm python:3.11 sh scripts/runtests.sh
docker run --pull=always -v `pwd`:/app -w /app --rm python:3.10 sh scripts/runtests.sh docker run --pull=always -v `pwd`:/app -w /app --rm python:3.10 sh scripts/runtests.sh
docker run --pull=always -v `pwd`:/app -w /app --rm python:3.9 sh scripts/runtests.sh docker run --pull=always -v `pwd`:/app -w /app --rm python:3.9 sh scripts/runtests.sh
# ============================================================================ # ============================================================================
# Below targets for devs. Need pipenv, black installed
requirements.txt: Pipfile.lock requirements.txt: Pipfile.lock
pipenv requirements > requirements.txt pipenv requirements > requirements.txt
format: format:
black mail4one/*py black mail4one/*py tests/*py
build-dev: requirements.txt build build-dev: requirements.txt build
@ -38,6 +42,11 @@ setup:
cleanup: cleanup:
pipenv --rm pipenv --rm
update:
rm requirements.txt Pipfile.lock
pipenv update
pipenv requirements > requirements.txt
shell: shell:
MYPYPATH=`pipenv --venv`/lib/python3.11/site-packages pipenv shell MYPYPATH=`pipenv --venv`/lib/python3.11/site-packages pipenv shell

20
Pipfile.lock generated
View File

@ -18,27 +18,28 @@
"default": { "default": {
"aiosmtpd": { "aiosmtpd": {
"hashes": [ "hashes": [
"sha256:f821fe424b703b2ea391dc2df11d89d2afd728af27393e13cf1a3530f19fdc5e", "sha256:78d7b14f859ad0e6de252b47f9cf1ca6f1c82a8b0f10a9e39bec7e915a6aa5fe",
"sha256:f9243b7dfe00aaf567da8728d891752426b51392174a34d2cf5c18053b63dcbc" "sha256:a196922f1903e54c4d37c53415b7613056d39e2b1e8249f324b9ee7a439be0f1"
], ],
"index": "pypi", "index": "pypi",
"version": "==1.4.4.post2" "markers": "python_version >= '3.8'",
"version": "==1.4.5"
}, },
"atpublic": { "atpublic": {
"hashes": [ "hashes": [
"sha256:0f40433219e124edf115c6c363808ca6f0e1cfa7d160d86b2fb94793086d1294", "sha256:d1c8cd931af7461f6d18bc6063383e8654d9e9ef19d58ee6dc01e8515bbf55df",
"sha256:80057c55641253b86dcb68b524f82328172371b6547d4c7462a9127fbfbbabfc" "sha256:df90de1162b1a941ee486f484691dc7c33123ee638ea5d6ca604061306e0fdde"
], ],
"markers": "python_version >= '3.8'", "markers": "python_version >= '3.8'",
"version": "==4.0" "version": "==4.1.0"
}, },
"attrs": { "attrs": {
"hashes": [ "hashes": [
"sha256:1f28b4522cdc2fb4256ac1a020c78acf9cba2c6b461ccd2c126f3aa8e8335d04", "sha256:935dc3b529c262f6cf76e50877d35a4bd3c1de194fd41f47a2b7ae8f19971f30",
"sha256:6279836d581513a26f1bf235f9acd333bc9115683f14f7e8fae46c98fc50e015" "sha256:99b87a485a5820b23b879f04c2305b44b951b502fd64be915879d77a7e8fc6f1"
], ],
"markers": "python_version >= '3.7'", "markers": "python_version >= '3.7'",
"version": "==23.1.0" "version": "==23.2.0"
}, },
"python-jata": { "python-jata": {
"hashes": [ "hashes": [
@ -46,6 +47,7 @@
"sha256:ff4cd7ca75c9a8306b69ef6e878c296a5602f3279c6f9a82b6105b8eba764760" "sha256:ff4cd7ca75c9a8306b69ef6e878c296a5602f3279c6f9a82b6105b8eba764760"
], ],
"index": "pypi", "index": "pypi",
"markers": "python_version >= '3.8'",
"version": "==1.2" "version": "==1.2"
} }
}, },

View File

@ -1,6 +1,6 @@
# Mail4one # Mail4one
Personal mail server for a single user or a small family. Written in pure python with minimal dependencies. Personal mail server for a single user or a small family. Written in pure python with [minimal dependencies](Pipfile).
Designed for dynamic alias based workflow where a different alias is used for each purpose. Designed for dynamic alias based workflow where a different alias is used for each purpose.
# Getting started # Getting started
@ -23,7 +23,7 @@ Mail4one only takes care of receiving and serving email. For sending email, use
Most of them have generous free tier which is more than enough for personal use. Most of them have generous free tier which is more than enough for personal use.
Sending email is tricky. Even if everything is correctly setup (DMARC, DKIM, SPF), popular email vendors like google, microsoft may mark emails sent from your IP as spam for no reason. Sending email is tricky. Even if everything is correctly setup (DMARC, DKIM, SPF), popular email vendors like google, microsoft may mark emails sent from your IP as spam for no reason. Hence using a dedicated service is the only reliable way to send emails.
# Community # Community
@ -62,7 +62,7 @@ This should generate `mail4one.pyz` in current folder. This is a [executable pyt
* Write dedicated documentation * Write dedicated documentation
* Test with more email clients ([Thunderbird](https://www.thunderbird.net/) and [k9mail](https://k9mail.app/) are tested now) * Test with more email clients ([Thunderbird](https://www.thunderbird.net/) and [k9mail](https://k9mail.app/) are tested now)
* IMAP support * IMAP support
* Web UI for editing config * Web UI for editing config ([WIP](https://github.com/mail4one/mail4one/tree/webform))
* Support email submission from client to forward to other senders or direct delivery * Support email submission from client to forward to other senders or direct delivery
* Optional SPAM filtering * Optional SPAM filtering
* Optional DMARC,SPF,DKIM verification * Optional DMARC,SPF,DKIM verification

View File

@ -81,15 +81,16 @@ systemctl status mail4one
Above command should fail as the TLS certificates don't exist yet. Above command should fail as the TLS certificates don't exist yet.
## Setup TLS certificates ## Setup TLS certificates
Install [certbot](https://certbot.eff.org/) and run below command. Follow instructions to create TLS certificates. Usually you want certificate for domain name like `mail.example.com` Install [certbot](https://certbot.eff.org/) and run below command. Follow instructions to create TLS certificates. Usually you want certificate for domain name like `mail.mydomain.com`
```sh ```sh
sudo certbot certonly sudo certbot certonly
sudo cp /etc/letsencrypt/live/mail.example.com/{fullchain,privkey}.pem /var/lib/mail4one/certs/
sudo chown mail4one:mail4one /var/lib/mail4one/certs/{fullchain,privkey}.pem
# Edit mail4one_cert_copy.sh to update your domain name # **Edit** mail4one_cert_copy.sh to update your domain name
sudo cp mail4one_cert_copy.sh /etc/letsencrypt/renewal-hooks/deploy/ sudo cp mail4one_cert_copy.sh /etc/letsencrypt/renewal-hooks/deploy/
sudo chmod +x /etc/letsencrypt/renewal-hooks/deploy/mail4one_cert_copy.sh sudo chmod +x /etc/letsencrypt/renewal-hooks/deploy/mail4one_cert_copy.sh
# This will create and copy the certificates to the right path with correct permissions and ownership
sudo certbot certonly -d mail.mydomain.com --run-deploy-hooks --dry-run
``` ```
## Restart service and check logs ## Restart service and check logs
```sh ```sh
@ -109,6 +110,6 @@ python3 -m http.server 25
In local machine or a browser In local machine or a browser
You should see file listing a, b, c. Repeat for port 465, 995 to make sure firewall rules and dns is working You should see file listing a, b, c. Repeat for port 465, 995 to make sure firewall rules and dns is working
```sh ```sh
curl http://mail.example.com:25 curl http://mail.mydomain.com:25
``` ```
If not working, refer to VPS settings and OS firewall settings. If not working, refer to VPS settings and OS firewall settings.

View File

@ -2,19 +2,26 @@
# certbot deploy hook to copy certificates to mail4one when renewed. # certbot deploy hook to copy certificates to mail4one when renewed.
# Initial setup, Install certbot(https://certbot.eff.org/) and run `certbot certonly` as root # Initial setup, Install certbot(https://certbot.eff.org/) and run `certbot certonly` as root
# Doc: https://eff-certbot.readthedocs.io/en/latest/using.html#renewing-certificates
# #
# This file is supposed to be copied to /etc/letsencrypt/renewal-hooks/deploy/ # This file is supposed to be copied to /etc/letsencrypt/renewal-hooks/deploy/
# Change the mail domain to the one on MX record # Change the mail domain to the one on MX record
set -x set -eu
if [ "$RENEWED_DOMAINS" = "mail.mydomain.com" ] if [ "$RENEWED_DOMAINS" = "mail.mydomain.com" ]
then then
mkdir -p /var/lib/mail4one/certs app=mail4one
chmod 750 /var/lib/mail4one/certs appuser=$app
chown mail4one:mail4one /var/lib/mail4one/certs certpath="/var/lib/$app/certs"
cp "$RENEWED_LINEAGE/fullchain.pem" /var/lib/mail4one/certs/
cp "$RENEWED_LINEAGE/privkey.pem" /var/lib/mail4one/certs/ mkdir -p "$certpath"
systemctl restart mail4one.service chmod 750 "$certpath"
chown $appuser:$appuser "$certpath"
install -o "$appuser" -g "$appuser" -m 444 "$RENEWED_LINEAGE/fullchain.pem" -t "$certpath"
install -o "$appuser" -g "$appuser" -m 400 "$RENEWED_LINEAGE/privkey.pem" -t "$certpath"
systemctl restart $app.service
echo "$(date) Renewed and deployed certificates for $app" >> /var/log/cert-renew.log
fi fi

View File

@ -1,14 +1,14 @@
import json import json
import re import re
import logging import logging
from typing import Callable, Union, Optional, List, Tuple from typing import Callable, Union, Optional
from jata import Jata, MutableDefault from jata import Jata, MutableDefault
class Match(Jata): class Match(Jata):
name: str name: str
addrs: List[str] = MutableDefault(lambda: []) # type: ignore addrs: list[str] = MutableDefault(lambda: []) # type: ignore
addr_rexs: List[str] = MutableDefault(lambda: []) # type: ignore addr_rexs: list[str] = MutableDefault(lambda: []) # type: ignore
DEFAULT_MATCH_ALL = "default_match_all" DEFAULT_MATCH_ALL = "default_match_all"
@ -23,7 +23,7 @@ class Rule(Jata):
class Mbox(Jata): class Mbox(Jata):
name: str name: str
rules: List[Rule] rules: list[Rule]
DEFAULT_NULL_MBOX = "default_null_mbox" DEFAULT_NULL_MBOX = "default_null_mbox"
@ -77,18 +77,18 @@ class Config(Jata):
logging: Optional[LogCfg] = None logging: Optional[LogCfg] = None
mails_path: str mails_path: str
matches: List[Match] matches: list[Match]
boxes: List[Mbox] boxes: list[Mbox]
users: List[User] users: list[User]
servers: List[ServerCfg] servers: list[ServerCfg]
CheckerFn = Callable[[str], bool] CheckerFn = Callable[[str], bool]
Checker = Tuple[str, CheckerFn, bool] Checker = tuple[str, CheckerFn, bool]
def parse_checkers(cfg: Config) -> List[Checker]: def parse_checkers(cfg: Config) -> list[Checker]:
def make_match_fn(m: Match): def make_match_fn(m: Match):
if m.addrs and m.addr_rexs: if m.addrs and m.addr_rexs:
raise Exception("Both addrs and addr_rexs is set") raise Exception("Both addrs and addr_rexs is set")
@ -118,7 +118,7 @@ def parse_checkers(cfg: Config) -> List[Checker]:
] ]
def get_mboxes(addr: str, checks: List[Checker]) -> List[str]: def get_mboxes(addr: str, checks: list[Checker]) -> list[str]:
def inner(): def inner():
for mbox, match_fn, stop_check in checks: for mbox, match_fn, stop_check in checks:
if match_fn(addr): if match_fn(addr):
@ -130,7 +130,7 @@ def get_mboxes(addr: str, checks: List[Checker]) -> List[str]:
return list(inner()) return list(inner())
def gen_addr_to_mboxes(cfg: Config) -> Callable[[str], List[str]]: def gen_addr_to_mboxes(cfg: Config) -> Callable[[str], list[str]]:
checks = parse_checkers(cfg) checks = parse_checkers(cfg)
logging.info(f"Parsed checkers from config, {len(checks)=}") logging.info(f"Parsed checkers from config, {len(checks)=}")
return lambda addr: get_mboxes(addr, checks) return lambda addr: get_mboxes(addr, checks)

View File

@ -13,7 +13,7 @@ from .pwhash import parse_hash, check_pass, PWInfo
from asyncio import StreamReader, StreamWriter from asyncio import StreamReader, StreamWriter
import random import random
from typing import Optional, List, Tuple, Dict from typing import Optional
from .poputils import ( from .poputils import (
InvalidCommand, InvalidCommand,
@ -29,7 +29,7 @@ from .poputils import (
end, end,
Request, Request,
MailEntry, MailEntry,
get_mail, get_mail_fp,
get_mails_list, get_mails_list,
MailList, MailList,
) )
@ -46,7 +46,7 @@ class State:
class SharedState: class SharedState:
def __init__(self, mails_path: Path, users: dict[str, Tuple[PWInfo, str]]): def __init__(self, mails_path: Path, users: dict[str, tuple[PWInfo, str]]):
self.mails_path = mails_path self.mails_path = mails_path
self.users = users self.users = users
self.loggedin_users: set[str] = set() self.loggedin_users: set[str] = set()
@ -217,7 +217,12 @@ def trans_command_retr(mails: MailList, req: Request) -> None:
entry = mails.get(req.arg1) entry = mails.get(req.arg1)
if entry: if entry:
write(ok("Contents follow")) write(ok("Contents follow"))
write(get_mail(entry)) with get_mail_fp(entry) as fp:
for line in fp:
if line.startswith(b"."):
write(b".") # prepend dot
write(line)
# write(get_mail(entry)) # no prepend dot
write(end()) write(end())
mails.delete(req.arg1) mails.delete(req.arg1)
else: else:
@ -237,7 +242,7 @@ def trans_command_noop(_, __) -> None:
write(ok("Hmm")) write(ok("Hmm"))
async def process_transactions(mails_list: List[MailEntry]) -> set[str]: async def process_transactions(mails_list: list[MailEntry]) -> set[str]:
mails = MailList(mails_list) mails = MailList(mails_list)
def reset(_, __): def reset(_, __):
@ -328,7 +333,7 @@ async def start_session() -> None:
scfg().loggedin_users.remove(state().username) scfg().loggedin_users.remove(state().username)
def parse_users(users: List[User]) -> Dict[str, Tuple[PWInfo, str]]: def parse_users(users: list[User]) -> dict[str, tuple[PWInfo, str]]:
def inner(): def inner():
for user in users: for user in users:
user = User(user) user = User(user)
@ -338,7 +343,7 @@ def parse_users(users: List[User]) -> Dict[str, Tuple[PWInfo, str]]:
return dict(inner()) return dict(inner())
def make_pop_server_callback(mails_path: Path, users: List[User], timeout_seconds: int): def make_pop_server_callback(mails_path: Path, users: list[User], timeout_seconds: int):
scfg = SharedState(mails_path=mails_path, users=parse_users(users)) scfg = SharedState(mails_path=mails_path, users=parse_users(users))
async def session_cb(reader: StreamReader, writer: StreamWriter): async def session_cb(reader: StreamReader, writer: StreamWriter):
@ -362,7 +367,7 @@ async def create_pop_server(
host: str, host: str,
port: int, port: int,
mails_path: Path, mails_path: Path,
users: List[User], users: list[User],
ssl_context: Optional[ssl.SSLContext] = None, ssl_context: Optional[ssl.SSLContext] = None,
timeout_seconds: int = 60, timeout_seconds: int = 60,
) -> asyncio.Server: ) -> asyncio.Server:
@ -386,13 +391,14 @@ def debug_main():
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
import sys import sys
from .pwhash import gen_pwhash
_, mails_path, port, password = sys.argv _, mails_path, mbox = sys.argv
mails_path = Path(mails_path) mails_path = Path(mails_path)
port = int(port) users = [User(username="dummy", password_hash=gen_pwhash("dummy"), mbox=mbox)]
asyncio.run(a_main(mails_path, port, password_hash=password_hash)) asyncio.run(a_main("127.0.0.1", 1101, mails_path, users=users))
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -2,7 +2,7 @@ import os
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum, auto from enum import Enum, auto
from pathlib import Path from pathlib import Path
from typing import List from contextlib import contextmanager
class ClientError(Exception): class ClientError(Exception):
@ -113,25 +113,31 @@ def files_in_path(path):
return [] return []
def get_mails_list(dirpath: Path) -> List[MailEntry]: def get_mails_list(dirpath: Path) -> list[MailEntry]:
files = files_in_path(dirpath) files = files_in_path(dirpath)
entries = [MailEntry(filename, path) for filename, path in files] entries = [MailEntry(filename, path) for filename, path in files]
return entries return entries
def set_nid(entries: List[MailEntry]): def set_nid(entries: list[MailEntry]):
entries.sort(reverse=True, key=lambda e: (e.c_time, e.uid)) entries.sort(reverse=True, key=lambda e: (e.c_time, e.uid))
for i, entry in enumerate(entries, start=1): for i, entry in enumerate(entries, start=1):
entry.nid = i entry.nid = i
@contextmanager
def get_mail_fp(entry: MailEntry):
with open(entry.path, mode="rb") as fp:
yield fp
def get_mail(entry: MailEntry) -> bytes: def get_mail(entry: MailEntry) -> bytes:
with open(entry.path, mode="rb") as fp: with open(entry.path, mode="rb") as fp:
return fp.read() return fp.read()
class MailList: class MailList:
def __init__(self, entries: List[MailEntry]): def __init__(self, entries: list[MailEntry]):
self.entries = entries self.entries = entries
set_nid(self.entries) set_nid(self.entries)
self.mails_map = {str(e.nid): e for e in entries} self.mails_map = {str(e.nid): e for e in entries}

View File

@ -57,7 +57,7 @@ async def a_main(cfg: config.Config) -> None:
return host return host
mbox_finder = config.gen_addr_to_mboxes(cfg) mbox_finder = config.gen_addr_to_mboxes(cfg)
servers: List[asyncio.Server] = [] servers: list[asyncio.Server] = []
if not cfg.servers: if not cfg.servers:
logging.warning("Nothing to do!") logging.warning("Nothing to do!")

View File

@ -7,7 +7,7 @@ import uuid
import shutil import shutil
from functools import partial from functools import partial
from pathlib import Path from pathlib import Path
from typing import Callable, Optional, List from typing import Callable, Optional
from . import config from . import config
from email.message import Message from email.message import Message
import email.policy import email.policy
@ -25,7 +25,7 @@ logger = logging.getLogger("smtp")
class MyHandler(AsyncMessage): class MyHandler(AsyncMessage):
def __init__(self, mails_path: Path, mbox_finder: Callable[[str], List[str]]): def __init__(self, mails_path: Path, mbox_finder: Callable[[str], list[str]]):
super().__init__() super().__init__()
self.mails_path = mails_path self.mails_path = mails_path
self.mbox_finder = mbox_finder self.mbox_finder = mbox_finder
@ -63,7 +63,7 @@ class MyHandler(AsyncMessage):
def protocol_factory_starttls( def protocol_factory_starttls(
mails_path: Path, mbox_finder: Callable[[str], List[str]], context: ssl.SSLContext mails_path: Path, mbox_finder: Callable[[str], list[str]], context: ssl.SSLContext
): ):
logger.info("Got smtp client cb starttls") logger.info("Got smtp client cb starttls")
try: try:
@ -80,7 +80,7 @@ def protocol_factory_starttls(
return smtp return smtp
def protocol_factory(mails_path: Path, mbox_finder: Callable[[str], List[str]]): def protocol_factory(mails_path: Path, mbox_finder: Callable[[str], list[str]]):
logger.info("Got smtp client cb") logger.info("Got smtp client cb")
try: try:
handler = MyHandler(mails_path, mbox_finder) handler = MyHandler(mails_path, mbox_finder)
@ -95,9 +95,9 @@ async def create_smtp_server_starttls(
host: str, host: str,
port: int, port: int,
mails_path: Path, mails_path: Path,
mbox_finder: Callable[[str], List[str]], mbox_finder: Callable[[str], list[str]],
ssl_context: ssl.SSLContext, ssl_context: ssl.SSLContext,
): ) -> asyncio.Server:
logging.info( logging.info(
f"Starting SMTP STARTTLS server {host=}, {port=}, {mails_path=!s}, {ssl_context != None=}" f"Starting SMTP STARTTLS server {host=}, {port=}, {mails_path=!s}, {ssl_context != None=}"
) )
@ -114,9 +114,9 @@ async def create_smtp_server(
host: str, host: str,
port: int, port: int,
mails_path: Path, mails_path: Path,
mbox_finder: Callable[[str], List[str]], mbox_finder: Callable[[str], list[str]],
ssl_context: Optional[ssl.SSLContext] = None, ssl_context: Optional[ssl.SSLContext] = None,
): ) -> asyncio.Server:
logging.info( logging.info(
f"Starting SMTP server {host=}, {port=}, {mails_path=!s}, {ssl_context != None=}" f"Starting SMTP server {host=}, {port=}, {mails_path=!s}, {ssl_context != None=}"
) )

View File

@ -1,5 +1,5 @@
-i https://pypi.org/simple -i https://pypi.org/simple
aiosmtpd==1.4.4.post2 aiosmtpd==1.4.5; python_version >= '3.8'
atpublic==4.0 ; python_version >= '3.8' atpublic==4.1.0; python_version >= '3.8'
attrs==23.1.0 ; python_version >= '3.7' attrs==23.2.0; python_version >= '3.7'
python-jata==1.2 python-jata==1.2; python_version >= '3.8'

View File

@ -8,7 +8,7 @@ then
tag_val=$(git describe --dirty=DIRTY --exact-match) tag_val=$(git describe --dirty=DIRTY --exact-match)
case "$tag_val" in case "$tag_val" in
*DIRTY) *DIRTY)
echo "git=$commit-changes" echo "git-$commit-changes"
;; ;;
v*) # Only consider tags starting with v v*) # Only consider tags starting with v
echo "$tag_val" echo "$tag_val"

View File

@ -67,10 +67,11 @@ class TestConfig(unittest.TestCase):
def test_get_mboxes(self) -> None: def test_get_mboxes(self) -> None:
cfg = config.Config(TEST_CONFIG) cfg = config.Config(TEST_CONFIG)
rules = config.parse_checkers(cfg) rules = config.parse_checkers(cfg)
self.assertEqual(config.get_mboxes("foo@bar.com", rules), ['spam']) self.assertEqual(config.get_mboxes("foo@bar.com", rules), ["spam"])
self.assertEqual(config.get_mboxes("foo@mydomain.com", rules), ['all']) self.assertEqual(config.get_mboxes("foo@mydomain.com", rules), ["all"])
self.assertEqual(config.get_mboxes("first.last@mydomain.com", rules), self.assertEqual(
['important', 'all']) config.get_mboxes("first.last@mydomain.com", rules), ["important", "all"]
)
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -4,20 +4,29 @@ import logging
import tempfile import tempfile
import time import time
import os import os
import poplib
from mail4one.pop3 import create_pop_server from mail4one.pop3 import create_pop_server
from mail4one.config import User from mail4one.config import User
from pathlib import Path from pathlib import Path
TEST_HASH = "".join(c for c in """ TEST_HASH = "".join(
"""
AFTY5EVN7AX47ZL7UMH3BETYWFBTAV3XHR73CEFAJBPN2NIHPWD AFTY5EVN7AX47ZL7UMH3BETYWFBTAV3XHR73CEFAJBPN2NIHPWD
ZHV2UQSMSPHSQQ2A2BFQBNC77VL7F2UKATQNJZGYLCSU6C43UQD ZHV2UQSMSPHSQQ2A2BFQBNC77VL7F2UKATQNJZGYLCSU6C43UQD
AQXWXSWNGAEPGIMG2F3QDKBXL3MRHY6K2BPID64ZR6LABLPVSF AQXWXSWNGAEPGIMG2F3QDKBXL3MRHY6K2BPID64ZR6LABLPVSF
""" if not c.isspace()) """.split()
)
TEST_USER = 'foobar' TEST_USER = "foobar"
TEST_MBOX = 'foobar_mails' TEST_MBOX = "foobar_mails"
USERS = [User(username=TEST_USER, password_hash=TEST_HASH, mbox=TEST_MBOX)] TEST_USER2 = "foo2"
TEST_MBOX2 = "foo2mails"
USERS = [
User(username=TEST_USER, password_hash=TEST_HASH, mbox=TEST_MBOX),
User(username=TEST_USER2, password_hash=TEST_HASH, mbox=TEST_MBOX2),
]
MAILS_PATH: Path MAILS_PATH: Path
@ -38,7 +47,8 @@ Hello bro\r
IlzVOJqu9Zp7twFAtzcV\r IlzVOJqu9Zp7twFAtzcV\r
yQVk36B0mGU2gtWxXLr\r yQVk36B0mGU2gtWxXLr\r
PeF0RtbI0mAuVPLQDHCi\r PeF0RtbI0mAuVPLQDHCi\r
\r\n""" \r
"""
def setUpModule() -> None: def setUpModule() -> None:
@ -47,13 +57,21 @@ def setUpModule() -> None:
td = tempfile.TemporaryDirectory(prefix="m41.pop.") td = tempfile.TemporaryDirectory(prefix="m41.pop.")
unittest.addModuleCleanup(td.cleanup) unittest.addModuleCleanup(td.cleanup)
MAILS_PATH = Path(td.name) MAILS_PATH = Path(td.name)
os.mkdir(MAILS_PATH / TEST_MBOX) for mbox in (TEST_MBOX, TEST_MBOX2):
for md in ('new', 'cur', 'tmp'): os.mkdir(MAILS_PATH / mbox)
os.mkdir(MAILS_PATH / TEST_MBOX / md) for md in ("new", "cur", "tmp"):
with open(MAILS_PATH / TEST_MBOX/ 'new/msg1.eml', 'wb') as f: os.mkdir(MAILS_PATH / mbox / md)
with open(MAILS_PATH / TEST_MBOX / "new/msg1.eml", "wb") as f:
f.write(TESTMAIL) f.write(TESTMAIL)
with open(MAILS_PATH / TEST_MBOX/ 'new/msg2.eml', 'wb') as f: with open(MAILS_PATH / TEST_MBOX / "new/msg2.eml", "wb") as f:
f.write(TESTMAIL) f.write(TESTMAIL)
with open(MAILS_PATH / TEST_MBOX2 / "new/msg1.eml", "wb") as f:
f.write(TESTMAIL)
f.write(b"More lines to follow\r\n")
f.write(b".Line starts with a dot\r\n")
f.write(b"some more lines\r\n")
f.write(b".\r\n")
f.write(b"Previous line just has a dot\r\n")
logging.debug(MAILS_PATH) logging.debug(MAILS_PATH)
@ -65,13 +83,11 @@ class TestPop3(unittest.IsolatedAsyncioTestCase):
async def asyncSetUp(self) -> None: async def asyncSetUp(self) -> None:
logging.debug("at asyncSetUp") logging.debug("at asyncSetUp")
pop_server = await create_pop_server(host='127.0.0.1', pop_server = await create_pop_server(
port=7995, host="127.0.0.1", port=7995, mails_path=MAILS_PATH, users=USERS
mails_path=MAILS_PATH, )
users=USERS)
self.task = asyncio.create_task(pop_server.serve_forever()) self.task = asyncio.create_task(pop_server.serve_forever())
self.reader, self.writer = await asyncio.open_connection( self.reader, self.writer = await asyncio.open_connection("127.0.0.1", 7995)
'127.0.0.1', 7995)
async def test_QUIT(self) -> None: async def test_QUIT(self) -> None:
dialog = """ dialog = """
@ -115,8 +131,8 @@ class TestPop3(unittest.IsolatedAsyncioTestCase):
await self.dialog_checker(dialog) await self.dialog_checker(dialog)
async def test_dupe_AUTH(self) -> None: async def test_dupe_AUTH(self) -> None:
r1, w1 = await asyncio.open_connection('127.0.0.1', 7995) r1, w1 = await asyncio.open_connection("127.0.0.1", 7995)
r2, w2 = await asyncio.open_connection('127.0.0.1', 7995) r2, w2 = await asyncio.open_connection("127.0.0.1", 7995)
dialog = """ dialog = """
S: +OK Server Ready S: +OK Server Ready
C: USER foobar C: USER foobar
@ -197,6 +213,22 @@ class TestPop3(unittest.IsolatedAsyncioTestCase):
""" """
await self.dialog_checker(dialog) await self.dialog_checker(dialog)
async def test_poplib(self) -> None:
def run_poplib():
pc = poplib.POP3("127.0.0.1", 7995)
try:
self.assertEqual(b"+OK Server Ready", pc.getwelcome())
self.assertEqual(b"+OK Welcome", pc.user("foo2"))
self.assertEqual(b"+OK Login successful", pc.pass_("helloworld"))
_, eml, oc = pc.retr(1)
self.assertIn(b"Previous line just has a dot", eml)
self.assertIn(b".Line starts with a dot", eml)
self.assertIn(b".", eml)
finally:
pc.quit()
await asyncio.to_thread(run_poplib)
async def asyncTearDown(self) -> None: async def asyncTearDown(self) -> None:
logging.debug("at teardown") logging.debug("at teardown")
self.writer.close() self.writer.close()
@ -206,9 +238,9 @@ class TestPop3(unittest.IsolatedAsyncioTestCase):
async def dialog_checker(self, dialog: str) -> None: async def dialog_checker(self, dialog: str) -> None:
await self.dialog_checker_impl(self.reader, self.writer, dialog) await self.dialog_checker_impl(self.reader, self.writer, dialog)
async def dialog_checker_impl(self, reader: asyncio.StreamReader, async def dialog_checker_impl(
writer: asyncio.StreamWriter, self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, dialog: str
dialog: str) -> None: ) -> None:
for line in dialog.splitlines(): for line in dialog.splitlines():
line = line.strip() line = line.strip()
if not line: if not line:
@ -222,5 +254,5 @@ class TestPop3(unittest.IsolatedAsyncioTestCase):
self.assertEqual(data, resp) self.assertEqual(data, resp)
if __name__ == '__main__': if __name__ == "__main__":
unittest.main() unittest.main()

View File

@ -10,27 +10,31 @@ class TestPWHash(unittest.TestCase):
pwinfo = parse_hash(pwhash) pwinfo = parse_hash(pwhash)
self.assertEqual(len(pwinfo.salt), SALT_LEN) self.assertEqual(len(pwinfo.salt), SALT_LEN)
self.assertEqual(len(pwinfo.scrypt_hash), KEY_LEN) self.assertEqual(len(pwinfo.scrypt_hash), KEY_LEN)
self.assertTrue(check_pass(password, pwinfo), self.assertTrue(
"check pass with correct password") check_pass(password, pwinfo), "check pass with correct password"
self.assertFalse(check_pass("foobar", pwinfo), )
"check pass with wrong password") self.assertFalse(check_pass("foobar", pwinfo), "check pass with wrong password")
def test_hardcoded_hash(self): def test_hardcoded_hash(self):
test_hash = "".join(c for c in """ test_hash = "".join(
c
for c in """
AFTY5EVN7AX47ZL7UMH3BETYWFBTAV3XHR73CEFAJBPN2NIHPWD AFTY5EVN7AX47ZL7UMH3BETYWFBTAV3XHR73CEFAJBPN2NIHPWD
ZHV2UQSMSPHSQQ2A2BFQBNC77VL7F2UKATQNJZGYLCSU6C43UQD ZHV2UQSMSPHSQQ2A2BFQBNC77VL7F2UKATQNJZGYLCSU6C43UQD
AQXWXSWNGAEPGIMG2F3QDKBXL3MRHY6K2BPID64ZR6LABLPVSF AQXWXSWNGAEPGIMG2F3QDKBXL3MRHY6K2BPID64ZR6LABLPVSF
""" if not c.isspace()) """
if not c.isspace()
)
pwinfo = parse_hash(test_hash) pwinfo = parse_hash(test_hash)
self.assertTrue(check_pass("helloworld", pwinfo), self.assertTrue(
"check pass with correct password") check_pass("helloworld", pwinfo), "check pass with correct password"
self.assertFalse(check_pass("foobar", pwinfo), )
"check pass with wrong password") self.assertFalse(check_pass("foobar", pwinfo), "check pass with wrong password")
def test_invalid_hash(self): def test_invalid_hash(self):
with self.assertRaises(Exception): with self.assertRaises(Exception):
parse_hash("sdlfkjdsklfjdsk") parse_hash("sdlfkjdsklfjdsk")
if __name__ == '__main__': if __name__ == "__main__":
unittest.main() unittest.main()

View File

@ -10,7 +10,7 @@ from pathlib import Path
from mail4one.smtp import create_smtp_server from mail4one.smtp import create_smtp_server
TEST_MBOX = 'foobar_mails' TEST_MBOX = "foobar_mails"
MAILS_PATH: Path MAILS_PATH: Path
@ -21,7 +21,7 @@ def setUpModule() -> None:
unittest.addModuleCleanup(td.cleanup) unittest.addModuleCleanup(td.cleanup)
MAILS_PATH = Path(td.name) MAILS_PATH = Path(td.name)
os.mkdir(MAILS_PATH / TEST_MBOX) os.mkdir(MAILS_PATH / TEST_MBOX)
for md in ('new', 'cur', 'tmp'): for md in ("new", "cur", "tmp"):
os.mkdir(MAILS_PATH / TEST_MBOX / md) os.mkdir(MAILS_PATH / TEST_MBOX / md)
@ -32,7 +32,8 @@ class TestSMTP(unittest.IsolatedAsyncioTestCase):
host="127.0.0.1", host="127.0.0.1",
port=7996, port=7996,
mails_path=MAILS_PATH, mails_path=MAILS_PATH,
mbox_finder=lambda addr: [TEST_MBOX]) mbox_finder=lambda addr: [TEST_MBOX],
)
self.task = asyncio.create_task(smtp_server.serve_forever()) self.task = asyncio.create_task(smtp_server.serve_forever())
async def test_send_mail(self) -> None: async def test_send_mail(self) -> None:
@ -45,8 +46,9 @@ class TestSMTP(unittest.IsolatedAsyncioTestCase):
msg = b"".join(l.strip() + b"\r\n" for l in msg.splitlines()) msg = b"".join(l.strip() + b"\r\n" for l in msg.splitlines())
def send_mail(): def send_mail():
with contextlib.closing(smtplib.SMTP(host="127.0.0.1", with contextlib.closing(
port=7996)) as client: smtplib.SMTP(host="127.0.0.1", port=7996)
) as client:
client.sendmail("foo@sender.com", "foo@bar.com", msg) client.sendmail("foo@sender.com", "foo@bar.com", msg)
_, local_port = client.sock.getsockname() _, local_port = client.sock.getsockname()
return local_port return local_port
@ -62,7 +64,7 @@ class TestSMTP(unittest.IsolatedAsyncioTestCase):
Byee Byee
""" """
expected = "".join(l.strip() + "\r\n" for l in expected.splitlines()) expected = "".join(l.strip() + "\r\n" for l in expected.splitlines())
mails = list((MAILS_PATH / TEST_MBOX / 'new').glob("*")) mails = list((MAILS_PATH / TEST_MBOX / "new").glob("*"))
self.assertEqual(len(mails), 1) self.assertEqual(len(mails), 1)
self.assertEqual(mails[0].read_bytes(), expected.encode()) self.assertEqual(mails[0].read_bytes(), expected.encode())