首页 > 代码库 > postfix 队列管理
postfix 队列管理
队列管理中真正的main函数式
trigger_server_main(argc, argv, qmgr_trigger_event,
CA_MAIL_SERVER_INT_TABLE(int_table),
CA_MAIL_SERVER_STR_TABLE(str_table),
CA_MAIL_SERVER_BOOL_TABLE(bool_table),
CA_MAIL_SERVER_TIME_TABLE(time_table),
CA_MAIL_SERVER_PRE_INIT(qmgr_pre_init),
CA_MAIL_SERVER_POST_INIT(qmgr_post_init),
CA_MAIL_SERVER_LOOP(qmgr_loop),
CA_MAIL_SERVER_PRE_ACCEPT(pre_accept),
CA_MAIL_SERVER_SOLITARY,
CA_MAIL_SERVER_WATCHDOG(&var_qmgr_daemon_timeout),
0);
在qmgr_loop开始循环执行队列调度。它是Event handing loop的一部分,当这个时间管理器发送了一个时间标识或者一个I/O事件,等待了很长一段件 ,这个函数指明了需要等待的时间长度。
使用qmgr_active_drain(),重新分配一个传输进程将active队列排除,将邮件传输出去,并且进程的分配和邮件的传输是异步的。
进入qmgr_active_drain()
transport = qmgr_transport_select()进行MDA的选择,使用轮询调度的方式进行选择。
#define MIN5af51743e4eef(x, y) ((x) < (y) ? (x) : (y))
//先判断状态,是否dead || lock || max_pend,在通过todo_list 和 busy_list 列表的值进行比较。选择合适的MDA.
for (xport = qmgr_transport_list.next; xport; xport = xport->peers.next) {
if ((xport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0
|| (xport->flags & QMGR_TRANSPORT_STAT_RATE_LOCK) != 0
|| xport->pending >= QMGR_TRANSPORT_MAX_PEND)
continue;
need = xport->pending + 1;
for (queue = xport->queue_list.next; queue; queue = queue->peers.next) {
if (QMGR_QUEUE_READY(queue) == 0)
continue;
if ((need -= MIN5af51743e4eef(queue->window - queue->busy_refcount,
queue->todo_refcount)) <= 0) {
QMGR_LIST_ROTATE(qmgr_transport_list, xport, peers);
if (msg_verbose)
msg_info("qmgr_transport_select: %s", xport->name);
return (xport);
}
}
}
MDA 选择成功,再进行内存的分配,先判断状态(sanity check)
if (transport->flags & QMGR_TRANSPORT_STAT_DEAD)
msg_panic("qmgr_transport: dead transport: %s", transport->name);
if (transport->flags & QMGR_TRANSPORT_STAT_RATE_LOCK)
msg_panic("qmgr_transport: rate-locked transport: %s", transport->name);
if (transport->pending >= QMGR_TRANSPORT_MAX_PEND)
msg_panic("qmgr_transport: excess allocation: %s", transport->name);
MDA 在rate_limit 状态下不能被选中,将flag上锁。
if (transport->xport_rate_delay > 0)
transport->flags |= QMGR_TRANSPORT_STAT_RATE_LOCK;
//使用mailconnect unix域协议方式连接到邮件系统。
if ((alloc->stream = mail_connect(MAIL_CLASS_PRIVATE, transport->name,
NON_BLOCKING)) == 0)
进入函数
int unix_connect(const char *addr, int block_mode, int timeout)连接socket;
alloc->stream 用来读写邮件信息,地址。
Time_connect 设置时间限制。
在qmgr_transport_alloc(transport, qmgr_deliver);
中,qmgr_deliver以回调函数的形式传送。
在transport_event中调用。
event_request_timer(qmgr_transport_event, (void *) alloc, 0);设置定时。该函数的第三个参数设置延时时间也可以保护系统不被损坏。
event_enable_read(int fd, EVENT_NOTIFY_RDWR_FN callback, void *context);
进行邮件的传递。
/*
* Let some new blood into the active queue when the queue size is
* smaller than some configurable limit.
*
* We import one message per interrupt, to optimally tune the input count
* for the number of delivery agent protocol wait states, as explained in
* qmgr_transport.c.
*/
delay = WAIT_FOR_EVENT;
for (scan_idx = 0; qmgr_message_count < var_qmgr_active_limit
&& scan_idx < QMGR_SCAN_IDX_COUNT; ++scan_idx) {
last_scan_idx = (scan_idx + first_scan_idx) % QMGR_SCAN_IDX_COUNT;
if ((path = qmgr_scan_next(qmgr_scans[last_scan_idx])) != 0) {
delay = DONT_WAIT;
if ((feed = qmgr_active_feed(qmgr_scans[last_scan_idx], path)) != 0)
break;
}
}
循环遍历,扫描队列,将message移入active队列。
当active队列当前邮件数qmgr_message_count小于var_qmgr_active_limit,即参数qmgr_message_active_limit,表示active队列最大文件数,默认值20000时,交替搜索incomming和deferred队列,选择邮件进入active队列。该参数实现了leakybucket策略。
具体的计算过程如下:
(1)初值scan_idx = 0。
(2)需要满足循环条件qmgr_message_count < var_qmgr_active_limit。
(3)需要满足循环条件scan_idx < QMGR_SCAN_IDX_COUNT,QMGR_SCAN_IDX_COUNT宏展开为:
#define QMGR_SCAN_IDX_COUNT(sizeof(qmgr_scans) / sizeof(qmgr_scans[0]))
即2,两轮。
(4)last_scan_idx = (scan_idx + first_scan_idx) % QMGR_SCAN_IDX_COUNT;
其中scan_idx为循环变量,取值为0或1,first_scan_idx为
static int first_scan_idx =QMGR_SCAN_IDX_INCOMING;
#define QMGR_SCAN_IDX_INCOMING 0
进行取模运算,也就是last_scan_idx在循环中以0、1交替。
(5)最后,针对qmgr_scans[last_scan_idx]队列进行qmgr_active_feed操作,也即交替检索incomming和deferred队列。
进入,int qmgr_active_feed(QMGR_SCAN *scan_info, const char *queue_id)
使用mail_queue_rename(queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE)
将message移入active队列中。
message = qmgr_message_alloc(MAIL_QUEUE_ACTIVE, queue_id,
(st.st_mode & MAIL_QUEUE_STAT_UNTHROTTLE) ?
scan_info->flags | QMGR_FLUSH_AFTER :
scan_info->flags,
(st.st_mode & MAIL_QUEUE_STAT_UNTHROTTLE) ?
st.st_mode & ~MAIL_QUEUE_STAT_UNTHROTTLE :
0)) == 0)
进入该函数
该函数为了创建符合要求的消息结构,方便传递。
message = qmgr_message_create(queue_name, queue_id, qflags);
qmgr_message_open(message)打开信息结构文件
qmgr_message_read(message)将信息读入。
mail_queue_remove(MAIL_QUEUE_DEFER, queue_id) && errno != ENOENT)
msg_fatal("%s: %s: remove %s %s: %m", myname,
queue_id, MAIL_QUEUE_DEFER, queue_id)
将推迟的邮件队列移除。调用REMOVE函数,该函数是将邮件信息保存到一个文件中,而不是直接将他删除。
qmgr_message_sort(message);
使用qsort() 函数进行排序。 qmgr_message_sort - sort message recipient addresses by domain 。
qmgr_message_resolve(message);
进入该函数:
Postfix文档中有对此描述,postfix可以对收件人域名自动补齐,寻址。
static void qmgr_message_resolve(QMGR_MESSAGE *message)
{
static ARGV *defer_xport_argv;
RECIPIENT_LIST list = message->rcpt_list;
RECIPIENT *recipient;
QMGR_TRANSPORT *transport = 0;
QMGR_QUEUE *queue = 0;
RESOLVE_REPLY reply;
VSTRING *queue_name;
char *at;
char **cpp;
char *nexthop;
ssize_t len;
int status;
DSN dsn;
MSG_STATS stats;
DSN *saved_dsn;
#define STREQ(x,y) (strcmp(x,y) == 0)
#define STR vstring_str
#define LEN VSTRING_LEN
qmgr_message_reslove函数也不仅仅解析收件人地址,而是根据收件人地址查找或生成了QMGR_TRANSPORT和QMGR_QUEUE结构体。因为一个收件人地址可能会启用新的MDA或引入新的域,所以这两个结构体的构建要在该函数中完成。
resolve_clnt_init(&reply);
queue_name = vstring_alloc(1);
for (recipient = list.info; recipient < list.info + list.len; recipient++) {
/*
* Redirect overrides all else. But only once (per entire message).
* For consistency with the remainder of Postfix, rewrite the address
* to canonical form before resolving it.
*/
if (message->redirect_addr) {
if (recipient > list.info) {
recipient->u.queue = 0;
continue;
}
message->rcpt_offset = 0;
rewrite_clnt_internal(REWRITE_CANON, message->redirect_addr,
reply.recipient);
RECIPIENT_UPDATE(recipient->address, STR(reply.recipient));
if (qmgr_resolve_one(message, recipient,
recipient->address, &reply) < 0)
continue;
if (!STREQ(recipient->address, STR(reply.recipient)))
RECIPIENT_UPDATE(recipient->address, STR(reply.recipient));
}
对于REDIRECT类型的地址,由于地址来自手工编写的查询表,可能不够规范,所以首先对其采用local方式用trivial-rewrite模块进行重写,rewrite_clnt_internal,宏REWRITE_CANON即字符串local),然后再用qmgr_resolve_one函数解析。
qmgr_message_sort(message);
使用qsort函数进行排序。
qmgr_message_assign(message);
qmgr_message_assign函数创建QMGR_JOB、QMGR_PEER、QMGR_ENTRY结构体。至此,通过qmgr_message_read、qmgr_message_reslove、qmgr_message_assign三个函数完成了相关结构体的创建。
static void qmgr_message_assign(QMGR_MESSAGE *message)
{
RECIPIENT_LIST list = message->rcpt_list;
RECIPIENT *recipient;
QMGR_ENTRY *entry = 0;
QMGR_QUEUE *queue;
/*
* Try to bundle as many recipients in a delivery request as we can. When
* the recipient resolves to the same site and transport as the previous
* recipient, do not create a new queue entry, just move that recipient
* to the recipient list of the existing queue entry. All this provided
* that we do not exceed the transport-specific limit on the number of
* recipients per transaction. Skip recipients with a dead transport or
* destination.
*/
#define LIMIT_OK(limit, count) ((limit) == 0 || ((count) < (limit)))
for (recipient = list.info; recipient < list.info + list.len; recipient++) {
if ((queue = recipient->u.queue) != 0) {
if (message->single_rcpt || entry == 0 || entry->queue != queue
|| !LIMIT_OK(entry->queue->transport->recipient_limit,
entry->rcpt_list.len)) {
entry = qmgr_entry_create(queue, message);
}
recipient_list_add(&entry->rcpt_list, recipient->offset,
recipient->dsn_orcpt, recipient->dsn_notify,
recipient->orig_addr, recipient->address);
qmgr_recipient_count++;
}
}
循环所有收件人创建entry结构体。
recipient_list_free(&message->rcpt_list);
recipient_list_init(&message->rcpt_list, RCPT_LIST_INIT_QUEUE);
}
创建entry结构体,移入todo_list:
Create the delivery request:
entry = (QMGR_ENTRY *) mymalloc(sizeof(QMGR_ENTRY));
entry->stream = 0;
entry->message = message;
recipient_list_init(&entry->rcpt_list, RCPT_LIST_INIT_QUEUE);
message->refcount++;
entry->peer = peer;
QMGR_LIST_APPEND(peer->entry_list, entry, peer_peers);
peer->refcount++;
entry->queue = queue;
QMGR_LIST_APPEND(queue->todo, entry, queue_peers);
queue->todo_refcount++;
peer->job->read_entries++;
qmgr_message_close(message);
关闭结构体;
QMGR_JOB *qmgr_job_obtain(QMGR_MESSAGE *message, QMGR_TRANSPORT *transport)
{
QMGR_JOB *job;
/*
* Try finding an existing job, reviving it if it was already retired.
* Create a new job for this transport/message combination otherwise. In
* either case, the job ends linked on the job lists.
*/
//查找任务,如果任务
if ((job = qmgr_job_find(message, transport)) == 0)
job = qmgr_job_create(message, transport);
if (job->stack_level < 0)
qmgr_job_link(job);
/*
* Reset the candidate cache because of the new expected recipients. Make
* sure the job is not marked as a blocker for the same reason. Note that
* this can result in having a non-blocker followed by more blockers.
* Consequently, we can‘t just update the current job pointer, we have to
* reset it. Fortunately qmgr_job_entry_select() will easily deal with
* this and will lookup the real current job for us.
*/
RESET_CANDIDATE_CACHE(transport);
if (IS_BLOCKER(job, transport)) {
job->blocker_tag = 0;
transport->job_current = transport->job_list.next;
}
return (job);
}
void recipient_list_add(RECIPIENT_LIST *list, long offset,
const char *dsn_orcpt, int dsn_notify,
const char *orig_rcpt, const char *rcpt)
将接收地址加入列表。
postfix 队列管理