Добавил:
Upload Опубликованный материал нарушает ваши авторские права? Сообщите нам.
Вуз: Предмет: Файл:
Параллельные потоки.doc
Скачиваний:
2
Добавлен:
03.09.2019
Размер:
188.93 Кб
Скачать

Асинхронное взаимодействие

Это очень универсальный и весьма эффективный способ взаимодействия параллельных потоков. Суть его состоит во введении посредника-буфера между параллельными потоками. Иногда этот способ называют взаимодействием с помощью обмена сообщениями. Поток-писатель, желающий послать другому потоку некоторые данные, записывает их в очередь. Поток-читатель ожидает появления данных в очереди, получает их, обрабатывает и, если нужно, посылает результат в очередь потока-писателя. Принципиальный недостаток асинхронного взаимодействия - необходимость буфера неопределенного размера. Этот недостаток можно сгладить, если ввести ограничение на размер очереди - в этом случае поток-писатель должен приостанавливаться, если очередь достигла своего максимального размера.

Важная проблема эффективности многопоточной обработки возникает в том случае, если поток должен обрабатывать несколько очередей входящих сообщений или несколько событий. Приведем цитату из [1]: "пассажиру, ожидающему автобуса номер 127 в общем случае придется ждать дольше, чем тому, кто может воспользоваться как 19-м, так и 127-м маршрутом автобуса, в зависимости от того, который из них раньше придет к остановке. В предположении что автобусы приходят в случайном порядке, у пассажира, имеющего выбор, время ожидания оказывается вдвое короче - парадоксально, но получается, что он ждет вдвое "быстрее"! Единственный способ достигнуть этого - ожидать именно первого из многих возможных событий; приобретение же более быстрого компьютера здесь не поможет".

Учитывая важность выбора первого из многих возможных событий, введем понятие альтернативного ожидания и дадим его практическое воплощение.

TGsvSelectMethod = procedure of object;

TGsvSelect = class

private

FEvents: array[0..MAXIMUM_WAIT_OBJECTS - 1] of THandle;

FMethods: array[0..MAXIMUM_WAIT_OBJECTS - 1] of TGsvSelectMethod;

FCount: Cardinal;

public

constructor Create(aThread: TGsvThread);

procedure Init;

procedure Add(aEvent: THandle; aMethod: TGsvSelectMethod);

function Wait(aTimeout: Cardinal = INFINITE): Boolean;

end;

Альтернатива могут быть добавлена к списку возможных альтернатив с помощью метода Add, который принимает 2 аргумента - дескриптор события и метод-обработчик. Альтернативы могут добавляться статически, на этапе создания объекта, а могут динамически, причем список альтернатив может быть различным в зависимости от различных условий. В языке Ada это делает оператор отбора. В Delphi мы можем воспользоваться оператором if или case. Список альтернатив инициализируется вызовом Init, а альтернативное ожидание - вызовом Wait. Если в процессе ожидания возникло событие, то будет вызван метод, связанный с данным событием и Wait вернет True, в противном случае Wait вернет False. Обработчик события - это обычный метод класса параллельного потока. Функция Wait принимает в качестве аргумента значение таймаута (в миллисекундах), если за время таймаута никаких событий не произошло, то Wait вернет False. Отметим важную деталь - в список ожидаемых событий неявно вносится событие FTerminationEvent, которое делает возможным форсированное завершение ожидания при завершении потока - в этом случае Wait также вернет False. Приведем пример кода, в котором параллельный поток создает набор альтернатив и выполняет ожидание. Предполагается, что в конструкторе объекта потока создан объект FSelect класса TGsvSelect:

FSelect.Init;

if условие1 then

FSelect.Add(событие1, метод1);

if условие2 then

FSelect.Add(событие2, метод2);

FSelect.Add(безусловное_событие3, метод3);

if not FSelect.Wait(100) then begin

if Terminated then

// обработка завершения потока

else

// обработка таймаута

end;

Теперь рассмотрим более подробно реализацию класса TGsvSelect:

constructor TGsvSelect.Create(aThread: TGsvThread);

begin

inherited Create;

FEvents[0] := aThread.FTerminationEvent;

FMethods[0] := nil;

FCount := 1;

end;

procedure TGsvSelect.Init;

begin

FCount := 1;

end;

procedure TGsvSelect.Add(aEvent: THandle; aMethod: TGsvSelectMethod);

begin

Assert(FCount <= High(FEvents));

FEvents[FCount] := aEvent;

FMethods[FCount] := aMethod;

Inc(FCount);

end;

function TGsvSelect.Wait(aTimeout: Cardinal): Boolean;

var

res, i: Cardinal;

begin

Result := False;

res := WaitForMultipleObjects(FCount, @FEvents[0], False, aTimeout);

if res < (WAIT_OBJECT_0 + FCount) then begin

Result := res > WAIT_OBJECT_0;

if Result then begin

i := res - WAIT_OBJECT_0;

if Assigned(FMethods[i]) then

FMethods[i]();

end;

end;

end;

При создании объекта первым событием в списке становится завершающее событие. Процедура Add регистрирует событие - добавляет его в список (в массив) событий и добавляет в список методов связанный с этим событием метод. Функция Wait выполняет альтернативное ожидание по всем зарегистрированным событиям, включая завершающее событие. Следует отметить, что функция Wait может вернуть False не только при таймауте или при завершающем событии. Это может случиться также в том случае, когда в качестве события используется мьютекс, от которого отказался создавший его процесс. Но поскольку мы ограничились случаем взаимодействия потоков в рамках одного приложения (одного процесса операционной системы), то это событие у нас не возникнет никогда.

Теперь пора уточнить термин "событие", который мы использовали до этого момента без особого разъяснения. Под событием мы будем понимать изменение состояния объекта ядра операционной системы. В качестве событий можно использовать:

  1. Event - объект ядра, который так и называется - событие. Ввиду его важности мы подробно рассмотрим этот вид события немного ниже.

  2. Mutex (мьютекс) - напоминает критическую секцию, то есть, только один из конкурирующий потоков может захвать мьютекс, а остальные потоки будут приостановлены. Мьютекс имеет перед критической секцией ряд преимуществ, в частности, время ожидания мьютекса можно ограничивать таймаутом, мьютекс можно использовать для синхронизации параллельных потоков из разных приложений. См. в MSDN функции CreateMutex, OpenMutex, ReleaseMutex.

  3. Semaphore (семафор) - напоминает мьютекс, но разрешает захват ресурса не одним, а несколькими потоками, причем число потоков ограничено доступным количеством ресурса. См. функции CreateSemaphore, OpenSemaphore, ReleaseSemaphore.

  4. Change notification (уведомление файловой системы) - событие возникает при различных изменениях в файловой системе. См. FindFirstChangeNotification.

  5. Process - событие возбуждается при завершении процесса. См. CreateProcess.

  6. Thread - событие возбуждается при завершении потока. Ожидание этого события позволяет родительскому потоку дождаться завершения дочерних потоков. В классе TGsvThread дескриптор этого события называется SysHandle.

Кроме перечисленных событий операционная система Windows предоставляет еще несколько событий, но они доступны только в Windows NT. Особо можно еще отметить сокетные события, которые создаются функцией WSACreateEvent - эти события также можно использовать при альтернативных ожиданиях на сокетах.

Рассмотрим более подробно событие "Event":

TGsvEvent = class

private

FHandle: THandle;

procedure SetState(aState: Boolean);

public

constructor Create;

destructor Destroy; override;

function Wait(aThread: TGsvThread; aTimeout: Cardinal = INFINITE): Boolean;

property Handle: THandle read FHandle;

property State: Boolean write SetState;

end;

Событие может находиться в двух состояниях - активном (сигнализирущее состояние) и сброшенном (несигнализирующее состояние). В активное состояние событие переводится установкой свойства State в True, а сбрасывается событие установкой State в False. Эти редкий тип свойства, которое доступно только для установки, но не доступно для получения. Событие автоматически сбрасывается при каком-либо успешном ожидании, после чего его нужно явно перевести в активное состояние. Событие может быть внутренним объектом потока, а может не принадлежать никакому потоку конкретно. Поток может ожидать только данного события с помощью метода Wait, либо использовать его дескриптор Handle для альтернативного ожидания. Приведем реализацию метода Wait:

function TGsvEvent.Wait(aThread: TGsvThread; aTimeout: Cardinal): Boolean;

var

objs: array[0..1] of THandle;

cnt: Integer;

begin

objs[0] := FHandle;

cnt := 1;

if Assigned(aThread) then begin

objs[1] := aThread.FTerminationEvent;

cnt := 2;

end;

Result := WaitForMultipleObjects(cnt, @objs[0], False, aTimeout) = WAIT_OBJECT_0;

end;

Если событие ожидает поток aThread, то кроме собственно события также будет ожидаться завершающее событие потока, которое необходимо для форсированного прекращения ожидания. Если поток не задан (aThread = nil), то метод Wait будет ожидать только своего события - это может пригодиться основному VCL-потоку.

Чтобы суммировать материал этого раздела, приведем реализацию очереди ограниченной длины, которую можно использовать для взаимодействия обменом сообщениями. Будем предполагать, что все сообщения являются объектами. Сообщения добавляются в конец очереди, а извлекаются из начала очереди. Исключение из правила - высокоприоритетное сообщение, которое помещается в начало очереди без учета ограничения на длину очереди.

Класс очереди:

TGsvQueue = class

private

FGetEvent: TGsvEvent;

FPutEvent: TGsvEvent;

FLatch: TGsvLatch;

FList: TList;

FMaxCount: Integer;

function GetCount: Integer;

procedure SetEvents;

public

constructor Create(aMaxCount: Integer);

destructor Destroy; override;

function Get(aThread: TGsvThread; aTimeout: Cardinal = INFINITE): TObject;

function Put(aThread: TGsvThread; aMessage: TObject;

aTimeout: Cardinal = INFINITE): Boolean;

procedure PutOutOfTurn(aMessage: TObject);

property GetEvent: TGsvEvent read FGetEvent;

property PutEvent: TGsvEvent read FPutEvent;

property Count: Integer read GetCount;

property MaxCount: Integer read FMaxCount;

end;

Рассмотрим подробнее метод Get:

function TGsvQueue.Get(aThread: TGsvThread; aTimeout: Cardinal): TObject;

begin

Result := nil;

if not FGetEvent.Wait(aThread, aTimeout) then

Exit;

FLatch.Lock;

try

if FList.Count <> 0 then begin

Result := TObject(FList.Items[0]);

FList.Delete(0);

SetEvents;

end;

finally

FLatch.Unlock;

end;

end;

Поток, вызвавший метод Get, сразу переходит в состояние ожидания непустой очереди. Если очередь содержит хоть одно сообщение, то поток продолжит свое выполнение без ожидания. Первое сообщение извлекается из списка внутри критической секции, а затем производится переустановка событий:

procedure TGsvQueue.SetEvents;

begin

FGetEvent.State := FList.Count <> 0;

FPutEvent.State := FList.Count < FMaxCount;

end;

Обычно только один поток пишет в очередь и только один поток читает из нее. Если при асинхронном взаимодействии требуется обмен сообщениями, то в структуре данных записываемого сообщения указывается дескриптор потока-отправителя. Кроме взаимодействия один-к-одному, очередь поддерживает достаточно важный случай взаимодействия многие-к-одному, при котором многие потоки посылают данные одному потоку. Можно себе представить также ситуацию множественного чтения-записи, когда многие потоки посылают сообщения в одну очередь, и несколько потоков выбирают сообщения из очереди. Наиболее вероятна такая ситуация при наличии массива идентичных потоков-получателей.