gravity-wells/cqrs/singularity.go
Rene Nochebuena 457ff24dcc
All checks were successful
Go CI/CD / go-ci (push) Successful in 36s
Add comprehensive documentation for Singularity struct methods
Added detailed comments for struct `singularity`, its methods, and helper functions to improve code clarity and maintainability. This includes explanations of each method's purpose, input/output, and thread-safety mechanisms where applicable.
2025-04-26 22:33:20 -06:00

226 lines
6.8 KiB
Go

package cqrs
import (
"context"
"fmt"
"log"
"reflect"
"sync"
)
// singularity is a type that orchestrates command, query, and event handling.
// It maintains handlers and middlewares for commands, queries, and events.
// A mutex is used for thread-safe access to the handlers and middlewares.
type singularity struct {
commandHandlers map[string]CommandHandler
queryHandlers map[string]QueryHandler
eventHandlers map[string][]EventHandler
commandMiddlewares []CommandMiddleware
queryMiddlewares []QueryMiddleware
eventMiddlewares []EventMiddleware
mutex sync.RWMutex
}
// NewSingularity initializes and returns a new instance of a Bus.
func NewSingularity() Bus {
return &singularity{
commandHandlers: make(map[string]CommandHandler),
queryHandlers: make(map[string]QueryHandler),
eventHandlers: make(map[string][]EventHandler),
commandMiddlewares: make([]CommandMiddleware, 0),
queryMiddlewares: make([]QueryMiddleware, 0),
eventMiddlewares: make([]EventMiddleware, 0),
}
}
// RegisterCommandHandler registers a handler for the specified command type.
// It applies any registered CommandMiddleware to the handler.
func (s *singularity) RegisterCommandHandler(
cmd Command, handler CommandHandler,
) {
s.mutex.Lock()
defer s.mutex.Unlock()
handler = applyCommandMiddleware(handler, s.commandMiddlewares)
s.commandHandlers[typeName(cmd)] = handler
}
// RegisterQueryHandler registers a handler for the specified query type.
// It applies any registered QueryMiddleware to the handler.
func (s *singularity) RegisterQueryHandler(query Query, handler QueryHandler) {
s.mutex.Lock()
defer s.mutex.Unlock()
handler = applyQueryMiddleware(handler, s.queryMiddlewares)
s.queryHandlers[typeName(query)] = handler
}
// RegisterEventHandler registers an event handler for a specific event type.
// It applies any registered EventMiddleware to the handler.
// Handlers are stored in a list, allowing multiple handlers for the same event.
func (s *singularity) RegisterEventHandler(event Event, handler EventHandler) {
s.mutex.Lock()
defer s.mutex.Unlock()
handler = applyEventMiddleware(handler, s.eventMiddlewares)
name := typeName(event)
s.eventHandlers[name] = append(s.eventHandlers[name], handler)
}
// UseCommandMiddleware appends a CommandMiddleware to the middleware chain.
func (s *singularity) UseCommandMiddleware(middleware CommandMiddleware) {
s.commandMiddlewares = append(s.commandMiddlewares, middleware)
}
// UseQueryMiddleware appends a QueryMiddleware to the middleware chain.
func (s *singularity) UseQueryMiddleware(middleware QueryMiddleware) {
s.queryMiddlewares = append(s.queryMiddlewares, middleware)
}
// UseEventMiddleware appends a EventMiddleware to the middleware chain.
func (s *singularity) UseEventMiddleware(middleware EventMiddleware) {
s.eventMiddlewares = append(s.eventMiddlewares, middleware)
}
// ExecuteCommand processes a command and emits resulting events if any.
// It returns the emitted events or an error if the processing fails.
func (s *singularity) ExecuteCommand(ctx context.Context, cmd Command) (
[]Event, error,
) {
handler, err := s.getCommandHandler(cmd)
if err != nil {
return nil, err
}
events, err := handler.Handle(ctx, cmd)
if err != nil {
return nil, err
}
if len(events) > 0 {
if err = s.emitEvents(ctx, events...); err != nil {
return events, fmt.Errorf(
"command succeeded, but dispatching resulting events failed: %v",
typeName(cmd),
)
}
}
return events, nil
}
// ExecuteQuery executes the given query by invoking its registered handler.
// It returns the result of the query or an error if no handler is found.
func (s *singularity) ExecuteQuery(
ctx context.Context, query Query,
) (interface{}, error) {
handler, err := s.getQueryHandler(query)
if err != nil {
return nil, err
}
return handler.Handle(ctx, query)
}
// EmitEvent dispatches an event to all registered handlers for its type.
// Returns an error if any handler processing the event fails.
func (s *singularity) EmitEvent(ctx context.Context, event Event) error {
handlers := s.getEventHandlers(event)
if len(handlers) == 0 {
log.Printf("[RuneBringer] No handler listening for event: %T", event)
return nil
}
for _, handler := range handlers {
if err := handler.Handle(ctx, event); err != nil {
return err
}
}
return nil
}
// emitEvents dispatches multiple events by calling EmitEvent for each one.
// Returns an error if any event fails to emit.
func (s *singularity) emitEvents(ctx context.Context, events ...Event) error {
for _, event := range events {
if err := s.EmitEvent(ctx, event); err != nil {
return err
}
}
return nil
}
// getCommandHandler retrieves a command handler for a given command type.
// Returns an error if no handler is registered for the query type.
func (s *singularity) getCommandHandler(cmd Command) (CommandHandler, error) {
s.mutex.RLock()
defer s.mutex.RUnlock()
handler, exists := s.commandHandlers[typeName(cmd)]
if !exists {
return nil, fmt.Errorf(
"no command handler registered for command type '%v'",
typeName(cmd),
)
}
return handler, nil
}
// getQueryHandler retrieves a query handler for a given query type.
// Returns an error if no handler is registered for the query type.
func (s *singularity) getQueryHandler(query Query) (QueryHandler, error) {
s.mutex.RLock()
defer s.mutex.RUnlock()
handler, exists := s.queryHandlers[typeName(query)]
if !exists {
return nil, fmt.Errorf(
"no query handler registered for query type '%v'", typeName(query),
)
}
return handler, nil
}
// getEventHandlers retrieves all registered event handlers for a given event type.
func (s *singularity) getEventHandlers(event Event) []EventHandler {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.eventHandlers[typeName(event)]
}
// typeName returns the name of the type of the given value.
// If the value is a pointer, it dereferences the type before returning its name.
func typeName(v interface{}) string {
t := reflect.TypeOf(v)
if t.Kind() == reflect.Ptr {
t = t.Elem()
}
return t.Name()
}
// applyCommandMiddleware applies a chain of middlewares to a CommandHandler.
func applyCommandMiddleware(
handler CommandHandler, middlewares []CommandMiddleware,
) CommandHandler {
for i := len(middlewares) - 1; i >= 0; i-- {
handler = middlewares[i](handler)
}
return handler
}
// applyQueryMiddleware applies a chain of middlewares to a QueryHandler.
func applyQueryMiddleware(
handler QueryHandler, middlewares []QueryMiddleware,
) QueryHandler {
for i := len(middlewares) - 1; i >= 0; i-- {
handler = middlewares[i](handler)
}
return handler
}
// applyEventMiddleware applies a chain of middleware to an EventHandler.
func applyEventMiddleware(
handler EventHandler, middlewares []EventMiddleware,
) EventHandler {
for i := len(middlewares) - 1; i >= 0; i-- {
handler = middlewares[i](handler)
}
return handler
}