Textract#
Amazon Textract is a service that automatically extracts text and data from scanned documents. Amazon Textract goes beyond simple optical character recognition (OCR) to also identify the contents of fields in forms and information stored in tables. Many companies today extract data from documents and forms through manual data entry that’s slow and expensive or through simple optical character recognition (OCR) software that requires manual customization or configuration. Rules and workflows for each document and form often need to be hard-coded and updated with each change to the form or when dealing with multiple forms. If the form deviates from the rules, the output is often scrambled and unusable.
Using Textract with boto3#
If your EC Data Platform account/DSL component has Textract access, you can use the Textract API via a Lambda function dataplatform-textract-api
which is based on the boto3 API. You can use Amazon WorkSpaces (after initiating an AWS role) or any DSL component that has access to boto3 and has associated permissions.
Following actions are supported:
analyze_document()
detect_document_text()
Below you can find a Python code example to showcase how text can be extracted from a document that is uploaded to S3. The output of the script:
Python snippet:
#Analyzes text in a document stored in an S3 bucket. Display polygon box around text and angled text
import boto3
import io
from io import BytesIO
import sys
from IPython.display import display
import math
from PIL import Image, ImageDraw, ImageFont
def ShowBoundingBox(draw,box,width,height,boxColor):
left = width * box['Left']
top = height * box['Top']
draw.rectangle([left,top, left + (width * box['Width']), top +(height * box['Height'])],outline=boxColor)
def ShowSelectedElement(draw,box,width,height,boxColor):
left = width * box['Left']
top = height * box['Top']
draw.rectangle([left,top, left + (width * box['Width']), top +(height * box['Height'])],fill=boxColor)
def get_kv_relationship(key_map, value_map, block_map):
kvs = {}
for block_id, key_block in key_map.items():
value_block = find_value_block(key_block, value_map)
key = get_text(key_block, block_map)
val = get_text(value_block, block_map)
kvs[key] = val
return kvs
def find_value_block(key_block, value_map):
for relationship in key_block['Relationships']:
if relationship['Type'] == 'VALUE':
for value_id in relationship['Ids']:
value_block = value_map[value_id]
return value_block
def get_text(result, blocks_map):
text = ''
if 'Relationships' in result:
for relationship in result['Relationships']:
if relationship['Type'] == 'CHILD':
for child_id in relationship['Ids']:
word = blocks_map[child_id]
if word['BlockType'] == 'WORD':
text += word['Text'] + ' '
if word['BlockType'] == 'SELECTION_ELEMENT':
if word['SelectionStatus'] == 'SELECTED':
text += 'X '
return text
def process_text_analysis(bucket, document):
#Get the document from S3
s3_connection = boto3.resource('s3', region_name='eu-west-1')
s3_object = s3_connection.Object(bucket,document)
s3_response = s3_object.get()
stream = io.BytesIO(s3_response['Body'].read())
image=Image.open(stream)
image_binary = stream.getvalue()
logger.info(type(image_binary))
# args are the same arguments that you would give to the boto3 API (https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/textract.html)
aws_lambda = boto3.client('lambda')
args={'Document':{'Bytes': image_binary.encode("base64")},'FeatureTypes':['TABLES', 'FORMS']}
method='analyze_document'
payload = {'Args':args,'Method':method}
response = aws_lambda.invoke(
FunctionName='dataplatform-textract-api',
InvocationType='RequestResponse',
LogType='Tail',
Payload=json.dumps(payload)
)
response = response['Payload'].read()
#Get the text blocks
blocks=response['Blocks']
width, height =image.size
draw = ImageDraw.Draw(image)
print ('Detected Document Text')
key_map = {}
value_map = {}
block_map = {}
for block in blocks:
block_id = block['Id']
block_map[block_id] = block
if block['BlockType'] == "KEY_VALUE_SET":
if 'KEY' in block['EntityTypes']:
key_map[block_id] = block
else:
value_map[block_id] = block
kvs = get_kv_relationship(key_map, value_map, block_map)
print("\n\n== FOUND KEY : VALUE pairs ===\n")
print_kvs(kvs)
# Create image showing bounding box/polygon the detected lines/text
for block in blocks:
draw=ImageDraw.Draw(image)
if block['BlockType'] == "KEY_VALUE_SET":
if block['EntityTypes'][0] == "KEY":
ShowBoundingBox(draw, block['Geometry']['BoundingBox'],width,height,'red')
else:
ShowBoundingBox(draw, block['Geometry']['BoundingBox'],width,height,'green')
if block['BlockType'] == 'TABLE':
ShowBoundingBox(draw, block['Geometry']['BoundingBox'],width,height, 'blue')
if block['BlockType'] == 'CELL':
ShowBoundingBox(draw, block['Geometry']['BoundingBox'],width,height, 'yellow')
if block['BlockType'] == 'SELECTION_ELEMENT':
if block['SelectionStatus'] =='SELECTED':
ShowSelectedElement(draw, block['Geometry']['BoundingBox'],width,height, 'blue')
#uncomment to draw polygon for all Blocks
points=[]
for polygon in block['Geometry']['Polygon']:
points.append((width * polygon['X'], height * polygon['Y']))
draw.polygon((points), outline='blue')
# Display the image
# Use this line if you are not using a Jupyter notebook
# image.show()
display(image)
return len(blocks)
def print_kvs(kvs):
for key, value in kvs.items():
print(key, ":", value)
def main():
bucket = 'ENTER YOUR BUCKET HERE'
document = 'ENTER YOUR IMAGE HERE'
block_count=process_text_analysis(bucket,document)
print("Blocks detected: " + str(block_count))
if __name__ == "__main__":
main()