Фундаментальные алгоритмы и структуры данных в Delphi
Шрифт:
aBuffers : TQueuedBuffers);
begin
inherited Create (true);
FStream := aStream;
FSyncObj :=,aSyncObj;
FBuffers aBuffers;
end;
procedure TProducer.Execute;
var
Tail : PBuffer;
begin
{выполнять до момента опустошения потока...}
repeat
{сигнализировать о готовности к началу генерирования данных}
FSyncObj.StartProducing;
{считать блок из потока в конечный буфер}
Tail FBuffers.Tail;
Tail^.bCount := FStream.Read(Tail^.bBlock, BufferSize);
{переместить
FBuffers.AdvanceTail;
{поскольку выполняется запись нового буфера, необходимо сигнализировать о созданных данных}
FSyncObj.StopProducing;
until (Tail^.bCount ? 0);
end;
type
TConsumer = class(TThread) private
FBuffers : TQueuedBuffers;
FStream : TStream;
FSyncObj : TtdProduceConsumeSync;
protected
procedure Execute; override;
public
constructor Create(aStream : TStream;
aSyncObj : TtdProduceConsumeSync;
aBuffers : TQueuedBuffers);
end;
constructor TConsumer.Create(aStream : TStream;
aSyncObj : TtdProduceConsumeSync;
aBuffers : TQueuedBuffers);
begin
inherited Create (true);
FStream := aStream;
FSyncObj := aSyncObj;
FBuffers := aBuffers;
end;
procedure TConsumer.Execute;
var
Head : PBuffer;
begin
{сигнализировать о готовности к началу потребления данных}
FSyncObj.StartConsuming;
{извлечь начальный буфер}
Head := FBuffers.Head;
{до тех пор, пока начальный буфер не опустошен...}
while (Head^.bCount <> 0) do
begin
{выполнить запись блока из начального буфера в поток}
FStream.Write(Head^.bBlock, Head^.bCount);
{переместить указатель начала очереди}
FBuffers.AdvanceHead;
{поскольку было выполнено считывание и обработка буфера, необходимо сообщить о том, что данные были использованы}
FSyncObj.StopConsuming;
{сигнализировать о готовности снова приступить к потреблению данных}
FSyncObj.StartConsuming;
{извлечь начальный буфер}
Head := FBuffers.Head;
end;
end;
И, наконец, мы можем рассмотреть подпрограмму копирования потока, приведенную в листинге 12.14. Она принимает два параметра: входной поток и выходной поток. Подпрограмма создает специальный объект типа TQueuedBuffers. Этот объект содержит все ресурсы и методы, необходимые для реализации организованного в виде очереди набора буферов. Он создает также экземпляр класса TtdProducerConsumerSync, который будет действовать в качестве объекта синхронизации, обеспечивающего согласованную работу производителя и потребителя.
Листинг 12.14. Многопоточное копирование
procedure ThreadedCopyStream(aSrcStream, aDestStream : TStream);
var
SyncObj : TtdProduceConsumeSync;
Buffers : TQueuedBuffers;
Producer : TProducer;
Consumer : TConsumer;
WaitArray : array [ 0..1] of THandle;
begin
SyncObj := nil;
Buffers := nil;
Producer :=nil;
Consumer :=nil;
try
{создать
SyncObj := TtdProduceConsumeSync.Create(20);
Buffers := TQueuedBuffers.Create(20);
Producer := TProducer.Create(aSrcStream, SyncObj, Buffers);
Consumer := TConsumer.Create(aDestStream, SyncObj, Buffers);
{сохранить дескрипторы потоков, что обеспечивает возможность ожидания их передачи}
WaitArray[0] := Producer.Handle;
WaitArray[1] := Consumer.Handle;
{запустить потоки}
Consumer.Resume;
Producer.Resume;
{ожидать окончания потоков}
WaitForMultipleObjects(2, @WaitArray, true, INFINITE);
finally
Producer.Free;
Consumer.Free;
Buffers.Free;
SyncObj.Free;
end;
end;
Затем подпрограмма копирования создает два потока, между которыми будет выполняться копирование, и возобновляет их выполнение (потоки создаются в приостановленном состоянии). Далее подпрограмма дожидается завершения обоих потоков и выполняет очистку. Полный код подпрограммы можно найти в файлах TstCopy.dpr и TstCopyu.pas на web-сайте издательства, в разделе материалов.
Модель с одним производителем и несколькими потребителями
Реализовать рассмотренное приложение, в котором используется модель "производитель-потребитель", было достаточно просто. Теперь рассмотрим модель с одним производителем и несколькими потребителями. В этом случае имеется поток, который создает данные. Предположим, что существует несколько потоков, которым требуется считывать созданные данные. В упомянутом ранее примере использовались два потребителя, которые сжимали данные с применением разных алгоритмов. Еще одним примером мог бы служить браузер. Будем считать, что производитель выгружает web-страницу из удаленного сайта, а один потребитель считывает HTML-код, чтобы выполнить его сохранение на диске, второй считывает код для его отображения на экране, а третий - с целью отображения индикатора выполнения. Создание этих процессов как отдельных потребителей упрощает написание кода, поскольку каждый процесс должен выполнять только одну задачу.
Итак, что же требуется, чтобы объект синхронизации поддерживал согласованную работу производителя и потребителей? Во-первых, производитель должен сообщать всем потребителям о наличии данных для считывания. Предположительно скорости работы потребителей будут различными, и поэтому они будут обрабатывать данные с различной частотой. Это предполагает существование по одному семафору "имеются данные" на каждый потребитель. Будем считать, что существует список буферов, которые производитель должен пополнять данными. И более того, этот список организован в виде циклической очереди. Следовательно, нам нужен единственный указатель конца очереди (управляемый исключительно производителем) и по одному указателю начала очереди для каждого потребителя, поскольку, по всей вероятности, каждый потребитель будет считывать буфера с различной частотой.