top of page

Complete Machine Learning pipeline for NLP tasks

Updated: Jan 7, 2022

End-to-end Machine Learning pipeline for Named Entity Recognition in emails with basic implementation

The pipeline architecture


Disclaimers

  1. This project was inspired by a problem I had a chance to solve in my professional career, however, the problem presented here is different and this article does not contain any code and/or solutions used in the product.

  2. The solution presented here is a simplified one. Further steps required for making it closer to reliable production-ready service are discussed at the end of the article.

  3. The given material assumes the reader is familiar with the basics of Machine Learning and Software Engineering but is curious to know how one can make them work together.


Introduction

Several years ago, a reliable ML system working in production and bringing real value was the prerogative of big tech, other companies didn’t have access to talent and technologies that enable implementation of such products. However, times have changed and nowadays more and more non-tech companies enjoy the benefits of real-world ML applications.


This article has everything one needs to convert a PoC to a full blown ML product including reference implementation of a simplified pipeline as well as hints on what to do next. The particular problem the system solves can be described as follows: Extracting names of companies from incoming emails and recording them. This may sound like a superficial problem but should work to demonstrate how an ML system can be put into production.


System overview

The proposed architecture is shown on the figure at the beginning of the article.


It has the following components:

  1. Mailbox to be polled for new emails

2. Workflow Orchestrator that controls the flow of data:

  • Polls the mailbox (1)

  • Makes requests to the Prediction Worker (3)

  • Saves the email bodies along with extracted entities to the DB (4)

3. Prediction Worker actually doing ML by extracting entities applying a model to a given email body

4. Database storing email bodies and extracted entities

5. (Optional) Model Trainer that would:

  • Form a training dataset querying against the DB (4)

  • (Re)train models when needed

  • Validate the model against the test dataset

  • Store the model in the Model Storage (6)

  • Change production model pointer to the new model if validation is successful

6. (Optional) Model Storage in the form of object storage like S3 bucket


Implementation

This article is accompanied by the repository that has the code for the complete system and running instructions.


Let’s review the components in detail.


Mailbox

SMTP (sending emails) / IMAP (reading emails) mail server is mocked using the GreenMail docker image opening respective ports:

mail:  
    image: greenmail/standalone:1.6.5  
    ports:    
        - "3025:3025" # : SMTP    
        - "3110:3110" # : POP3    
        - "3143:3143" # : IMAP    
        - "3465:3465" # : SMTPS    
        - "3993:3993" # : IMAPS    
        - "3995:3995" # : POP3s    
        - "8080:8080" # : GreenMail API


Workflow Orchestrator

In our system we use a very simple script that periodically polls the Mailbox (1), sends the body of email to the Prediction Worker (3) and records the results to the Database (4):

orchestrator:  
    build:    
        context: ./orchestrator  
    depends_on:    
        - mail    
        - db

import json
import time

import psycopg2 as psycopg2
import requests
from imap_tools import AND, MailBoxUnencrypted

sql = """INSERT INTO mail (subject, body, labels) VALUES (%s, %s, %s) RETURNING id;"""

time.sleep(10)

mb = MailBoxUnencrypted(host="mail", port=3143).login("user", "pass")

with psycopg2.connect(        
        host="db",        
        database="maildb",        
        user="pguser",        
        password="password") as conn:    
    while True:        
        print("Polling mailbox...")        
        time.sleep(5)        
        messages = mb.fetch(criteria=AND(seen=False),                            
                mark_seen=True,                            
                bulk=True)        
        for msg in messages:            
                labels = requests.post("http://prediction-worker:1390/predict", json={"mail_body": msg.text})            
                cur = conn.cursor()            
                cur.execute(sql, (msg.subject, msg.text, json.dumps(labels.json())))            
                id = cur.fetchone()[0]            
                conn.commit()            
                cur.close()            
                print(f"Recoded to DB with id={id}: {labels.json()}")


On the other hand, in a real production system, one of the workflow orchestration engines like Apache Airflow, Argo Workflows, etc.. can be used (a good list of such tools can be found here) for reliability and easier workflow declaration.


Prediction Worker

The prediction worker is a microservice providing an endpoint for prediction of the entities from the incoming text. In our example, we limit ourselves to a very simple FastAPI based web service that loads a pretrained model from SpaCy and applies it to the input.

import spacy as spacy
from fastapi import FastAPI
from typing import List, Dict

from model import PredictedEntity, Input

app = FastAPI()
nlp = spacy.load("en_core_web_sm")

@app.get("/health")
async def ping() -> Dict:    
    """ Healthcheck endpoint """    
    return {"status": "healthy"}

@app.post("/predict")
def predict(inp: Input) -> List[PredictedEntity]:    
    """ Return extracted entites from the document given its body """    
    
    # The code below is definitely not production-ready but rather quick implementation to show working server    
    # In the real prod system it would make sense to implement batching    
    
    doc = nlp(inp.mail_body)    
    result = []    
    for ent in doc.ents:        
        if ent.label_ == "ORG":            
            result.append({"entity_text": ent.text, "start": ent.start_char, "end": ent.end_char})    
    return result


Database

DB is just a single instance of PostgreSQL DB. Again, we assume it should be enough for reasonable loads and we don’t consider reliability issues when you would want to have a cluster or create a read-only replica for analytics so analysts can’t affect the production instances.


db:    
    image: postgres:10    
    environment:      
        - POSTGRES_USER=root      
        - POSTGRES_PASSWORD=password      
        - APP_DB_USER=pguser      
        - APP_DB_PASS=password      
        - APP_DB_NAME=maildb    
    healthcheck:      
        test: [ "CMD", "pg_isready", "-q", "-U", "root" ]      
        timeout: 60s      
        interval: 10s      
        retries: 10    
    deploy:      
        placement:        
            max_replicas_per_node: 1    
    ports:      
        - "5432:5432"    
    volumes:      
        - ./postgres-data:/var/lib/postgresql/data      
        # copy the sql script to create tables      
        - ./db:/docker-entrypoint-initdb.d/


Running the pipeline

This article is accompanied with the repository which includes all the code needed to run the end-to-end pipeline.


Requirements

The PC which is going to be used to run the pipeline should have Docker and telnet installed. Please refer to installation instructions for your system if needed.


The repository can be cloned with the following command:


Running it

The whole pipeline of 4 services (mail server, database, prediction service and orchestrator) can be started with one command:

docker-compose -f docker-compose.yaml up --build

It should start printing log messages from the services.


Sending an email

The pipeline is triggered by an unread email appearing in the mailbox. In order to send one, telnet util can be used.


Connecting to the IMAP mail server:

telnet localhost 3025

Sending the email with telnet:

EHLO user
MAIL FROM:<example@some-domain.com>
RCPT TO:<user>
DATA
Subject: Hello World
 
Hello!She works at Apple now but before that she worked at Microsoft.
.
QUIT

If everything went well, something like this should appear in logs:

orchestrator_1 | Polling mailbox…
prediction-worker_1 | INFO: 172.19.0.5:55294 — “POST /predict HTTP/1.1” 200 OK
orchestrator_1 | Recoded to DB with id=34: [{‘entity_text’: ‘Apple’, ‘start’: 24, ‘end’: 29}, {‘entity_text’: ‘Microsoft’, ‘start’: 58, ‘end’: 67}]


Checking the result

The data must also be recorded to the database. In order to check that, any DB client can be used with the following connection parameters:

host: localhost
port: 5432
database: maildb
username: pguser
pasword: password

and running

SELECT * FROM mail LIMIT 10

query.


Productionalizing

This implementation can give reader an idea of how a real ML system would look like and how its components would interact. However, in order to make the system robust, reliable and performant, some more steps are required.


Async services

The given implementation includes synchronous versions of requests library used by the Orchestrator (2) and the synchronous handlers in the Prediction Worker (3) which means that several requests can’t be processed in parallel, i.e. one prediction must be completed before the next requests can be sent. This might be suboptimal as the Orchestrator is just idling while Prediction Workers doing their thing.

To solve this issue, the following can be done:

  • Use aiohttp on the client (Orchestrator) side

  • Define async handlers on the server (Prediction Worker) side


Message queues

Even though periodic mailbox polling can serve as a kind of queue, putting a real queue like an Apache Kafka topic between the Orchestrator (which would be data producer publishing to the topic) and the Prediction Worker (which would be the data consumer subscribed to the topic) and between the source of data and the Orchestrator, might be a good idea. It would decouple the services more and might smoothen spikes in load and avoid timeouts if the Prediction Worker can’t handle the number of requests.


Deployment and scaling

One of the advantages of having a distributed system of microservices instead of one monolith that would do all the things is the ease of scaling. Reasonable loads would probably not require scaling Workflow Orchestrator beyond a single host for reasons other than reliability, however, it is highly likely that a single Prediction Worker would not be able to cope with higher load. Luckily for us, Prediction Worker is a stateless service which means it does not need to maintain state outside of a single request context (i.e. no race conditions between instances of the service) which in turn makes scaling it horizontally as easy as adding a load balancing service like nginx in front of the fleet of the workers.


The example accompanying the article uses docker-compose for deploying the pipeline mostly due to simplicity. However, real production systems rarely rely on this tool. I experienced two main ways to deploy microservice in tech companies:

  • A self-written system that manages deployments in cloud or on-prem.

  • Different flavours of Kubernetes and wrappers around it, like Kubeflow for example which adds many ML-specific conveniences to the bare Kubernetes.

The latter option with GitOps way of managing deployments seems the most popular nowadays.


Monitoring

Some services can emit metrics even without any additional code, like Airflow integration with statsd or Datadog integration with gunicorn. While these integrations can provide us with basic metrics like rate of HTTP response codes or response delays, ML pipeline requires more than that. Let’s look at what else could/should be monitored:

  • Prediction time from both server (Prediction Worker (3) itself) and client (Orchestrator (2)) side

  • Rate of HTTP codes returned by the Prediction Worker

  • Rate of different predicted labels and predicted confidences/similarities for model’s predictions drift detection. Change in rate of detections of certain entity can tell about potential model obsolescence and need for retraining/fine tuning which can be triggered by the Orchestrator (2) based on some drift threshold for example

  • Input data parameters for detecting the input data distribution drift, for example, input texts’ lengths, word count, etc… Similar to the previous point but here the change may indicate that the nature of data has changed and the model might need to be adapted to it

  • Standard host-related metrics like CPU load, memory/disk space, etc…


Conclusion

This wraps up the short journey to the world of production-ready Machine Learning solutions and gives a reader directions for further exploration and keywords to google about each of the topics covered.



Source: Towards Data Science


The Tech Platform

Recent Posts

See All
bottom of page