package cqrs import ( "context" "fmt" "log" "reflect" "sync" ) // singularity is a type that orchestrates command, query, and event handling. // It maintains handlers and middlewares for commands, queries, and events. // A mutex is used for thread-safe access to the handlers and middlewares. type singularity struct { commandHandlers map[string]CommandHandler queryHandlers map[string]QueryHandler eventHandlers map[string][]EventHandler commandMiddlewares []CommandMiddleware queryMiddlewares []QueryMiddleware eventMiddlewares []EventMiddleware mutex sync.RWMutex } // NewSingularity initializes and returns a new instance of a Bus. func NewSingularity() Bus { return &singularity{ commandHandlers: make(map[string]CommandHandler), queryHandlers: make(map[string]QueryHandler), eventHandlers: make(map[string][]EventHandler), commandMiddlewares: make([]CommandMiddleware, 0), queryMiddlewares: make([]QueryMiddleware, 0), eventMiddlewares: make([]EventMiddleware, 0), } } // RegisterCommandHandler registers a handler for the specified command type. // It applies any registered CommandMiddleware to the handler. func (s *singularity) RegisterCommandHandler( cmd Command, handler CommandHandler, ) { s.mutex.Lock() defer s.mutex.Unlock() handler = applyCommandMiddleware(handler, s.commandMiddlewares) s.commandHandlers[typeName(cmd)] = handler } // RegisterQueryHandler registers a handler for the specified query type. // It applies any registered QueryMiddleware to the handler. func (s *singularity) RegisterQueryHandler(query Query, handler QueryHandler) { s.mutex.Lock() defer s.mutex.Unlock() handler = applyQueryMiddleware(handler, s.queryMiddlewares) s.queryHandlers[typeName(query)] = handler } // RegisterEventHandler registers an event handler for a specific event type. // It applies any registered EventMiddleware to the handler. // Handlers are stored in a list, allowing multiple handlers for the same event. func (s *singularity) RegisterEventHandler(event Event, handler EventHandler) { s.mutex.Lock() defer s.mutex.Unlock() handler = applyEventMiddleware(handler, s.eventMiddlewares) name := typeName(event) s.eventHandlers[name] = append(s.eventHandlers[name], handler) } // UseCommandMiddleware appends a CommandMiddleware to the middleware chain. func (s *singularity) UseCommandMiddleware(middleware CommandMiddleware) { s.commandMiddlewares = append(s.commandMiddlewares, middleware) } // UseQueryMiddleware appends a QueryMiddleware to the middleware chain. func (s *singularity) UseQueryMiddleware(middleware QueryMiddleware) { s.queryMiddlewares = append(s.queryMiddlewares, middleware) } // UseEventMiddleware appends a EventMiddleware to the middleware chain. func (s *singularity) UseEventMiddleware(middleware EventMiddleware) { s.eventMiddlewares = append(s.eventMiddlewares, middleware) } // ExecuteCommand processes a command and emits resulting events if any. // It returns the emitted events or an error if the processing fails. func (s *singularity) ExecuteCommand(ctx context.Context, cmd Command) ( []Event, error, ) { handler, err := s.getCommandHandler(cmd) if err != nil { return nil, err } events, err := handler.Handle(ctx, cmd) if err != nil { return nil, err } if len(events) > 0 { if err = s.emitEvents(ctx, events...); err != nil { return events, fmt.Errorf( "command succeeded, but dispatching resulting events failed: %v", typeName(cmd), ) } } return events, nil } // ExecuteQuery executes the given query by invoking its registered handler. // It returns the result of the query or an error if no handler is found. func (s *singularity) ExecuteQuery( ctx context.Context, query Query, ) (interface{}, error) { handler, err := s.getQueryHandler(query) if err != nil { return nil, err } return handler.Handle(ctx, query) } // EmitEvent dispatches an event to all registered handlers for its type. // Returns an error if any handler processing the event fails. func (s *singularity) EmitEvent(ctx context.Context, event Event) error { handlers := s.getEventHandlers(event) if len(handlers) == 0 { log.Printf("[RuneBringer] No handler listening for event: %T", event) return nil } for _, handler := range handlers { if err := handler.Handle(ctx, event); err != nil { return err } } return nil } // emitEvents dispatches multiple events by calling EmitEvent for each one. // Returns an error if any event fails to emit. func (s *singularity) emitEvents(ctx context.Context, events ...Event) error { for _, event := range events { if err := s.EmitEvent(ctx, event); err != nil { return err } } return nil } // getCommandHandler retrieves a command handler for a given command type. // Returns an error if no handler is registered for the query type. func (s *singularity) getCommandHandler(cmd Command) (CommandHandler, error) { s.mutex.RLock() defer s.mutex.RUnlock() handler, exists := s.commandHandlers[typeName(cmd)] if !exists { return nil, fmt.Errorf( "no command handler registered for command type '%v'", typeName(cmd), ) } return handler, nil } // getQueryHandler retrieves a query handler for a given query type. // Returns an error if no handler is registered for the query type. func (s *singularity) getQueryHandler(query Query) (QueryHandler, error) { s.mutex.RLock() defer s.mutex.RUnlock() handler, exists := s.queryHandlers[typeName(query)] if !exists { return nil, fmt.Errorf( "no query handler registered for query type '%v'", typeName(query), ) } return handler, nil } // getEventHandlers retrieves all registered event handlers for a given event type. func (s *singularity) getEventHandlers(event Event) []EventHandler { s.mutex.RLock() defer s.mutex.RUnlock() return s.eventHandlers[typeName(event)] } // typeName returns the name of the type of the given value. // If the value is a pointer, it dereferences the type before returning its name. func typeName(v interface{}) string { t := reflect.TypeOf(v) if t.Kind() == reflect.Ptr { t = t.Elem() } return t.Name() } // applyCommandMiddleware applies a chain of middlewares to a CommandHandler. func applyCommandMiddleware( handler CommandHandler, middlewares []CommandMiddleware, ) CommandHandler { for i := len(middlewares) - 1; i >= 0; i-- { handler = middlewares[i](handler) } return handler } // applyQueryMiddleware applies a chain of middlewares to a QueryHandler. func applyQueryMiddleware( handler QueryHandler, middlewares []QueryMiddleware, ) QueryHandler { for i := len(middlewares) - 1; i >= 0; i-- { handler = middlewares[i](handler) } return handler } // applyEventMiddleware applies a chain of middleware to an EventHandler. func applyEventMiddleware( handler EventHandler, middlewares []EventMiddleware, ) EventHandler { for i := len(middlewares) - 1; i >= 0; i-- { handler = middlewares[i](handler) } return handler }