/*
* Copyright (c) 2006-2017 Hypertriton, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE FOR
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
* USE OF THIS SOFTWARE EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "mailprocd.h"
#include "pathnames.h"
int qmgrMaxIdle; /* Inactivity timeout */
static int qmgrMsgCount = 0; /* Messages delivered */
QMGR_WorkerQ qmgrWorkers; /* MAIN: Active workers */
Uint qmgrWorkerCount=0;
static volatile int SigDIE = 0;
char *pathQueue;
char pathQueueTmpDir[FILENAME_MAX];
int
QMGR_Read(int fd, void *data, size_t len)
{
size_t nread;
ssize_t rv;
for (nread = 0; nread < len; ) {
rv = read(fd, data+nread, len-nread);
if (rv == -1) {
if (errno == EINTR || errno == EAGAIN) {
if (QMGR_CheckSignals()) {
exit(1);
}
continue;
} else {
MPD_SetError("QMGR_Read: %s", strerror(errno));
return (-1);
}
} else if (rv == 0) {
MPD_SetErrorS("EOF");
return (-1);
}
nread += rv;
}
return (0);
}
int
QMGR_Write(int fd, const void *data, size_t len)
{
size_t nwrote;
ssize_t rv;
for (nwrote = 0; nwrote < len; ) {
rv = write(fd, data+nwrote, len-nwrote);
if (rv == -1) {
if (errno == EINTR || errno == EAGAIN) {
if (QMGR_CheckSignals()) {
exit(1);
}
continue;
} else {
MPD_SetError("QMGR_Write: %s", strerror(errno));
return (-1);
}
} else if (rv == 0) {
MPD_SetErrorS("EOF");
return (-1);
}
nwrote += rv;
}
return (0);
}
int
QMGR_Init(CFG_File *cf)
{
struct stat sb;
CFG_GetStr(cf, "qmgr.path", &pathQueue, _PATH_QUEUE);
CFG_GetInt(cf, "qmgr.max-idle", &qmgrMaxIdle, 30);
Strlcpy(pathQueueTmpDir, pathQueue, sizeof(pathQueueTmpDir));
Strlcat(pathQueueTmpDir, "tmp/", sizeof(pathQueueTmpDir));
if (stat(pathQueue, &sb) != 0 &&
mkdir(pathQueue, 0700) == -1) {
MPD_SetError("%s: %s", pathQueue, strerror(errno));
return (-1);
}
if (stat(pathQueueTmpDir, &sb) != 0 &&
mkdir(pathQueueTmpDir, 0700) == -1) {
MPD_SetError("%s: %s", pathQueue, strerror(errno));
return (-1);
}
return (0);
}
/* Return the number of items in the queue. */
int
QMGR_Todo(void)
{
DIR *dir;
int count = 0;
if ((dir = opendir(pathQueue)) == NULL) {
syslog(LOG_ERR, "opendir %s: %m", pathQueue);
return (0);
}
while (readdir(dir) != NULL) { count++; }
closedir(dir);
return (count>2 ? count-2 : 0);
}
/*
* Load a message from the queue. If the meta argument is NULL, metadata is
* recovered from file.
*/
MPD_Message *
QMGR_LoadMessage(const char *path, const QMGR_MetaData *meta)
{
char rcpt_to[ADDRESS_MAX];
QMGR_MetaData rmeta;
MPD_Recipient *rcpt;
MPD_Message *msg;
off_t msgEnd;
int fd;
try_open:
if ((fd = open(path, O_RDONLY)) == -1) {
if (errno == EINTR) {
if (QMGR_CheckSignals()) {
return (NULL);
}
goto try_open;
}
MPD_SetErrorS(strerror(errno));
return (NULL);
}
if ((msg = MPD_MessageNew()) == NULL)
return (NULL);
if (meta != NULL) {
Strlcpy(msg->mail_from, meta->from, sizeof(msg->mail_from));
Strlcpy(rcpt_to, meta->rcpt, sizeof(rcpt_to));
} else {
if (QMGR_Read(fd, &rmeta, sizeof(QMGR_MetaData)) == -1) {
MPD_SetError("Read meta: %s", MPD_GetError());
goto fail;
}
Strlcpy(msg->mail_from, rmeta.from, sizeof(msg->mail_from));
Strlcpy(msg->ip, rmeta.ip, sizeof(msg->ip));
Strlcpy(rcpt_to, rmeta.rcpt, sizeof(rcpt_to));
}
if ((rcpt = MPD_MessageAddRecipient(msg, rcpt_to)) == NULL ||
MPD_ParseRecipientParts(rcpt) == -1)
goto fail;
/* Read the message body */
if ((msgEnd = lseek(fd, 0, SEEK_END)) == -1) {
MPD_SetError("SEEK_END: %s", strerror(errno));
goto fail;
}
if (lseek(fd, sizeof(QMGR_MetaData), SEEK_SET) == -1) {
MPD_SetError("SEEK_SET: %s", strerror(errno));
goto fail;
}
msg->text_len = (size_t)msgEnd - sizeof(QMGR_MetaData);
if ((msg->text = malloc(msg->text_len+1)) == NULL) {
MPD_SetErrorS("Out of memory");
goto fail;
}
if (QMGR_Read(fd, msg->text, msg->text_len) == -1) {
MPD_SetError("LoadMessage: %s", MPD_GetError());
goto fail;
}
msg->text[msg->text_len] = '\0';
close(fd);
return (msg);
fail:
MPD_MessageFree(msg);
close(fd);
return (NULL);
}
/*
* Enter a message onto the queue. On success, write metadata back to
* the master process.
*
* CONTEXT: SMTP Worker subprocess.
*/
int
QMGR_Queue(MPD_Message *msg, MPD_Recipient *rcpt, int masterPipe)
{
char pathDst[FILENAME_MAX], pathDstBase[FILENAME_MAX];
char pathTmp[FILENAME_MAX];
struct iovec vec[2];
QMGR_MetaData meta;
struct stat sb;
char *queueID;
int fd;
bzero(&meta, sizeof(meta));
if (LOCAL_GetDefaultRecipientUID(rcpt->addr, &meta.uid, &meta.gid) == -1)
return (-1);
Strlcpy(pathTmp, pathQueueTmpDir, sizeof(pathTmp));
Strlcat(pathTmp, "XXXXXXXXXX", sizeof(pathTmp));
if ((fd = mkstemp(pathTmp)) == -1) {
MPD_SetError("mkstemp: %s", strerror(errno));
return (-1);
}
if ((queueID = strrchr(pathTmp, '/')) == NULL || queueID[1] == '\0') {
MPD_SetErrorS("mkstemp");
goto fail_rm;
}
queueID++;
snprintf(pathDstBase, sizeof(pathDstBase), "%s%u/", pathQueue, meta.uid);
if (stat(pathDstBase, &sb) != 0) {
Debug("Creating: %s", pathDstBase);
if (mkdir(pathDstBase, 0700) == -1) {
MPD_SetError("%s: %s", pathDstBase, strerror(errno));
return (-1);
}
}
Strlcpy(pathDst, pathDstBase, sizeof(pathDst));
Strlcat(pathDst, queueID, sizeof(pathDst));
if (fchown(fd, meta.uid, meta.gid) == -1) {
MPD_SetError("%s: chown: %s", pathTmp, strerror(errno));
goto fail_rm;
}
Strlcpy(meta.qid, queueID, sizeof(meta.qid));
Strlcpy(meta.from, msg->mail_from, sizeof(meta.from));
Strlcpy(meta.rcpt, rcpt->addr, sizeof(meta.rcpt));
Strlcpy(meta.ip, msg->ip, sizeof(meta.ip));
/*
* Save the metadata followed by the message body. The metadata is
* only actually used in the event of a crash or server restart.
*/
vec[0].iov_base = &meta;
vec[0].iov_len = sizeof(meta);
vec[1].iov_base = msg->text;
vec[1].iov_len = msg->text_len;
if (QMGR_Writev(fd, vec, 2) == -1) {
goto fail;
}
fsync(fd);
close(fd);
/*
* Now that the message is safely written to the queue, notify
* the master process by sending it the metadata block.
*/
if (rename(pathTmp, pathDst) == -1) {
MPD_SetError("rename(%s->%s): %s", pathTmp, pathDst,
strerror(errno));
return (-1);
}
return QMGR_Write(masterPipe, &meta, sizeof(meta));
fail_rm:
unlink(pathTmp);
fail:
close(fd);
return (-1);
}
/* Process and deliver a message. If successful, dequeue it as well. */
static int
QMGR_ProcessMessage(const char *path, const QMGR_MetaData *meta)
{
MPD_Message *msg;
MPD_Recipient *rcpt;
int rv;
if ((msg = QMGR_LoadMessage(path, meta)) == NULL)
return (-1);
rcpt = TAILQ_FIRST(&msg->rcpts); /* Only 1 rcpt possible */
rcpt->ml = ML_GetMailListReq(rcpt);
if (rcpt->ml != NULL) { /* Mailing list */
rv = ML_MessageProcess(msg, rcpt);
Debug("[ML] From <%s> to List <%s>, %luK",
msg->mail_from, rcpt->ml->name,
(Ulong)msg->text_len/1024);
} else { /* Regular address */
rv = MPD_MessageProcess(msg, rcpt);
Debug("[MSG] From <%s> to <%s>, %luK",
msg->mail_from, rcpt->addr,
(Ulong)msg->text_len/1024);
}
MPD_MessageFree(msg);
if (rv == 0) {
Unlink(path);
} else if (rv == -1) {
syslog(LOG_ERR, "%s: PERM: %s", path, MPD_GetError());
unlink(path);
} else if (rv == 1) {
syslog(LOG_ERR, "%s: SOFTFAIL: %s", path, MPD_GetError());
}
if (getuid() != 0 || geteuid() != 0 || /* Sanity check */
getgid() != 0 || getegid() != 0) {
syslog(LOG_ERR, "QMGR bad uid %d:%d", (int)getuid(), (int)geteuid());
exit(1);
}
return (0);
}
/*
* Scan the queue and process messages under my UID. Return number of
* messages processed or -1 on fatal error.
*/
static int
QMGR_ProcessQueue(uid_t uid)
{
char pathBase[FILENAME_MAX];
char path[FILENAME_MAX];
DIR *dir;
struct dirent *dp;
struct stat sb;
int count = 0;
snprintf(pathBase, sizeof(pathBase), "%s%u/", pathQueue, uid);
if ((dir = opendir(pathBase)) == NULL) {
MPD_SetError("%s: %s", pathBase, strerror(errno));
return (-1);
}
while ((dp = readdir(dir)) != NULL) {
if (dp->d_name[0] == '.') {
continue;
}
Strlcpy(path, pathBase, sizeof(path));
Strlcat(path, dp->d_name, sizeof(path));
if (stat(path, &sb) == -1 ||
sb.st_uid != uid) {
syslog(LOG_WARNING, "Bad qfile: %s", path);
continue;
}
if (QMGR_ProcessMessage(path, NULL) == -1) {
syslog(LOG_ERR, "Queue(%s): %s", path, MPD_GetError());
continue;
}
count++;
}
closedir(dir);
qmgrMsgCount += count;
return (count);
}
static void SigTERM(int sigraised) { SigDIE++; }
int
QMGR_CheckSignals(void)
{
if (SigDIE) {
SigDIE = 0;
MPD_SetErrorS("Exiting (signal)");
return (1);
}
return (0);
}
/*
* Spawn a new worker process if necessary.
* CONTEXT: Main.
*/
int
QMGR_WakeWorker(const QMGR_MetaData *meta)
{
QMGR_Worker *worker;
TAILQ_FOREACH(worker, &qmgrWorkers, workers) {
if (worker->uid == meta->uid &&
worker->gid == meta->gid)
break;
}
if (worker == NULL) {
int pp[2];
pid_t pid;
if (qmgrWorkerCount+1 > mpdMaxWorkers) {
MPD_SetErrorS("Too many workers");
return (-1);
}
Debug("Creating new worker for %d:%d", meta->uid, meta->gid);
if (pipe(pp) == -1) {
MPD_SetError("pipe: %s", strerror(errno));
return (-1);
}
if ((pid = fork()) == -1) {
MPD_SetError("fork: %s", strerror(errno));
close(pp[0]);
close(pp[1]);
return (-1);
} else if (pid == 0) { /* Child */
dup2(pp[0], STDIN_FILENO);
close(pp[1]);
MPD_EnterServerProc("worker");
if (QMGR_WorkerMain(meta->uid) == -1) {
syslog(LOG_ERR, "WORKER: %s", MPD_GetError());
}
MPD_ExitServerProc(0);
} else {
worker = Malloc(sizeof(QMGR_Worker));
worker->pid = pid;
worker->uid = meta->uid;
worker->gid = meta->gid;
worker->pipe = pp[1];
close(pp[0]);
TAILQ_INSERT_TAIL(&qmgrWorkers, worker, workers);
qmgrWorkerCount++;
}
}
return (0);
}
/* Main worker process routine. */
int
QMGR_WorkerMain(uid_t uid)
{
char pathBase[FILENAME_MAX];
struct sigaction sa;
int rv, fdDir, kq;
struct kevent change, event;
struct timespec timeo;
sa.sa_handler = SigTERM;
sigfillset(&sa.sa_mask);
sa.sa_flags = 0;
sigaction(SIGTERM, &sa, NULL);
sigaction(SIGUSR1, &sa, NULL);
if (uid < QMGR_UIDMIN || (getpwuid(uid) == NULL)) {
MPD_SetError("Bad UID (%d)", uid);
return (-1);
}
Setproctitle("worker %u (...)", uid);
/* Process the initial message that caused our invocation. */
if (QMGR_ProcessQueue(uid) == -1)
return (-1);
Setproctitle("worker %u (1 msg)", uid);
if ((kq = kqueue()) == -1) {
MPD_SetError("kqueue: %s", strerror(errno));
return (-1);
}
/* Monitor the queue directory for more messages, or timeout */
snprintf(pathBase, sizeof(pathBase), "%s%u/", pathQueue, uid);
if ((fdDir = open(pathBase, O_RDONLY|O_DIRECTORY)) == -1) {
MPD_SetError("%s: %s", pathBase, strerror(errno));
goto fail;
}
EV_SET(&change, fdDir, EVFILT_VNODE,
EV_ADD | EV_ENABLE | EV_ONESHOT,
NOTE_DELETE | NOTE_EXTEND | NOTE_WRITE | NOTE_ATTRIB | NOTE_LINK,
0, 0);
timeo.tv_sec = qmgrMaxIdle;
timeo.tv_nsec = 0;
for (;;) {
int nev;
if ((nev = kevent(kq, &change, 1, &event, 1, &timeo)) == -1) {
if (errno == EINTR) {
if (QMGR_CheckSignals()) {
break;
}
continue;
} else {
MPD_SetErrorS("kevent");
goto fail;
}
}
if (nev > 0) {
if ((rv = QMGR_ProcessQueue(uid)) == -1)
goto fail;
} else if (nev == 0) {
break;
}
}
close(fdDir);
close(kq);
return (0);
fail:
close(kq);
return (-1);
}