Introduces the 'Singularity' implementation, a CQRS Bus that supports commands, queries, and events, along with middleware extensibility. Includes comprehensive tests, modular files for commands, queries, and events, as well as CI/CD workflows.
194 lines
4.6 KiB
Go
194 lines
4.6 KiB
Go
package cqrs
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"reflect"
|
|
"sync"
|
|
)
|
|
|
|
type singularity struct {
|
|
commandHandlers map[string]CommandHandler
|
|
queryHandlers map[string]QueryHandler
|
|
eventHandlers map[string][]EventHandler
|
|
commandMiddlewares []CommandMiddleware
|
|
queryMiddlewares []QueryMiddleware
|
|
eventMiddlewares []EventMiddleware
|
|
mutex sync.RWMutex
|
|
}
|
|
|
|
func NewSingularity() Bus {
|
|
return &singularity{
|
|
commandHandlers: make(map[string]CommandHandler),
|
|
queryHandlers: make(map[string]QueryHandler),
|
|
eventHandlers: make(map[string][]EventHandler),
|
|
commandMiddlewares: make([]CommandMiddleware, 0),
|
|
queryMiddlewares: make([]QueryMiddleware, 0),
|
|
eventMiddlewares: make([]EventMiddleware, 0),
|
|
}
|
|
}
|
|
|
|
func (s *singularity) RegisterCommandHandler(
|
|
cmd Command, handler CommandHandler,
|
|
) {
|
|
s.mutex.Lock()
|
|
defer s.mutex.Unlock()
|
|
|
|
handler = applyCommandMiddleware(handler, s.commandMiddlewares)
|
|
|
|
s.commandHandlers[typeName(cmd)] = handler
|
|
}
|
|
|
|
func (s *singularity) RegisterQueryHandler(query Query, handler QueryHandler) {
|
|
s.mutex.Lock()
|
|
defer s.mutex.Unlock()
|
|
|
|
handler = applyQueryMiddleware(handler, s.queryMiddlewares)
|
|
|
|
s.queryHandlers[typeName(query)] = handler
|
|
}
|
|
|
|
func (s *singularity) RegisterEventHandler(event Event, handler EventHandler) {
|
|
s.mutex.Lock()
|
|
defer s.mutex.Unlock()
|
|
|
|
handler = applyEventMiddleware(handler, s.eventMiddlewares)
|
|
|
|
name := typeName(event)
|
|
s.eventHandlers[name] = append(s.eventHandlers[name], handler)
|
|
}
|
|
|
|
func (s *singularity) UseCommandMiddleware(middleware CommandMiddleware) {
|
|
s.commandMiddlewares = append(s.commandMiddlewares, middleware)
|
|
}
|
|
|
|
func (s *singularity) UseQueryMiddleware(middleware QueryMiddleware) {
|
|
s.queryMiddlewares = append(s.queryMiddlewares, middleware)
|
|
}
|
|
|
|
func (s *singularity) UseEventMiddleware(middleware EventMiddleware) {
|
|
s.eventMiddlewares = append(s.eventMiddlewares, middleware)
|
|
}
|
|
|
|
func (s *singularity) ExecuteCommand(ctx context.Context, cmd Command) (
|
|
[]Event, error,
|
|
) {
|
|
handler, err := s.getCommandHandler(cmd)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
events, err := handler.Handle(ctx, cmd)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(events) > 0 {
|
|
if err = s.emitEvents(ctx, events...); err != nil {
|
|
return events, fmt.Errorf(
|
|
"command succeeded, but dispatching resulting events failed: %v",
|
|
typeName(cmd),
|
|
)
|
|
}
|
|
}
|
|
return events, nil
|
|
}
|
|
|
|
func (s *singularity) ExecuteQuery(
|
|
ctx context.Context, query Query,
|
|
) (interface{}, error) {
|
|
handler, err := s.getQueryHandler(query)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return handler.Handle(ctx, query)
|
|
}
|
|
|
|
func (s *singularity) EmitEvent(ctx context.Context, event Event) error {
|
|
handlers := s.getEventHandlers(event)
|
|
if len(handlers) == 0 {
|
|
log.Printf("[RuneBringer] No handler listening for event: %T", event)
|
|
return nil
|
|
}
|
|
for _, handler := range handlers {
|
|
if err := handler.Handle(ctx, event); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *singularity) emitEvents(ctx context.Context, events ...Event) error {
|
|
for _, event := range events {
|
|
if err := s.EmitEvent(ctx, event); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *singularity) getCommandHandler(cmd Command) (CommandHandler, error) {
|
|
s.mutex.RLock()
|
|
defer s.mutex.RUnlock()
|
|
handler, exists := s.commandHandlers[typeName(cmd)]
|
|
if !exists {
|
|
return nil, fmt.Errorf(
|
|
"no command handler registered for command type '%v'",
|
|
typeName(cmd),
|
|
)
|
|
}
|
|
return handler, nil
|
|
}
|
|
|
|
func (s *singularity) getQueryHandler(query Query) (QueryHandler, error) {
|
|
s.mutex.RLock()
|
|
defer s.mutex.RUnlock()
|
|
handler, exists := s.queryHandlers[typeName(query)]
|
|
if !exists {
|
|
return nil, fmt.Errorf(
|
|
"no query handler registered for query type '%v'", typeName(query),
|
|
)
|
|
}
|
|
return handler, nil
|
|
}
|
|
|
|
func (s *singularity) getEventHandlers(event Event) []EventHandler {
|
|
s.mutex.RLock()
|
|
defer s.mutex.RUnlock()
|
|
return s.eventHandlers[typeName(event)]
|
|
}
|
|
|
|
func typeName(v interface{}) string {
|
|
t := reflect.TypeOf(v)
|
|
if t.Kind() == reflect.Ptr {
|
|
t = t.Elem()
|
|
}
|
|
return t.Name()
|
|
}
|
|
|
|
func applyCommandMiddleware(
|
|
handler CommandHandler, middlewares []CommandMiddleware,
|
|
) CommandHandler {
|
|
for i := len(middlewares) - 1; i >= 0; i-- {
|
|
handler = middlewares[i](handler)
|
|
}
|
|
return handler
|
|
}
|
|
|
|
func applyQueryMiddleware(
|
|
handler QueryHandler, middlewares []QueryMiddleware,
|
|
) QueryHandler {
|
|
for i := len(middlewares) - 1; i >= 0; i-- {
|
|
handler = middlewares[i](handler)
|
|
}
|
|
return handler
|
|
}
|
|
|
|
func applyEventMiddleware(
|
|
handler EventHandler, middlewares []EventMiddleware,
|
|
) EventHandler {
|
|
for i := len(middlewares) - 1; i >= 0; i-- {
|
|
handler = middlewares[i](handler)
|
|
}
|
|
return handler
|
|
}
|