Skip to content

Aggregation Pipeline

goodm provides a fluent builder for MongoDB aggregation pipelines. Chain stages together and execute against a collection.

Creating a Pipeline

pipe := goodm.NewPipeline(&User{})

The model parameter determines the collection. Pass options to override the database:

pipe := goodm.NewPipeline(&User{}, goodm.PipelineOptions{DB: otherDB})

Stages

Match

Filter documents (equivalent to find):

pipe.Match(bson.D{{Key: "age", Value: bson.D{{Key: "$gte", Value: 21}}}})

Group

Aggregate by grouping:

pipe.Group(bson.D{
    {Key: "_id", Value: "$role"},
    {Key: "count", Value: bson.D{{Key: "$sum", Value: 1}}},
    {Key: "avgAge", Value: bson.D{{Key: "$avg", Value: "$age"}}},
})

Sort

Order results:

pipe.Sort(bson.D{{Key: "count", Value: -1}}) // descending

Project

Reshape documents:

pipe.Project(bson.D{
    {Key: "email", Value: 1},
    {Key: "name", Value: 1},
    {Key: "_id", Value: 0},
})

Limit / Skip

Pagination:

pipe.Skip(20).Limit(10)

Unwind

Deconstruct an array field (auto-prefixed with $):

pipe.Unwind("tags")
// Produces: {$unwind: "$tags"}

Lookup

Left outer join:

pipe.Lookup("orders", "user_id", "_id", "user_orders")
// Joins the "orders" collection where orders.user_id == users._id
// Results stored in "user_orders" array

AddFields

Add computed fields:

pipe.AddFields(bson.D{
    {Key: "fullName", Value: bson.D{{Key: "$concat", Value: bson.A{"$first", " ", "$last"}}}},
})

Count

Count documents at the current stage:

pipe.Count("total")

Stage (Raw)

Add any stage not covered by the builder:

pipe.Stage(bson.D{{Key: "$sample", Value: bson.D{{Key: "size", Value: 5}}}})
pipe.Stage(bson.D{{Key: "$out", Value: "results_collection"}})

Executing

Execute

Runs the pipeline and decodes all results:

var results []bson.M
err := goodm.NewPipeline(&User{}).
    Match(bson.D{{Key: "age", Value: bson.D{{Key: "$gte", Value: 21}}}}).
    Group(bson.D{
        {Key: "_id", Value: "$role"},
        {Key: "count", Value: bson.D{{Key: "$sum", Value: 1}}},
    }).
    Sort(bson.D{{Key: "count", Value: -1}}).
    Execute(ctx, &results)

Results can be decoded into typed structs:

type RoleCount struct {
    Role  string `bson:"_id"`
    Count int    `bson:"count"`
}

var counts []RoleCount
err := pipe.Execute(ctx, &counts)

Cursor

Returns a raw *mongo.Cursor for streaming large result sets:

cursor, err := pipe.Cursor(ctx)
if err != nil {
    log.Fatal(err)
}
defer cursor.Close(ctx)

for cursor.Next(ctx) {
    var doc bson.M
    cursor.Decode(&doc)
}

Inspecting Stages

stages := pipe.Stages() // []bson.D

Full Example

Users per role with average age, sorted by count:

type RoleStat struct {
    Role   string  `bson:"_id"`
    Count  int     `bson:"count"`
    AvgAge float64 `bson:"avg_age"`
}

var stats []RoleStat
err := goodm.NewPipeline(&User{}).
    Match(bson.D{{Key: "verified", Value: true}}).
    Group(bson.D{
        {Key: "_id", Value: "$role"},
        {Key: "count", Value: bson.D{{Key: "$sum", Value: 1}}},
        {Key: "avg_age", Value: bson.D{{Key: "$avg", Value: "$age"}}},
    }).
    Sort(bson.D{{Key: "count", Value: -1}}).
    Execute(ctx, &stats)