首页 > 代码库 > Queue and Message

Queue and Message

#ifndef __QUEUE_H__#define __QUEUE_H__#include <stdint.h>#include <stdlib.h>#include <string.h>/* * Queues can have more than one producer but only one consumer. * This means that more than one task or interrupt handler is allowed to store * new data in the queue but only one task is allowed toget data from the queue. * * Queues accept messages of various size. When putting a message into a queue, * the message size is passed as a parameter. * * Retrieving a message from the queue does not copy the message, but returns * a pointer to the message and its size. Thisenhances performance because the * data is copied only once, when the message is written into the queue. * * The retrieving function has to delete every message after processing it. * A new message can only be retrieved from the queue when the previous message * was deleted from the queue. * * |---------------------- size -------------------------| * |        |------------ msgCnt ---------------|        | * [ .... ] [ size : message ] [ size : message ] [ .... ] * |        |                                     | * |pData   |offsetFirst                          |offsetLast * */typedef struct TAG_QUEUE{  uint8_t * Memory;  uint32_t Capacity;  uint32_t MessageCount;  uint32_t ReadIndex;  uint32_t WriteIndex;  uint32_t IsUsing;  uint32_t InProgressCount;} QUEUE;// Creates and initializes a message queue.QUEUE * Q_Create( uint32_t Capacity );// Deletes a specific queue.void Q_Delete( QUEUE * Queue );// Initializes a message queue.void Q_Init( QUEUE * Queue, uint8_t * Memory, uint32_t Capacity );// Deletes the last retrieved message in a queue.void Q_Purge( QUEUE * Queue );// Deletes all message in a queue.void Q_Clear( QUEUE * Queue );// Returns the number of messages currently in a queueuint32_t Q_GetCount( QUEUE * Queue );// Returns the first message sizeuint32_t Q_GetSize( QUEUE * Queue );// Delivers information whether the queue is actually in use.// A queue must not be cleared or deleted when it is in use.uint32_t Q_IsUsing( QUEUE * Queue );// Stores a new message of given size in a queue.uint32_t Q_Wirte( QUEUE * Queue, void * Message, uint32_t Size );// Retrieves a message from a queueuint32_t Q_Read( QUEUE * Queue, void ** Message );#endif /* __QUEUE_H__ */

 

#include "queue.h"#include "cmsis_os.h"#include "macro_misc.h"// Creates and initializes a message Queue.QUEUE * Q_Create( uint32_t Capacity ){  uint32_t Size = ALIGN_UP( sizeof(QUEUE), 4 ) + ALIGN_UP( Capacity, 4 );  QUEUE *Queue = (QUEUE *) osMalloc( Size, osWaitForever );  if ( Queue == NULL )    return NULL;  uint8_t * Memory = //    (uint8_t *) ( ( (uint32_t) ( Queue ) ) + ALIGN_UP( sizeof(QUEUE), 4 ) );  Q_Init( Queue, Memory, ALIGN_UP( Capacity, 4 ) );  return Queue;}// Deletes a specific Queue.// A Queue must not be cleared or deleted when it is in use.void Q_Delete( QUEUE * Queue ){  if ( Queue->IsUsing == 0 )    osFree( Queue );}// Deletes all messages in a Queue.// A Queue must not be cleared or deleted when it is in use.void Q_Clear( QUEUE * Queue ){  if ( Queue->IsUsing == 0 )    Queue->MessageCount = 0;}// Initializes a message Queue.void Q_Init( QUEUE * Queue, uint8_t * Memory, uint32_t Capacity ){  int32_t Delta = (uint32_t) Memory & 3;  if ( Delta )  {    Delta -= 4;    Capacity += Delta;    Memory -= Delta;  }  memset( Queue, 0, sizeof(QUEUE) );  Queue->Capacity = Capacity;  Queue->Memory = Memory;}// Returns the number of messages currently in a Queueuint32_t Q_GetCount( QUEUE * Queue ){  return Queue->MessageCount - Queue->InProgressCount;}// Returns the first message sizeuint32_t Q_GetSize( QUEUE * Queue ){  uint32_t MessageSize = 0;  if ( Queue->MessageCount )    MessageSize = *(uint32_t *) ( &Queue->Memory[ Queue->ReadIndex ] );  return MessageSize;}// Delivers information whether the Queue is actually in use.// A Queue must not be cleared or deleted when it is in use.uint32_t Q_IsUsing( QUEUE * Queue ){  return Queue->IsUsing;}// Stores a new message of given size in a Queue.// 0 : Queue could not be stored (Queue is full).// 1 : Success; message stored.uint32_t Q_Write( QUEUE * Queue, void * Message, uint32_t Size ){  uint32_t ReadIndexVal;  uint32_t WriteIndexPending;  uint32_t WriteIndexVal;  uint32_t MessageSize = 4 + ALIGN_UP( Size, 4 );  int32_t * Memory = (int32_t *) Queue->Memory;  uint32_t Value = osDisableInterrupt( );  if ( Queue->MessageCount == 0 )  {    // read next message from head of memory    Queue->ReadIndex = 0;    // Queue could not be stored (memory is full).    WriteIndexVal = -1;    if ( Queue->Capacity >= MessageSize )      WriteIndexVal = 0;  }  else  {    Memory = (int32_t *) Queue->Memory;    WriteIndexPending = Queue->WriteIndex;    int32_t SizePending = Memory[ WriteIndexPending ];    if ( SizePending < 0 )    {      // other task is writting ... but it is preemptived by our task      // WriteIndexPending has been updated      // [ Last Queue ] [ --- Other Queue --- ] [ Our Mesage ]      //                  | WriteIndexPending      SizePending = -SizePending;    }    else    {      // [ Last Queue ] [ Our Mesage ]      //                  | WriteIndexPending    }    // where our task will write ...    WriteIndexVal = WriteIndexPending + 4 + ALIGN_UP( SizePending, 4 );    ReadIndexVal = Queue->ReadIndex;    if ( ReadIndexVal >= WriteIndexVal )    {      // [ Our Mesage ] [ Last Queue ]      // |<------------>|ReadIndexVal      // |WriteIndexVal      if ( ReadIndexVal - WriteIndexVal < MessageSize )        WriteIndexVal = -1;    }    else    {      // [ Our Mesage ] [ Available Space ]      // |WriteIndexVal                   |Capacity      // |<------------------------------>|      uint32_t sizeAvailableTail = Queue->Capacity - WriteIndexVal;      if ( sizeAvailableTail < MessageSize )      {        // try to write to head of memory        // [ Our Mesage ] [ Last Queue ]        // |<------------>|ReadIndexVal        // |0        if ( ReadIndexVal < MessageSize )          WriteIndexVal = -1;        else if ( sizeAvailableTail > 4 )        {          // can not read message from tail of memory          // Marker for Q_Purge()          Memory[ WriteIndexVal ] = 0;          // write to head of memory          WriteIndexVal = 0;        }      }    }  }  // store message to memory  if ( WriteIndexVal != -1 )  {    // WriteIndexPending for other task if our task be preemptived    Queue->WriteIndex = WriteIndexVal;    Queue->MessageCount++;    Memory[ WriteIndexVal ] = -Size; // SizePending for other task    Queue->InProgressCount++;    osRestoreInterrupt( Value );    //    memcpy( &Memory[ WriteIndexVal + 4 ], Message, Size );    //    Value =http://www.mamicode.com/ osDisableInterrupt( );    Memory[ WriteIndexVal ] = Size; // Size for this message    Queue->InProgressCount--;  }  osRestoreInterrupt( Value );  return ( WriteIndexVal != -1 );}// Retrieves a message from a Queue// not allowed while the queue is in use.uint32_t Q_Read( QUEUE * Queue, void ** Message ){  uint32_t MessageSize = 0;  uint32_t * Memory = (uint32_t *) Queue->Memory;  uint32_t Value = osDisableInterrupt( );  if ( ( Queue->IsUsing == 0 ) && ( Queue->MessageCount ) )  {    MessageSize = Memory[ Queue->ReadIndex ];    *Message = (void *) ( (uint32_t) ( &Memory[ Queue->ReadIndex ] ) + 4 );    Queue->IsUsing = 1;  }  osRestoreInterrupt( Value );  return MessageSize;}// Deletes the last retrieved message in a Queue.void Q_Purge( QUEUE * Queue ){  uint32_t Value = osDisableInterrupt( );  if ( Queue->IsUsing )  {    uint32_t * Memory = (uint32_t *) Queue->Memory;    uint32_t MessageSize = 4 + ALIGN_UP( Memory[ Queue->ReadIndex ], 4 );    Queue->MessageCount--;    uint32_t NextReadIndexVal = Queue->ReadIndex + MessageSize;    Queue->ReadIndex = NextReadIndexVal;    if ( Queue->Capacity - NextReadIndexVal < 5 )      Queue->ReadIndex = 0;    else if ( Queue->MessageCount )    {      // Marked by Q_Write(), Next readable message at head of memory      if ( Memory[ NextReadIndexVal ] == 0 )        Queue->ReadIndex = 0;    }    Queue->IsUsing = 0;  }  osRestoreInterrupt( Value );}