首页 > 代码库 > 简易高效的Delphi原子队列

简易高效的Delphi原子队列

本文提供Delphi一个基于原子操作的无锁队列,简易高效。适用于多线程大吞吐量操作的队列。

可用于Android系统和32,64位Windows系统。

 

感谢歼10和qsl提供了修改建议!

有如下问题:

1.必须实现开辟内存

2.队列大小必须是2的幂

3.不能压入空指针

 

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
unit utAtomFIFO;
 
interface
 
Uses
  SysUtils,
  SyncObjs;
 
Type
  TAtomFIFO = Class
  Protected
    FWritePtr: Integer;
    FReadPtr: Integer;
    FCount:Integer;
    FHighBound:Integer;
    FisEmpty:Integer;
    FData: array of Pointer;
    function GetSize:Integer;
  Public
    procedure Push(Item: Pointer);
    function Pop: Pointer;
    Constructor Create(Size: Integer); Virtual;
    Destructor Destroy; Override;
    Procedure Empty;
    property Size: Integer read GetSize;
    property UsedCount:Integer read FCount;
  End;
 
Implementation
 
{$I InterlockedAPIs.inc}
//创建队列,大小必须是2的幂,需要开辟足够大的队列,防止队列溢出
 
Constructor TAtomFIFO.Create(Size: Integer);
var
  i:NativeInt;
  OK:Boolean;
Begin
  Inherited Create;
  OK:=(Size and (Size-1)=0);
 
  if not OK then raise Exception.Create(‘FIFO长度必须大于等于256并为2的幂‘);
 
  try
    SetLength(FData, Size);
    FHighBound:=Size-1;
  except
    Raise Exception.Create(‘FIFO申请内存失败‘);
  end;
End;
 
Destructor TAtomFIFO.Destroy;
Begin
  SetLength(FData, 0);
  Inherited;
End;
 
procedure TAtomFIFO.Empty;
begin
  while (InterlockedExchange(FReadPtr, 0)<>0) and (InterlockedExchange(FWritePtr, 0)<>0) and (InterlockedExchange(FCount, 0)<>0) do;
end;
 
function TAtomFIFO.GetSize: Integer;
begin
  Result:=FHighBound+1;
end;
 
procedure TAtomFIFO.Push(Item:Pointer);
var
  N:Integer;
begin
  if Item=nil then Exit;
 
  N:=InterlockedIncrement(FWritePtr) and FHighBound;
  FData[N]:=Item;
  InterlockedIncrement(FCount);
end;
 
Function TAtomFIFO.Pop:Pointer;
var
  N:Integer;
begin
  if InterlockedDecrement(FCount)<0 then
  begin
    InterlockedIncrement(FCount);
    Result:=nil;
  end
  else
  begin
    N:=InterlockedIncrement(FReadPtr) and FHighBound;
    while FData[N]=nil do Sleep(0);
    Result:=FData[N];
 
    FData[N]:=nil;
  end;
end;
 
End.

  

 InterlockedAPIs.inc

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
{*******************************************************}
{                                                       }
{           CodeGear Delphi Runtime Library             }
{                                                       }
{ Copyright(c) 1995-2014 Embarcadero Technologies, Inc. }
{                                                       }
{*******************************************************}
 
{$IFDEF CPUX86}
 
function InterlockedAdd(var Addend: Integer; Increment: Integer): Integer;
asm
      MOV   ECX,EAX
      MOV   EAX,EDX
 LOCK XADD  [ECX],EAX
      ADD   EAX,EDX
end;
 
function InterlockedCompareExchange(var Target: Integer; Exchange: Integer; Comparand: Integer): Integer;
asm
      XCHG    EAX,ECX
 LOCK CMPXCHG [ECX],EDX
end;
 
function InterlockedCompareExchangePointer(var Target: Pointer; Exchange: Pointer; Comparand: Pointer): Pointer;
asm
      JMP InterlockedCompareExchange
end;
 
function InterlockedDecrement(var Addend: Integer): Integer;
asm
      MOV   EDX,-1
      JMP   InterlockedAdd
end;
 
function InterlockedExchange(var Target: Integer; Value: Integer): Integer;
asm
      MOV     ECX,EAX
      MOV     EAX,[ECX]
@@loop:
 LOCK CMPXCHG [ECX],EDX
      JNZ     @@loop
end;
 
function InterlockedExchangePointer(var Target: Pointer; Value: Pointer): Pointer;
asm
      JMP InterlockedExchange
end;
 
function InterlockedIncrement(var Addend: Integer): Integer;
asm
      MOV   EDX,1
      JMP   InterlockedAdd
end;
 
{$ENDIF CPUX86}
 
{$IFDEF CPUX64}
 
function InterlockedExchangeAdd(var Addend: Integer; Value: Integer): Integer;
asm
      .NOFRAME
      MOV   EAX,EDX
 LOCK XADD  [RCX].Integer,EAX
end;
 
function InterlockedDecrement(var Addend: LongInt): LongInt;
asm
      .NOFRAME
      MOV   EAX,-1
 LOCK XADD  [RCX].Integer,EAX
      DEC   EAX
end;
 
function InterlockedIncrement(var Addend: LongInt): LongInt;
asm
      MOV   EAX,1
 LOCK XADD  [RCX].Integer,EAX
      INC   EAX
end;
 
function InterlockedCompareExchange(var Destination: Integer; Exchange: Integer; Comparand: Integer): Integer;
asm
      .NOFRAME
      MOV     EAX,R8d
 LOCK CMPXCHG [RCX].Integer,EDX
end;
 
function InterlockedCompareExchange64(var Destination: Int64; Exchange: Int64; Comparand: Int64): Int64; overload;
asm
      .NOFRAME
      MOV     RAX,R8
 LOCK CMPXCHG [RCX],RDX
end;
 
function InterlockedCompareExchangePointer(var Destination: Pointer; Exchange: Pointer; Comparand: Pointer): Pointer;
asm
      .NOFRAME
      MOV     RAX,R8
 LOCK CMPXCHG [RCX],RDX
end;
 
function InterlockedExchangePointer(var Target: Pointer; Value: Pointer): Pointer;
asm
       .NOFRAME
  LOCK XCHG [RCX],RDX
       MOV RAX,RDX
end;
 
function InterlockedExchange(var Target: Integer; Value: Integer): Integer;// inline;
asm
       .NOFRAME
  LOCK XCHG [RCX],EDX
       MOV EAX,EDX
end;
 
{$ENDIF CPUX64}
 
{$IFDEF CPUARM}
 
function InterlockedAdd(var Addend: Integer; Increment: Integer): Integer;
begin
  Result := AtomicIncrement(Addend, Increment);
end;
 
function InterlockedCompareExchange(var Target: Integer; Exchange: Integer; Comparand: Integer): Integer;
begin
  Result := AtomicCmpExchange(Target, Exchange, Comparand);
end;
 
function InterlockedCompareExchangePointer(var Target: Pointer; Exchange: Pointer; Comparand: Pointer): Pointer;
begin
  Result := AtomicCmpExchange(Target, Exchange, Comparand);
end;
 
function InterlockedDecrement(var Addend: Integer): Integer;
begin
  Result := AtomicDecrement(Addend);
end;
 
function InterlockedExchange(var Target: Integer; Value: Integer): Integer;
begin
  Result := AtomicExchange(Target, Value);
end;
 
function InterlockedExchangePointer(var Target: Pointer; Value: Pointer): Pointer;
begin
  Result := AtomicExchange(Target, Value);
end;
 
function InterlockedIncrement(var Addend: Integer): Integer;
begin
  Result := AtomicIncrement(Addend);
end;
 
{$ENDIF CPUARM}

  

?
1
<em id="__mceDel"> </em>