首页 > 代码库 > postfix 队列管理

postfix 队列管理

队列管理中真正的main函数式

trigger_server_main(argc, argv, qmgr_trigger_event,

CA_MAIL_SERVER_INT_TABLE(int_table),

CA_MAIL_SERVER_STR_TABLE(str_table),

CA_MAIL_SERVER_BOOL_TABLE(bool_table),

CA_MAIL_SERVER_TIME_TABLE(time_table),

CA_MAIL_SERVER_PRE_INIT(qmgr_pre_init),

CA_MAIL_SERVER_POST_INIT(qmgr_post_init),

CA_MAIL_SERVER_LOOP(qmgr_loop),

CA_MAIL_SERVER_PRE_ACCEPT(pre_accept),

CA_MAIL_SERVER_SOLITARY,

CA_MAIL_SERVER_WATCHDOG(&var_qmgr_daemon_timeout),

0);

qmgr_loop开始循环执行队列调度。它是Event handing loop的一部分,当这个时间管理器发送了一个时间标识或者一个I/O事件,等待了很长一段件 ,这个函数指明了需要等待的时间长度。

使用qmgr_active_drain(),重新分配一个传输进程将active队列排除,将邮件传输出去,并且进程的分配和邮件的传输是异步的。

进入qmgr_active_drain()

transport = qmgr_transport_select()进行MDA的选择,使用轮询调度的方式进行选择。

 

#define MIN5af51743e4eef(x, y) ((x) < (y) ? (x) : (y))

 

//先判断状态,是否dead || lock || max_pend,在通过todo_list 和 busy_list 列表的值进行比较。选择合适的MDA.

    for (xport = qmgr_transport_list.next; xport; xport = xport->peers.next) {

if ((xport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0

    || (xport->flags & QMGR_TRANSPORT_STAT_RATE_LOCK) != 0

    || xport->pending >= QMGR_TRANSPORT_MAX_PEND)

    continue;

need = xport->pending + 1;

for (queue = xport->queue_list.next; queue; queue = queue->peers.next) {

    if (QMGR_QUEUE_READY(queue) == 0)

continue;

    if ((need -= MIN5af51743e4eef(queue->window - queue->busy_refcount,

  queue->todo_refcount)) <= 0) {

QMGR_LIST_ROTATE(qmgr_transport_list, xport, peers);

if (msg_verbose)

    msg_info("qmgr_transport_select: %s", xport->name);

return (xport);

    }

}

    }

 

MDA 选择成功,再进行内存的分配,先判断状态(sanity check

 if (transport->flags & QMGR_TRANSPORT_STAT_DEAD)

msg_panic("qmgr_transport: dead transport: %s", transport->name);

    if (transport->flags & QMGR_TRANSPORT_STAT_RATE_LOCK)

msg_panic("qmgr_transport: rate-locked transport: %s", transport->name);

    if (transport->pending >= QMGR_TRANSPORT_MAX_PEND)

msg_panic("qmgr_transport: excess allocation: %s", transport->name);

 

MDA rate_limit 状态下不能被选中,将flag上锁。

if (transport->xport_rate_delay > 0)

transport->flags |= QMGR_TRANSPORT_STAT_RATE_LOCK;

 

//使用mailconnect unix域协议方式连接到邮件系统。

 if ((alloc->stream = mail_connect(MAIL_CLASS_PRIVATE, transport->name,

      NON_BLOCKING)) == 0)

进入函数

int  unix_connect(const char *addr, int block_mode, int timeout)连接socket;

 

alloc->stream 用来读写邮件信息,地址。

 

Time_connect 设置时间限制。

 

qmgr_transport_alloc(transport, qmgr_deliver);

中,qmgr_deliver以回调函数的形式传送。

transport_event中调用。

event_request_timer(qmgr_transport_event, (void *) alloc, 0);设置定时。该函数的第三个参数设置延时时间也可以保护系统不被损坏。

 

 event_enable_read(int fd, EVENT_NOTIFY_RDWR_FN callback, void *context);

进行邮件的传递。

 

 

 

 

 

 

 

 

 /*

     * Let some new blood into the active queue when the queue size is

     * smaller than some configurable limit.

     * 

     * We import one message per interrupt, to optimally tune the input count

     * for the number of delivery agent protocol wait states, as explained in

     * qmgr_transport.c.

     */

    delay = WAIT_FOR_EVENT;

    for (scan_idx = 0; qmgr_message_count < var_qmgr_active_limit

 && scan_idx < QMGR_SCAN_IDX_COUNT; ++scan_idx) {

last_scan_idx = (scan_idx + first_scan_idx) % QMGR_SCAN_IDX_COUNT;

if ((path = qmgr_scan_next(qmgr_scans[last_scan_idx])) != 0) {

    delay = DONT_WAIT;

    if ((feed = qmgr_active_feed(qmgr_scans[last_scan_idx], path)) != 0)

break;

}

}

 

循环遍历,扫描队列,将message移入active队列。

active队列当前邮件数qmgr_message_count小于var_qmgr_active_limit,即参数qmgr_message_active_limit,表示active队列最大文件数,默认值20000时,交替搜索incommingdeferred队列,选择邮件进入active队列。该参数实现了leakybucket策略。

 

具体的计算过程如下:

1)初值scan_idx = 0

2)需要满足循环条件qmgr_message_count < var_qmgr_active_limit

3)需要满足循环条件scan_idx < QMGR_SCAN_IDX_COUNTQMGR_SCAN_IDX_COUNT宏展开为:

#define QMGR_SCAN_IDX_COUNT(sizeof(qmgr_scans) / sizeof(qmgr_scans[0]))

2,两轮。

4last_scan_idx = (scan_idx + first_scan_idx) % QMGR_SCAN_IDX_COUNT;

其中scan_idx为循环变量,取值为01first_scan_idx

static int first_scan_idx =QMGR_SCAN_IDX_INCOMING;

#define QMGR_SCAN_IDX_INCOMING 0

进行取模运算,也就是last_scan_idx在循环中以01交替。

5)最后,针对qmgr_scans[last_scan_idx]队列进行qmgr_active_feed操作,也即交替检索incommingdeferred队列。

 

进入,int  qmgr_active_feed(QMGR_SCAN *scan_info, const char *queue_id)

 

使用mail_queue_rename(queue_id, scan_info->queue, MAIL_QUEUE_ACTIVE

message移入active队列中。

 

message = qmgr_message_alloc(MAIL_QUEUE_ACTIVE, queue_id,

 (st.st_mode & MAIL_QUEUE_STAT_UNTHROTTLE) ?

      scan_info->flags | QMGR_FLUSH_AFTER :

      scan_info->flags,

 (st.st_mode & MAIL_QUEUE_STAT_UNTHROTTLE) ?

  st.st_mode & ~MAIL_QUEUE_STAT_UNTHROTTLE :

      0)) == 0)

进入该函数

该函数为了创建符合要求的消息结构,方便传递。

 message = qmgr_message_create(queue_name, queue_id, qflags);

 

qmgr_message_open(message)打开信息结构文件

 

qmgr_message_read(message)将信息读入。

  

mail_queue_remove(MAIL_QUEUE_DEFER, queue_id) && errno != ENOENT)

    msg_fatal("%s: %s: remove %s %s: %m", myname,

      queue_id, MAIL_QUEUE_DEFER, queue_id)

将推迟的邮件队列移除。调用REMOVE函数,该函数是将邮件信息保存到一个文件中,而不是直接将他删除。

 

qmgr_message_sort(message);

 

使用qsort() 函数进行排序。 qmgr_message_sort - sort message recipient addresses by domain 

 

 

 

 

qmgr_message_resolve(message);

进入该函数:

Postfix文档中有对此描述,postfix可以对收件人域名自动补齐,寻址。

static void qmgr_message_resolve(QMGR_MESSAGE *message)

{

    static ARGV *defer_xport_argv;

    RECIPIENT_LIST list = message->rcpt_list;

    RECIPIENT *recipient;

    QMGR_TRANSPORT *transport = 0;

    QMGR_QUEUE *queue = 0;

    RESOLVE_REPLY reply;

    VSTRING *queue_name;

    char   *at;

    char  **cpp;

    char   *nexthop;

    ssize_t len;

    int     status;

    DSN     dsn;

    MSG_STATS stats;

    DSN    *saved_dsn;

 

#define STREQ(x,y) (strcmp(x,y) == 0)

#define STR vstring_str

#define LEN VSTRING_LEN

qmgr_message_reslove函数也不仅仅解析收件人地址,而是根据收件人地址查找或生成了QMGR_TRANSPORTQMGR_QUEUE结构体。因为一个收件人地址可能会启用新的MDA或引入新的域,所以这两个结构体的构建要在该函数中完成。

    resolve_clnt_init(&reply);

    queue_name = vstring_alloc(1);

    for (recipient = list.info; recipient < list.info + list.len; recipient++) {

 

/*

 * Redirect overrides all else. But only once (per entire message).

 * For consistency with the remainder of Postfix, rewrite the address

 * to canonical form before resolving it.

 */

if (message->redirect_addr) {

    if (recipient > list.info) {

recipient->u.queue = 0;

continue;

    }

    message->rcpt_offset = 0;

    rewrite_clnt_internal(REWRITE_CANON, message->redirect_addr,

  reply.recipient);

    RECIPIENT_UPDATE(recipient->address, STR(reply.recipient));

    if (qmgr_resolve_one(message, recipient,

 recipient->address, &reply) < 0)

continue;

    if (!STREQ(recipient->address, STR(reply.recipient)))

RECIPIENT_UPDATE(recipient->address, STR(reply.recipient));

}

对于REDIRECT类型的地址,由于地址来自手工编写的查询表,可能不够规范,所以首先对其采用local方式用trivial-rewrite模块进行重写,rewrite_clnt_internal,宏REWRITE_CANON即字符串local),然后再用qmgr_resolve_one函数解析。

 

 

 

 

qmgr_message_sort(message);

 

使用qsort函数进行排序。

 

qmgr_message_assign(message);

 qmgr_message_assign函数创建QMGR_JOBQMGR_PEERQMGR_ENTRY结构体。至此,通过qmgr_message_readqmgr_message_resloveqmgr_message_assign三个函数完成了相关结构体的创建。

 

static void qmgr_message_assign(QMGR_MESSAGE *message)

{

    RECIPIENT_LIST list = message->rcpt_list;

    RECIPIENT *recipient;

    QMGR_ENTRY *entry = 0;

    QMGR_QUEUE *queue;

 

    /*

     * Try to bundle as many recipients in a delivery request as we can. When

     * the recipient resolves to the same site and transport as the previous

     * recipient, do not create a new queue entry, just move that recipient

     * to the recipient list of the existing queue entry. All this provided

     * that we do not exceed the transport-specific limit on the number of

     * recipients per transaction. Skip recipients with a dead transport or

     * destination.

     */

#define LIMIT_OK(limit, count) ((limit) == 0 || ((count) < (limit)))

 

    for (recipient = list.info; recipient < list.info + list.len; recipient++) {

if ((queue = recipient->u.queue) != 0) {

    if (message->single_rcpt || entry == 0 || entry->queue != queue

|| !LIMIT_OK(entry->queue->transport->recipient_limit,

     entry->rcpt_list.len)) {

entry = qmgr_entry_create(queue, message);

    }

    recipient_list_add(&entry->rcpt_list, recipient->offset,

       recipient->dsn_orcpt, recipient->dsn_notify,

       recipient->orig_addr, recipient->address);

    qmgr_recipient_count++;

}

    }

循环所有收件人创建entry结构体。

    recipient_list_free(&message->rcpt_list);

    recipient_list_init(&message->rcpt_list, RCPT_LIST_INIT_QUEUE);

}

 

 

创建entry结构体,移入todo_list

 Create the delivery request:

  entry = (QMGR_ENTRY *) mymalloc(sizeof(QMGR_ENTRY));

    entry->stream = 0;

    entry->message = message;

    recipient_list_init(&entry->rcpt_list, RCPT_LIST_INIT_QUEUE);

    message->refcount++;

    entry->peer = peer;

    QMGR_LIST_APPEND(peer->entry_list, entry, peer_peers);

    peer->refcount++;

    entry->queue = queue;

    QMGR_LIST_APPEND(queue->todo, entry, queue_peers);

    queue->todo_refcount++;

    peer->job->read_entries++;

 

 

 

 

qmgr_message_close(message);

关闭结构体;

 

QMGR_JOB *qmgr_job_obtain(QMGR_MESSAGE *message, QMGR_TRANSPORT *transport)

{

    QMGR_JOB *job;

 

    /*

     * Try finding an existing job, reviving it if it was already retired.

     * Create a new job for this transport/message combination otherwise. In

     * either case, the job ends linked on the job lists.

     */

//查找任务,如果任务

    if ((job = qmgr_job_find(message, transport)) == 0)

job = qmgr_job_create(message, transport);

    if (job->stack_level < 0)

qmgr_job_link(job);

 

    /*

     * Reset the candidate cache because of the new expected recipients. Make

     * sure the job is not marked as a blocker for the same reason. Note that

     * this can result in having a non-blocker followed by more blockers.

     * Consequently, we can‘t just update the current job pointer, we have to

     * reset it. Fortunately qmgr_job_entry_select() will easily deal with

     * this and will lookup the real current job for us.

     */

    RESET_CANDIDATE_CACHE(transport);

    if (IS_BLOCKER(job, transport)) {

job->blocker_tag = 0;

transport->job_current = transport->job_list.next;

    }

    return (job);

}

void    recipient_list_add(RECIPIENT_LIST *list, long offset,

           const char *dsn_orcpt, int dsn_notify,

           const char *orig_rcpt, const char *rcpt)

 

将接收地址加入列表。

 

postfix 队列管理