Фундаментальные алгоритмы и структуры данных в Delphi
Шрифт:
destructor Destroy; override;
procedureAdvanceHead(aConsumerId : integer);
procedure AdvanceTail;
property Head [aInx : integer] : PBuffer read qbGetHead;
property Tail : PBuffer read qbGetTail;
property ConsumerCount : integer read FConsumerCount;
end;
constructor TQueuedBuffers.Create(aBufferCount : integer;
aConsumerCount : integer);
var
i : integer;
begin
inherited Create;
{распределить буферы}
FBuffers := AllocMem(aBufferCount * sizeof(pointer));
for i := 0 to pred(aBufferCount) do
GetMem(FBuffers^[i], sizeof(TBuffer));
FBufCount := aBufferCount;
FConsumerCount := aConsumerCount;
end;
destructor TQueuedBuffers.Destroy;
var
i : integer;
begin
{освободить
if (FBuffers <> nil) then begin
for i := 0 to pred(FBufCount) do
if (FBuffers^[i] <> nil) then
FreeMem(FBuffers^[i], sizeof(TBuffer));
FreeMem(FBuffers, FBufCount * sizeof(pointer));
end;
inherited Destroy;
end;
procedure TQueuedBuffers.AdvanceHead(aConsumerId : integer);
begin
inc(FHead[aConsumerId]);
if (FHead[aConsumerId] = FBufCount) then
FHead[aConsumerId] := 0;
end;
procedure TQueuedBuffers.AdvanceTail;
begin
inc(FTail);
if (FTail = FBufCount) then
FTail := 0;
end;
function TQueuedBuffers.qbGetHead(aInx : integer): PBuffer;
begin
Result := FBuffers^[FHead[aInx]];
end;
function TQueuedBuffers.qbGetTail : PBuffer;
begin
Result := FBuffers^ [FTail];
end;
Следующей мы рассмотрим реализацию классов производителя и потребителя (листинг 12.20). Класс производителя претерпел не слишком много изменений по сравнению с предыдущей реализацией, в то время как класс потребителя теперь содержит идентификационный номер, посредством которого он обращается к объекту буферов для получения нужного указателя начала очереди.
Листинг 12.20. Классы производителя и потребителя
type
TProducer * class(TThread) private
FBuffers : TQueuedBuffers;
FStream : TStream;
FSyncObj : TtdProduceManyConsumeSync;
protected
procedure Execute; override;
public
constructor Create(aStream : TStream;
aSyncObj : TtdProduceManyConsumeSync;
aBuffers : TQueuedBuffers);
end;
constructor TProducer.Create(aStream : TStream;
aSyncObj : TtdProduceManyConsumeSync;
aBuffers : TQueuedBuffers);
begin
inherited Create (true);
FStream := aStream;
FSyncObj := aSyncObj;
FBuffers := aBuffers;
end;
procedure TProducer.Execute;
var
Tail : PBuffer;
begin
{выполнять
repeat
{передать сигнал о готовности к началу генерации данных}
FSyncObj.StartProducing;
{выполнить считывание блока из потока в конечный буфер очереди}
Tail := FBuffers.Tail;
Tail74.bCount := FStream.Read (Tail^.ЬВ1оск, 1024);
{переместить указатель конца очереди}
FBuffers.AdvanceTail;
{передать сигнал о прекращении генерации данных}
FSyncObj.StopProducing;
until (Tail^.bCount = 0);
end;
type
TConsumer = class (TThread) private
FBuffers : TQueuedBuffers;
FID : integer;
FStream : TStream;
FSyncObj : TtdProduceManyConsumeSync;
protected
procedure Execute; override;
public
constructor Create(aStream : TStream;
aSyncObj : TtdProduceManyConsumeSync;
aBuffers : TQueuedBuffers;
alD : integer);
end;
constructor TConsumer.Create(aStream : TStream;
aSyncObj : TtdProduceManyConsumeSync;
aBuffers : TQueuedBuffers;
alD : integer);
begin
inherited Create (true);
FStream := aStream;
FSyncObj := aSyncObj;
FBuffers := aBuffers;
FID := alD;
end;
procedure TConsumer.Execute;
var
Head : PBuffer;
begin
{передать сигнал о готовности к началу потребления данных}
FSyncObj.StartConsuming(FID);
{выполнить считывание начального буфера очереди}
Head := FBuffers.Head[FID];
{до тех пор, пока начальный буфер не пуст...}
while (Head^.bCount <> 0) do
begin
{выполнить запись блока из начального буфера очереди в поток}
FStream.Write(Head^.bBlock, Head^.bCount);
{переместить указатель начала очереди}
FBuffers.AdvanceHead(FID);
{обработка этого буфера завершена}
FSyncObj.StopConsuming(FID);
{передать сигнал о повторной готовности к началу потребления данных}
FSyncObj.StartConsuming(FID);
{выполнить считывание начального буфера очереди}
Head := FBuffers.Head[FID];
end;
{обработка последнего буфера завершена}
FSyncObj.StopConsuming(FID);
end;
И, наконец, рассмотрим подпрограмму копирования потоков, код которой показан в листинге 12.21.