Event-Driven Flows
Overview
The Transactions Service participates in the FloatMe event architecture as both a consumer and a producer. It consumes events from Auth0 (Log Stream via EventBridge) and raw Plaid data (via Kinesis), and it produces account and transaction change events on an outbound Kinesis stream for downstream services.
| Direction | Source / Destination | Lambda | Purpose |
|---|---|---|---|
Inbound |
|
|
Triggers re-mining when a user logs in with stale transaction data |
Inbound |
|
|
Converts raw Plaid data into FloatMe format and persists to DynamoDB |
Internal |
|
|
Publishes account and transaction change events to the outbound Kinesis stream |
Internal |
|
|
Same feeder also watches the plaid table stream |
Outbound |
|
|
Delivers refined account and transaction events to downstream FloatMe services |
Outbound |
|
|
Domain events: new_account, user_new_txns_batch_completed |
Listener (Auth0 Login Events)
Trigger
Auth0 publishes Log Stream events to a special EventBridge event bus: prod-auth0 → SQS queue: prod-txn-listener-eventbridge
Events Handled
| Event Type | Description |
|---|---|
|
User successfully authenticated |
|
Suspicious email address pattern (still triggers a re-mine check) |
|
Suspicious IP address pattern (still triggers a re-mine check) |
(all others) |
Ignored |
Flow
Auth0 Log Stream event arrives (SQS message)
│
▼
Event type in [gd_auth_succeed, sepft, sertft]?
│ No → skip
│ Yes
▼
Look up most recent Plaid item for user
│
├─ No item found → skip (user has no linked bank)
│
└─ Item found
│
▼
Query webhook history for item:
- Was there ever a HISTORICAL_UPDATE webhook?
- Was there a DEFAULT_UPDATE webhook in the last 24 hours?
│
├─ No webhooks at all
│ └─► Send HISTORICAL_UPDATE to miner SQS
│ (first-time sync; mine last 365 days)
│
├─ Historical found + recent default found
│ └─► Skip (data is fresh, no action needed)
│
├─ Historical found + no recent default
│ └─► Send DEFAULT_UPDATE to miner SQS
│ (refresh last 14 days)
│
└─ No historical found
└─► Send HISTORICAL_UPDATE to miner SQS
(backfill; mine last 365 days)
The synthetic webhook message has source: "FROM_LISTENER" set so the miner can identify it and skip processing if the item has been removed in the meantime.
Refiner (Plaid Kinesis → DynamoDB)
Trigger
Kinesis stream: prod-txn-plaid-transactions (published by the miner and throttle-miner)
Parallelization factor: 2. Starting position: AT_TIMESTAMP (offset -15 minutes from deploy time).
Flow
Kinesis record received (prod-txn-plaid-transactions)
│
▼
Unmarshal Kinesis wrapper
(user_id, item_id, accounts, transactions, is_last_page)
│
├─ wrapper.Removed is set (TRANSACTIONS_REMOVED path)
│ │
│ └─ For each removed txn_id:
│ Write Remove record to DynamoDB
│ Update existing Transaction record → mark as removed
│
└─ Regular update path (accounts + transactions)
│
▼
Process accounts:
For each account in Plaid response:
Write Account record to DynamoDB (transactions table)
Write AccountBalance snapshot to DynamoDB (with TTL)
│
▼
Process transactions:
For each transaction in Plaid response:
├─ Transaction ID already marked removed (out-of-order)?
│ └─► Write Remove record
│ Send removal notification to Insight Service SQS
└─ New or valid transaction
└─► Batch write to Transaction table
│
▼
All transactions processed AND wrapper.LastPage = true
AND len(transactions) > 0?
└─► Emit user_new_txns_batch_completed to EventBridge
Feeder (DynamoDB Streams → Kinesis)
Trigger
DynamoDB Streams on two tables, both routing to the same feeder Lambda:
-
prod-txn-transactions— filters forACCOUNTandTRANSACTIONtype events -
prod-txn-plaid— does NOT use theACCOUNT/TRANSACTIONfilter set; surfacesITEMandLAYERTTL expirations and other Plaid-specific event types
Behaviour
The feeder reads the type attribute from each DynamoDB stream record’s new image to determine the entity type and route accordingly. TTL expiration events (identified by UserIdentity.PrincipalID == "dynamodb.amazonaws.com") are handled separately for cleanup logic.
Flow
DynamoDB Stream record received
│
▼
Event is a TTL expiration (UserIdentity = dynamodb.amazonaws.com)?
│ Yes
├─ type = LAYER → check if real item was created
│ └─ No real item found → call Plaid.RemoveItem()
└─ type = ITEM → mark item REMOVED, call Plaid.RemoveItem()
│ No
▼
Operation = DELETE?
│ Yes → skip
│ No
▼
Read type from new image
│
├─ type = TRANSACTION
│ Convert DynamoDB record → Transaction API model
│ Publish to prod-txn-floatme-transactions Kinesis
│ Partition key: user_id
│
├─ type = ACCOUNT
│ Convert DynamoDB record → Account model
│ Look up active items for user
│ Set is_main = true if account matches main_account_id
│ Emit new_account event to floatme-events EventBridge
│
└─ type = (other)
Skip (log debug)
Outbound Stream
The prod-txn-floatme-transactions Kinesis stream is consumed by downstream FloatMe services (underwriting, floats, subscriptions, etc.) for account and transaction change events.
Error Handling
The feeder processes all records in a batch sequentially. Errors on individual records are logged but do not stop processing of subsequent records. Failed records are reported as batch item failures in the DynamoDB event response, allowing the Lambda runtime to retry only the failed records.
Related Pages
-
Architecture — System context and component overview
-
Plaid Mining Pipeline — Miner, throttle-miner, and webhook routing details
-
Item Lifecycle — Item add/remove events and the item-created EventBridge event
-
Infrastructure — SQS queue configurations and Kinesis stream details