- Go 100%
| cmd/livenstore-cli | ||
| internal/idgen | ||
| .gitignore | ||
| aggregate_repository.go | ||
| aggregate_root.go | ||
| aggregate_root_test.go | ||
| event.go | ||
| event_test.go | ||
| go.mod | ||
| go.sum | ||
| json_store.go | ||
| json_store_test.go | ||
| livenstore-cli | ||
| README.md | ||
| store.go | ||
| subscription_manager.go | ||
| subscription_manager_test.go | ||
LivenStore
An Event Store implementation in Go with file-based JSON storage, stream linking, and pub/sub capabilities.
Installation
go get forge.evrard.online/livenstore
Usage
As a Library
package main
import (
"fmt"
"forge.evrard.online/livenstore"
)
func main() {
// Create a subscription manager for pub/sub
manager := livenstore.NewInMemorySubscriptionManager()
// Initialize the store with the subscription manager
store, err := livenstore.NewJSONEventStore("./data", manager)
if err != nil {
panic(err)
}
// Subscribe to events
store.Subscribe([]livenstore.EventType{"UserCreated"}, func(e livenstore.Event) {
fmt.Printf("Received event: %s\n", e.ID)
})
// Create and publish an event (stores and notifies subscribers)
event := livenstore.NewEvent("UserCreated", []byte(`{"name": "John"}`))
store.Publish(event)
// Or use lower-level operations:
// store.Append(event) // Store without notifying
// store.Read(event.ID) // Read by ID
// store.LinkToStream(event.ID, "users") // Link to a stream
}
Building the CLI
go build ./cmd/livenstore-cli
Project Structure
livenstore/
├── event.go # Event struct and constructor
├── store.go # EventStore interface
├── json_store.go # JSON file-based implementation
├── subscription_manager.go # Pub/sub subscription management
├── aggregate_root.go # Aggregate root for event sourcing
├── internal/ # Internal packages (not importable)
│ └── idgen/ # ULID-based ID generation
└── cmd/ # Applications
└── livenstore-cli/
Features
- Event Storage: Store events as JSON files with ULID-based identifiers
- Stream Linking: Organize events into streams using filesystem symlinks
- Pub/Sub: Subscribe to event types and receive notifications when events are published
- Lexicographic Ordering: ULIDs provide time-ordered, sortable event IDs
- Domain Events: Transform your business domain structs to/from events automatically
- Aggregate Root: Apply events to aggregates using convention-based method dispatch
Domain Events
LivenStore allows you to define your own business domain event structs and automatically convert them to and from the internal Event type. This keeps your domain logic clean and decoupled from the event store implementation.
Defining Domain Events
Define your domain events as regular Go structs:
type UserCreated struct {
UserID string `json:"user_id"`
Username string `json:"username"`
Email string `json:"email"`
}
type OrderPlaced struct {
OrderID string `json:"order_id"`
UserID string `json:"user_id"`
Amount float64 `json:"amount"`
}
Converting Domain Events
Use EventFrom to convert a domain event struct into a storable Event:
// Create a domain event
userCreated := &UserCreated{
UserID: "user-123",
Username: "john_doe",
Email: "john@example.com",
}
// Convert to Event (automatically generates ID and serializes payload)
event, err := livenstore.EventFrom(userCreated)
if err != nil {
panic(err)
}
// Store and publish
store.Publish(*event)
Use FromEvent to convert a stored Event back into your domain struct:
// Read event from store
event, err := store.Read(eventID)
if err != nil {
panic(err)
}
// Convert back to domain event
var userCreated UserCreated
err = livenstore.FromEvent(*event, &userCreated)
if err != nil {
panic(err)
}
fmt.Printf("User %s created\n", userCreated.Username)
Type Safety
FromEvent performs type checking to ensure the event type matches the target struct. If you try to deserialize an event into the wrong type, you'll get an error:
event, _ := livenstore.EventFrom(&UserCreated{...})
var order OrderPlaced
err := livenstore.FromEvent(*event, &order)
// Error: Wrong event type. Expected *main.UserCreated, got *main.OrderPlaced
Note: For proper roundtrip conversion, use a pointer when calling EventFrom (e.g., EventFrom(&domainEvent)) since FromEvent requires a pointer to unmarshal into.
Aggregate Root
LivenStore provides a generic AggregateRoot[T] type for implementing the event sourcing pattern. Aggregates can apply events using convention-based method dispatch: when you call Apply(aggregate, event), it automatically invokes the On<EventType> method on the aggregate.
The Aggregate Interface
All aggregates must implement the Aggregate interface, which requires an ID() method:
type Aggregate interface {
ID() string
}
This ensures that every aggregate has a unique identifier that can be used for event streams and persistence.
Defining an Aggregate
Use the self-referential generic pattern: embed AggregateRoot[*YourType] in your aggregate struct, implement the ID() method, and define handler methods following the On<EventType>(event) error pattern:
// User aggregate uses the self-referential generic pattern.
// The generic parameter *User allows AggregateRoot to call methods on User.
type User struct {
livenstore.AggregateRoot[*User]
id string // lowercase - accessed via ID() method
Username string
Active bool
}
// ID returns the unique identifier for this aggregate.
// Required by the livenstore.Aggregate interface.
func (u *User) ID() string {
return u.id
}
// Event types
type UserCreated struct {
UserID string `json:"user_id"`
Username string `json:"username"`
}
type UserRenamed struct {
NewUsername string `json:"new_username"`
}
type UserDeactivated struct {
Reason string `json:"reason"`
}
// Event handlers - must follow the pattern On<EventType>(event) error
func (u *User) OnUserCreated(e UserCreated) error {
u.id = e.UserID // Set the aggregate ID from the event
u.Username = e.Username
u.Active = true
return nil
}
func (u *User) OnUserRenamed(e UserRenamed) error {
if e.NewUsername == "" {
return errors.New("username cannot be empty")
}
u.Username = e.NewUsername
return nil
}
func (u *User) OnUserDeactivated(e UserDeactivated) error {
u.Active = false
return nil
}
Applying Events
Use the Apply method to apply events to your aggregate. The method dynamically finds and calls the appropriate On<EventType> handler:
user := &User{}
// Apply UserCreated - calls user.OnUserCreated(event)
err := user.Apply(user, UserCreated{UserID: "user-001", Username: "john_doe"})
if err != nil {
panic(err)
}
// user.ID() = "user-001", user.Username = "john_doe", user.Active = true
// Apply UserRenamed - calls user.OnUserRenamed(event)
err = user.Apply(user, UserRenamed{NewUsername: "john_smith"})
if err != nil {
panic(err)
}
// user.Username = "john_smith"
// Apply UserDeactivated - calls user.OnUserDeactivated(event)
err = user.Apply(user, UserDeactivated{Reason: "User requested deletion"})
if err != nil {
panic(err)
}
// user.Active = false
Replaying Events with OnEvent
The OnEvent method applies an event to the aggregate without tracking it in unsavedEvents. This is useful when replaying events from storage to rebuild aggregate state:
// Load events from store
storedEvents := loadEventsFromStore(userID)
// Replay events to rebuild state (without tracking)
user := &User{}
for _, storedEvent := range storedEvents {
var domainEvent any
// Convert stored event back to domain event...
err := user.OnEvent(user, domainEvent)
if err != nil {
panic(err)
}
}
// user state is rebuilt, unsavedEvents is empty
When to use which method:
| Method | Tracks Events | Use Case |
|---|---|---|
Apply |
Yes | New events that need to be persisted |
OnEvent |
No | Replaying events from storage |
How It Works
-
Method Resolution: Both
ApplyandOnEventextract the event type name (e.g.,UserCreated) and look for a method namedOn<EventType>(e.g.,OnUserCreated) on the handler. -
Signature Validation: The method must have exactly one parameter matching the event type and return exactly one
errorvalue. -
Event Tracking:
ApplycallsOnEventinternally, then tracks the event inunsavedEventson success.OnEventdoes not track events. -
Error Propagation: If the handler returns an error, both methods propagate it.
Applydoes not track events that fail.
Handler Method Requirements
| Requirement | Example |
|---|---|
Method name must be On + event type name |
OnUserCreated, OnOrderPlaced |
| Must accept exactly 1 parameter (the event) | func (u *User) OnUserCreated(e UserCreated) error |
| Parameter must match event type (as value) | Event UserCreated{} requires param UserCreated |
Must return exactly 1 value of type error |
return nil or return errors.New("...") |
Error Handling
The Apply method returns descriptive errors for common issues:
// Missing handler method
err := user.Apply(user, UnknownEvent{})
// Error: method OnUnknownEvent not found on handler *main.User
// Wrong parameter count
// func (u *User) OnBadEvent(a, b int) error
err := user.Apply(user, BadEvent{})
// Error: method OnBadEvent must have exactly 1 parameter, got 2
// Handler returns error
// func (u *User) OnUserRenamed(e UserRenamed) error { return errors.New("invalid") }
err := user.Apply(user, UserRenamed{})
// Error: invalid
Collecting Unsaved Events
After applying events to an aggregate, you can collect them for persistence using CollectUnsavedEvents(). This method returns all successfully applied events and clears the internal tracking slice:
user := &User{}
// Apply multiple events
user.Apply(user, UserCreated{UserID: "user-001", Username: "john_doe"})
user.Apply(user, UserRenamed{NewUsername: "john_smith"})
// Collect events for persistence
events := user.CollectUnsavedEvents()
// events contains [UserCreated{...}, UserRenamed{...}]
// Persist events to store
for _, event := range events {
storeEvent, _ := livenstore.EventFrom(event)
store.Publish(*storeEvent)
}
// Subsequent calls return empty slice until new events are applied
events = user.CollectUnsavedEvents()
// events is empty []
Key behaviors:
- Returns all events that were successfully applied (handlers returned
nil) - Clears the internal slice after collection (events are only returned once)
- Failed events (where handler returned an error) are never tracked
- Always returns a non-nil slice (empty slice
[]any{}if no events, nevernil)
Event Store Implementations
LivenStore defines an EventStore interface that can be implemented by different storage backends:
type EventStore interface {
Append(Event) error
Read(string) (*Event, error)
LinkToStream(string, string) error
ReadStream(string) ([]Event, error)
}
JSONEventStore
The JSONEventStore is a file-based implementation that stores events as individual JSON files on disk.
Storage Structure
data/
├── events/ # All events stored here
│ ├── 01HX7Y8K9M2N3P4Q5R6S7T8U9V.json
│ ├── 01HX7Y8K9M2N3P4Q5R6S7T8U9W.json
│ └── ...
└── streams/ # Stream directories with symlinks
├── users/
│ ├── 01HX7Y8K9M2N3P4Q5R6S7T8U9V.json -> ../../events/01HX...9V.json
│ └── 01HX7Y8K9M2N3P4Q5R6S7T8U9W.json -> ../../events/01HX...9W.json
└── orders/
└── 01HX7Y8K9M2N3P4Q5R6S7T8U9V.json -> ../../events/01HX...9V.json
How It Works
-
Event Storage: Each event is serialized to JSON and written to
events/<event-id>.json. The payload is base64-encoded to support binary data.{ "id": "01HX7Y8K9M2N3P4Q5R6S7T8U9V", "type": "UserCreated", "payload": "eyJuYW1lIjogIkpvaG4ifQ==" } -
Stream Linking: Streams are directories containing symbolic links to event files. This allows:
- An event to belong to multiple streams
- Efficient enumeration of events in a stream
- No data duplication
-
Reading Streams: Retrieve all events in a stream using
ReadStream:// Read all events from the "users" stream events, err := store.ReadStream("users") if err != nil { log.Fatal(err) } for _, event := range events { var user UserCreated livenstore.FromEvent(event, &user) fmt.Printf("User: %s\n", user.Username) }Events are returned in lexicographic order by ID, which corresponds to chronological order due to ULID properties.
-
Event IDs: Events are assigned ULID identifiers, which are:
- Lexicographically sortable by creation time
- Globally unique
- URL-safe and case-insensitive
Limitations
- Not suitable for high-concurrency scenarios (no file locking)
- Performance degrades with very large numbers of events
- Symlinks may not work on all filesystems
Subscription Manager
LivenStore includes a pub/sub system for subscribing to and publishing events.
SubscriptionManager Interface
type EventHandler func(Event)
type SubscriptionManager interface {
Subscribe([]EventType, EventHandler)
Publish(Event)
}
InMemorySubscriptionManager
The InMemorySubscriptionManager is an in-memory implementation that delivers events to subscribers asynchronously via channels.
// Create a subscription manager
manager := livenstore.NewInMemorySubscriptionManager()
// Subscribe to multiple event types with a single handler
manager.Subscribe([]livenstore.EventType{"UserCreated", "UserUpdated"}, func(e livenstore.Event) {
fmt.Printf("User event: %s\n", e.Type)
})
// Publish an event (delivered to all matching subscribers)
manager.Publish(event)
Features
- Multiple Event Types: Subscribe to multiple event types with a single handler
- Multiple Subscribers: Multiple handlers can subscribe to the same event type
- Async Delivery: Events are delivered asynchronously via buffered channels
- Integrated with Store:
JSONEventStore.Publish()both persists and notifies subscribers
Limitations
- In-memory only (subscriptions are lost on restart)
- No persistence of subscription state
- No guaranteed delivery or retry mechanism
License
MIT