All checks were successful
Go CI/CD / go-ci (push) Successful in 36s
Added detailed comments for struct `singularity`, its methods, and helper functions to improve code clarity and maintainability. This includes explanations of each method's purpose, input/output, and thread-safety mechanisms where applicable.
226 lines
6.8 KiB
Go
226 lines
6.8 KiB
Go
package cqrs
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"reflect"
|
|
"sync"
|
|
)
|
|
|
|
// singularity is a type that orchestrates command, query, and event handling.
|
|
// It maintains handlers and middlewares for commands, queries, and events.
|
|
// A mutex is used for thread-safe access to the handlers and middlewares.
|
|
type singularity struct {
|
|
commandHandlers map[string]CommandHandler
|
|
queryHandlers map[string]QueryHandler
|
|
eventHandlers map[string][]EventHandler
|
|
commandMiddlewares []CommandMiddleware
|
|
queryMiddlewares []QueryMiddleware
|
|
eventMiddlewares []EventMiddleware
|
|
mutex sync.RWMutex
|
|
}
|
|
|
|
// NewSingularity initializes and returns a new instance of a Bus.
|
|
func NewSingularity() Bus {
|
|
return &singularity{
|
|
commandHandlers: make(map[string]CommandHandler),
|
|
queryHandlers: make(map[string]QueryHandler),
|
|
eventHandlers: make(map[string][]EventHandler),
|
|
commandMiddlewares: make([]CommandMiddleware, 0),
|
|
queryMiddlewares: make([]QueryMiddleware, 0),
|
|
eventMiddlewares: make([]EventMiddleware, 0),
|
|
}
|
|
}
|
|
|
|
// RegisterCommandHandler registers a handler for the specified command type.
|
|
// It applies any registered CommandMiddleware to the handler.
|
|
func (s *singularity) RegisterCommandHandler(
|
|
cmd Command, handler CommandHandler,
|
|
) {
|
|
s.mutex.Lock()
|
|
defer s.mutex.Unlock()
|
|
|
|
handler = applyCommandMiddleware(handler, s.commandMiddlewares)
|
|
|
|
s.commandHandlers[typeName(cmd)] = handler
|
|
}
|
|
|
|
// RegisterQueryHandler registers a handler for the specified query type.
|
|
// It applies any registered QueryMiddleware to the handler.
|
|
func (s *singularity) RegisterQueryHandler(query Query, handler QueryHandler) {
|
|
s.mutex.Lock()
|
|
defer s.mutex.Unlock()
|
|
|
|
handler = applyQueryMiddleware(handler, s.queryMiddlewares)
|
|
|
|
s.queryHandlers[typeName(query)] = handler
|
|
}
|
|
|
|
// RegisterEventHandler registers an event handler for a specific event type.
|
|
// It applies any registered EventMiddleware to the handler.
|
|
// Handlers are stored in a list, allowing multiple handlers for the same event.
|
|
func (s *singularity) RegisterEventHandler(event Event, handler EventHandler) {
|
|
s.mutex.Lock()
|
|
defer s.mutex.Unlock()
|
|
|
|
handler = applyEventMiddleware(handler, s.eventMiddlewares)
|
|
|
|
name := typeName(event)
|
|
s.eventHandlers[name] = append(s.eventHandlers[name], handler)
|
|
}
|
|
|
|
// UseCommandMiddleware appends a CommandMiddleware to the middleware chain.
|
|
func (s *singularity) UseCommandMiddleware(middleware CommandMiddleware) {
|
|
s.commandMiddlewares = append(s.commandMiddlewares, middleware)
|
|
}
|
|
|
|
// UseQueryMiddleware appends a QueryMiddleware to the middleware chain.
|
|
func (s *singularity) UseQueryMiddleware(middleware QueryMiddleware) {
|
|
s.queryMiddlewares = append(s.queryMiddlewares, middleware)
|
|
}
|
|
|
|
// UseEventMiddleware appends a EventMiddleware to the middleware chain.
|
|
func (s *singularity) UseEventMiddleware(middleware EventMiddleware) {
|
|
s.eventMiddlewares = append(s.eventMiddlewares, middleware)
|
|
}
|
|
|
|
// ExecuteCommand processes a command and emits resulting events if any.
|
|
// It returns the emitted events or an error if the processing fails.
|
|
func (s *singularity) ExecuteCommand(ctx context.Context, cmd Command) (
|
|
[]Event, error,
|
|
) {
|
|
handler, err := s.getCommandHandler(cmd)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
events, err := handler.Handle(ctx, cmd)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(events) > 0 {
|
|
if err = s.emitEvents(ctx, events...); err != nil {
|
|
return events, fmt.Errorf(
|
|
"command succeeded, but dispatching resulting events failed: %v",
|
|
typeName(cmd),
|
|
)
|
|
}
|
|
}
|
|
return events, nil
|
|
}
|
|
|
|
// ExecuteQuery executes the given query by invoking its registered handler.
|
|
// It returns the result of the query or an error if no handler is found.
|
|
func (s *singularity) ExecuteQuery(
|
|
ctx context.Context, query Query,
|
|
) (interface{}, error) {
|
|
handler, err := s.getQueryHandler(query)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return handler.Handle(ctx, query)
|
|
}
|
|
|
|
// EmitEvent dispatches an event to all registered handlers for its type.
|
|
// Returns an error if any handler processing the event fails.
|
|
func (s *singularity) EmitEvent(ctx context.Context, event Event) error {
|
|
handlers := s.getEventHandlers(event)
|
|
if len(handlers) == 0 {
|
|
log.Printf("[RuneBringer] No handler listening for event: %T", event)
|
|
return nil
|
|
}
|
|
for _, handler := range handlers {
|
|
if err := handler.Handle(ctx, event); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// emitEvents dispatches multiple events by calling EmitEvent for each one.
|
|
// Returns an error if any event fails to emit.
|
|
func (s *singularity) emitEvents(ctx context.Context, events ...Event) error {
|
|
for _, event := range events {
|
|
if err := s.EmitEvent(ctx, event); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// getCommandHandler retrieves a command handler for a given command type.
|
|
// Returns an error if no handler is registered for the query type.
|
|
func (s *singularity) getCommandHandler(cmd Command) (CommandHandler, error) {
|
|
s.mutex.RLock()
|
|
defer s.mutex.RUnlock()
|
|
handler, exists := s.commandHandlers[typeName(cmd)]
|
|
if !exists {
|
|
return nil, fmt.Errorf(
|
|
"no command handler registered for command type '%v'",
|
|
typeName(cmd),
|
|
)
|
|
}
|
|
return handler, nil
|
|
}
|
|
|
|
// getQueryHandler retrieves a query handler for a given query type.
|
|
// Returns an error if no handler is registered for the query type.
|
|
func (s *singularity) getQueryHandler(query Query) (QueryHandler, error) {
|
|
s.mutex.RLock()
|
|
defer s.mutex.RUnlock()
|
|
handler, exists := s.queryHandlers[typeName(query)]
|
|
if !exists {
|
|
return nil, fmt.Errorf(
|
|
"no query handler registered for query type '%v'", typeName(query),
|
|
)
|
|
}
|
|
return handler, nil
|
|
}
|
|
|
|
// getEventHandlers retrieves all registered event handlers for a given event type.
|
|
func (s *singularity) getEventHandlers(event Event) []EventHandler {
|
|
s.mutex.RLock()
|
|
defer s.mutex.RUnlock()
|
|
return s.eventHandlers[typeName(event)]
|
|
}
|
|
|
|
// typeName returns the name of the type of the given value.
|
|
// If the value is a pointer, it dereferences the type before returning its name.
|
|
func typeName(v interface{}) string {
|
|
t := reflect.TypeOf(v)
|
|
if t.Kind() == reflect.Ptr {
|
|
t = t.Elem()
|
|
}
|
|
return t.Name()
|
|
}
|
|
|
|
// applyCommandMiddleware applies a chain of middlewares to a CommandHandler.
|
|
func applyCommandMiddleware(
|
|
handler CommandHandler, middlewares []CommandMiddleware,
|
|
) CommandHandler {
|
|
for i := len(middlewares) - 1; i >= 0; i-- {
|
|
handler = middlewares[i](handler)
|
|
}
|
|
return handler
|
|
}
|
|
|
|
// applyQueryMiddleware applies a chain of middlewares to a QueryHandler.
|
|
func applyQueryMiddleware(
|
|
handler QueryHandler, middlewares []QueryMiddleware,
|
|
) QueryHandler {
|
|
for i := len(middlewares) - 1; i >= 0; i-- {
|
|
handler = middlewares[i](handler)
|
|
}
|
|
return handler
|
|
}
|
|
|
|
// applyEventMiddleware applies a chain of middleware to an EventHandler.
|
|
func applyEventMiddleware(
|
|
handler EventHandler, middlewares []EventMiddleware,
|
|
) EventHandler {
|
|
for i := len(middlewares) - 1; i >= 0; i-- {
|
|
handler = middlewares[i](handler)
|
|
}
|
|
return handler
|
|
}
|