Relación entre los manejadores de comandos, agregados, el repositorio y el almacén de eventos en CQRS


Me gustaría entender algunos detalles de las relaciones entre los manejadores de comandos, agregados, el repositorio y el almacén de eventos en sistemas basados en CQRS.

Lo que he entendido hasta ahora:

  • Los manejadores de comandos reciben comandos del bus. Son responsables de cargar el agregado apropiado desde el repositorio y llamar a la lógica de dominio en el agregado. Una vez terminado, quitan el comando del bus.
  • Un agregado proporciona un comportamiento y un estado. El estado nunca es público. La única manera de cambiar de estado es usando el comportamiento. Los métodos que modelan este comportamiento crean eventos a partir de las propiedades del comando y aplican estos eventos al agregado, que a su vez llama a un controlador de eventos que establece el estado interno en consecuencia.
  • El repositorio simplemente permite cargar agregados en un ID dado, y agregar nuevos agregados. Básicamente, el repositorio conecta el dominio al almacén de eventos.
  • El almacén de eventos, el último pero no least, es responsable de almacenar eventos en una base de datos (o cualquier almacenamiento que se utilice), y recargar estos eventos como un llamado flujo de eventos.

Hasta ahora, todo bien. Ahora hay algunos problemas que aún no he conseguido:

  • Si un manejador de comandos debe llamar a behavior en un agregado aún existente, todo es bastante fácil. El controlador de comandos obtiene una referencia al repositorio, llama a su método loadById y se devuelve el agregado. Pero, ¿qué hace el controlador de comandos cuando no hay agregado todavía, pero uno debe ser creado? Desde mi punto de vista, el agregado debería reconstruirse más tarde utilizando los eventos. Esto significa que la creación del agregado se realiza en respuesta a un evento fooCreated. Pero para poder almacenar cualquier evento (incluido el creado), necesito un agregado. Así que esto me parece un problema de gallina y huevo: no puedo crear el agregado sin el evento, pero el único componente que debería crear eventos es el agregado. Así que básicamente viene hasta: ¿ Cómo puedo crear nuevos agregados, quién hace qué?
  • Cuando un agregado desencadena un evento, un manejador de eventos interno responde a él (normalmente al ser llamado a través de un método apply) y cambia el estado del agregado. ¿Cómo se entrega este evento al repositorio? ¿Quién origina la acción" por favor envíe los nuevos eventos al repositorio / tienda de eventos"? ¿El agregado en sí? El repositorio viendo el agregado? ¿Alguien más que esté suscrito a los eventos internos? ...?
  • Por último, pero no menos importante, tengo un problema para entender correctamente el concepto de un flujo de eventos: En mi imaginación, es simplemente algo así como una lista ordenada de eventos. Lo importante es que está "ordenado". ¿Es esto correcto?
Author: Dennis Traub, 2012-09-11

3 answers

Lo siguiente se basa en mi propia experiencia y mis experimentos con varios marcos como Lokad.CQRS, NCQRS, etc. Estoy seguro de que hay múltiples maneras de manejar esto. Voy a publicar lo que tiene más sentido para mí.

1. Creación de agregados:

Cada vez que un manejador de comandos necesita un agregado, utiliza un repositorio. El repositorio recupera la lista respectiva de eventos del almacén de eventos y llama a un constructor sobrecargado, inyectando los eventos

var stream = eventStore.LoadStream(id)
var User = new User(stream)

Si el aggregate no existía antes, la secuencia estará vacía y el objeto recién creado estará en su estado original. Es posible que desee asegurarse de que en este estado solo se permiten unos pocos comandos para dar vida al agregado, por ejemplo, User.Create().

2. Almacenamiento de nuevos Eventos

El manejo de comandos ocurre dentro de una Unidad de Trabajo . Durante la ejecución del comando, cada evento resultante se agregará a una lista dentro del agregado (User.Changes). Una vez finalizada la ejecución, los cambios se añadirán a la tienda de eventos. En el siguiente ejemplo esto sucede en la siguiente línea:

store.AppendToStream(cmd.UserId, stream.Version, user.Changes)

3. Orden de los acontecimientos

Imagínese lo que sucedería, si dos eventos posteriores CustomerMoved se repiten en el orden equivocado.

Un Ejemplo

Trataré de ilustrar el con una pieza de pseudo-código (dejé deliberadamente las preocupaciones del repositorio dentro del controlador de comandos para mostrar lo que sucedería detrás del escenas):

Servicio De Aplicación:

UserCommandHandler
    Handle(CreateUser cmd)
        stream = store.LoadStream(cmd.UserId)
        user = new User(stream.Events)
        user.Create(cmd.UserName, ...)
        store.AppendToStream(cmd.UserId, stream.Version, user.Changes)

    Handle(BlockUser cmd)
        stream = store.LoadStream(cmd.UserId)
        user = new User(stream.Events)
        user.Block(string reason)
        store.AppendToStream(cmd.UserId, stream.Version, user.Changes)

Agregado:

User
    created = false
    blocked = false

    Changes = new List<Event>

    ctor(eventStream)
        foreach (event in eventStream)
            this.Apply(event)

    Create(userName, ...)
        if (this.created) throw "User already exists"
        this.Apply(new UserCreated(...))

    Block(reason)
        if (!this.created) throw "No such user"
        if (this.blocked) throw "User is already blocked"
        this.Apply(new UserBlocked(...))

    Apply(userCreatedEvent)
        this.created = true
        this.Changes.Add(userCreatedEvent)

    Apply(userBlockedEvent)
        this.blocked = true
        this.Changes.Add(userBlockedEvent)

Actualización:

Como nota al margen: La respuesta de Yves me recordó un interesante artículo de Udi Dahan de hace un par de años:

 32
Author: Dennis Traub,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2013-07-16 10:04:14

Una pequeña variación de Dennis excelente respuesta:

  • Cuando se trata de casos de uso "creacionales" (es decir, que deberían derivar nuevos agregados), trate de encontrar otro agregado o fábrica a la que pueda mover esa responsabilidad. Esto no entra en conflicto con tener un ctor que toma eventos para hidratar (o cualquier otro mecanismo para rehidratar para el caso). A veces la fábrica es solo un método estático (bueno para la captura de "contexto" / "intención"), a veces es un método de instancia de otro aggregate (buen lugar para la herencia de" datos"), a veces es un objeto de fábrica explícito (buen lugar para la lógica de creación" compleja").
  • Me gusta proporcionar un método GetChanges() explícito en mi agregado que devuelve la lista interna como una matriz. Si mi aggregate debe permanecer en la memoria más allá de una ejecución, también agrego un método AcceptChanges() para indicar que la lista interna debe borrarse (normalmente se llama después de que las cosas se descargan al almacén de eventos). Usted puede utilizar ya sea un tirón (GetChanges/Changes) o push (think. net event o IObservable) modelo basado aquí. Mucho depende de la semántica transaccional, tecnología, necesidades, etc ...
  • Tu eventstream es una lista vinculada. Cada revisión (evento/conjunto de cambios) apunta a la anterior (también conocida como el padre). Tu eventstream es una secuencia de eventos / cambios que ocurrieron en un agregado específico. La orden solo debe garantizarse dentro del límite agregado.
 10
Author: Yves Reynhout,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2012-09-11 08:26:17

Yo casi estoy de acuerdo con yves-reynhout y dennis-traub pero quiero mostrarles cómo hago esto. Quiero despojar a mis agregados de la responsabilidad de aplicar los eventos en sí mismos o rehidratarse; de lo contrario, hay mucha duplicación de código: cada constructor de agregados se verá igual:

UserAggregate:
    ctor(eventStream)
         foreach (event in eventStream)
            this.Apply(event)


OrderAggregate:
    ctor(eventStream)
         foreach (event in eventStream)
            this.Apply(event)


ProfileAggregate:
    ctor(eventStream)
         foreach (event in eventStream)
            this.Apply(event)

Esas responsabilidades podrían dejarse al despachador de órdenes. El comando es manejado directamente por el agregado.

Command dispatcher class

    dispatchCommand(command) method:
        newEvents = ConcurentProofFunctionCaller.executeFunctionUntilSucceeds(tryToDispatchCommand)
        EventDispatcher.dispatchEvents(newEvents)

    tryToDispatchCommand(command) method:
        aggregateClass = CommandSubscriber.getAggregateClassForCommand(command)
        aggregate = AggregateRepository.loadAggregate(aggregateClass, command.getAggregateId())
        newEvents = CommandApplier.applyCommandOnAggregate(aggregate, command)
        AggregateRepository.saveAggregate(command.getAggregateId(), aggregate, newEvents)

ConcurentProofFunctionCaller class

    executeFunctionUntilSucceeds(pureFunction) method:
        do this n times
            try
                call result=pureFunction()
                return result
            catch(ConcurentWriteException)
                continue
        throw TooManyRetries    

AggregateRepository class

     loadAggregate(aggregateClass, aggregateId) method:
         aggregate = new aggregateClass
         priorEvents = EventStore.loadEvents()
         this.applyEventsOnAggregate(aggregate, priorEvents)

     saveAggregate(aggregateId, aggregate, newEvents)
        this.applyEventsOnAggregate(aggregate, newEvents)
        EventStore.saveEventsForAggregate(aggregateId, newEvents, priorEvents.version)

SomeAggregate class
    handleCommand1(command1) method:
        return new SomeEvent or throw someException BUT don't change state!
    applySomeEvent(SomeEvent) method:
        changeStateSomehow() and not throw any exception and don't return anything!

Tenga en cuenta que esto es pseudo código proyectado desde una aplicación PHP; el código real debe tener cosas inyectadas y otras responsabilidades refactorizadas en otras clases. La idea es mantener los agregados lo más limpios posible y evitar la duplicación de código.

Algunos aspectos importantes sobre los agregados:

  1. los manejadores de comandos no deben cambiar de estado; producen eventos o excepciones de lanzamiento
  2. el evento que se aplica no debe lanzar ninguna excepción y no debe devolver nada; solo cambian estado interno

Una implementación PHP de código abierto de esto se puede encontrar aquí.

 0
Author: Constantin Galbenu,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-03-17 06:19:39