Métodos Formais em Engenharia de Software |
---|
Cálculo de Sistemas de Informação |
Programação Ciber-física |
Verificação Formal |
Calendário | Sumários |
Anos anteriores |
Alumni |
Considere o seguinte programa onde duas threads concorrentes incrementam uma variável partilhada.
import threading
x = 0
def inc():
global x
x = x + 1
t0 = threading.Thread(target=inc)
t1 = threading.Thread(target=inc)
t0.start()
t1.start()
t0.join()
t1.join()
assert x == 2
Para simplificar a modelação vamos assumir que todas as instruções de uma thread executam de forma atómica, isto é sem interrupção de outros processos. Por vezes, para tornar a análise mais eficiente (porque o número de traços possíveis diminui significativamente) vamos assumir que várias instruções executam de forma atómica. Para sinalizar isso iremos usar etiquetas no algoritmo para assinalar diferentes pontos de execução e assumir que todas as instruções entre duas etiquetas executam de forma atómica. Há situações em que podemos assumir isso sem mascarar problemas, nomeadamente agrupar sequências de instruções que apenas referem variáveis locais. Noutros casos é preciso ter muito cuidado com essas simplificações. Por exemplo, no algoritmo acima só temos dois possíveis pontos de execução em cada processo (antes ou depois da atribuição), o que pode ser tornado explícito acrescentando as referidas etiquetas.
import threading
x = 0
def inc():
global x
# 0
x = x + 1
# 1
t0 = threading.Thread(target=inc)
t1 = threading.Thread(target=inc)
t0.start()
t1.start()
t0.join()
t1.join()
assert x == 2
Para verificar a correção parcial deste algoritmo expressa na asserção (quando termina a variável partilhada tem o valor 2) e a sua terminação podemos usar a seguinte especificação em TLA+.
----- MODULE Increment2 -----
EXTENDS Naturals
VARIABLES x,pc0,pc1
vars == <<x,pc0,pc1>>
Init == x = 0 /\ pc0 = 0 /\ pc1 = 0
Increment0 == pc0 = 0 /\ x' = x + 1 /\ pc0' = 1 /\ UNCHANGED pc1
Increment1 == pc1 = 0 /\ x' = x + 1 /\ pc1' = 1 /\ UNCHANGED pc0
Next == Increment0 \/ Increment1
Spec == Init /\ [][Next]_vars /\ WF_vars(Next)
TypeOK == [](x \in Nat /\ pc0 \in 0..1 /\ pc1 \in 0..1)
Finish == pc0 = 1 /\ pc1 = 1
PartialCorrectness == [](Finish => x = 2)
Termination == <> Finish
=============================
Como este é um algoritmo que termina obviamente também terá um deadlock, logo devemos indicar ao TLC que não queremos verificar a ausência do mesmo.
SPECIFICATION Spec
PROPERTIES TypeOK PartialCorrectness Termination
CHECK_DEADLOCK FALSE
Este algoritmo pode ser generalizado para um número arbitrário de processos.
import threading
N = 5
x = 0
def inc():
global x
# 0
x = x + 1
# 1
thread = [threading.Thread(target=inc) for i in range(N)]
for t in thread: t.start()
for t in thread: t.join()
assert x == N
Para especificar esta versão em TLA+ devemos declarar uma constante que diz quantos processos temos e assumir que essa constante é estritamente positiva. Para além disso, como temos agora um número arbitrário de processos, para guardar os program counters teremos que usar uma função finita que mapeia cada identificador de processo para a etiqueta que identifica o respetivo ponto de execução.
----- MODULE IncrementN -----
EXTENDS Naturals
CONSTANT N
ASSUME N > 0
VARIABLES x,pc
vars == <<x,pc>>
Init == x = 0 /\ pc = [i \in 1..N |-> 0]
Increment(i) == pc[i] = 0 /\ x' = x + 1 /\ pc' = [pc EXCEPT ![i] = 1]
Next == \E i \in 1..N : Increment(i)
Spec == Init /\ [][Next]_vars /\ WF_vars(Next)
TypeOK == [](x \in Nat /\ pc \in [1..N -> 0..1])
Finish == \A i \in 1..N : pc[i] = 1
PartialCorrectness == [](Finish => x = N)
Termination == <> Finish
=============================
Para verificar este algoritmo, no ficheiro de configuração temos que indicar qual é o valor da constante declarada.
CONSTANT N = 5
SPECIFICATION Spec
PROPERTIES TypeOK PartialCorrectness Termination
CHECK_DEADLOCK FALSE
De facto, os processadores não garantem que a instrução x = x + 1
executa de forma atómica, pois a execução desta instrução implica uma leitura da memória do valor actual de x
e uma posterior escrita de x + 1
, e entre estas duas instruções máquina o processo poderia ser interrompido. Continuando a assumir que as instruções executam de forma atómica, podemos simular este comportamento substituindo a instrução x = x + 1
pelas instruções y = x
e x = y+1
, onde y
é uma variável local de cada processo. Isto corresponderia ao seguinte algoritmo.
import threading
N = 5
x = 0
def inc():
global x
# 0
y = x
# 1
x = y + 1
# 2
thread = [threading.Thread(target=inc) for i in range(N)]
for t in thread: t.start()
for t in thread: t.join()
assert x == N
Exercício: Especifique este algoritmo em TLA+ e verifique se continua a cumprir as propriedades esperadas.
Neste caso, para garantir que a atribuição em cada processo executa de forma atómica podemos usar um semáforo, que terá de ser adquirido por cada processo antes de começar a executar e libertado no final. Em termos abstratos podemos ver um semáforo como uma simples variável booleana partilhada, onde a ação de adquirir o semáforo corresponde à execução atómica de uma espera até que a variável seja True
seguida uma atribuição a False
. O algoritmo com o semáforo seria o seguinte.
import threading
N = 5
x = 0
l = threading.Lock()
def inc():
global x
# 0
l.acquire()
# 1
y = x
# 2
x = y + 1
# 3
l.release()
# 4
thread = [threading.Thread(target=inc) for i in range(N)]
for t in thread: t.start()
for t in thread: t.join()
assert x == N
Exercício: Especifique este algoritmo em TLA+ e verifique se volta a cumprir as propriedades esperadas.
Exercício: Verifique que esta versão do algoritmo é um refinamento da versão inicial onde se assumia que a atribuição executava de forma atómica.
Exercício: Verifique se o seguinte algoritmo de exclusão mútua para um número arbitrário de processos usando um semáforo garante de facto a exclusão mútua e a ausência de starvation (neste caso identifique as condições de justiça mais fracas necessárias para que a propriedade se verifique). Valide a sua especificação verificando se um cenário onde todos os processos entram na região crítica é possível.
import threading
import random
N = 5
s = threading.Lock()
def code():
global x
while True:
# idle
while True:
if random.choice([True, False]): break
# wait
s.acquire()
# critical
s.release()
ts = [threading.Thread(target=code) for i in range(N)]
for t in ts: t.start()
Exercício: Verifique se o seguinte algoritmo de exclusão mútua para 2 processos proposto por Peterson garante de facto a exclusão mútua e a ausência de starvation (neste caso identifique as condições de justiça mais fracas necessárias para que a propriedade se verifique).
import threading
import random
turn = random.choice([0, 1])
want = [False, False]
def t(i):
global want
global turn
while True:
# idle
while True:
if random.choice([True, False]): break
want[i] = True
# want
turn = 1-i
# waiting
while want[1-i] and turn == 1-i: pass
# critical
want[i] = False
thread1 = threading.Thread(target=t, args=(0,))
thread2 = threading.Thread(target=t, args=(1,))
thread1.start()
thread2.start()
Exercício: Verifique a seguinte versão do algoritmo de Peterson para N > 1
processos.
import threading
import random
N = 4
level = [-1] * N
last = [random.choice(range(N)) for i in range(N-1)]
def proc(i):
global level
global last
while True:
# idle
while True:
if random.choice([True, False]): break
j = 0
# want
while j < N-1:
level[i] = j
last[j] = i
# wait
while last[j] == i and any([level[k] >= j and k != i for k in range(N)]): pass
j += 1
# critical
level[i] = -1
thread = [threading.Thread(target=proc, args=(i,)) for i in range(N)]
for t in thread: t.start()
Exercício: Verifique o algoritmo Bakery de exclusão mútua proposto por Leslie Lamport.