simulator
import "github.com/umbralcalc/stochadex/pkg/simulator"
Package simulator provides the core simulation engine and infrastructure for stochadex simulations. It includes the main simulation loop, state management, partition coordination, and execution control mechanisms.
Key Features:
- Partition-based simulation architecture
- Concurrent execution with goroutine coordination
- State history management and time tracking
- Configurable termination and output conditions
- Flexible timestep control
- Storage and persistence utilities
Architecture: The simulator uses a partition-based approach where simulations are divided into independent partitions that can be executed concurrently. Each partition maintains its own state history and can communicate with other partitions through defined interfaces.
Usage Patterns:
- Configure and run complex multi-partition simulations
- Manage simulation state across multiple timesteps
- Coordinate concurrent execution of simulation components
- Store and retrieve simulation results and intermediate states
- Implement custom termination and output conditions
Index
- Variables
- func RunWithHarnesses(settings *Settings, implementations *Implementations) error
- type ConfigGenerator
- func NewConfigGenerator() *ConfigGenerator
- func (c *ConfigGenerator) GenerateConfigs() (*Settings, *Implementations)
- func (c *ConfigGenerator) GetGlobalSeed() uint64
- func (c *ConfigGenerator) GetPartition(name string) *PartitionConfig
- func (c *ConfigGenerator) GetSimulation() *SimulationConfig
- func (c *ConfigGenerator) ResetPartition(name string, config *PartitionConfig)
- func (c *ConfigGenerator) SetGlobalSeed(seed uint64)
- func (c *ConfigGenerator) SetPartition(config *PartitionConfig)
- func (c *ConfigGenerator) SetSimulation(config *SimulationConfig)
- type ConstantTimestepFunction
- type CumulativeTimestepsHistory
- type DownstreamStateValues
- type EveryNStepsOutputCondition
- type EveryStepOutputCondition
- type ExponentialDistributionTimestepFunction
- type Implementations
- type Iteration
- type IterationSettings
- type IterationTestHarness
- type IteratorInputMessage
- type JsonLogChannelOutputFunction
- type JsonLogEntry
- type JsonLogOutputFunction
- type NamedPartitionIndex
- type NamedUpstreamConfig
- type NilOutputCondition
- type NilOutputFunction
- type NumberOfStepsTerminationCondition
- type OnlyGivenPartitionsOutputCondition
- type OutputCondition
- type OutputFunction
- type Params
- func NewParams(params map[string][]float64) Params
- func (p *Params) Get(name string) []float64
- func (p *Params) GetCopy(name string) []float64
- func (p *Params) GetCopyOk(name string) ([]float64, bool)
- func (p *Params) GetIndex(name string, index int) float64
- func (p *Params) GetOk(name string) ([]float64, bool)
- func (p *Params) Set(name string, values []float64)
- func (p *Params) SetIndex(name string, index int, value float64)
- func (p *Params) SetPartitionName(name string)
- type PartitionConfig
- type PartitionConfigOrdering
- type PartitionCoordinator
- func NewPartitionCoordinator(settings *Settings, implementations *Implementations) *PartitionCoordinator
- func (c *PartitionCoordinator) ReadyToTerminate() bool
- func (c *PartitionCoordinator) RequestMoreIterations(wg *sync.WaitGroup)
- func (c *PartitionCoordinator) Run()
- func (c *PartitionCoordinator) Step(wg *sync.WaitGroup)
- func (c *PartitionCoordinator) UpdateHistory(wg *sync.WaitGroup)
- type PartitionState
- func (*PartitionState) Descriptor() ([]byte, []int)
- func (x *PartitionState) GetCumulativeTimesteps() float64
- func (x *PartitionState) GetPartitionName() string
- func (x *PartitionState) GetState() []float64
- func (*PartitionState) ProtoMessage()
- func (x *PartitionState) ProtoReflect() protoreflect.Message
- func (x *PartitionState) Reset()
- func (x *PartitionState) String() string
- type Settings
- type SimulationConfig
- type SimulationConfigStrings
- type StateHistory
- type StateIterator
- func NewStateIterator(iteration Iteration, params Params, partitionName string, partitionIndex int, valueChannels StateValueChannels, outputCondition OutputCondition, outputFunction OutputFunction, initState []float64, initTime float64) *StateIterator
- func (s *StateIterator) Iterate(stateHistories []*StateHistory, timestepsHistory *CumulativeTimestepsHistory) []float64
- func (s *StateIterator) ReceiveAndIteratePending(inputChannel <-chan *IteratorInputMessage)
- func (s *StateIterator) UpdateHistory(inputChannel <-chan *IteratorInputMessage)
- type StateTimeStorage
- func NewStateTimeStorage() *StateTimeStorage
- func (s *StateTimeStorage) ConcurrentAppend(name string, time float64, values []float64)
- func (s *StateTimeStorage) GetIndex(name string) int
- func (s *StateTimeStorage) GetNames() []string
- func (s *StateTimeStorage) GetTimes() []float64
- func (s *StateTimeStorage) GetValues(name string) [][]float64
- func (s *StateTimeStorage) SetTimes(times []float64)
- func (s *StateTimeStorage) SetValues(name string, values [][]float64)
- type StateTimeStorageOutputFunction
- type StateValueChannels
- type StdoutOutputFunction
- type TerminationCondition
- type TimeElapsedTerminationCondition
- type TimestepFunction
- type UpstreamConfig
- type UpstreamStateValues
- type WebsocketOutputFunction
Variables
var File_cmd_messages_partition_state_proto protoreflect.FileDescriptor
func RunWithHarnesses
func RunWithHarnesses(settings *Settings, implementations *Implementations) error
RunWithHarnesses runs all iterations, each wrapped in a test harness and returns any errors if found. The simulation is also run twice to check for statefulness residues.
type ConfigGenerator
ConfigGenerator builds Settings and Implementations programmatically and can generate runnable configs on demand.
type ConfigGenerator struct {
// contains filtered or unexported fields
}
func NewConfigGenerator
func NewConfigGenerator() *ConfigGenerator
NewConfigGenerator creates a new ConfigGenerator with empty ordering.
func (*ConfigGenerator) GenerateConfigs
func (c *ConfigGenerator) GenerateConfigs() (*Settings, *Implementations)
GenerateConfigs constructs Settings and Implementations ready to run. It computes state widths, converts named references, and configures iterations with their partition indices.
func (*ConfigGenerator) GetGlobalSeed
func (c *ConfigGenerator) GetGlobalSeed() uint64
GetGlobalSeed returns the current global seed.
func (*ConfigGenerator) GetPartition
func (c *ConfigGenerator) GetPartition(name string) *PartitionConfig
GetPartition retrieves a partition config by name.
func (*ConfigGenerator) GetSimulation
func (c *ConfigGenerator) GetSimulation() *SimulationConfig
GetSimulation returns the current simulation config.
func (*ConfigGenerator) ResetPartition
func (c *ConfigGenerator) ResetPartition(name string, config *PartitionConfig)
ResetPartition replaces the config for a partition by name.
func (*ConfigGenerator) SetGlobalSeed
func (c *ConfigGenerator) SetGlobalSeed(seed uint64)
SetGlobalSeed assigns a random seed to each partition derived from the provided global seed.
func (*ConfigGenerator) SetPartition
func (c *ConfigGenerator) SetPartition(config *PartitionConfig)
SetPartition adds a new partition config. Names must be unique.
func (*ConfigGenerator) SetSimulation
func (c *ConfigGenerator) SetSimulation(config *SimulationConfig)
SetSimulation sets the current simulation config.
type ConstantTimestepFunction
ConstantTimestepFunction uses a fixed stepsize.
type ConstantTimestepFunction struct {
float64
Stepsize }
func (*ConstantTimestepFunction) NextIncrement
func (t *ConstantTimestepFunction) NextIncrement(timestepsHistory *CumulativeTimestepsHistory) float64
type CumulativeTimestepsHistory
CumulativeTimestepsHistory is a rolling window of cumulative timesteps with NextIncrement and CurrentStepNumber.
type CumulativeTimestepsHistory struct {
float64
NextIncrement *mat.VecDense
Values int
CurrentStepNumber int
StateHistoryDepth }
type DownstreamStateValues
DownstreamStateValues contains information to broadcast state values to downstream iterators via channel.
type DownstreamStateValues struct {
chan []float64
Channel int
Copies }
type EveryNStepsOutputCondition
EveryNStepsOutputCondition emits output once every N steps.
type EveryNStepsOutputCondition struct {
int
N // contains filtered or unexported fields
}
func (*EveryNStepsOutputCondition) IsOutputStep
func (c *EveryNStepsOutputCondition) IsOutputStep(partitionName string, state []float64, cumulativeTimesteps float64) bool
type EveryStepOutputCondition
EveryStepOutputCondition calls the OutputFunction at every step.
type EveryStepOutputCondition struct{}
func (*EveryStepOutputCondition) IsOutputStep
func (c *EveryStepOutputCondition) IsOutputStep(partitionName string, state []float64, cumulativeTimesteps float64) bool
type ExponentialDistributionTimestepFunction
ExponentialDistributionTimestepFunction draws dt from an exponential distribution parameterised by Mean and Seed.
type ExponentialDistributionTimestepFunction struct {
float64
Mean uint64
Seed // contains filtered or unexported fields
}
func NewExponentialDistributionTimestepFunction
func NewExponentialDistributionTimestepFunction(mean float64, seed uint64) *ExponentialDistributionTimestepFunction
NewExponentialDistributionTimestepFunction constructs an exponential-dt timestep function given mean and seed.
func (*ExponentialDistributionTimestepFunction) NextIncrement
func (t *ExponentialDistributionTimestepFunction) NextIncrement(timestepsHistory *CumulativeTimestepsHistory) float64
type Implementations
Implementations provides concrete implementations for a simulation run.
type Implementations struct {
[]Iteration
Iterations
OutputCondition OutputCondition
OutputFunction OutputFunction
TerminationCondition TerminationCondition
TimestepFunction TimestepFunction}
type Iteration
Iteration defines the interface for per-partition state update functions in stochadex simulations.
The Iteration interface is the fundamental building block for defining how simulation state evolves over time. Each partition in a simulation uses an Iteration to compute its next state values based on the current state, parameters, and time information.
Design Philosophy: The Iteration interface emphasizes modularity and composability. By providing a simple, well-defined interface, it enables the creation of complex simulations through the combination of simple, focused iterations. This design supports both built-in iteration types and custom user-defined iterations.
Interface Methods:
- Configure: Initialize the iteration with simulation settings (called once)
- Iterate: Compute the next state values (called each simulation step)
Configuration Phase: Configure is called once per partition during simulation setup. It receives:
- partitionIndex: The index of this partition in the simulation
- settings: Global simulation settings and configuration
This phase is used for:
- Initializing random number generators
- Setting up internal data structures
- Configuring iteration-specific parameters
- Validating configuration parameters
Iteration Phase: Iterate is called each simulation step to compute the next state values. It receives:
- params: Current simulation parameters for this partition
- partitionIndex: The index of this partition
- stateHistories: State histories for all partitions (for cross-partition access)
- timestepsHistory: Time and timestep information
It must return:
- []float64: The next state values for this partition
Implementation Requirements:
- Configure must be called before Iterate
- Iterate must return a slice of the correct length (matching state width)
- Iterate should not modify the input parameters or state histories
- Iterate should be deterministic given the same inputs (for reproducible simulations)
Example Usage:
type MyIteration struct {
// Internal state
}
func (m *MyIteration) Configure(partitionIndex int, settings *Settings) {
// Initialize iteration
}
func (m *MyIteration) Iterate(params *Params, partitionIndex int,
[]*StateHistory,
stateHistories *CumulativeTimestepsHistory) []float64 {
timestepsHistory // Compute next state values
return []float64{newValue1, newValue2, ...}
}
Common Iteration Types:
- Stochastic processes: WienerProcessIteration, PoissonProcessIteration
- Deterministic functions: ValuesFunctionIteration, ConstantValuesIteration
- Aggregation functions: VectorMeanIteration, GroupedAggregationIteration
- User-defined iterations: Custom implementations for specific needs
Performance Considerations:
- Iterate is called frequently during simulation execution
- Implementations should be optimized for performance
- Avoid expensive computations or memory allocations in Iterate
- Consider caching expensive computations in Configure
Thread Safety:
- Iterate may be called concurrently from multiple goroutines
- Implementations should be thread-safe or stateless
- Shared mutable state should be protected by synchronization primitives
type Iteration interface {
(partitionIndex int, settings *Settings)
Configure(
Iterate*Params,
params int,
partitionIndex []*StateHistory,
stateHistories *CumulativeTimestepsHistory,
timestepsHistory ) []float64
}
type IterationSettings
IterationSettings is the YAML-loadable per-partition configuration.
Usage hints:
- Name is used to address partitions in other configs and params maps.
- ParamsFromUpstream forwards outputs from upstream partitions into Params.
- StateWidth and StateHistoryDepth control the size and depth of state.
type IterationSettings struct {
string `yaml:"name"`
Name `yaml:"params"`
Params Params map[string]UpstreamConfig `yaml:"params_from_upstream,omitempty"`
ParamsFromUpstream []float64 `yaml:"init_state_values"`
InitStateValues uint64 `yaml:"seed"`
Seed int `yaml:"state_width"`
StateWidth int `yaml:"state_history_depth"`
StateHistoryDepth }
type IterationTestHarness
IterationTestHarness wraps an iteration and performs checks on its behaviour while running.
type IterationTestHarness struct {
Iteration Iterationerror
Err // contains filtered or unexported fields
}
func (*IterationTestHarness) Configure
func (h *IterationTestHarness) Configure(partitionIndex int, settings *Settings)
func (*IterationTestHarness) Iterate
func (h *IterationTestHarness) Iterate(params *Params, partitionIndex int, stateHistories []*StateHistory, timestepsHistory *CumulativeTimestepsHistory) []float64
type IteratorInputMessage
IteratorInputMessage carries shared histories into iterator jobs.
type IteratorInputMessage struct {
[]*StateHistory
StateHistories *CumulativeTimestepsHistory
TimestepsHistory }
type JsonLogChannelOutputFunction
JsonLogChannelOutputFunction writes JSON log entries via a background goroutine using a channel for improved throughput.
type JsonLogChannelOutputFunction struct {
// contains filtered or unexported fields
}
func NewJsonLogChannelOutputFunction
func NewJsonLogChannelOutputFunction(filePath string) *JsonLogChannelOutputFunction
NewJsonLogChannelOutputFunction creates a JsonLogChannelOutputFunction. Call Close (defer it) to ensure flushing at the end of a run.
func (*JsonLogChannelOutputFunction) Close
func (j *JsonLogChannelOutputFunction) Close()
Close ensures that the log channel flushes at the end of a run.
func (*JsonLogChannelOutputFunction) Output
func (j *JsonLogChannelOutputFunction) Output(partitionName string, state []float64, cumulativeTimesteps float64)
type JsonLogEntry
JsonLogEntry is the serialised record format used by JSON log outputs.
type JsonLogEntry struct {
string `json:"partition_name"`
PartitionName []float64 `json:"state"`
State float64 `json:"time"`
CumulativeTimesteps }
type JsonLogOutputFunction
JsonLogOutputFunction writes newline-delimited JSON log entries.
type JsonLogOutputFunction struct {
// contains filtered or unexported fields
}
func NewJsonLogOutputFunction
func NewJsonLogOutputFunction(filePath string) *JsonLogOutputFunction
NewJsonLogOutputFunction creates a new JsonLogOutputFunction.
func (*JsonLogOutputFunction) Output
func (j *JsonLogOutputFunction) Output(partitionName string, state []float64, cumulativeTimesteps float64)
type NamedPartitionIndex
NamedPartitionIndex pairs the name of a partition with the partition index assigned to it by the PartitionCoordinator.
type NamedPartitionIndex struct {
string
Name int
Index }
type NamedUpstreamConfig
NamedUpstreamConfig is like UpstreamConfig but refers to upstream by name.
type NamedUpstreamConfig struct {
string `yaml:"upstream"`
Upstream []int `yaml:"indices,omitempty"`
Indices }
type NilOutputCondition
NilOutputCondition never outputs.
type NilOutputCondition struct{}
func (*NilOutputCondition) IsOutputStep
func (c *NilOutputCondition) IsOutputStep(partitionName string, state []float64, cumulativeTimesteps float64) bool
type NilOutputFunction
NilOutputFunction outputs nothing from the simulation.
type NilOutputFunction struct{}
func (*NilOutputFunction) Output
func (f *NilOutputFunction) Output(partitionName string, state []float64, cumulativeTimesteps float64)
type NumberOfStepsTerminationCondition
NumberOfStepsTerminationCondition terminates after MaxNumberOfSteps.
type NumberOfStepsTerminationCondition struct {
int
MaxNumberOfSteps }
func (*NumberOfStepsTerminationCondition) Terminate
func (t *NumberOfStepsTerminationCondition) Terminate(stateHistories []*StateHistory, timestepsHistory *CumulativeTimestepsHistory) bool
type OnlyGivenPartitionsOutputCondition
OnlyGivenPartitionsOutputCondition emits output only for listed partitions.
type OnlyGivenPartitionsOutputCondition struct {
map[string]bool
Partitions }
func (*OnlyGivenPartitionsOutputCondition) IsOutputStep
func (o *OnlyGivenPartitionsOutputCondition) IsOutputStep(partitionName string, state []float64, cumulativeTimesteps float64) bool
type OutputCondition
OutputCondition decides whether an output should be emitted this step.
type OutputCondition interface {
(partitionName string, state []float64, cumulativeTimesteps float64) bool
IsOutputStep}
type OutputFunction
OutputFunction writes state/time to an output sink when the OutputCondition is met.
type OutputFunction interface {
(partitionName string, state []float64, cumulativeTimesteps float64)
Output}
type Params
Params stores per-partition parameter values.
Usage hints:
- Use Get/GetIndex helpers to retrieve, Set/SetIndex to update.
- SetPartitionName improves error messages for missing params.
type Params struct {
map[string][]float64 `yaml:",inline"`
Map // contains filtered or unexported fields
}
func NewParams
func NewParams(params map[string][]float64) Params
NewParams constructs a Params instance.
func (*Params) Get
func (p *Params) Get(name string) []float64
Get returns parameter values or panics with a helpful message.
func (*Params) GetCopy
func (p *Params) GetCopy(name string) []float64
GetCopy returns a copy of parameter values or panics with a helpful message.
func (*Params) GetCopyOk
func (p *Params) GetCopyOk(name string) ([]float64, bool)
GetCopyOk returns a copy of parameter values if present along with a flag.
func (*Params) GetIndex
func (p *Params) GetIndex(name string, index int) float64
GetIndex returns a single parameter value or panics.
func (*Params) GetOk
func (p *Params) GetOk(name string) ([]float64, bool)
GetOk returns parameter values if present along with a boolean flag.
func (*Params) Set
func (p *Params) Set(name string, values []float64)
Set creates or updates parameter values by name.
func (*Params) SetIndex
func (p *Params) SetIndex(name string, index int, value float64)
SetIndex updates a single parameter value or panics on invalid index.
func (*Params) SetPartitionName
func (p *Params) SetPartitionName(name string)
SetPartitionName attaches the owning partition name for better errors.
type PartitionConfig
PartitionConfig defines a partition to add to a simulation.
Usage hints:
- Iteration is not YAML-serialised; set it programmatically.
- ParamsAsPartitions allows passing partition indices via their names.
- ParamsFromUpstream forwards outputs from named upstream partitions.
type PartitionConfig struct {
string `yaml:"name"`
Name `yaml:"-"`
Iteration Iteration `yaml:"params"`
Params Params map[string][]string `yaml:"params_as_partitions,omitempty"`
ParamsAsPartitions map[string]NamedUpstreamConfig `yaml:"params_from_upstream,omitempty"`
ParamsFromUpstream []float64 `yaml:"init_state_values"`
InitStateValues int `yaml:"state_history_depth"`
StateHistoryDepth uint64 `yaml:"seed"`
Seed }
func LoadPartitionConfigFromYaml
func LoadPartitionConfigFromYaml(path string) *PartitionConfig
LoadPartitionConfigFromYaml loads PartitionConfig from a YAML file path.
Usage hints:
- Calls Init to populate missing defaults after unmarshalling.
func (*PartitionConfig) Init
func (p *PartitionConfig) Init()
Init ensures params maps are initialised; call after unmarshalling YAML.
type PartitionConfigOrdering
PartitionConfigOrdering maintains the ordering and lookup for partitions. Can be updated dynamically via Append.
type PartitionConfigOrdering struct {
[]string
Names map[string]int
IndexByName map[string]*PartitionConfig
ConfigByName }
func (*PartitionConfigOrdering) Append
func (p *PartitionConfigOrdering) Append(config *PartitionConfig)
Append inserts another partition into the ordering and updates lookups.
type PartitionCoordinator
PartitionCoordinator orchestrates iteration work across partitions and applies state/time history updates in a coordinated manner.
The PartitionCoordinator is the central component that manages the execution of all partitions in a simulation. It coordinates the timing, communication, and state updates across all partitions, ensuring proper synchronization and maintaining simulation consistency.
Architecture: The coordinator uses a two-phase execution model:
- Iteration Phase: All partitions compute their next state values
- Update Phase: State and time histories are updated with new values
This design ensures that all partitions see consistent state information during each iteration, preventing race conditions and maintaining simulation determinism.
Concurrency Model:
- Each partition runs in its own goroutine for parallel execution
- Channels are used for inter-partition communication
- WaitGroups ensure proper synchronization between phases
- Shared state is protected by the coordinator’s control flow
Execution Flow:
- Compute next timestep increment using TimestepFunction
- Request iterations from all partitions (parallel execution)
- Wait for all iterations to complete
- Update state and time histories (parallel execution)
- Check termination condition
- Repeat until termination
Fields:
- Iterators: List of StateIterators, one per partition
- Shared: Shared state and time information accessible to all partitions
- TimestepFunction: Function that determines the next timestep increment
- TerminationCondition: Condition that determines when to stop the simulation
- newWorkChannels: Communication channels for coordinating partition work
Example Usage:
:= NewPartitionCoordinator(settings, implementations)
coordinator
// Run simulation until termination
.Run()
coordinator
// Or step-by-step control
for !coordinator.ReadyToTerminate() {
var wg sync.WaitGroup
.Step(&wg)
coordinator}
Performance:
- O(p) time complexity where p is the number of partitions
- Parallel execution of partition iterations
- Efficient channel-based communication
- Memory usage scales with partition count and state size
Thread Safety:
- Safe for concurrent access to coordinator methods
- Internal synchronization ensures consistent state updates
- Partition communication is thread-safe through channels
type PartitionCoordinator struct {
[]*StateIterator
Iterators *IteratorInputMessage
Shared
TimestepFunction TimestepFunction
TerminationCondition TerminationCondition// contains filtered or unexported fields
}
func NewPartitionCoordinator
func NewPartitionCoordinator(settings *Settings, implementations *Implementations) *PartitionCoordinator
NewPartitionCoordinator wires Settings and Implementations into a runnable coordinator with initial state/time histories and channels.
func (*PartitionCoordinator) ReadyToTerminate
func (c *PartitionCoordinator) ReadyToTerminate() bool
ReadyToTerminate returns whether the TerminationCondition is met.
func (*PartitionCoordinator) RequestMoreIterations
func (c *PartitionCoordinator) RequestMoreIterations(wg *sync.WaitGroup)
RequestMoreIterations spawns a goroutine per partition to run ReceiveAndIteratePending.
func (*PartitionCoordinator) Run
func (c *PartitionCoordinator) Run()
Run advances by repeatedly calling Step until termination.
func (*PartitionCoordinator) Step
func (c *PartitionCoordinator) Step(wg *sync.WaitGroup)
Step performs one simulation tick: compute dt, request iterations, then apply state/time updates.
func (*PartitionCoordinator) UpdateHistory
func (c *PartitionCoordinator) UpdateHistory(wg *sync.WaitGroup)
UpdateHistory spawns a goroutine per partition to run UpdateHistory and shifts time history forward, adding NextIncrement to t[0].
type PartitionState
type PartitionState struct {
float64 `protobuf:"fixed64,1,opt,name=cumulative_timesteps,json=cumulativeTimesteps,proto3" json:"cumulative_timesteps,omitempty"`
CumulativeTimesteps string `protobuf:"bytes,2,opt,name=partition_name,json=partitionName,proto3" json:"partition_name,omitempty"`
PartitionName []float64 `protobuf:"fixed64,3,rep,packed,name=state,proto3" json:"state,omitempty"`
State // contains filtered or unexported fields
}
func (*PartitionState) Descriptor
func (*PartitionState) Descriptor() ([]byte, []int)
Deprecated: Use PartitionState.ProtoReflect.Descriptor instead.
func (*PartitionState) GetCumulativeTimesteps
func (x *PartitionState) GetCumulativeTimesteps() float64
func (*PartitionState) GetPartitionName
func (x *PartitionState) GetPartitionName() string
func (*PartitionState) GetState
func (x *PartitionState) GetState() []float64
func (*PartitionState) ProtoMessage
func (*PartitionState) ProtoMessage()
func (*PartitionState) ProtoReflect
func (x *PartitionState) ProtoReflect() protoreflect.Message
func (*PartitionState) Reset
func (x *PartitionState) Reset()
func (*PartitionState) String
func (x *PartitionState) String() string
type Settings
Settings is the YAML-loadable top-level simulation configuration.
type Settings struct {
[]IterationSettings `yaml:"iterations"`
Iterations float64 `yaml:"init_time_value"`
InitTimeValue int `yaml:"timesteps_history_depth"`
TimestepsHistoryDepth }
func LoadSettingsFromYaml
func LoadSettingsFromYaml(path string) *Settings
LoadSettingsFromYaml loads Settings from a YAML file path.
Usage hints:
- Calls Init to populate missing defaults after unmarshalling.
func (*Settings) Init
func (s *Settings) Init()
Init fills in defaults and ensures maps are initialised. Call immediately after unmarshalling from YAML.
type SimulationConfig
SimulationConfig defines additional run-level configuration.
type SimulationConfig struct {
OutputCondition OutputCondition
OutputFunction OutputFunction
TerminationCondition TerminationCondition
TimestepFunction TimestepFunctionfloat64
InitTimeValue }
type SimulationConfigStrings
SimulationConfigStrings is the YAML-loadable version of SimulationConfig, referring to implementations by type names for templating.
type SimulationConfigStrings struct {
string `yaml:"output_condition"`
OutputCondition string `yaml:"output_function"`
OutputFunction string `yaml:"termination_condition"`
TerminationCondition string `yaml:"timestep_function"`
TimestepFunction float64 `yaml:"init_time_value"`
InitTimeValue }
func LoadSimulationConfigStringsFromYaml
func LoadSimulationConfigStringsFromYaml(path string) *SimulationConfigStrings
LoadSimulationConfigStringsFromYaml loads SimulationConfigStrings from YAML.
type StateHistory
StateHistory is a rolling window of state vectors.
Usage hints:
- Values holds rows of state (row 0 is most recent by convention).
- Use GetNextStateRowToUpdate when updating in multi-row histories.
type StateHistory struct {
// each row is a different state in the history, by convention,
// starting with the most recent at index = 0
*mat.Dense
Values // should be of length = StateWidth
[]float64
NextValues int
StateWidth int
StateHistoryDepth }
func (*StateHistory) CopyStateRow
func (s *StateHistory) CopyStateRow(index int) []float64
CopyStateRow copies a row from the state history given the index.
func (*StateHistory) GetNextStateRowToUpdate
func (s *StateHistory) GetNextStateRowToUpdate() []float64
GetNextStateRowToUpdate determines whether or not it is necessary to copy the previous row or simply expose it based on whether a history longer than 1 is needed.
type StateIterator
StateIterator runs an Iteration for a partition on a goroutine and manages reads/writes to history and output.
type StateIterator struct {
Iteration Iteration
Params Params
Partition NamedPartitionIndex
ValueChannels StateValueChannels
OutputCondition OutputCondition
OutputFunction OutputFunction}
func NewStateIterator
func NewStateIterator(iteration Iteration, params Params, partitionName string, partitionIndex int, valueChannels StateValueChannels, outputCondition OutputCondition, outputFunction OutputFunction, initState []float64, initTime float64) *StateIterator
NewStateIterator creates a StateIterator and may emit initial output if the condition is met by the initial state/time.
func (*StateIterator) Iterate
func (s *StateIterator) Iterate(stateHistories []*StateHistory, timestepsHistory *CumulativeTimestepsHistory) []float64
Iterate runs the Iteration and optionally triggers output if the condition is met for the new state/time.
func (*StateIterator) ReceiveAndIteratePending
func (s *StateIterator) ReceiveAndIteratePending(inputChannel <-chan *IteratorInputMessage)
ReceiveAndIteratePending listens for an IteratorInputMessage, updates upstream-driven params, runs Iterate, and stores a pending state update.
func (*StateIterator) UpdateHistory
func (s *StateIterator) UpdateHistory(inputChannel <-chan *IteratorInputMessage)
UpdateHistory applies the pending state update to the partition history.
type StateTimeStorage
StateTimeStorage provides thread-safe storage for simulation time series data with minimal contention and efficient access patterns.
StateTimeStorage is designed to handle concurrent access from multiple simulation partitions while maintaining data consistency and performance. It uses a mutex-protected design optimized for the common case of appending new data points during simulation execution.
Data Organization:
- Time series are organized by partition name
- Each partition can have multiple state dimensions
- Time axis is shared across all partitions
- Data is stored in row-major format for efficient access
Thread Safety:
- ConcurrentAppend is safe for concurrent use from multiple goroutines
- GetValues/GetTimes are safe for concurrent reads
- SetValues/SetTimes should not be called concurrently with appends
- Internal mutex protects against race conditions
Performance Characteristics:
- O(1) lookup by partition name using hash map
- O(1) append operations with minimal locking
- Memory usage: O(total_samples * state_dimensions)
- Efficient for high-frequency data collection
Usage Patterns:
- Real-time data collection during simulation runs
- Batch data loading from external sources
- Result storage for post-simulation analysis
- Intermediate storage for multi-stage simulations
Example Usage:
:= NewStateTimeStorage()
storage
// Concurrent appends from multiple partitions
go func() {
.ConcurrentAppend("prices", 1.0, []float64{100.0, 101.0})
storage}()
go func() {
.ConcurrentAppend("volumes", 1.0, []float64{1000.0})
storage}()
// Retrieve data after simulation
:= storage.GetValues("prices")
priceData := storage.GetTimes() timeData
Memory Management:
- Automatic memory allocation for new partitions
- Efficient storage of sparse time series
- No automatic cleanup (caller responsible for memory management)
Error Handling:
- GetValues panics if partition name is not found
- Provides helpful error messages with available partition names
- ConcurrentAppend handles time deduplication automatically
type StateTimeStorage struct {
// contains filtered or unexported fields
}
func NewStateTimeStorage
func NewStateTimeStorage() *StateTimeStorage
NewStateTimeStorage constructs a new StateTimeStorage.
func (*StateTimeStorage) ConcurrentAppend
func (s *StateTimeStorage) ConcurrentAppend(name string, time float64, values []float64)
ConcurrentAppend appends values for name and updates the time axis at most once per unique timestamp. Safe for concurrent use.
func (*StateTimeStorage) GetIndex
func (s *StateTimeStorage) GetIndex(name string) int
GetIndex returns or creates the index for a name.
func (*StateTimeStorage) GetNames
func (s *StateTimeStorage) GetNames() []string
GetNames returns all partition names present in the store.
func (*StateTimeStorage) GetTimes
func (s *StateTimeStorage) GetTimes() []float64
GetTimes returns the time axis.
func (*StateTimeStorage) GetValues
func (s *StateTimeStorage) GetValues(name string) [][]float64
GetValues returns all time series for name, panicking if absent.
func (*StateTimeStorage) SetTimes
func (s *StateTimeStorage) SetTimes(times []float64)
SetTimes replaces the time axis.
func (*StateTimeStorage) SetValues
func (s *StateTimeStorage) SetValues(name string, values [][]float64)
SetValues replaces the entire series for name.
type StateTimeStorageOutputFunction
StateTimeStorageOutputFunction stores output into StateTimeStorage when the condition is met.
type StateTimeStorageOutputFunction struct {
*StateTimeStorage
Store }
func (*StateTimeStorageOutputFunction) Output
func (f *StateTimeStorageOutputFunction) Output(partitionName string, state []float64, cumulativeTimesteps float64)
type StateValueChannels
StateValueChannels provides upstream/downstream channels for inter-iterator communication.
type StateValueChannels struct {
map[string]*UpstreamStateValues
Upstreams *DownstreamStateValues
Downstream }
func (*StateValueChannels) BroadcastDownstream
func (s *StateValueChannels) BroadcastDownstream(stateValues []float64)
BroadcastDownstream sends state values to all configured downstream copies.
func (*StateValueChannels) UpdateUpstreamParams
func (s *StateValueChannels) UpdateUpstreamParams(params *Params)
UpdateUpstreamParams updates Params with values received from upstream channels.
type StdoutOutputFunction
StdoutOutputFunction outputs the state to the terminal.
type StdoutOutputFunction struct{}
func (*StdoutOutputFunction) Output
func (s *StdoutOutputFunction) Output(partitionName string, state []float64, cumulativeTimesteps float64)
type TerminationCondition
TerminationCondition decides when the simulation should end.
type TerminationCondition interface {
(
Terminate[]*StateHistory,
stateHistories *CumulativeTimestepsHistory,
timestepsHistory ) bool
}
type TimeElapsedTerminationCondition
TimeElapsedTerminationCondition terminates after MaxTimeElapsed.
type TimeElapsedTerminationCondition struct {
float64
MaxTimeElapsed }
func (*TimeElapsedTerminationCondition) Terminate
func (t *TimeElapsedTerminationCondition) Terminate(stateHistories []*StateHistory, timestepsHistory *CumulativeTimestepsHistory) bool
type TimestepFunction
TimestepFunction computes the next time increment.
type TimestepFunction interface {
(
NextIncrement*CumulativeTimestepsHistory,
timestepsHistory ) float64
}
type UpstreamConfig
UpstreamConfig is the YAML-loadable representation of a slice of data from the output of a partition which is computationally upstream.
type UpstreamConfig struct {
int `yaml:"upstream"`
Upstream []int `yaml:"indices,omitempty"`
Indices }
type UpstreamStateValues
UpstreamStateValues contains information to receive state values from an upstream iterator via channel.
type UpstreamStateValues struct {
chan []float64
Channel []int
Indices }
type WebsocketOutputFunction
WebsocketOutputFunction serialises and sends outputs via a websocket connection when the condition is met.
type WebsocketOutputFunction struct {
// contains filtered or unexported fields
}
func NewWebsocketOutputFunction
func NewWebsocketOutputFunction(connection *websocket.Conn, mutex *sync.Mutex) *WebsocketOutputFunction
NewWebsocketOutputFunction constructs a WebsocketOutputFunction with a connection and a mutex for safe concurrent writes.
func (*WebsocketOutputFunction) Output
func (w *WebsocketOutputFunction) Output(partitionName string, state []float64, cumulativeTimesteps float64)
Generated by gomarkdoc