package stonecqrs import ( "context" "fmt" "sync" "gitstormr.dev/stone-utils/stoneerror" ) // errCommandHandlerNotFound indicates a missing command handler error. // errQueryHandlerNotFound indicates a missing query handler error. // errEventHandlerNotFound indicates a missing event handler error. var ( errCommandHandlerNotFound = stoneerror.New( 1001, "command handler not found", ) errQueryHandlerNotFound = stoneerror.New(1002, "query handler not found") errEventHandlerNotFound = stoneerror.New(1003, "event handler not found") ) // Command represents an action or operation to be handled by a dispatcher. type Command interface{} // Query represents a marker interface used to identify query types. type Query interface{} // Event represents a domain event within the application architecture. type Event interface{} // CommandHandler defines an interface for handling commands in a CQRS pattern. // Implementations should process commands and return resulting events or errors. // Handle processes the given command and returns any resulting events or error. type CommandHandler interface { Handle(ctx context.Context, cmd Command) ([]Event, error) } // QueryHandler defines an interface for handling queries within a context. type QueryHandler interface { Handle(ctx context.Context, query Query) (interface{}, error) } // EventHandler is an interface for handling specific types of events. // It defines a single method Handle to process an Event with a context. // Handle returns an error if the event handling fails. type EventHandler interface { Handle(ctx context.Context, event Event) error } // Dispatcher is responsible for managing and dispatching handlers. // It supports command, query, and event handlers concurrently. // Commands, queries, and events are identified by their type names. // Uses a read-write mutex to ensure thread-safe operations. type Dispatcher struct { commandHandlers map[string]CommandHandler queryHandlers map[string]QueryHandler eventHandlers map[string][]EventHandler mutex sync.RWMutex } // NewDispatcher creates and returns a new instance of Dispatcher. func NewDispatcher() *Dispatcher { return &Dispatcher{ commandHandlers: make(map[string]CommandHandler), queryHandlers: make(map[string]QueryHandler), eventHandlers: make(map[string][]EventHandler), } } // RegisterCommandHandler registers a handler for a specific command type. func (d *Dispatcher) RegisterCommandHandler( cmd Command, handler CommandHandler, ) { d.mutex.Lock() defer d.mutex.Unlock() d.commandHandlers[typeName(cmd)] = handler } // RegisterQueryHandler registers a handler for the specified query type. func (d *Dispatcher) RegisterQueryHandler(query Query, handler QueryHandler) { d.mutex.Lock() defer d.mutex.Unlock() d.queryHandlers[typeName(query)] = handler } // RegisterEventHandler registers a handler for a specific type of event. func (d *Dispatcher) RegisterEventHandler(event Event, handler EventHandler) { d.mutex.Lock() defer d.mutex.Unlock() name := typeName(event) d.eventHandlers[name] = append(d.eventHandlers[name], handler) } // DispatchCommand processes a Command and dispatches the resulting Events. // Returns the generated Events or an error if command handling fails. func (d *Dispatcher) DispatchCommand(ctx context.Context, cmd Command) ( []Event, error, ) { handler, err := d.getCommandHandler(cmd) if err != nil { return nil, err } events, err := handler.Handle(ctx, cmd) if err != nil { return nil, err } if len(events) > 0 { if err = d.DispatchEvents(ctx, events...); err != nil { return events, stoneerror.Wrap( err, 1004, "failed to dispatch generated events", ) } } return events, nil } // DispatchQuery dispatches a query to its corresponding handler. // It retrieves the query handler and invokes its Handle method. // Returns the result of the query handler or an error if not found. func (d *Dispatcher) DispatchQuery( ctx context.Context, query Query, ) (interface{}, error) { handler, err := d.getQueryHandler(query) if err != nil { return nil, err } return handler.Handle(ctx, query) } // DispatchEvents dispatches multiple events to their registered handlers. // It processes each event sequentially and stops on the first error. // Returns an error if any event fails to dispatch. func (d *Dispatcher) DispatchEvents( ctx context.Context, events ...Event, ) error { for _, event := range events { if err := d.DispatchEvent(ctx, event); err != nil { return err } } return nil } // DispatchEvent dispatches an event to all registered event handlers. // It returns an error if any handler fails to process the event. func (d *Dispatcher) DispatchEvent(ctx context.Context, event Event) error { handlers, err := d.getEventHandlers(event) if err != nil { return err } for _, handler := range handlers { if err = handler.Handle(ctx, event); err != nil { return stoneerror.Wrap(err, 1005, "event handling failed") } } return nil } // getCommandHandler retrieves the CommandHandler for the given Command. // Returns an error if no handler is found. func (d *Dispatcher) getCommandHandler(cmd Command) (CommandHandler, error) { d.mutex.RLock() defer d.mutex.RUnlock() handler, exists := d.commandHandlers[typeName(cmd)] if !exists { return nil, errCommandHandlerNotFound } return handler, nil } // getQueryHandler retrieves the handler for a specific query type. // It returns an error if the handler is not found. func (d *Dispatcher) getQueryHandler(query Query) (QueryHandler, error) { d.mutex.RLock() defer d.mutex.RUnlock() handler, exists := d.queryHandlers[typeName(query)] if !exists { return nil, errQueryHandlerNotFound } return handler, nil } // getEventHandlers retrieves all handlers for a specific event type. // Returns an error if no handlers are registered for the event type. func (d *Dispatcher) getEventHandlers(event Event) ([]EventHandler, error) { d.mutex.RLock() defer d.mutex.RUnlock() handlers, exists := d.eventHandlers[typeName(event)] if !exists { return nil, errEventHandlerNotFound } return handlers, nil } // typeName returns the name of the type of the given interface value. func typeName(v interface{}) string { return fmt.Sprintf("%T", v) }