All checks were successful
Go CI/CD / go-ci (push) Successful in 19m8s
Reviewed-on: #2 Reviewed-by: Cloud Administrator <cloud-admin@noreply.gitstormr.dev> Co-authored-by: Rene Nochebuena <code-raider@noreply.gitstormr.dev> Co-committed-by: Rene Nochebuena <code-raider@noreply.gitstormr.dev>
207 lines
6.2 KiB
Go
207 lines
6.2 KiB
Go
package stonecqrs
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
|
|
"gitstormr.dev/stone-utils/stoneerror"
|
|
)
|
|
|
|
// errCommandHandlerNotFound indicates a missing command handler error.
|
|
// errQueryHandlerNotFound indicates a missing query handler error.
|
|
// errEventHandlerNotFound indicates a missing event handler error.
|
|
var (
|
|
errCommandHandlerNotFound = stoneerror.New(
|
|
1001, "command handler not found",
|
|
)
|
|
errQueryHandlerNotFound = stoneerror.New(1002, "query handler not found")
|
|
errEventHandlerNotFound = stoneerror.New(1003, "event handler not found")
|
|
)
|
|
|
|
// Command represents an action or operation to be handled by a dispatcher.
|
|
type Command interface{}
|
|
|
|
// Query represents a marker interface used to identify query types.
|
|
type Query interface{}
|
|
|
|
// Event represents a domain event within the application architecture.
|
|
type Event interface{}
|
|
|
|
// CommandHandler defines an interface for handling commands in a CQRS pattern.
|
|
// Implementations should process commands and return resulting events or errors.
|
|
// Handle processes the given command and returns any resulting events or error.
|
|
type CommandHandler interface {
|
|
Handle(ctx context.Context, cmd Command) ([]Event, error)
|
|
}
|
|
|
|
// QueryHandler defines an interface for handling queries within a context.
|
|
type QueryHandler interface {
|
|
Handle(ctx context.Context, query Query) (interface{}, error)
|
|
}
|
|
|
|
// EventHandler is an interface for handling specific types of events.
|
|
// It defines a single method Handle to process an Event with a context.
|
|
// Handle returns an error if the event handling fails.
|
|
type EventHandler interface {
|
|
Handle(ctx context.Context, event Event) error
|
|
}
|
|
|
|
// Dispatcher is responsible for managing and dispatching handlers.
|
|
// It supports command, query, and event handlers concurrently.
|
|
// Commands, queries, and events are identified by their type names.
|
|
// Uses a read-write mutex to ensure thread-safe operations.
|
|
type Dispatcher struct {
|
|
commandHandlers map[string]CommandHandler
|
|
queryHandlers map[string]QueryHandler
|
|
eventHandlers map[string][]EventHandler
|
|
mutex sync.RWMutex
|
|
}
|
|
|
|
// NewDispatcher creates and returns a new instance of Dispatcher.
|
|
func NewDispatcher() *Dispatcher {
|
|
return &Dispatcher{
|
|
commandHandlers: make(map[string]CommandHandler),
|
|
queryHandlers: make(map[string]QueryHandler),
|
|
eventHandlers: make(map[string][]EventHandler),
|
|
}
|
|
}
|
|
|
|
// RegisterCommandHandler registers a handler for a specific command type.
|
|
func (d *Dispatcher) RegisterCommandHandler(
|
|
cmd Command, handler CommandHandler,
|
|
) {
|
|
d.mutex.Lock()
|
|
defer d.mutex.Unlock()
|
|
d.commandHandlers[typeName(cmd)] = handler
|
|
}
|
|
|
|
// RegisterQueryHandler registers a handler for the specified query type.
|
|
func (d *Dispatcher) RegisterQueryHandler(query Query, handler QueryHandler) {
|
|
d.mutex.Lock()
|
|
defer d.mutex.Unlock()
|
|
d.queryHandlers[typeName(query)] = handler
|
|
}
|
|
|
|
// RegisterEventHandler registers a handler for a specific type of event.
|
|
func (d *Dispatcher) RegisterEventHandler(event Event, handler EventHandler) {
|
|
d.mutex.Lock()
|
|
defer d.mutex.Unlock()
|
|
name := typeName(event)
|
|
d.eventHandlers[name] = append(d.eventHandlers[name], handler)
|
|
}
|
|
|
|
// DispatchCommand processes a Command and dispatches the resulting Events.
|
|
// Returns the generated Events or an error if command handling fails.
|
|
func (d *Dispatcher) DispatchCommand(ctx context.Context, cmd Command) (
|
|
[]Event, error,
|
|
) {
|
|
handler, err := d.getCommandHandler(cmd)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
events, err := handler.Handle(ctx, cmd)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(events) > 0 {
|
|
if err = d.DispatchEvents(ctx, events...); err != nil {
|
|
return events, stoneerror.Wrap(
|
|
err, 1004, "failed to dispatch generated events",
|
|
)
|
|
}
|
|
}
|
|
|
|
return events, nil
|
|
}
|
|
|
|
// DispatchQuery dispatches a query to its corresponding handler.
|
|
// It retrieves the query handler and invokes its Handle method.
|
|
// Returns the result of the query handler or an error if not found.
|
|
func (d *Dispatcher) DispatchQuery(
|
|
ctx context.Context, query Query,
|
|
) (interface{}, error) {
|
|
handler, err := d.getQueryHandler(query)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return handler.Handle(ctx, query)
|
|
}
|
|
|
|
// DispatchEvents dispatches multiple events to their registered handlers.
|
|
// It processes each event sequentially and stops on the first error.
|
|
// Returns an error if any event fails to dispatch.
|
|
func (d *Dispatcher) DispatchEvents(
|
|
ctx context.Context, events ...Event,
|
|
) error {
|
|
for _, event := range events {
|
|
if err := d.DispatchEvent(ctx, event); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// DispatchEvent dispatches an event to all registered event handlers.
|
|
// It returns an error if any handler fails to process the event.
|
|
func (d *Dispatcher) DispatchEvent(ctx context.Context, event Event) error {
|
|
handlers, err := d.getEventHandlers(event)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, handler := range handlers {
|
|
if err = handler.Handle(ctx, event); err != nil {
|
|
return stoneerror.Wrap(err, 1005, "event handling failed")
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// getCommandHandler retrieves the CommandHandler for the given Command.
|
|
// Returns an error if no handler is found.
|
|
func (d *Dispatcher) getCommandHandler(cmd Command) (CommandHandler, error) {
|
|
d.mutex.RLock()
|
|
defer d.mutex.RUnlock()
|
|
|
|
handler, exists := d.commandHandlers[typeName(cmd)]
|
|
if !exists {
|
|
return nil, errCommandHandlerNotFound
|
|
}
|
|
return handler, nil
|
|
}
|
|
|
|
// getQueryHandler retrieves the handler for a specific query type.
|
|
// It returns an error if the handler is not found.
|
|
func (d *Dispatcher) getQueryHandler(query Query) (QueryHandler, error) {
|
|
d.mutex.RLock()
|
|
defer d.mutex.RUnlock()
|
|
|
|
handler, exists := d.queryHandlers[typeName(query)]
|
|
if !exists {
|
|
return nil, errQueryHandlerNotFound
|
|
}
|
|
return handler, nil
|
|
}
|
|
|
|
// getEventHandlers retrieves all handlers for a specific event type.
|
|
// Returns an error if no handlers are registered for the event type.
|
|
func (d *Dispatcher) getEventHandlers(event Event) ([]EventHandler, error) {
|
|
d.mutex.RLock()
|
|
defer d.mutex.RUnlock()
|
|
|
|
handlers, exists := d.eventHandlers[typeName(event)]
|
|
if !exists {
|
|
return nil, errEventHandlerNotFound
|
|
}
|
|
return handlers, nil
|
|
}
|
|
|
|
// typeName returns the name of the type of the given interface value.
|
|
func typeName(v interface{}) string {
|
|
return fmt.Sprintf("%T", v)
|
|
}
|