MFP

Logo

Índice

Métodos Formais em Engenharia de Software
Cálculo de Sistemas de Informação
Programação Ciber-física
Verificação Formal
Calendário | Sumários
Anos anteriores
Alumni

Especificação e verificação de algoritmos concorrentes com TLA+

Atomicidade

Considere o seguinte programa onde duas threads concorrentes incrementam uma variável partilhada.

import threading

x = 0

def inc():
    global x
    x = x + 1

t0 = threading.Thread(target=inc)
t1 = threading.Thread(target=inc)
t0.start()
t1.start()
t0.join()
t1.join()

assert x == 2

Para simplificar a modelação vamos assumir que todas as instruções de uma thread executam de forma atómica, isto é sem interrupção de outros processos. Por vezes, para tornar a análise mais eficiente (porque o número de traços possíveis diminui significativamente) vamos assumir que várias instruções executam de forma atómica. Para sinalizar isso iremos usar etiquetas no algoritmo para assinalar diferentes pontos de execução e assumir que todas as instruções entre duas etiquetas executam de forma atómica. Há situações em que podemos assumir isso sem mascarar problemas, nomeadamente agrupar sequências de instruções que apenas referem variáveis locais. Noutros casos é preciso ter muito cuidado com essas simplificações. Por exemplo, no algoritmo acima só temos dois possíveis pontos de execução em cada processo (antes ou depois da atribuição), o que pode ser tornado explícito acrescentando as referidas etiquetas.

import threading

x = 0

def inc():
    global x
    # 0
    x = x + 1
    # 1

t0 = threading.Thread(target=inc)
t1 = threading.Thread(target=inc)
t0.start()
t1.start()
t0.join()
t1.join()

assert x == 2

Para verificar a correção parcial deste algoritmo expressa na asserção (quando termina a variável partilhada tem o valor 2) e a sua terminação podemos usar a seguinte especificação em TLA+.

----- MODULE Increment2 -----

EXTENDS Naturals

VARIABLES x,pc0,pc1

vars == <<x,pc0,pc1>>

Init == x = 0 /\ pc0 = 0 /\ pc1 = 0

Increment0 == pc0 = 0 /\ x' = x + 1 /\ pc0' = 1 /\ UNCHANGED pc1

Increment1 == pc1 = 0 /\ x' = x + 1 /\ pc1' = 1 /\ UNCHANGED pc0

Next == Increment0 \/ Increment1

Spec == Init /\ [][Next]_vars /\ WF_vars(Next)

TypeOK == [](x \in Nat /\ pc0 \in 0..1 /\ pc1 \in 0..1)

Finish == pc0 = 1 /\ pc1 = 1

PartialCorrectness == [](Finish => x = 2)

Termination == <> Finish

=============================

Como este é um algoritmo que termina obviamente também terá um deadlock, logo devemos indicar ao TLC que não queremos verificar a ausência do mesmo.

SPECIFICATION Spec 
PROPERTIES TypeOK PartialCorrectness Termination
CHECK_DEADLOCK FALSE

Este algoritmo pode ser generalizado para um número arbitrário de processos.

import threading

N = 5

x = 0

def inc():
    global x
    # 0
    x = x + 1
    # 1

thread = [threading.Thread(target=inc) for i in range(N)]
for t in thread: t.start()
for t in thread: t.join()

assert x == N

Para especificar esta versão em TLA+ devemos declarar uma constante que diz quantos processos temos e assumir que essa constante é estritamente positiva. Para além disso, como temos agora um número arbitrário de processos, para guardar os program counters teremos que usar uma função finita que mapeia cada identificador de processo para a etiqueta que identifica o respetivo ponto de execução.

----- MODULE IncrementN -----

EXTENDS Naturals

CONSTANT N

ASSUME N > 0

VARIABLES x,pc

vars == <<x,pc>>

Init == x = 0 /\ pc = [i \in 1..N |-> 0]

Increment(i) == pc[i] = 0 /\ x' = x + 1 /\ pc' = [pc EXCEPT ![i] = 1]

Next == \E i \in 1..N : Increment(i)

Spec == Init /\ [][Next]_vars /\ WF_vars(Next)

TypeOK == [](x \in Nat /\ pc \in [1..N -> 0..1])

Finish == \A i \in 1..N : pc[i] = 1

PartialCorrectness == [](Finish => x = N)

Termination == <> Finish

=============================

Para verificar este algoritmo, no ficheiro de configuração temos que indicar qual é o valor da constante declarada.

CONSTANT N = 5
SPECIFICATION Spec
PROPERTIES TypeOK PartialCorrectness Termination
CHECK_DEADLOCK FALSE 

De facto, os processadores não garantem que a instrução x = x + 1 executa de forma atómica, pois a execução desta instrução implica uma leitura da memória do valor actual de x e uma posterior escrita de x + 1, e entre estas duas instruções máquina o processo poderia ser interrompido. Continuando a assumir que as instruções executam de forma atómica, podemos simular este comportamento substituindo a instrução x = x + 1 pelas instruções y = x e x = y+1, onde y é uma variável local de cada processo. Isto corresponderia ao seguinte algoritmo.

import threading

N = 5

x = 0

def inc():
    global x
    # 0
    y = x
    # 1
    x = y + 1
    # 2

thread = [threading.Thread(target=inc) for i in range(N)]
for t in thread: t.start()
for t in thread: t.join()

assert x == N

Exercício: Especifique este algoritmo em TLA+ e verifique se continua a cumprir as propriedades esperadas.

Neste caso, para garantir que a atribuição em cada processo executa de forma atómica podemos usar um semáforo, que terá de ser adquirido por cada processo antes de começar a executar e libertado no final. Em termos abstratos podemos ver um semáforo como uma simples variável booleana partilhada, onde a ação de adquirir o semáforo corresponde à execução atómica de uma espera até que a variável seja True seguida uma atribuição a False. O algoritmo com o semáforo seria o seguinte.

import threading

N = 5

x = 0
l = threading.Lock()

def inc():
    global x
    # 0
    l.acquire()
    # 1
    y = x
    # 2
    x = y + 1
    # 3
    l.release()
    # 4

thread = [threading.Thread(target=inc) for i in range(N)]
for t in thread: t.start()
for t in thread: t.join()

assert x == N

Exercício: Especifique este algoritmo em TLA+ e verifique se volta a cumprir as propriedades esperadas.

Exercício: Verifique que esta versão do algoritmo é um refinamento da versão inicial onde se assumia que a atribuição executava de forma atómica.

Algoritmos de exclusão mútua

Exercício: Verifique se o seguinte algoritmo de exclusão mútua para um número arbitrário de processos usando um semáforo garante de facto a exclusão mútua e a ausência de starvation (neste caso identifique as condições de justiça mais fracas necessárias para que a propriedade se verifique). Valide a sua especificação verificando se um cenário onde todos os processos entram na região crítica é possível.

import threading
import random

N = 5

s = threading.Lock()

def code():
    global x
    while True:
        # idle
        while True: 
            if random.choice([True, False]): break
        # wait
        s.acquire()
        # critical
        s.release()

ts = [threading.Thread(target=code) for i in range(N)]
for t in ts: t.start()

Exercício: Verifique se o seguinte algoritmo de exclusão mútua para 2 processos proposto por Peterson garante de facto a exclusão mútua e a ausência de starvation (neste caso identifique as condições de justiça mais fracas necessárias para que a propriedade se verifique).

import threading
import random

turn = random.choice([0, 1])
want = [False, False]

def t(i):
    global want
    global turn

    while True:
        # idle
        while True: 
            if random.choice([True, False]): break
        want[i] = True
        # want
        turn = 1-i
        # waiting
        while want[1-i] and turn == 1-i: pass
        # critical
        want[i] = False

thread1 = threading.Thread(target=t, args=(0,))
thread2 = threading.Thread(target=t, args=(1,))
thread1.start()
thread2.start()

Exercício: Verifique a seguinte versão do algoritmo de Peterson para N > 1 processos.

import threading
import random

N = 4

level = [-1] * N
last = [random.choice(range(N)) for i in range(N-1)]

def proc(i):
    global level
    global last
    while True:
        # idle
        while True: 
            if random.choice([True, False]): break
        j = 0
        # want
        while j < N-1:
            level[i] = j
            last[j] = i
            # wait
            while last[j] == i and any([level[k] >= j and k != i for k in range(N)]): pass
            j += 1
        # critical 
        level[i] = -1

thread = [threading.Thread(target=proc, args=(i,)) for i in range(N)]
for t in thread: t.start()

Exercício: Verifique o algoritmo Bakery de exclusão mútua proposto por Leslie Lamport.