iOS SDK
On-device inference, model deployment, and federated learning for iOS.
GitHub: github.com/octomil-ai/octomil-ios
Installation
- Swift Package Manager
- CocoaPods
Add to Package.swift:
dependencies: [
.package(url: "https://github.com/octomil-ai/octomil-ios.git", from: "1.0.0")
]
Or in Xcode: File > Add Packages and enter https://github.com/octomil-ai/octomil-ios.git
pod 'Octomil', '~> 1.0'
Quick Start
import Octomil
// 1. Initialize
let client = OctomilClient(apiKey: "edg_...", orgId: "your-org-id")
// 2. Register device
try await client.register()
// 3. Download model (cached locally for offline use)
let model = try await client.downloadModel(modelId: "fraud-detector")
// 4. Run inference — all on-device, zero cloud calls
let prediction = try model.predict(input: ["features": userFeatures])
print("Fraud score: \(prediction["score"]!)")
// 5. Participate in federated training (optional)
try await client.train(modelId: "fraud-detector", data: localData, samples: 1000)
Device Pairing
Handle octomil://pair deep links with one line:
@main
struct MyApp: App {
var body: some Scene {
WindowGroup {
ContentView()
.edgeMLPairing() // handles deep links automatically
}
}
}
When a user scans a QR code from octomil deploy --phone, the deep link opens your app and presents a pairing flow.
Model Wrapping
Wrap an existing CoreML model with telemetry, validation, and OTA updates. One line changes at model load — zero changes at call sites:
// Before
let model = try MLModel(contentsOf: modelURL)
// After
let model = try Octomil.wrap(MLModel(contentsOf: modelURL), modelId: "classifier")
let result = try model.prediction(from: input) // identical API
The wrapper adds contract validation, latency telemetry, and OTA model updates.
Network Discovery
Make your device discoverable on the local network:
let discovery = DiscoveryManager()
discovery.startDiscoverable(deviceId: client.deviceId ?? "unknown")
The SDK advertises a _octomil._tcp Bonjour service. The CLI discovers this service and connects directly — no QR code needed.
OctomilClient
Constructor
OctomilClient(
apiKey: String,
orgId: String,
serverURL: URL = URL(string: "https://api.octomil.com")!,
configuration: OctomilConfiguration = .default,
heartbeatInterval: TimeInterval = 300
)
Key Methods
| Method | Description |
|---|---|
register() | Register device with Octomil server |
sendHeartbeat() | Send health report |
downloadModel(modelId:version:format:) | Download and cache model |
getCachedModel(modelId:version:) | Get cached model without downloading |
clearModelCache() | Clear all cached models |
train(modelId:data:samples:) | Train locally and upload |
participateInTrainingRound(...) | Train and upload with full control |
uploadWeights(...) | Upload raw weight update |
Model Rollouts
When you download a model, Octomil automatically determines which version your device receives based on active rollouts. Always use downloadModel() without specifying a version — the server provides the correct one:
let model = try await client.downloadModel(modelId: "sentiment-classifier")
// Server decides version based on rollout percentage + device hash
Streaming Inference
The SDK supports streaming inference via AsyncThrowingStream. Implement StreamingInferenceEngine to plug in your own model backend, then wrap it with InstrumentedStreamWrapper for automatic timing metrics (TTFC, inter-chunk latency, throughput).
import Octomil
// 1. Implement the engine protocol for your model
class MyTextEngine: StreamingInferenceEngine {
func generate(input: Any, modality: Modality) -> AsyncThrowingStream<InferenceChunk, Error> {
AsyncThrowingStream { continuation in
Task {
// Your on-device generation logic here
for i in 0..<tokenCount {
let chunk = InferenceChunk(
index: i,
data: tokenData,
modality: .text,
timestamp: Date(),
latencyMs: 0 // filled in by wrapper
)
continuation.yield(chunk)
}
continuation.finish()
}
}
}
}
// 2. Wrap the engine with timing instrumentation
let engine = MyTextEngine()
let wrapper = InstrumentedStreamWrapper(sessionId: UUID().uuidString, modality: .text)
let (stream, getResult) = wrapper.wrap(engine, input: "Summarize this document...")
// 3. Consume the stream
for try await chunk in stream {
let token = String(data: chunk.data, encoding: .utf8) ?? ""
print(token, terminator: "") // stream tokens to UI
}
// 4. Get aggregated metrics after the stream completes
if let result = getResult() {
print("TTFC: \(result.ttfcMs)ms")
print("Avg chunk latency: \(result.avgChunkLatencyMs)ms")
print("Throughput: \(result.throughput) chunks/sec")
print("Total chunks: \(result.totalChunks)")
}
InferenceChunk carries the chunk index, raw data bytes, modality (.text, .image, .audio, .video), timestamp, and latencyMs. The StreamingInferenceResult returned after the stream finishes includes sessionId, ttfcMs, avgChunkLatencyMs, totalChunks, totalDurationMs, and throughput.
Embeddings
Use EmbeddingClient to generate dense vector embeddings via the Octomil server.
import Octomil
let embeddingClient = EmbeddingClient(
serverURL: URL(string: "https://api.octomil.com")!,
apiKey: "edg_..."
)
// Single string
let result = try await embeddingClient.embed(
modelId: "nomic-embed-text",
input: "Hello, world!"
)
print(result.embeddings) // [[0.1, 0.2, ...]]
print(result.model) // "nomic-embed-text"
print(result.usage.promptTokens)
print(result.usage.totalTokens)
// Batch embedding
let batchResult = try await embeddingClient.embed(
modelId: "nomic-embed-text",
input: ["First document", "Second document", "Third document"]
)
// batchResult.embeddings contains one vector per input string
EmbeddingResult contains:
embeddings: [[Double]]-- one dense vector per input stringmodel: String-- the model that produced the embeddingsusage: EmbeddingUsage-- token counts (promptTokens,totalTokens)
Smart Routing
RoutingClient calls the Octomil routing API to decide whether inference should run on-device or in the cloud. Decisions are cached with a configurable TTL.
import Octomil
// 1. Configure the routing client
let routingConfig = RoutingConfig(
serverURL: URL(string: "https://api.octomil.com")!,
apiKey: "edg_...",
cacheTtlSeconds: 300, // cache decisions for 5 minutes
prefer: .fastest, // .device, .cloud, .cheapest, .fastest
modelParams: 2_000_000_000, // 2B parameter model
modelSizeMb: 1400
)
let router = RoutingClient(config: routingConfig)
// 2. Get device capabilities
let capabilities = DeviceMetadata.current().routingCapabilities()
// RoutingDeviceCapabilities includes: platform, model, totalMemoryMb,
// gpuAvailable, npuAvailable, supportedRuntimes (["coreml", "metal"])
// 3. Ask the routing API for a decision
let decision = await router.route(
modelId: "gemma-2b",
deviceCapabilities: capabilities
)
// 4. Act on the decision
if let decision = decision {
switch decision.target {
case "device":
// Run inference on-device using decision.format and decision.engine
print("Run on-device with \(decision.engine) engine")
case "cloud":
// Fall back to cloud inference
let response = try await router.cloudInfer(
modelId: "gemma-2b",
inputData: ["prompt": "Hello"]
)
print("Cloud result: \(response.output), latency: \(response.latencyMs)ms")
default:
break
}
}
RoutingClient is an actor, so all calls are concurrency-safe. route() returns nil on any network failure, allowing you to fall back to local inference gracefully.
RoutingDecision contains: id, target ("device" or "cloud"), format, engine, and an optional fallbackTarget with an endpoint URL.
Cache Management
// Invalidate a specific model's cached decision
await router.invalidate(modelId: "gemma-2b")
// Clear all cached decisions
await router.clearCache()
Cloud Fallback
When the routing API determines a device cannot run a model (insufficient memory, missing runtime, etc.), it returns target: "cloud" with a fallbackTarget containing the cloud endpoint. Use cloudInfer() to run inference in the cloud:
let decision = await router.route(modelId: "large-model", deviceCapabilities: capabilities)
if decision?.target == "cloud" {
do {
let response = try await router.cloudInfer(
modelId: "large-model",
inputData: ["prompt": "Explain quantum computing"],
parameters: ["max_tokens": 256]
)
print("Provider: \(response.provider)")
print("Latency: \(response.latencyMs)ms")
} catch {
// Cloud also failed — handle gracefully
print("Cloud inference failed: \(error)")
}
}
If route() returns nil (network failure, server down), the caller should default to on-device inference. The SDK never silently falls back -- you control the fallback logic.
Background Training
Use the BackgroundSync class with iOS BGTaskScheduler for background federated training:
// 1. Register background tasks in application(_:didFinishLaunchingWithOptions:)
BackgroundSync.registerBackgroundTasks()
// 2. Schedule training — uses BGProcessingTask under the hood
BackgroundSync.shared.scheduleNextTraining()
// 3. Handle app lifecycle
func applicationDidEnterBackground() {
BackgroundSync.shared.applicationDidEnterBackground()
}
Add the task identifiers to your Info.plist:
<key>BGTaskSchedulerPermittedIdentifiers</key>
<array>
<string>ai.octomil.training</string>
<string>ai.octomil.sync</string>
</array>
BackgroundSync schedules a BGProcessingTask that requires network connectivity. It respects BackgroundConstraints (WiFi required, charging required, minimum battery level). Training survives app backgrounding and device sleep.
Configuration
let config = OctomilConfiguration(
privacyConfiguration: .highPrivacy, // staggered uploads, DP
trainingConfiguration: TrainingConfiguration(
batchSize: 64,
learningRate: 0.01,
epochs: 3
),
enableLogging: true
)
let client = OctomilClient(apiKey: "edg_...", orgId: "my-org", configuration: config)
Privacy presets:
PrivacyConfiguration.default— standard settingsPrivacyConfiguration.highPrivacy— staggered uploads (1-10 min), DP with epsilon=0.5
Error Handling
All async methods throw OctomilError:
do {
let result = try await client.train(modelId: "fraud-detector", data: data, samples: 100)
} catch OctomilError.networkError(let message) {
print("Network error: \(message)")
} catch OctomilError.authenticationFailed {
print("Invalid API key or orgId")
} catch OctomilError.deviceNotRegistered {
print("Call register() first")
}
Requirements
- iOS 15.0+, Xcode 14.0+, Swift 5.7+
Gotchas
- iOS 15.0+ required — the SDK uses
async/awaitand structured concurrency. Devices on iOS 14 or earlier are not supported. register()must be called first — all other methods throwOctomilError.deviceNotRegisteredif the device hasn't registered. Call it once at app launch.- Model downloads are cached —
downloadModelcaches to disk. Subsequent calls return the cached version unless a new version is available via rollout. UseclearModelCache()to force re-download. - CoreML compilation happens on first load — the first inference after download may be slow (~1-2s) while CoreML compiles the model. Subsequent loads use the compiled cache.
- Federated training requires Wi-Fi by default — the SDK respects workspace training policies. If
wifi_onlyis set (default), training won't start on cellular. .edgeMLPairing()requires URL scheme — addoctomilto your app's URL schemes in Info.plist for deep link pairing to work.RoutingClientis an actor — all calls toroute(),cloudInfer(),clearCache(), andinvalidate()must beawaited.- Routing returns
nilon failure —route()never throws. It returnsnilwhen the server is unreachable, so you always control fallback behavior. - Background training uses BGTaskScheduler — register task identifiers in Info.plist and call
BackgroundSync.registerBackgroundTasks()at app launch. Don't use rawTask {}for background training -- the system will suspend it.
Related
- Python SDK — server-side orchestration
- Android SDK — native Android client
- Browser SDK — browser inference
- Model Catalog — model versioning
- Rollouts — progressive deployment