Фундаментальные алгоритмы и структуры данных в Delphi
Шрифт:
Так как же быть с производителем? Каким образом он узнает, что можно снова заполнять буфер данных? Понятно, что он может это делать только после того, как последний (предположительно самый медленный) потребитель прочитал достаточный объем данных, чтобы появилось место для его заполнения новыми данными (иначе говоря, как только буфер снова освободится). Это, в свою очередь, предполагает, что должен существовать счетчик потребителей для каждого буфера данных. Каждый раз, когда потребитель считывает данные из буфера, он уменьшает значение этого счетчика (число потребителей, которым еще только предстоит выполнить считывание данных из этого буфера). Таким образом, когда последний
Код этого расширенного класса TtdProduceManyConsumeSync, который позволяет нескольким потребителям потреблять данные, сгенерированные единственным производителем, приведен в листинге 12.15. Предполагается, что каждый поток потребителя имеет уникальный, начинающийся с нуля, идентификатор (на практике этого легко добиться, но при необходимости класс можно было бы расширить, чтобы потребители могли регистрироваться и отменять свою регистрацию, и чтобы идентификаторы присваивались им "на лету"). Затем потребитель использует этот идентификатор (числовое значение) при обращении к методам StartConsumer и StopConsumer.
Листинг 12.15. Класс синхронизации одного производителя и нескольких потребителей
В этом классе предполагается, что производитель заполняет буфера, которые затем используются потребителями. Буфера не имеют никакой реальной реализации в самом классе. Их предоставление - задача пользователя класса.
Метод StartProducing, показанный в листинге 12.16, работает во многом аналогично описанному в предыдущем случае: он просто дожидается передачи ему семафора "требуются данные". (Этот семафор содержит значение, равное количеству буферов, что позволяет производителю заполнить все буфера.)
type
TtdProduceManyConsumeSync = class private
FBufferCount : integer;
{счетчик буферов данных}
FBufferInfo : TList;
{циклическая очередь информации о буферах}
FBufferTail : integer;
{конец циклической очереди буферов}
FConsumerCount : integer;
{счетчик потребителей}
FConsumerInfo : TList;
{информация для каждого потребителя}
FNeedsData : THandle;
{семафор}
protected
public
constructor Create(aBufferCount : integer;
aConsumerCount : integer);
destructor Destroy; override;
procedure StartConsuming(aid : integer);
procedure StartProducing;
procedure StopConsuming(aid : integer);
procedure StopProducing;
end;
Метод StopProducing, также показанный в листинге 12.16, на этот раз должен выполнить несколько больший объем работы. Во-первых, счетчик использования потребителей только что заполненного им буфера должен быть установлен равным количеству потребителей. Обратите внимание, что поток производителя должен передавать все семафоры "имеются данные" (по одному для каждого потребителя), тем самым сообщая о наличии еще одного буфера, готового к использованию.
Листинг 12.16. Методы StartProducing и StopProducing
type
PBufferInfo = ^TBufferInfo;
TBufferInfo = packed record
biToUseCount : integer;
{счетчик потребителей, которым еще предстоит использовать буфер}
end;
type
PConsumerInfo = ^TConsumerInfo;
TConsumerInfo = packed record ciHasData : THandle;
{семафор}
ciHead : integer;
{указатель
end;
procedure TtdProduceManyConsumeSync.StartProducing;
begin
{чтобы можно было начать генерацию данных, необходимо передать семафор "требуются данные"}
WaitForSingleObject(FNeedsData, INFINITE);
end;
procedure TtdProduceManyConsumeSync.StopProducing;
var
i : integer;
BufInfo : PBufferInfo;
ConsumerInfo : PConsumerInfo;
begin
{в случае генерации каких-либо дополнительных данных необходимо установить счетчик потребителей буфера в конце очереди, чтобы тем самым обеспечить правильную обработку всех буферов}
BufInfo := PBufferInfo(FBufferInfo[FBufferTail]);
BufInfo^.biToUseCount := FConsumerCount;
inc(FBufferTail);
if (FBufferTail >= FBufferCount) then
FBufferTail := 0;
{теперь всем потребителям необходимо сообщить о наличии дополнительных данных}
for i := 0 to pred(FConsumerCount) do
begin
ConsumerInfo := PConsumerInfo(FConsumerInfo[i]);
ReleaseSemaphore(ConsumerInfo^.ciHasData/ 1, nil);
end;
end;
Чтобы разобраться с работой алгоритма с точки зрения потребителя, взгляните на листинг 12.17. Метод StartConsuming должен дождаться передачи семафора "имеются данные", предназначенного для соответствующего потока потребителя (каждому потоку присвоен идентификатор потребителя). Метод StopConsuming -наиболее сложный во всем классе синхронизации. Вначале он извлекает информационную запись о буфере, соответствующую его собственному указателю на начало очереди. Затем он уменьшает значение счетчика потребителей, которым еще предстоит выполнить считывание (потребить) данный буфер. (подпрограмма InterlockedDecrement - это составная часть интерфейса WIN32 API. Она уменьшает значение своего параметра безопасным для потоков образом и возвращает новое значение параметра.) Затем метод увеличивает указатель на начало очереди для данного потока потребителя и, если теперь число потребителей, которым еще предстоит выполнить считывание этого буфера, равно нулю, передает производителю семафор "требуются данные", чтобы побудить его сгенерировать новые данные.
Листинг 12.17. Методы StartConsuming и StopConsuming
procedure TtdProduceManyConsumeSync.StartConsuming(aId : integer);
var
ConsumerInfo : PConsumerInfo;
begin
{чтобы можно было начать потребление данных, потребителю с данным конкретным идентификатором должен быть передан семафор "имеются данные"}
ConsumerInfo := PConsumerInfo(FConsumerInfo[aId]);
WaitForSingleObject(ConsumerInfo^.ciHasData, INFINITE);
end;
procedure TtdProduceManyConsumeSync.StopConsuming(aId : integer);
var
BufInfo : PBufferInfo;
ConsumerInfo : PConsumerInfo;
NumToRead : integer;
begin
{мы выполнили считывание данных в буфере, на который указывает указатель начала очереди}
ConsumerInfo := PConsumerInfo(FConsumerInfo[aId]);
BufInfo := PBufferInfo(FBufferInfo[ConsumerInfo^.ciHead]);
NumToRead := InterLockedDecrement(BufInfo^.biToUseCount);
{переместить указатель начала очереди}
inc(ConsumerInfo^.ciHead);
if (ConsumerInfo^.ciHead >= FBufferCount) then