Фундаментальные алгоритмы и структуры данных в Delphi
Шрифт:
Очередь буферов, используемая в рассматриваемом примере копирования потока, реализована в виде циклической очереди. Очередь создается с заранее выделенными всеми ее буферами. Код реализации этого класса приведен в листинге 12.12.
Обратите внимание, что мы не будем использовать диспетчер кучи во время процесса копирования потока, поскольку критический раздел защищает диспетчер кучи в многопоточной подпрограмме. Если начать вызывать подпрограммы распределения и освобождения памяти из потоков, они слишком легко смогут блокировать одна другую и,
Производитель будет заполнять буфер в начале очереди, а затем перемещать указатель начала очереди. С другой стороны, потребитель будет считывать, данные из буфера в конце очереди, а затем перемещать конец очереди. Процессы заполнения и считывания могут происходить одновременно, поскольку они используют различные буферы.
Листинг 12.12. Класс TQueuedBuffers, предназначенный для выполнения копирования потока
type
PBuffer= ^TBuffer;
TBuffer = packed record
bCount : longint;
bBlock : array [0..pred(BufferSize)] of byte;
end;
PBufferArray = ^TBufferArray;
TBufferArray = array [0..1023] of PBuffer;
type
TQueuedBuffers = class private
FBufCount : integer;
FBuffers : PBufferArray;
FHead : integer;
FTail : integer;
protected
function qbGetHead : PBuffer;
function qbGetTail : PBuffer;
public
constructor Create(aBufferCount : integer);
destructor Destroy; override;
procedure AdvanceHead;
procedure AdvanceTail;
property Head : PBuffer read qbGetHead;
property Tail : PBuffer read qbGetTail;
end;
constructor TQueuedBuffer s.Create(aBufferCount : integer);
var
i : integer;
begin
inherited Create;
{распределить буферы}
FBuffers := AllocMem(aBufferCount * sizeof(pointer));
for i := 0 to pred(aBufferCount) do
GetMem(FBuffers^[i], sizeof(TBuffer));
FBufCount := aBufferCount;
end;
destructor TQueuedBuffers.Destroy;
var
i : integer;
begin
{освободить буферы}
if (FBuffers <> nil) then begin
for i := 0 to pred( FBuf Count) do
if (FBuffers^[i] <> nil) then
FreeMem(FBuffers^[i], sizeof(TBuffer));
FreeMem(FBuffers, FBufCount * sizeof(pointer));
end;
inherited Destroy;
end;
procedure TQueuedBuffers.AdvanceHead;
begin
inc(FHead);
if (FHead = FBufCount) then
FHead := 0;
end;
procedure TQueuedBuffers.AdvanceTail;
begin
inc(FTail);
if (FTail = FBuf Count) then
FTail := 0;
end;
function TQueuedBuffers.qbGetHead : PBuffer;
begin
Result := FBuffers^[FHead];
end;
function TQueuedBuffers.qbGetTail : PBuffer;
begin
Result := FBuffers^[FTail];
end;
Менее
Коды реализации классов производителя и потребителя приведены в листинге 12.13. Эти классы являются производными от класса TThread. Код реализации каждого из перекрытых методов Execute не отличается от ранее описанного. Поток производителя входит в цикл. На каждом шаге цикла он вызывает метод StartProducer объекта синхронизации, а затем считывает блок данных из исходного потока в буфер в конце очереди. После этого он смещает указатель конца очереди. И, в заключение, он вызывает метод StopProducing и повторяет цикл с начала. Выполнение цикла прекращается, как только поток производителя устанавливает буфер в состояние, соответствующее отсутствию в нем каких-либо данных (потребитель воспринимает это состояние в качестве признака "конец потока").
В свою очередь, цикл потока потребителя выполняется следующим образом. Вначале поток вызывает метод StartConsuming объекта синхронизации. Возврат из этого метода свидетельствует об отсутствии данных для считывания в объекте поставленных в очередь буферов. Поток считывает данные из буфера, определяемого указателем начала очереди, и записывает их в поток назначения. Затем он смещает указатель начала очереди. Сразу после считывания всех данных из заполненного буфера он вызывает метод StopConsuming объекта синхронизации и повторяет цикл сначала. Работа потребителя останавливается при получении им пустого буфера.
Листинг 12.13. Классы производителя и потребителя
type
TProducer = class (TThread) private
FBuffers : TQueuedBuffers;
FStream : TStream;
FSyncObj : TtdProduceConsumeSync;
protected
procedure Execute; override;
public
constructor Create(aStream : TStream;
aSyncObj : TtdProduceConsumeSync;
aBuffers : TQueuedBuffers);
end;
constructor TProducer.Create(aStream : TStream;
aSyncObj : TtdProduceConsumeSync;