Haskell cola concurrente rápida


El Problema

¡Hola! Estoy escribiendo una biblioteca de registro y me encantaría crear un registrador, que se ejecutaría en subprocesos separados, mientras que todos los subprocesos de aplicaciones solo le enviarían mensajes. Quiero encontrar la solución más eficaz para este problema. Necesito una simple cola de unboud aquí.

Enfoques

He creado algunas pruebas para ver cómo funcionan las soluciones disponibles y obtengo resultados muy extraños aquí. Probé 4 implementaciones (código fuente proporcionado a continuación) basado en:

  1. pipes-concurrency
  2. Control.Concurrente.Chan
  3. Control.Concurrente.Chan.Unagi
  4. Basado en MVar como se describe en el libro "Programación Paralela y Concurrente en Haskell" Tenga en cuenta que esta técnica nos da colas acotadas de capacidad 1 - se utiliza solo para pruebas

Pruebas

Aquí está el código fuente utilizado para las pruebas:

{-# LANGUAGE NoMonomorphismRestriction #-}

import Control.Concurrent (threadDelay)
import Control.Monad (forever)
import Pipes
import qualified Pipes.Concurrent as Pipes
import Control.Applicative
import Control.Monad (replicateM_)
import System.Environment (getArgs)

import Control.Concurrent.Chan
import Control.Concurrent (forkIO)
import qualified Control.Concurrent.Chan.Unagi as U
import Control.Concurrent.MVar
import Criterion.Main

data Event = Msg String | Status | Quit deriving (Show)

----------------------------------------------------------------------
-- Pipes
----------------------------------------------------------------------

pipesLogMsg = yield (Msg "hello")
pipesManyLogs num = replicateM_ num pipesLogMsg

pipesAddProducer num o = Pipes.forkIO $ do runEffect $ (pipesManyLogs num) >-> Pipes.toOutput o
                                           Pipes.performGC

pipesHandler max = loop 0
  where
    loop mnum = do
        if mnum == max
            then lift $ pure ()
            else do event <- await
                    case event of
                        Msg _  -> loop (mnum + 1)
                        Status -> (lift $ putStrLn (show mnum)) *> loop mnum
                        Quit   -> return ()

----------------------------------------------------------------------
-- Chan
----------------------------------------------------------------------

chanAddProducer num ch = forkIO $ chanManyLogs num ch
chanManyLogs num ch = replicateM_ num (writeChan ch (Msg "hello"))
chanHandler ch max = handlerIO (readChan ch) max

----------------------------------------------------------------------
-- Unagi-Chan
----------------------------------------------------------------------

uchanAddProducer num ch = forkIO $ uchanManyLogs num ch
uchanManyLogs num ch = replicateM_ num (U.writeChan ch (Msg "hello"))
uchanHandler ch max = handlerIO (U.readChan ch) max

----------------------------------------------------------------------
-- MVars
----------------------------------------------------------------------

mvarAddProducer num m = forkIO $ mvarManyLogs num m
mvarManyLogs num m = replicateM_ num (putMVar m (Msg "hello"))
mvarHandler m max = handlerIO (takeMVar m) max

----------------------------------------------------------------------
-- Utils
----------------------------------------------------------------------

handlerIO f max = loop 0 where
    loop mnum = do
        if mnum == max 
            then pure ()
            else do event <- f
                    case event of
                         Msg _  -> loop (mnum + 1)
                         Status -> putStrLn (show mnum) *> loop mnum
                         Quit   -> return ()

----------------------------------------------------------------------
-- Main
----------------------------------------------------------------------

main = defaultMain [
      bench "pipes" $ nfIO $ do
        (output, input) <- Pipes.spawn Pipes.Unbounded
        replicateM_ prodNum (pipesAddProducer msgNum output)
        runEffect $ Pipes.fromInput input >-> pipesHandler totalMsg
    , bench "Chan" $ nfIO $ do
        ch <- newChan
        replicateM_ prodNum (chanAddProducer msgNum ch)
        chanHandler ch totalMsg
    , bench "Unagi-Chan" $ nfIO $ do
        (inCh, outCh) <- U.newChan
        replicateM_ prodNum (uchanAddProducer msgNum inCh)
        uchanHandler outCh totalMsg
    , bench "MVar" $ nfIO $ do
        m <- newEmptyMVar
        replicateM_ prodNum (mvarAddProducer msgNum m)
        mvarHandler m totalMsg
    ]
  where
    prodNum  = 20
    msgNum   = 1000
    totalMsg = msgNum * prodNum

Usted puede compilar con ghc -O2 Main.hs y simplemente ejecutarlo. Las pruebas crean 20 productores de mensajes, cada uno produciendo 1000000 mensajes.

Resultados

benchmarking pipes
time                 46.68 ms   (46.19 ms .. 47.31 ms)
                     0.999 R²   (0.999 R² .. 1.000 R²)
mean                 47.59 ms   (47.20 ms .. 47.95 ms)
std dev              708.3 μs   (558.4 μs .. 906.1 μs)

benchmarking Chan
time                 4.252 ms   (4.171 ms .. 4.351 ms)
                     0.995 R²   (0.991 R² .. 0.998 R²)
mean                 4.233 ms   (4.154 ms .. 4.314 ms)
std dev              244.8 μs   (186.3 μs .. 333.5 μs)
variance introduced by outliers: 35% (moderately inflated)

benchmarking Unagi-Chan
time                 1.209 ms   (1.198 ms .. 1.224 ms)
                     0.996 R²   (0.993 R² .. 0.999 R²)
mean                 1.267 ms   (1.244 ms .. 1.308 ms)
std dev              102.4 μs   (61.70 μs .. 169.3 μs)
variance introduced by outliers: 62% (severely inflated)

benchmarking MVar
time                 1.746 ms   (1.714 ms .. 1.774 ms)
                     0.997 R²   (0.995 R² .. 0.998 R²)
mean                 1.716 ms   (1.694 ms .. 1.739 ms)
std dev              73.99 μs   (65.32 μs .. 85.48 μs)
variance introduced by outliers: 29% (moderately inflated)

Pregunta

Me encantaría preguntarle por qué la versión de pipes-concurrent funciona tan lenta y por qué es mucho más lenta que incluso la basada en chan. Estoy muy sorprendido, que el MVar uno es el más rápido de todas las versiones - podría alguien decir más, por qué obtenemos estos resultados y si podemos hacerlo mejor en cualquier caso?

Author: Wojciech Danilo, 2015-01-14

2 answers

Así que puedo darles una pequeña visión general de algunos de los análisis de Chan y TQueue (que pipes-concurrency está usando internamente aquí) que motivaron algunas decisiones de diseño que entraron en unagi-chan. No estoy seguro de si va a responder a su pregunta. Recomiendo bifurcar diferentes colas y jugar con variaciones mientras se compara para obtener una buena idea de lo que está sucediendo.

Chan

Chan parece:

data Chan a
 = Chan (MVar (Stream a)) -- pointer to "head", where we read from
        (MVar (Stream a)) -- pointer to "tail", where values written to

type Stream a = MVar (ChItem a)
data ChItem a = ChItem a (Stream a)

Es una lista enlazada de MVar s. Los dos MVar s en el acto de tipo Chan como punteros a la cabeza y cola actual de la lista, respectivamente. Así es como se ve una escritura:

writeChan :: Chan a -> a -> IO () 
writeChan (Chan _ writeVar) val = do 
    new_hole <- newEmptyMVar   mask_ $ do
    old_hole <- takeMVar writeVar           -- [1]
    putMVar old_hole (ChItem val new_hole)  -- [2]
    putMVar writeVar new_hole               -- [3]

En 1 el escritor toma un bloqueo en el extremo de escritura, en 2 nuestro elemento a se pone a disposición del lector, y en 3 el extremo de escritura se desbloquea para otros escritores.

Esto realmente funciona bastante bien en un escenario de un solo consumidor/un solo productor (ver el gráfico aquí) porque las lecturas y las escrituras no compiten. Pero una vez que tienes varios escritores concurrentes puedes empieza a tener problemas:

  • Un escritor que golpea 1 mientras que otro escritor está en 2 se bloqueará y será descheduled (el más rápido que he sido capaz de medir un cambio de contexto es ~150ns (bastante maldito rápido); probablemente hay situaciones en las que es mucho más lento). Así que cuando tienes muchos escritores contendiendo básicamente estás haciendo un gran viaje de ida y vuelta a través del planificador, en una cola de espera para el MVar y, finalmente, la escritura puede completarse.

  • Cuando un escritor consigue descheduled (porque se agotó el tiempo) mientras que en 2, mantiene un bloqueo y no se permitirá que se completen las escrituras hasta que se pueda reprogramar de nuevo; esto se convierte en un problema cuando estamos sobre-suscritos, es decir, cuando nuestra relación subprocesos/núcleo es alta.

Finalmente, el uso de un MVar-por-elemento requiere cierta sobrecarga en términos de asignación, y lo más importante cuando acumulamos muchos objetos mutables, podemos causar una gran cantidad de presión GC.

TQueue

TQueue es genial porque STM hace que sea súper simple razonar sobre su corrección. Es una cola funcional de estilo dequeue, y un write consiste simplemente en leer la pila de writer, consing nuestro elemento, y escribirlo de nuevo:

data TQueue a = TQueue (TVar [a])
                       (TVar [a])

writeTQueue :: TQueue a -> a -> STM ()
writeTQueue (TQueue _ write) a = do  
  listend <- readTVar write   -- a transaction with a consistent 
  writeTVar write (a:listend) -- view of memory

Si después de que un writeTQueue escribe su nueva pila de nuevo, otra escritura intercalada hace lo mismo, una de las escrituras se volverá a intentar. A medida que se intercalan más writeTQueue, el efecto de la contención se empeora. Sin embargo, el rendimiento se degrada mucho más lentamente que en Chan porque solo hay una única operación writeTVar que puede anular writeTQueues competidores, y la transacción es muy pequeña (solo una lectura y una (:)).

Una lectura funciona al "desquejar" la pila desde el lado de escritura, invertirla, y almacenar la pila invertida en su propia variable para un fácil "popping" (en conjunto, esto nos da O (1) push y pop amortizados)

readTQueue :: TQueue a -> STM a
readTQueue (TQueue read write) = do
  xs <- readTVar read
  case xs of
    (x:xs') -> do writeTVar read xs'
                  return x
    [] -> do ys <- readTVar write
             case ys of
               [] -> retry
               _  -> case reverse ys of
                       [] -> error "readTQueue"
                       (z:zs) -> do writeTVar write []
                                    writeTVar read zs
                                    return z

Los lectores tienen una cuestión de contención moderada simétrica para los escritores. En el caso general, los lectores y los escritores no se sostienen, sin embargo cuando la pila de lectores está agotada los lectores están luchando con otros lectores y escritores. Sospecho que si precargaste un TQueue con suficientes valores y luego lanzaste 4 lectores y 4 escritores, podrías inducir livelock mientras el reverso luchaba por completarlo antes de la siguiente escritura. También es interesante notar que a diferencia de MVar, una escritura a un TVar en la que muchos lectores están esperando los despierta a todos simultáneamente (esto podría ser más o menos eficiente, dependiendo del escenario).

I sospeche que no ve muchas de las debilidades de TQueue en su prueba; principalmente está viendo los efectos moderados de la contención de escritura y la sobrecarga de una gran cantidad de asignación y GC'ing una gran cantidad de objetos mutables.

Unagi-chan

unagi-chan fue diseñado en primer lugar para manejar bien la contención. Es conceptualmente muy simple, pero la implementación tiene algunas complejidades

data ChanEnd a = ChanEnd AtomicCounter (IORef (Int , Stream a))

data Stream a = Stream (Array (Cell a)) (IORef (Maybe (Stream a)))

data Cell a = Empty | Written a | Blocking (MVar a)

Los lados de lectura y escritura de la cola comparten el Stream en el que coordinan los valores de paso (desde de escritor a lector) e indicaciones de bloqueo (de lector a escritor), y los lados de lectura y escritura tienen cada uno un contador atómico independiente. Una escritura funciona como:

  1. Un escritor llama al atómico incrCounter en el contador de escritura para recibir su índice único en el que coordinar con su lector (único)

  2. El escritor encuentra su célula y realiza un CAS de Written a

  3. Si tiene éxito, sale, de lo contrario, ve que un lector lo ha vencido y está bloqueando (o procediendo a bloquear), por lo que hace un (\Blocking v)-> putMVar v a) y sale.

Una lectura funciona de una manera similar y obvia.

La primera innovación es hacer que el punto de contención sea una operación atómica que no se degrade bajo contención (como lo haría un bucle CAS/retry o un bloqueo similar a Chan). Basado en un simple benchmarking y experimentación, el primop fetch-and-add, expuesto por la biblioteca atomic-primops funciona mejor.

Entonces en 2 tanto el lector como el escritor necesitan realizar solo uno comparar e intercambiar (el camino rápido para el lector es una simple lectura no atómica) para terminar la coordinación.

Así que para tratar de hacer unagi-chan bueno, {[39]]}

  • Use fetch-and-add para manejar el punto de contención

  • Utilice técnicas lockfree tales que cuando estamos sobresuscritos un hilo siendo descheduled en épocas inoportunas no bloquee progreso para otros hilos (un escritor bloqueado puede bloquear a lo sumo el lector "asignado" a él por el contador; lea las advertencias re. excepciones asincrónicas en los documentos unagi-chan, y tenga en cuenta que Chan tiene una semántica más agradable aquí)

  • Utilice una matriz para almacenar nuestros elementos, que tiene mejor localidad (pero ver más abajo) menor sobrecarga por elemento y pone menos presión sobre el GC

Una nota final re. uso de una matriz: las escrituras concurrentes en una matriz son generalmente una mala idea para escalar porque causa una gran cantidad de tráfico de coherencia de caché, ya que las líneas de caché se invalidan de un lado a otro en los hilos de writer. El el término general es "falso compartir". Pero también hay ventajas en cuanto a caché y desventajas de los diseños alternativos que se me ocurre que escribirían rayas o algo así; he estado experimentando con esto un poco, pero no tengo nada concluyente en este momento.

Un lugar en el que legítimamente estamos preocupados con el intercambio falso es en nuestro contador, que alineamos y rellenamos con 64 bytes; esto efectivamente se mostró en los puntos de referencia, y el único inconveniente es el aumento del uso de memoria.

 17
Author: jberryman,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-01-14 18:00:04

Si tuviera que adivinar por qué pipes-concurrency funciona más mal, es porque cada lectura y escritura está envuelta en una transacción STM, mientras que las otras bibliotecas usan primitivas de concurrencia de bajo nivel más eficientes.

 5
Author: Gabriel Gonzalez,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-01-14 05:09:40